Đây là một Use Case End-to-End rất thú vị trong việc xây dựng hệ thống tự động phản hồi sự kiện bảo mật (SOAR). Bài toán đặt ra là: Làm sao để khi OpenSearch phát hiện có kẻ đang quét thư mục Web, hệ thống sẽ tự động gọi tường lửa (UFW) khóa luôn IP đó, đồng thời tự động tạo Ticket (Jira/Freshservice) và gửi Email cảnh báo cho đội Security?
Thay vì dùng các giải pháp đắt tiền, mình đã dùng OpenSearch kết hợp với một script Python (Flask) đơn giản làm Webhook Receiver. Kết quả cực kỳ mỹ mãn! Dưới đây là từng bước thực hiện chômij người muốn tham khảo hoặc setup lab tương tự nhé.
from flask import Flask, request
import subprocess
import datetime
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def webhook():
print("\n" + "="*50)
print(f"[!] {datetime.datetime.now()} - CÓ BÁO ĐỘNG TỪ OPENSEARCH!")
try:
# Lấy dữ liệu JSON từ OpenSearch
data = request.get_json(force=True)
alert_name = data.get("alert_name", "Unknown Alert")
attacker_ip = data.get("attacker_ip", "Unknown")
print(f"[*] Tên sự kiện: {alert_name}")
print(f"[*] IP Hacker: {attacker_ip}")
if attacker_ip != "Unknown":
# ACTION 1: Tự động chặn IP trên Firewall (UFW)
print(f"\n[+] ACTION 1: Đang thực thi Block IP {attacker_ip} trên Firewall...")
subprocess.run(["sudo", "ufw", "deny", "from", attacker_ip], check=False)
# ACTION 2 & 3: Demo gọi API tạo Ticket và gửi Email
print(f"\n[+] ACTION 2: Đang gọi API tạo Ticket trên Jira/Freshservice...")
print(f"\n[+] ACTION 3: Đang gửi Email cảnh báo đến Security Team...")
return "Success", 200
except Exception as e:
print(f"[-] Lỗi xử lý SOAR: {e}")
return "Internal Server Error", 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
(Lưu ý: Bạn nhớ chạy script này bằng lệnh sudo python3 script_name.py để nó có quyền gọi lệnh UFW nhé).
{
"alert_name": "Phát hiện tấn công dò quét thư mục Web",
"attacker_ip": "192.168.10.200",
"severity": "High",
"action": "block_ip"
}
Nguyên nhân là do nút Test ở Channels chỉ gửi đi text thô (plain text), trong khi script Python của chúng ta đang bắt buộc phải nhận JSON để lấy IP.
Cách giải quyết: Hãy bấm nút "Send test message" ở bên trong giao diện chỉnh sửa Monitor (nơi bạn dán đoạn JSON). Khi đó OpenSearch mới lấy đúng cục JSON đó gửi đi!
Mở thêm một tab Terminal khác, gõ lệnh sudo ufw status là sẽ thấy IP của hacker đã chễm chệ nằm trong danh sách DENY của tường lửa. Quá trình khóa IP diễn ra chưa tới 1 giây!
Thay vì dùng các giải pháp đắt tiền, mình đã dùng OpenSearch kết hợp với một script Python (Flask) đơn giản làm Webhook Receiver. Kết quả cực kỳ mỹ mãn! Dưới đây là từng bước thực hiện chômij người muốn tham khảo hoặc setup lab tương tự nhé.
Bước 1: Tạo kênh Webhook trên OpenSearch
Đầu tiên, chúng ta cần tạo một "ống xả" để OpenSearch đẩy cảnh báo ra ngoài.- Vào phần Notifications > Channels > Create channel.
- Chọn loại là Custom webhook.
- Khai báo Method là POST và trỏ Webhook URL về địa chỉ IP máy chủ đang chạy script Python của bạn (cổng 5000).
- Đừng quên thêm Webhook header: Content-Type là application/json nhé.
Bước 2: Viết script Python làm "Bộ não" SOAR
Mở Terminal trên máy chủ Linux và viết một đoạn code Python dùng Flask. Nhiệm vụ của nó là lắng nghe các gói tin JSON từ OpenSearch gửi sang, bóc tách IP và gọi các lệnh hệ thống.from flask import Flask, request
import subprocess
import datetime
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def webhook():
print("\n" + "="*50)
print(f"[!] {datetime.datetime.now()} - CÓ BÁO ĐỘNG TỪ OPENSEARCH!")
try:
# Lấy dữ liệu JSON từ OpenSearch
data = request.get_json(force=True)
alert_name = data.get("alert_name", "Unknown Alert")
attacker_ip = data.get("attacker_ip", "Unknown")
print(f"[*] Tên sự kiện: {alert_name}")
print(f"[*] IP Hacker: {attacker_ip}")
if attacker_ip != "Unknown":
# ACTION 1: Tự động chặn IP trên Firewall (UFW)
print(f"\n[+] ACTION 1: Đang thực thi Block IP {attacker_ip} trên Firewall...")
subprocess.run(["sudo", "ufw", "deny", "from", attacker_ip], check=False)
# ACTION 2 & 3: Demo gọi API tạo Ticket và gửi Email
print(f"\n[+] ACTION 2: Đang gọi API tạo Ticket trên Jira/Freshservice...")
print(f"\n[+] ACTION 3: Đang gửi Email cảnh báo đến Security Team...")
return "Success", 200
except Exception as e:
print(f"[-] Lỗi xử lý SOAR: {e}")
return "Internal Server Error", 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
(Lưu ý: Bạn nhớ chạy script này bằng lệnh sudo python3 script_name.py để nó có quyền gọi lệnh UFW nhé).
Bước 3: Cấu hình Monitor & Action bắn JSON
Quay lại OpenSearch, chúng ta cấu hình Monitor để khi có sự kiện (Trigger), nó sẽ gọi Channel Webhook ở Bước 1.- Lưu ý cực kỳ quan trọng: Ở phần Message của Action, bạn phải xóa hết nội dung mặc định và truyền vào một chuẩn JSON có chứa biến IP. Ví dụ:
{
"alert_name": "Phát hiện tấn công dò quét thư mục Web",
"attacker_ip": "192.168.10.200",
"severity": "High",
"action": "block_ip"
}
Lưu ý khi Test
Lúc cấu hình xong, nếu anh em bấm nút "Send test message" ở giao diện tạo Channels, hệ thống sẽ báo lỗi 500 Internal Server Error hoặc 400 Bad Request.Nguyên nhân là do nút Test ở Channels chỉ gửi đi text thô (plain text), trong khi script Python của chúng ta đang bắt buộc phải nhận JSON để lấy IP.
Cách giải quyết: Hãy bấm nút "Send test message" ở bên trong giao diện chỉnh sửa Monitor (nơi bạn dán đoạn JSON). Khi đó OpenSearch mới lấy đúng cục JSON đó gửi đi!
Bước 4: Tận hưởng thành quả
Ngay khi có cảnh báo được Trigger (hoặc khi bấm test từ Monitor), trên màn hình Terminal của Python sẽ lập tức nhảy logs cực kỳ mượt mà. Hệ thống đã bóc tách được đúng IP 192.168.10.200 và thực thi thành công cả 3 Action: Block Firewall, Create Ticket, Send Email.
Mở thêm một tab Terminal khác, gõ lệnh sudo ufw status là sẽ thấy IP của hacker đã chễm chệ nằm trong danh sách DENY của tường lửa. Quá trình khóa IP diễn ra chưa tới 1 giây!