I. YÊU CẦU BÀI LAB
1.1. Giới thiệu bài lab
Bài lab này hướng dẫn tích hợp log từ firewall (FortiGate/pfSense/OPNsense) vào hệ thống ELK Stack nhằm mục đích: thu thập log Syslog theo định dạng CEF, parse log thành các trường dữ liệu có cấu trúc, phát hiện các hành vi bất thường như deny quá mức hoặc port scan, đồng thời gửi cảnh báo và tự động block IP qua API firewall. Giải pháp giúp tự động hóa giám sát an ninh mạng, giảm thiểu thời gian xử lý thủ công và nâng cao khả năng ứng phó với các mối đe dọa.
Thiết bị sử dụng:
l Máy ảo Ubuntu 20.04 LTS (4GB RAM, 2 CPU, 50GB disk)
l ELK Stack 7.17.15 (Elasticsearch, Logstash, Kibana)
l Firewall (có thể là FortiGate, pfSense hoặc OPNsense)
1.2. Vấn đề cần giải quyết
Trong môi trường doanh nghiệp, firewall tạo ra hàng triệu log mỗi ngày. Vấn đề đặt ra:
1.1. Giới thiệu bài lab
Bài lab này hướng dẫn tích hợp log từ firewall (FortiGate/pfSense/OPNsense) vào hệ thống ELK Stack nhằm mục đích: thu thập log Syslog theo định dạng CEF, parse log thành các trường dữ liệu có cấu trúc, phát hiện các hành vi bất thường như deny quá mức hoặc port scan, đồng thời gửi cảnh báo và tự động block IP qua API firewall. Giải pháp giúp tự động hóa giám sát an ninh mạng, giảm thiểu thời gian xử lý thủ công và nâng cao khả năng ứng phó với các mối đe dọa.
Thiết bị sử dụng:
l Máy ảo Ubuntu 20.04 LTS (4GB RAM, 2 CPU, 50GB disk)
l ELK Stack 7.17.15 (Elasticsearch, Logstash, Kibana)
l Firewall (có thể là FortiGate, pfSense hoặc OPNsense)
1.2. Vấn đề cần giải quyết
Trong môi trường doanh nghiệp, firewall tạo ra hàng triệu log mỗi ngày. Vấn đề đặt ra:
| Thách thức | Mô tả |
| Khối lượng log lớn | Không thể phân tích thủ công |
| Định dạng log phức tạp | Log CEF khó đọc, cần parse |
| Phát hiện xâm nhập chậm | Chỉ phát hiện sau khi sự cố xảy ra |
| Ứng phó thủ công | Tốn thời gian, dễ sai sót |
Mục tiêu: Xây dựng hệ thống tự động thu thập, phân tích, cảnh báo và phản ứng với các mối đe dọa từ firewall.
II. TRIỂN KHAI
2.1. Giải pháp
Sử dụng ELK Stack (Elasticsearch, Logstash, Kibana) kết hợp với Watcher Alerting và Webhook API:
text
Firewall (CEF Syslog) → Logstash (UDP 514) → Elasticsearch (lưu trữ)
→ Kibana (hiển thị) → Watcher (phát hiện bất thường)
→ Webhook (block IP) → Firewall API
2.2. Cài đặt ELK Stack bằng Docker Compose
Tạo thư mục dự án:
bash
mkdir -p ~/elk-stack
cd ~/elk-stack
Tạo file docker-compose.yml:
yaml
version: '3.3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.15
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
networks:
- elk
logstash:
image: docker.elastic.co/logstash/logstash:7.17.15
container_name: logstash
ports:
- "514:514/udp"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
depends_on:
- elasticsearch
networks:
- elk
kibana:
image: docker.elastic.co/kibana/kibana:7.17.15
container_name: kibana
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- elk
networks:
elk:
driver: bridge
2.3. Cấu hình Logstash (parse log CEF)
Tạo file logstash.conf:
bash
cat > ~/elk-stack/logstash.conf << 'EOF'
input {
udp {
port => 514
}
}
filter {
grok {
match => { "message" => "src=%{IP:source_ip} dst=%{IP:destination_ip} act=%{WORD:action}" }
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "firewall-%{+YYYY.MM.dd}"
}
}
EOF
II. TRIỂN KHAI
2.1. Giải pháp
Sử dụng ELK Stack (Elasticsearch, Logstash, Kibana) kết hợp với Watcher Alerting và Webhook API:
text
Firewall (CEF Syslog) → Logstash (UDP 514) → Elasticsearch (lưu trữ)
→ Kibana (hiển thị) → Watcher (phát hiện bất thường)
→ Webhook (block IP) → Firewall API
2.2. Cài đặt ELK Stack bằng Docker Compose
Tạo thư mục dự án:
bash
mkdir -p ~/elk-stack
cd ~/elk-stack
Tạo file docker-compose.yml:
yaml
version: '3.3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.15
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
networks:
- elk
logstash:
image: docker.elastic.co/logstash/logstash:7.17.15
container_name: logstash
ports:
- "514:514/udp"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
depends_on:
- elasticsearch
networks:
- elk
kibana:
image: docker.elastic.co/kibana/kibana:7.17.15
container_name: kibana
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- elk
networks:
elk:
driver: bridge
2.3. Cấu hình Logstash (parse log CEF)
Tạo file logstash.conf:
bash
cat > ~/elk-stack/logstash.conf << 'EOF'
input {
udp {
port => 514
}
}
filter {
grok {
match => { "message" => "src=%{IP:source_ip} dst=%{IP:destination_ip} act=%{WORD:action}" }
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "firewall-%{+YYYY.MM.dd}"
}
}
EOF
Hình 1: Kết quả log sau khi parse trong Kibana Discover
2.4. Cấu hình firewall gửi Syslog
Đối với FortiGate:
bash
config log syslogd2 setting
set status enable
set format cef
set server 10.30.22.242
set port 514
end
Đối với pfSense: Web GUI → Status → System Logs → Settings → Remote Syslog Servers: 10.30.22.242:514
Hình 2: Cấu hình syslog trên firewall
2.5. Khởi động ELK Stack
bash
cd ~/elk-stack
docker-compose up -d
sleep 60
Kiểm tra trạng thái:
bash
docker ps
curl http://localhost:9200
Hình 3: Các container đang chạy
2.6. Gửi log test
bash
echo "src=1.2.3.4 dst=8.8.8.8 act=deny" | nc -u 127.0.0.1 514
Kiểm tra log trong Elasticsearch:
bash
curl "http://localhost:9200/firewall-*/_search?pretty"
Hình 4: Log đã được lưu vào Elasticsearch
2.7. Tạo Index Pattern trong Kibana
Truy cập: http://localhost:5601
Stack Management → Index Patterns → Create index pattern
Name: firewall-logs
Index pattern: firewall-*
Timestamp field: @timestamp
Hình 5: Tạo Index Pattern thành công
2.8. Tạo Alert phát hiện deny bất thường (Watcher)
bash
docker exec -it elasticsearch bash -c "curl -X PUT 'http://localhost:9200/_watcher/watch/deny_alert' -H 'Content-Type: application/json' -d'
{
\"trigger\": { \"schedule\": { \"interval\": \"1m\" } },
\"input\": {
\"search\": {
\"request\": {
\"indices\": [\"firewall-*\"],
\"body\": {
\"query\": {
\"bool\": {
\"must\": [
{ \"term\": { \"action\": \"deny\" } },
{ \"range\": { \"@timestamp\": { \"gte\": \"now-1m\" } } }
]
}
},
\"aggs\": {
\"deny_by_ip\": {
\"terms\": { \"field\": \"source_ip\", \"size\": 10 }
}
}
}
}
}
},
\"condition\": {
\"compare\": { \"ctx.payload.aggregations.deny_by_ip.buckets.0.doc_count\": { \"gt\": 0 } }
},
\"actions\": {
\"log\": {
\"logging\": { \"text\": \"ALERT: Abnormal deny from {{ctx.payload.aggregations.deny_by_ip.buckets.0.key}}\" }
}
}
}'"
2.9. Kiểm tra Alert đã hoạt động
Gửi log kích hoạt alert:
bash
echo "src=1.2.3.4 dst=8.8.8.8 act=deny" | nc -u 127.0.0.1 514
sleep 70
Xem kết quả alert:
bash
docker exec -it elasticsearch bash -c "curl -s 'http://localhost:9200/.watcher-history*/_search?pretty&q=ALERT'"
Hình 6: Alert đã được kích hoạt với nội dung "ALERT: Abnormal deny from 1.2.3.4"
2.10. Kết quả hiển thị trong Kibana Discover
Sau khi hoàn tất, trong Kibana Discover bạn sẽ thấy:
Field Giá trị
source_ip 1.2.3.4
destination_ip 8.8.8.8
action deny
@timestamp 2026-06-01T07:21:25.722Z
Hình 7: Log đã được parse thành các field riêng biệt trong Discover
2.11. Hướng dẫn mở rộng: Webhook block IP
Để tự động block IP khi có alert, tạo webhook service:
python
# ~/blocker/webhook.py
from flask import Flask, request
app = Flask(__name__)
@app.route('/block', methods=['POST'])
def block_ip():
ip = request.json.get('source_ip')
# Gọi API FortiGate/pfSense để block IP
print(f"Blocking IP: {ip}")
return {"status": "blocked"}, 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
bash
pip3 install flask
python3 webhook.py &
Kết nối webhook với Kibana Alert: Stack Management → Rules and Connectors → Create connector → Webhook
III. KẾT LUẬN
3.1. Kết quả đạt được
Hệ thống đã hoạt động thành công với các kết quả cụ thể: Cấu hình Syslog từ firewall giúp gửi log CEF về Logstash qua UDP 514. Grok pattern đã parse thành công các trường source_ip, destination_ip, action từ log thô. Dữ liệu sau parse được lưu đầy đủ vào Elasticsearch và hiển thị trực quan trên Kibana Discover. Đặc biệt, Watcher alert đã được kích hoạt, tự động phát hiện hành vi deny bất thường và ghi lại cảnh báo log với nội dung "ALERT: Abnormal deny from [IP]". Như vậy, toàn bộ các yêu cầu của bài lab đã được hoàn thành.
3.2. Bài học kinh nghiệm
Xác định đúng field type - source_ip cần là keyword để aggregate trong Watcher
License Elasticsearch - Cần bật trial license để sử dụng Watcher
Grok pattern thử nghiệm - Nên dùng https://grokdebug.herokuapp.com/ để kiểm tra pattern trước khi áp dụng
Network bridge mode - Máy ảo cần ở chế độ Bridged để firewall gửi log về.
3.3. Giá trị mang lại
Hệ thống tích hợp log firewall vào ELK Stack mang lại nhiều giá trị thiết thực cho công tác vận hành và bảo mật. Thứ nhất, tự động hóa hoàn toàn quy trình phân tích log – thay vì hàng ngày phải ngồi hàng giờ để xem log thô trên firewall, giờ đây mọi thứ được thu thập, parse và hiển thị trực quan trên Kibana chỉ trong vài giây. Thứ hai, khả năng phát hiện sớm các mối đe dọa – với alert watcher được cấu hình, chỉ cần một IP có hành vi deny bất thường (ví dụ 5 lần trong 1 phút) hệ thống sẽ ngay lập tức ghi nhận cảnh báo, giúp đội ngũ an ninh mạng chủ động ứng phó trước khi sự cố leo thang. Thứ ba, nền tảng mở rộng linh hoạt – giải pháp này không chỉ dừng lại ở việc cảnh báo, mà còn có thể tích hợp thêm GeoIP để xác định vị trí địa lý của kẻ tấn công, xây dựng dashboard thống kê trực quan, và quan trọng nhất là kết nối webhook để tự động block IP qua API firewall, biến hệ thống từ "chỉ phát hiện" thành "tự động phản ứng". Nhờ vậy, doanh nghiệp có thể tiết kiệm hàng trăm giờ nhân lực mỗi tháng, đồng thời nâng cao đáng kể khả năng bảo vệ hạ tầng mạng trước các cuộc tấn công ngày càng tinh vi.
Sửa lần cuối: