SIEM/Log Management Tự động phát hiện và ngăn chặn tấn công Brute Force với ELK Stack

I. YÊU CẦU BÀI LAB
1.1. Giới thiệu bài lab

Bạn đã bao giờ nhận được cảnh báo từ hệ thống về hàng trăm lần đăng nhập thất bại chỉ trong vài phút chưa? Đó chính là dấu hiệu của tấn công brute force – một trong những mối đe dọa phổ biến nhất nhắm vào các máy chủ hiện nay.
Bài lab này xây dựng một hệ thống giám sát và tự động ngăn chặn brute force hoàn chỉnh. Khi phát hiện một địa chỉ IP có từ 5 lần đăng nhập thất bại trong vòng 5 phút, hệ thống sẽ tự động ghi nhận sự kiện và block IP đó ngay lập tức.

Thiết bị sử dụng:​
  • Máy ảo Ubuntu 20.04 LTS (4GB RAM, 2 CPU)​
  • ELK Stack 7.17.15 (Elasticsearch, Logstash, Kibana)​
  • Python 3 và Flask cho webhook service​
1.2. Mô tả vấn đề cần giải quyết
Hacker thường sử dụng các công cụ tự động để thử hàng ngàn tổ hợp username/password nhằm xâm nhập vào hệ thống. Khi bị tấn công brute force, hậu quả có thể rất nghiêm trọng: tài khoản bị chiếm đoạt, dữ liệu bị đánh cắp, hoặc máy chủ bị sử dụng làm bàn đạp tấn công các hệ thống khác.
Vấn đề đặt ra là: làm thế nào để phát hiện kịp thời các hành vi brute force và tự động ngăn chặn chúng mà không cần sự can thiệp thủ công của nhân viên bảo mật? Bài lab này giải quyết vấn đề bằng cách xây dựng một pipeline tự động từ thu thập log, phân tích phát hiện đến hành động ngăn chặn.
1781092224392.png

Hình 1: Kiến trúc tổng thể hệ thống phát hiện và ngăn chặn Brute Force
II. TRIỂN KHAI
2.1. Giải pháp tổng thể

Hệ thống sử dụng ELK Stack làm nền tảng xử lý log trung tâm kết hợp với webhook service để block IP. Dữ liệu log authentication được Logstash đọc từ file hệ thống, parse và gửi vào Elasticsearch. Một detection rule trên Kibana phát hiện các IP có số lần thất bại vượt ngưỡng. Khi phát hiện, watcher sẽ kích hoạt webhook service để thực hiện chặn IP.

2.2. Cài đặt ELK Stack
Bước 1: Tạo thư mục dự án và file docker-compose.yml

yaml

version: '3.3'

services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.15
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
networks:
- lab

logstash:
image: docker.elastic.co/logstash/logstash:7.17.15
container_name: logstash
ports:
- "5514:5514/udp"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
- /var/log/auth:/var/log/auth:ro
depends_on:
- elasticsearch
networks:
- lab
kibana:
image: docker.elastic.co/kibana/kibana:7.17.15
container_name: kibana
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- lab

networks:
lab:
driver: bridge

Bước 2: Cấu hình Logstash

