I. YÊU CẦU BÀI LAB
1.1. Giới thiệu bài lab
Bạn đã bao giờ nhận được cảnh báo từ hệ thống về hàng trăm lần đăng nhập thất bại chỉ trong vài phút chưa? Đó chính là dấu hiệu của tấn công brute force – một trong những mối đe dọa phổ biến nhất nhắm vào các máy chủ hiện nay.
Bài lab này xây dựng một hệ thống giám sát và tự động ngăn chặn brute force hoàn chỉnh. Khi phát hiện một địa chỉ IP có từ 5 lần đăng nhập thất bại trong vòng 5 phút, hệ thống sẽ tự động ghi nhận sự kiện và block IP đó ngay lập tức.
Thiết bị sử dụng:
1.1. Giới thiệu bài lab
Bạn đã bao giờ nhận được cảnh báo từ hệ thống về hàng trăm lần đăng nhập thất bại chỉ trong vài phút chưa? Đó chính là dấu hiệu của tấn công brute force – một trong những mối đe dọa phổ biến nhất nhắm vào các máy chủ hiện nay.
Bài lab này xây dựng một hệ thống giám sát và tự động ngăn chặn brute force hoàn chỉnh. Khi phát hiện một địa chỉ IP có từ 5 lần đăng nhập thất bại trong vòng 5 phút, hệ thống sẽ tự động ghi nhận sự kiện và block IP đó ngay lập tức.
Thiết bị sử dụng:
- Máy ảo Ubuntu 20.04 LTS (4GB RAM, 2 CPU)
- ELK Stack 7.17.15 (Elasticsearch, Logstash, Kibana)
- Python 3 và Flask cho webhook service
1.2. Mô tả vấn đề cần giải quyết
Hacker thường sử dụng các công cụ tự động để thử hàng ngàn tổ hợp username/password nhằm xâm nhập vào hệ thống. Khi bị tấn công brute force, hậu quả có thể rất nghiêm trọng: tài khoản bị chiếm đoạt, dữ liệu bị đánh cắp, hoặc máy chủ bị sử dụng làm bàn đạp tấn công các hệ thống khác.
Vấn đề đặt ra là: làm thế nào để phát hiện kịp thời các hành vi brute force và tự động ngăn chặn chúng mà không cần sự can thiệp thủ công của nhân viên bảo mật? Bài lab này giải quyết vấn đề bằng cách xây dựng một pipeline tự động từ thu thập log, phân tích phát hiện đến hành động ngăn chặn.
Hình 1: Kiến trúc tổng thể hệ thống phát hiện và ngăn chặn Brute Force
II. TRIỂN KHAI
2.1. Giải pháp tổng thể
Hệ thống sử dụng ELK Stack làm nền tảng xử lý log trung tâm kết hợp với webhook service để block IP. Dữ liệu log authentication được Logstash đọc từ file hệ thống, parse và gửi vào Elasticsearch. Một detection rule trên Kibana phát hiện các IP có số lần thất bại vượt ngưỡng. Khi phát hiện, watcher sẽ kích hoạt webhook service để thực hiện chặn IP.
2.2. Cài đặt ELK Stack
Bước 1: Tạo thư mục dự án và file docker-compose.yml
yaml
version: '3.3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.15
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
networks:
- lab
logstash:
image: docker.elastic.co/logstash/logstash:7.17.15
container_name: logstash
ports:
- "5514:5514/udp"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
- /var/log/auth:/var/log/auth:ro
depends_on:
- elasticsearch
networks:
- lab
kibana:
image: docker.elastic.co/kibana/kibana:7.17.15
container_name: kibana
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- lab
networks:
lab:
driver: bridge
Bước 2: Cấu hình Logstash
ruby
input {
file {
path => "/var/log/auth/auth.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => {
"message" => [
"%{TIMESTAMP_ISO8601:timestamp} src=%{IP:src_ip} user=%{USER:user} status=%{WORD:status} message=%{GREEDYDATA:message}"
]
}
}
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
mutate {
add_field => { "[event][outcome]" => "%{status}" }
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "auth-logs-%{+YYYY.MM.dd}"
}
}
Bước 3: Khởi động stack
bash
docker-compose up -d
Hình 2: Ba container elasticsearch, logstash, kibana đang chạy
2.3. Chuẩn bị dữ liệu mô phỏng
Tạo file log mô phỏng hành vi tấn công:
bash
sudo mkdir -p /var/log/auth
sudo tee /var/log/auth/auth.log > /dev/null << 'EOF'
2026-06-10T10:00:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
2026-06-10T10:01:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
2026-06-10T10:02:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
2026-06-10T10:03:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
2026-06-10T10:04:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
EOF
2.4. Tạo Detection Rule trên Kibana
Bước 1: Khởi tạo detection engine
bash
curl -X POST "http://localhost:5601/api/detection_engine/index" -H 'kbn-xsrf: true'
Bước 2: Tạo rule phát hiện brute force
bash
curl -X POST "http://localhost:5601/api/detection_engine/rules" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d'
{
"rule_id": "bruteforce-detection-001",
"name": "Brute Force Detection - Failed Login > 5 times",
"index": ["auth-logs-*"],
"query": "event.outcome: failed",
"severity": "high",
"risk_score": 75,
"type": "query",
"interval": "5m",
"from": "now-5m",
"enabled": true
}'
Hình 3: Detection rule đã được tạo thành công trên Kibana
2.5. Xây dựng Webhook Block Service
Tạo webhook service bằng Flask:
python
# webhook_server.py
from flask import Flask, request, jsonify
from datetime import datetime
app = Flask(__name__)
blocked_ips = set()
@app.route('/block', methods=['POST'])
def block_ip():
data = request.json
ip = data.get('ip')
if ip and ip not in blocked_ips:
blocked_ips.add(ip)
with open('block_audit.log', 'a') as f:
f.write(f"{datetime.now()} - Blocked IP: {ip}\n")
return {"status": "blocked", "ip": ip}, 200
return {"status": "already_blocked"}, 200
@app.route('/health', methods=['GET'])
def health():
return {"status": "running", "blocked_count": len(blocked_ips)}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
Chạy service:
bash
pip3 install flask
python3 webhook_server.py &
2.6. Tạo Watcher Alert kết hợp Block IP
bash
curl -X PUT "http://localhost:9200/_watcher/watch/bruteforce_block_alert" -H 'Content-Type: application/json' -d'
{
"trigger": { "schedule": { "interval": "5m" } },
"input": {
"search": {
"request": {
"indices": ["auth-logs-*"],
"body": {
"query": {
"bool": {
"must": [
{ "term": { "event.outcome": "failed" } },
{ "range": { "@timestamp": { "gte": "now-5m" } } }
]
}
},
"aggs": {
"failed_by_ip": {
"terms": { "field": "src_ip.keyword", "size": 10, "min_doc_count": 5 }
}
}
}
}
}
},
"condition": {
"compare": { "ctx.payload.aggregations.failed_by_ip.buckets.0.doc_count": { "gt": 4 } }
},
"actions": {
"block_ip": {
"webhook": {
"host": "host.docker.internal",
"port": 8080,
"method": "post",
"path": "/block",
"body": "{\"ip\": \"{{ctx.payload.aggregations.failed_by_ip.buckets.0.key}}\"}"
}
},
"log_audit": {
"logging": {
"text": "BRUTE FORCE BLOCKED: IP {{ctx.payload.aggregations.failed_by_ip.buckets.0.key}}"
}
}
}
}'
Hình 4: Watcher alert đã được tạo với trạng thái active
III. KẾT LUẬN
3.1. Kết quả đạt được
Sau khi chạy kịch bản mô phỏng 5 lần đăng nhập thất bại từ IP 10.0.0.100, hệ thống đã phát hiện chính xác và thực hiện block IP thành công. Watcher ghi nhận log "BRUTE FORCE BLOCKED: IP 10.0.0.100 with 5 failed attempts", webhook service cũng ghi nhận hành động block. Toàn bộ quy trình từ lúc tấn công đến lúc bị chặn đã được kiểm chứng end-to-end.
Hình 5: Kết quả watcher history hiển thị đã phát hiện và block IP 10.0.0.100 thành công
3.2. Giá trị mang lại
Hệ thống này giúp doanh nghiệp tự động hóa hoàn toàn quy trình phát hiện và ứng phó với tấn công brute force. Thay vì phải ngồi canh log cả ngày, đội ngũ bảo mật có thể tập trung vào các nhiệm vụ quan trọng hơn. Thời gian phát hiện và xử lý sự cố được rút ngắn từ hàng giờ xuống còn vài phút. Giải pháp có thể mở rộng để kết nối với các firewall thật như FortiGate, pfSense, hoặc tích hợp thêm kênh cảnh báo qua Telegram, Slack.
Hacker thường sử dụng các công cụ tự động để thử hàng ngàn tổ hợp username/password nhằm xâm nhập vào hệ thống. Khi bị tấn công brute force, hậu quả có thể rất nghiêm trọng: tài khoản bị chiếm đoạt, dữ liệu bị đánh cắp, hoặc máy chủ bị sử dụng làm bàn đạp tấn công các hệ thống khác.
Vấn đề đặt ra là: làm thế nào để phát hiện kịp thời các hành vi brute force và tự động ngăn chặn chúng mà không cần sự can thiệp thủ công của nhân viên bảo mật? Bài lab này giải quyết vấn đề bằng cách xây dựng một pipeline tự động từ thu thập log, phân tích phát hiện đến hành động ngăn chặn.
Hình 1: Kiến trúc tổng thể hệ thống phát hiện và ngăn chặn Brute Force
II. TRIỂN KHAI
2.1. Giải pháp tổng thể
Hệ thống sử dụng ELK Stack làm nền tảng xử lý log trung tâm kết hợp với webhook service để block IP. Dữ liệu log authentication được Logstash đọc từ file hệ thống, parse và gửi vào Elasticsearch. Một detection rule trên Kibana phát hiện các IP có số lần thất bại vượt ngưỡng. Khi phát hiện, watcher sẽ kích hoạt webhook service để thực hiện chặn IP.
2.2. Cài đặt ELK Stack
Bước 1: Tạo thư mục dự án và file docker-compose.yml
yaml
version: '3.3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.15
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
networks:
- lab
logstash:
image: docker.elastic.co/logstash/logstash:7.17.15
container_name: logstash
ports:
- "5514:5514/udp"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
- /var/log/auth:/var/log/auth:ro
depends_on:
- elasticsearch
networks:
- lab
kibana:
image: docker.elastic.co/kibana/kibana:7.17.15
container_name: kibana
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- lab
networks:
lab:
driver: bridge
Bước 2: Cấu hình Logstash
ruby
input {
file {
path => "/var/log/auth/auth.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => {
"message" => [
"%{TIMESTAMP_ISO8601:timestamp} src=%{IP:src_ip} user=%{USER:user} status=%{WORD:status} message=%{GREEDYDATA:message}"
]
}
}
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
mutate {
add_field => { "[event][outcome]" => "%{status}" }
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "auth-logs-%{+YYYY.MM.dd}"
}
}
Bước 3: Khởi động stack
bash
docker-compose up -d
Hình 2: Ba container elasticsearch, logstash, kibana đang chạy
2.3. Chuẩn bị dữ liệu mô phỏng
Tạo file log mô phỏng hành vi tấn công:
bash
sudo mkdir -p /var/log/auth
sudo tee /var/log/auth/auth.log > /dev/null << 'EOF'
2026-06-10T10:00:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
2026-06-10T10:01:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
2026-06-10T10:02:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
2026-06-10T10:03:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
2026-06-10T10:04:00Z src=10.0.0.100 user=admin status=failed message="Failed password"
EOF
2.4. Tạo Detection Rule trên Kibana
Bước 1: Khởi tạo detection engine
bash
curl -X POST "http://localhost:5601/api/detection_engine/index" -H 'kbn-xsrf: true'
Bước 2: Tạo rule phát hiện brute force
bash
curl -X POST "http://localhost:5601/api/detection_engine/rules" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d'
{
"rule_id": "bruteforce-detection-001",
"name": "Brute Force Detection - Failed Login > 5 times",
"index": ["auth-logs-*"],
"query": "event.outcome: failed",
"severity": "high",
"risk_score": 75,
"type": "query",
"interval": "5m",
"from": "now-5m",
"enabled": true
}'
Hình 3: Detection rule đã được tạo thành công trên Kibana
2.5. Xây dựng Webhook Block Service
Tạo webhook service bằng Flask:
python
# webhook_server.py
from flask import Flask, request, jsonify
from datetime import datetime
app = Flask(__name__)
blocked_ips = set()
@app.route('/block', methods=['POST'])
def block_ip():
data = request.json
ip = data.get('ip')
if ip and ip not in blocked_ips:
blocked_ips.add(ip)
with open('block_audit.log', 'a') as f:
f.write(f"{datetime.now()} - Blocked IP: {ip}\n")
return {"status": "blocked", "ip": ip}, 200
return {"status": "already_blocked"}, 200
@app.route('/health', methods=['GET'])
def health():
return {"status": "running", "blocked_count": len(blocked_ips)}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
Chạy service:
bash
pip3 install flask
python3 webhook_server.py &
2.6. Tạo Watcher Alert kết hợp Block IP
bash
curl -X PUT "http://localhost:9200/_watcher/watch/bruteforce_block_alert" -H 'Content-Type: application/json' -d'
{
"trigger": { "schedule": { "interval": "5m" } },
"input": {
"search": {
"request": {
"indices": ["auth-logs-*"],
"body": {
"query": {
"bool": {
"must": [
{ "term": { "event.outcome": "failed" } },
{ "range": { "@timestamp": { "gte": "now-5m" } } }
]
}
},
"aggs": {
"failed_by_ip": {
"terms": { "field": "src_ip.keyword", "size": 10, "min_doc_count": 5 }
}
}
}
}
}
},
"condition": {
"compare": { "ctx.payload.aggregations.failed_by_ip.buckets.0.doc_count": { "gt": 4 } }
},
"actions": {
"block_ip": {
"webhook": {
"host": "host.docker.internal",
"port": 8080,
"method": "post",
"path": "/block",
"body": "{\"ip\": \"{{ctx.payload.aggregations.failed_by_ip.buckets.0.key}}\"}"
}
},
"log_audit": {
"logging": {
"text": "BRUTE FORCE BLOCKED: IP {{ctx.payload.aggregations.failed_by_ip.buckets.0.key}}"
}
}
}
}'
Hình 4: Watcher alert đã được tạo với trạng thái active
III. KẾT LUẬN
3.1. Kết quả đạt được
Sau khi chạy kịch bản mô phỏng 5 lần đăng nhập thất bại từ IP 10.0.0.100, hệ thống đã phát hiện chính xác và thực hiện block IP thành công. Watcher ghi nhận log "BRUTE FORCE BLOCKED: IP 10.0.0.100 with 5 failed attempts", webhook service cũng ghi nhận hành động block. Toàn bộ quy trình từ lúc tấn công đến lúc bị chặn đã được kiểm chứng end-to-end.
Hình 5: Kết quả watcher history hiển thị đã phát hiện và block IP 10.0.0.100 thành công
3.2. Giá trị mang lại
Hệ thống này giúp doanh nghiệp tự động hóa hoàn toàn quy trình phát hiện và ứng phó với tấn công brute force. Thay vì phải ngồi canh log cả ngày, đội ngũ bảo mật có thể tập trung vào các nhiệm vụ quan trọng hơn. Thời gian phát hiện và xử lý sự cố được rút ngắn từ hàng giờ xuống còn vài phút. Giải pháp có thể mở rộng để kết nối với các firewall thật như FortiGate, pfSense, hoặc tích hợp thêm kênh cảnh báo qua Telegram, Slack.