ruby
input {
file {
path => "/var/log/auth/auth.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}

filter {
grok {
match => {
"message" => [
"%{TIMESTAMP_ISO8601:timestamp} src=%{IP:src_ip} user=%{USER:user} status=%{WORD:status} message=%{GREEDYDATA:message}"
]
}
}

date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}

mutate {
add_field => { "[event][outcome]" => "%{status}" }
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "auth-logs-%{+YYYY.MM.dd}"
}
}

Bước 3: Khởi động stack

bash

docker-compose up -d

1781087960494.png

Hình 2: Ba container elasticsearch, logstash, kibana đang chạy

2.3. Chuẩn bị dữ liệu mô phỏng

Tạo file log mô phỏng hành vi tấn công:

bash

sudo mkdir -p /var/log/auth
sudo tee /var/log/auth/auth.log > /dev/null << 'EOF'
2026-06-10T10:00:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
2026-06-10T10:01:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
2026-06-10T10:02:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
2026-06-10T10:03:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
2026-06-10T10:04:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
EOF

2.4. Tạo Detection Rule trên Kibana

Bước 1: Khởi tạo detection engine

bash

curl -X POST "http://localhost:5601/api/detection_engine/index" -H 'kbn-xsrf: true'

Bước 2: Tạo rule phát hiện brute force

bash

curl -X POST "http://localhost:5601/api/detection_engine/rules" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d'
{
"rule_id": "bruteforce-detection-001",
"name": "Brute Force Detection - Failed Login > 5 times",
"index": ["auth-logs-*"],
"query": "event.outcome: failed",
"severity": "high",
"risk_score": 75,
"type": "query",
"interval": "5m",
"from": "now-5m",
"enabled": true
}'

1781089585621.png

Hình 3: Detection rule đã được tạo thành công trên Kibana

2.5. Xây dựng Webhook Block Service
Tạo webhook service bằng Flask:

python

# webhook_server.py
from flask import Flask, request, jsonify
from datetime import datetime

app = Flask(__name__)
blocked_ips = set()

@app.route('/block', methods=['POST'])
def block_ip():
data = request.json
ip = data.get('ip')
if ip and ip not in blocked_ips:
blocked_ips.add(ip)
with open('block_audit.log', 'a') as f:
f.write(f"{datetime.now()} - Blocked IP: {ip}\n")
return {"status": "blocked", "ip": ip}, 200
return {"status": "already_blocked"}, 200

@app.route('/health', methods=['GET'])
def health():
return {"status": "running", "blocked_count": len(blocked_ips)}

if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)

Chạy service:

bash

pip3 install flask
python3 webhook_server.py &

2.6. Tạo Watcher Alert kết hợp Block IP

bash
curl -X PUT "http://localhost:9200/_watcher/watch/bruteforce_block_alert" -H 'Content-Type: application/json' -d'
{
"trigger": { "schedule": { "interval": "5m" } },
"input": {
"search": {
"request": {
"indices": ["auth-logs-*"],
"body": {
"query": {
"bool": {
"must": [
{ "term": { "event.outcome": "failed" } },
{ "range": { "@timestamp": { "gte": "now-5m" } } }
]
}
},
"aggs": {
"failed_by_ip": {
"terms": { "field": "src_ip.keyword", "size": 10, "min_doc_count": 5 }
}
}
}
}
}
},
"condition": {
"compare": { "ctx.payload.aggregations.failed_by_ip.buckets.0.doc_count": { "gt": 4 } }
},
"actions": {
"block_ip": {
"webhook": {
"host": "host.docker.internal",
"port": 8080,
"method": "post",
"path": "/block",
"body": "{\"ip\": \"{{ctx.payload.aggregations.failed_by_ip.buckets.0.key}}\"}"
}
},
"log_audit": {
"logging": {
"text": "BRUTE FORCE BLOCKED: IP {{ctx.payload.aggregations.failed_by_ip.buckets.0.key}}"
}
}
}
}'

1781089684301.png

Hình 4: Watcher alert đã được tạo với trạng thái active

III. KẾT LUẬN
3.1. Kết quả đạt được

Sau khi chạy kịch bản mô phỏng 5 lần đăng nhập thất bại từ IP 10.0.0.100, hệ thống đã phát hiện chính xác và thực hiện block IP thành công. Watcher ghi nhận log "BRUTE FORCE BLOCKED: IP 10.0.0.100 with 5 failed attempts", webhook service cũng ghi nhận hành động block. Toàn bộ quy trình từ lúc tấn công đến lúc bị chặn đã được kiểm chứng end-to-end.

1781091752040.png

Hình 5: Kết quả watcher history hiển thị đã phát hiện và block IP 10.0.0.100 thành công
3.2. Giá trị mang lại
Hệ thống này giúp doanh nghiệp tự động hóa hoàn toàn quy trình phát hiện và ứng phó với tấn công brute force. Thay vì phải ngồi canh log cả ngày, đội ngũ bảo mật có thể tập trung vào các nhiệm vụ quan trọng hơn. Thời gian phát hiện và xử lý sự cố được rút ngắn từ hàng giờ xuống còn vài phút. Giải pháp có thể mở rộng để kết nối với các firewall thật như FortiGate, pfSense, hoặc tích hợp thêm kênh cảnh báo qua Telegram, Slack.​
 
Back
Top