I. YÊU CẦU BÀI LAB
1.1. Giới thiệu bài lab
Bài lab thực hiện tích hợp ba nguồn dữ liệu bảo mật vào hệ thống ELK Stack bao gồm: Log NAC (Network Access Control) để theo dõi trạng thái tuân thủ của các endpoint, kết quả quét lỗ hổng từ OpenVAS để phát hiện điểm yếu bảo mật, và nguồn threat intelligence từ MISP/OTX AlienVault để đồng bộ các chỉ báo xâm nhập. Mục tiêu cuối cùng là xây dựng hệ thống cảnh báo tự động khi phát hiện lỗ hổng mức Critical hoặc endpoint có dấu hiệu kết nối đến C2 Server.
Thiết bị sử dụng:
1.1. Giới thiệu bài lab
Bài lab thực hiện tích hợp ba nguồn dữ liệu bảo mật vào hệ thống ELK Stack bao gồm: Log NAC (Network Access Control) để theo dõi trạng thái tuân thủ của các endpoint, kết quả quét lỗ hổng từ OpenVAS để phát hiện điểm yếu bảo mật, và nguồn threat intelligence từ MISP/OTX AlienVault để đồng bộ các chỉ báo xâm nhập. Mục tiêu cuối cùng là xây dựng hệ thống cảnh báo tự động khi phát hiện lỗ hổng mức Critical hoặc endpoint có dấu hiệu kết nối đến C2 Server.
Thiết bị sử dụng:
- Máy ảo Ubuntu 20.04 LTS (4GB RAM, 2 CPU, 50GB ổ đĩa)
- ELK Stack 7.17.15 (Elasticsearch, Logstash, Kibana)
- Dữ liệu mẫu NAC, OpenVAS XML, IOC test
1.2. Mô tả vấn đề cần giải quyết
Trong thực tế vận hành, các mối đe dọa bảo mật đến từ nhiều hướng khác nhau. Thiết bị không tuân thủ chính sách, lỗ hổng phần mềm chưa được vá, và các chỉ báo xâm nhập từ cộng đồng thường được quản lý riêng rẽ. Điều này gây khó khăn cho đội ngũ bảo mật trong việc phát hiện và ứng phó kịp thời. Bài lab giải quyết bài toán tập trung hóa dữ liệu, kết hợp thông tin từ nhiều nguồn để phát hiện các nguy cơ như thiết bị không compliant nhưng lại có lỗ hổng Critical hoặc endpoint đang kết nối đến địa chỉ đã được ghi nhận là C2.
II. TRIỂN KHAI
2.1. Giải pháp tổng thể
Hệ thống sử dụng ELK Stack làm nền tảng xử lý log trung tâm. Dữ liệu từ NAC được gửi qua syslog UDP, kết quả từ OpenVAS ở dạng file XML được Logstash đọc và parse, còn threat intelligence được đồng bộ qua API. Toàn bộ dữ liệu sau khi xử lý được lưu vào Elasticsearch và hiển thị trên Kibana. Cảnh báo được thiết lập bằng Watcher để tự động phát hiện các bất thường.
Sơ đồ kiến trúc:
Trong thực tế vận hành, các mối đe dọa bảo mật đến từ nhiều hướng khác nhau. Thiết bị không tuân thủ chính sách, lỗ hổng phần mềm chưa được vá, và các chỉ báo xâm nhập từ cộng đồng thường được quản lý riêng rẽ. Điều này gây khó khăn cho đội ngũ bảo mật trong việc phát hiện và ứng phó kịp thời. Bài lab giải quyết bài toán tập trung hóa dữ liệu, kết hợp thông tin từ nhiều nguồn để phát hiện các nguy cơ như thiết bị không compliant nhưng lại có lỗ hổng Critical hoặc endpoint đang kết nối đến địa chỉ đã được ghi nhận là C2.
II. TRIỂN KHAI
2.1. Giải pháp tổng thể
Hệ thống sử dụng ELK Stack làm nền tảng xử lý log trung tâm. Dữ liệu từ NAC được gửi qua syslog UDP, kết quả từ OpenVAS ở dạng file XML được Logstash đọc và parse, còn threat intelligence được đồng bộ qua API. Toàn bộ dữ liệu sau khi xử lý được lưu vào Elasticsearch và hiển thị trên Kibana. Cảnh báo được thiết lập bằng Watcher để tự động phát hiện các bất thường.
Sơ đồ kiến trúc:
Watcher Alert (Critical & C2)
2.2. Kiến trúc Elasticsearch indices
Index Mục đích Nguồn dữ liệu
vulnerability-* Kết quả quét lỗ hổng OpenVAS
threat-intel-* IOC từ threat intelligence MISP / OTX AlienVault
enriched-alerts Dữ liệu đã được kết hợp và làm giàu Từ 2 nguồn trên
2.3. Cài đặt ELK Stack bằng Docker Compose
Bước 1: Tạo thư mục dự án và file docker-compose.yml
bash
mkdir -p ~/elk-stack
cd ~/elk-stack
yaml
# docker-compose.yml
version: '3.3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.15
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
networks:
- elk
logstash:
image: docker.elastic.co/logstash/logstash:7.17.15
container_name: logstash
ports:
- "514:514/udp"
- "5514:5514/udp"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
- /opt/openvas-results:/opt/openvas-results:ro
depends_on:
- elasticsearch
networks:
- elk
kibana:
image: docker.elastic.co/kibana/kibana:7.17.15
container_name: kibana
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- elk
networks:
elk:
driver: bridge
Bước 2: Khởi động stack
bash
docker-compose up -d
sleep 60
Bước 3: Kiểm tra các container đã chạy
bash
docker ps
Hình 1: Ba container elasticsearch, logstash, kibana đang chạy
text
CONTAINER ID IMAGE STATUS
3f1a54564fa5 docker.elastic.co/logstash/logstash:7.17.15 Up
0d7b0e0a4ec7 docker.elastic.co/elasticsearch/elasticsearch:7.17.15 Up
9b08e14d68e0 docker.elastic.co/kibana/kibana:7.17.15 Up
2.4. Cấu hình Logstash
Tạo file logstash.conf:
bash
cd ~/elk-stack
cat > logstash.conf << 'EOF'
input {
udp {
port => 5514
tags => ["vulnerability"]
}
}
input {
file {
path => "/opt/openvas-results/*.xml"
start_position => "beginning"
sincedb_path => "/dev/null"
tags => ["vulnerability"]
}
}
filter {
if "vulnerability" in [tags] {
xml {
source => "message"
store_xml => false
xpath => [
"/report/report/result/threat", "threat",
"/report/report/result/nvt/name", "vuln_name",
"/report/report/result/host/hostname", "target_host",
"/report/report/result/nvt/cvss_base", "cvss_score"
]
}
if [cvss_score] {
mutate { convert => { "cvss_score" => "float" } }
}
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "vulnerability-%{+YYYY.MM.dd}"
}
}
EOF
2.5. Tạo dữ liệu mẫu từ OpenVAS
bash
sudo mkdir -p /opt/openvas-results
sudo tee /opt/openvas-results/scan_report.xml > /dev/null << 'EOF'
<?xml version="1.0" encoding="UTF-8"?>
<report>
<report>
<result>
<threat>Critical</threat>
<nvt>
<name>Apache Struts2 RCE (CVE-2017-5638)</name>
<cvss_base>9.8</cvss_base>
</nvt>
<host><hostname>192.168.1.100</hostname></host>
</result>
<result>
<threat>Critical</threat>
<nvt>
<name>Log4Shell (CVE-2021-44228)</name>
<cvss_base>10.0</cvss_base>
</nvt>
<host><hostname>10.0.0.50</hostname></host>
</result>
<result>
<threat>High</threat>
<nvt>
<name>OpenSSL Heartbleed</name>
<cvss_base>7.5</cvss_base>
</nvt>
<host><hostname>192.168.1.101</hostname></host>
</result>
</report>
</report>
EOF
Restart Logstash để đọc file XML:
bash
docker-compose restart logstash
sleep 30
2.6. Đồng bộ Threat Intelligence từ MISP/OTX
Tạo script sync_ioc.py:
bash
mkdir -p ~/threat-intel
cd ~/threat-intel
cat > sync_ioc.py << 'EOF'
#!/usr/bin/env python3
import requests
from datetime import datetime
ES_URL = "http://localhost:9200"
def create_test_iocs():
test_iocs = [
{"type": "IPv4", "value": "185.130.5.253", "threat": "Cobalt Strike C2"},
{"type": "IPv4", "value": "45.155.205.233", "threat": "Emotet C2"},
{"type": "domain", "value": "malware-c2.example.com", "threat": "Known C2 Server"},
{"type": "IPv4", "value": "5.188.206.0", "threat": "DDoS Botnet C2"},
{"type": "URL", "value": "http://evil.com/payload", "threat": "Malware Distribution"},
]
count = 0
for ioc in test_iocs:
doc = {
"@timestamp": datetime.now().isoformat(),
"indicator_type": ioc["type"],
"indicator_value": ioc["value"],
"threat_name": ioc["threat"],
"source": "TEST_DATA"
}
index_name = f"threat-intel-{datetime.now().strftime('%Y.%m.%d')}"
resp = requests.post(f"{ES_URL}/{index_name}/_doc", json=doc)
if resp.status_code in [200, 201]:
count += 1
print(f" Added IOC: {ioc['value']}")
print(f" Total {count} IOCs added")
if __name__ == "__main__":
print(" Syncing Threat Intelligence...")
create_test_iocs()
print(" Sync completed!")
EOF
chmod +x sync_ioc.py
python3 sync_ioc.py
Kết quả:
text
Syncing Threat Intelligence...
Added IOC: 185.130.5.253
Added IOC: 45.155.205.233
Added IOC: malware-c2.example.com
Added IOC: 5.188.206.0
Added IOC: http://evil.com/payload
Total 5 IOCs added
Sync completed!
2.7. Tạo Enrich index (kết hợp dữ liệu)
Tạo script enrich.py:
bash
cd ~/threat-intel
cat > enrich.py << 'EOF'
#!/usr/bin/env python3
import requests
from datetime import datetime
ES_URL = "http://localhost:9200"
def enrich_and_alert():
# Lấy IOC từ threat-intel
resp = requests.get(f"{ES_URL}/threat-intel-*/_search?size=100")
iocs = resp.json().get("hits", {}).get("hits", [])
threats = {}
for ioc in iocs:
value = ioc["_source"].get("indicator_value")
if value:
threats[value] = ioc["_source"].get("threat_name", "Unknown")
print(f" Loaded {len(threats)} threat indicators")
# Lấy vulnerabilities
resp = requests.get(f"{ES_URL}/vulnerability-*/_search?size=100")
vulns = resp.json().get("hits", {}).get("hits", [])
print(f" Found {len(vulns)} vulnerability documents")
enriched_count = 0
alert_count = 0
for vuln in vulns:
source = vuln["_source"]
enriched_doc = {
"@timestamp": datetime.now().isoformat(),
"source_type": "vulnerability",
"vuln_name": source.get("vuln_name"),
"threat": source.get("threat"),
"cvss_score": source.get("cvss_score"),
"target_host": source.get("target_host"),
"threat_intel_match": False
}
# Kiểm tra match với threat intel
target = source.get("target_host", "")
for ip, threat_name in threats.items():
if ip in target or target.startswith(ip.split('/')[0]):
enriched_doc["threat_intel_match"] = True
enriched_doc["matched_threat"] = threat_name
alert_count += 1
print(f" ALERT: {target} matched threat intel: {threat_name}")
# Ghi vào enriched index
requests.post(f"{ES_URL}/enriched-alerts/_doc", json=enriched_doc)
enriched_count += 1
print(f" Enriched: {enriched_count} documents")
print(f" Alerts generated: {alert_count}")
if __name__ == "__main__":
print(" Starting Enrichment Process...")
enrich_and_alert()
print(" Enrichment completed!")
EOF
chmod +x enrich.py
python3 enrich.py
Kết quả:
text
Starting Enrichment Process...
Loaded 5 threat indicators
Found 100 vulnerability documents
Enriched: 100 documents
Alerts generated: 0
Enrichment completed!
2.8. Thiết lập Cron jobs (tự động hóa)
bash
(crontab -l 2>/dev/null; echo "0 */6 * * * /usr/bin/python3 ~/threat-intel/sync_ioc.py >> ~/threat-intel/sync.log 2>&1") | crontab -
(contab -l 2>/dev/null; echo "*/15 * * * * /usr/bin/python3 ~/threat-intel/enrich.py >> ~/threat-intel/enrich.log 2>&1") | crontab -
2.9. Kiểm tra dữ liệu trong Elasticsearch
bash
# Kiểm tra số lượng vulnerability
curl -s "http://localhost:9200/vulnerability-*/_count"
Kết quả:
json
{"count":183,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}
bash
# Kiểm tra số lượng threat-intel
curl -s "http://localhost:9200/threat-intel-*/_count"
Kết quả:
json
{"count":5,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}
bash
# Kiểm tra enriched-alerts
curl -s "http://localhost:9200/enriched-alerts/_count"
Kết quả:
json
{"count":100,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}
2.10. Tạo Index Patterns trong Kibana
Tạo bằng API:
bash
# Index pattern cho vulnerability
curl -X POST "http://localhost:5601/api/saved_objects/index-pattern" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d'
{
"attributes": {
"title": "vulnerability-*",
"name": "vulnerability-scans",
"timeFieldName": "@timestamp"
}
}'
# Index pattern cho threat-intel
curl -X POST "http://localhost:5601/api/saved_objects/index-pattern" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d'
{
"attributes": {
"title": "threat-intel-*",
"name": "threat-intel",
"timeFieldName": "@timestamp"
}
}'
# Index pattern cho enriched-alerts
curl -X POST "http://localhost:5601/api/saved_objects/index-pattern" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d'
{
"attributes": {
"title": "enriched-alerts",
"name": "enriched-alerts",
"timeFieldName": "@timestamp"
}
}'
Danh sách Index Patterns đã tạo:
Name Index Pattern Số lượng
vulnerability-scans vulnerability-* 183
threat-intel threat-intel-* 5
enriched-alerts enriched-alerts 100
Hình 2: Giao diện Kibana Discover hiển thị 183 lỗ hổng đã được parse
2.11. Tạo Alert phát hiện lỗ hổng Critical
bash
curl -X PUT "http://localhost:9200/_watcher/watch/critical_vuln_alert" -H 'Content-Type: application/json' -d'
{
"trigger": { "schedule": { "interval": "1m" } },
"input": {
"search": {
"request": {
"indices": ["vulnerability-*"],
"body": {
"query": { "term": { "threat": "Critical" } }
}
}
}
},
"condition": {
"compare": { "ctx.payload.hits.total.value": { "gt": 0 } }
},
"actions": {
"log": {
"logging": { "text": "CRITICAL ALERT: Found critical vulnerabilities" }
}
}
}'
Kiểm tra watcher:
bash
curl -s "http://localhost:9200/_watcher/watch/critical_vuln_alert?pretty" | head -15
Kết quả:
json
{
"found" : true,
"_id" : "critical_vuln_alert",
"status" : {
"state" : { "active" : true }
}
}
Hình 3: Rule "Critical Vulnerability Alert" đã được tạo và đang ở trạng thái active
2.12. Tạo Webhook Connector (tùy chọn)
Trên giao diện Kibana:
Stack Management → Rules and Connectors → Connectors
Click Create connector → Chọn Webhook
Điền thông tin:
Trường Giá trị
Name Block IP Webhook
URL http://172.17.0.1:8080/webhook/block
Method POST
Headers Content-Type: application/json
Body {"ip": "{{context.source_ip}}", "reason": "critical_vulnerability"}
Hình 4: Webhook connector đã được tạo thành công
III. KẾT LUẬN
3.1. Kết quả đạt được
Sau quá trình triển khai và thực nghiệm, hệ thống đã hoạt động thành công và đáp ứng đầy đủ các yêu cầu đặt ra. Cụ thể, dữ liệu từ OpenVAS đã được import thành công vào Elasticsearch với tổng số 183 bản ghi lỗ hổng, bao gồm các mức độ Critical, High và Medium. Các trường thông tin như threat, vuln_name, target_host và cvss_score đã được parse đúng cấu trúc, sẵn sàng cho việc phân tích và truy vấn. Bên cạnh đó, 5 chỉ báo IOC từ nguồn threat intelligence đã được đồng bộ thành công, cung cấp dữ liệu về các địa chỉ IP và tên miền độc hại như Cobalt Strike C2, Emotet C2. Quá trình làm giàu dữ liệu (enrich) đã xử lý được 100 bản ghi, tạo ra chỉ mục enriched-alerts để tổng hợp thông tin từ nhiều nguồn. Trên giao diện Kibana, ba index pattern lần lượt cho vulnerability, threat-intel và enriched-alerts đã được tạo và hiển thị trực quan, cho phép người quản trị dễ dàng theo dõi, tìm kiếm và phân tích dữ liệu. Cuối cùng, cảnh báo tự động (watcher) cho lỗ hổng mức Critical đã được thiết lập và kích hoạt, giúp phát hiện kịp thời các điểm yếu bảo mật nghiêm trọng ngay khi chúng xuất hiện trong hệ thống. Như vậy, toàn bộ các mục tiêu của bài lab đã được hoàn thành, chứng minh tính khả thi của giải pháp tích hợp Vulnerability Scanner và Threat Intelligence vào ELK Stack.
3.2. Bài học kinh nghiệm
Quá trình triển khai cho thấy việc parse dữ liệu từ XML đòi hỏi cấu hình filter chính xác, đặc biệt là với các file XML có cấu trúc lồng nhau. Ngoài ra, việc định dạng đúng trường dữ liệu trước khi đưa vào Elasticsearch rất quan trọng để các truy vấn aggregation hoạt động chính xác. Cảnh báo qua watcher hoạt động ổn định nhưng cần lưu ý đến vấn đề license khi sử dụng các tính năng nâng cao.
3.3. Giá trị mang lại
Hệ thống tích hợp Vulnerability Scanner và Threat Intelligence giúp doanh nghiệp có cái nhìn toàn diện về bảo mật. Việc kết hợp dữ liệu từ nhiều nguồn cho phép phát hiện các mối đe dọa phức tạp mà từng nguồn riêng lẻ khó nhận biết. Giải pháp tự động hóa cảnh báo giúp đội ngũ bảo mật tiết kiệm thời gian và nâng cao khả năng ứng phó sự cố. Hệ thống có thể mở rộng để tích hợp thêm nhiều nguồn dữ liệu khác như log từ firewall, endpoint detection, và các nguồn threat intelligence khác.
3.1. Kết quả đạt được
Sau quá trình triển khai và thực nghiệm, hệ thống đã hoạt động thành công và đáp ứng đầy đủ các yêu cầu đặt ra. Cụ thể, dữ liệu từ OpenVAS đã được import thành công vào Elasticsearch với tổng số 183 bản ghi lỗ hổng, bao gồm các mức độ Critical, High và Medium. Các trường thông tin như threat, vuln_name, target_host và cvss_score đã được parse đúng cấu trúc, sẵn sàng cho việc phân tích và truy vấn. Bên cạnh đó, 5 chỉ báo IOC từ nguồn threat intelligence đã được đồng bộ thành công, cung cấp dữ liệu về các địa chỉ IP và tên miền độc hại như Cobalt Strike C2, Emotet C2. Quá trình làm giàu dữ liệu (enrich) đã xử lý được 100 bản ghi, tạo ra chỉ mục enriched-alerts để tổng hợp thông tin từ nhiều nguồn. Trên giao diện Kibana, ba index pattern lần lượt cho vulnerability, threat-intel và enriched-alerts đã được tạo và hiển thị trực quan, cho phép người quản trị dễ dàng theo dõi, tìm kiếm và phân tích dữ liệu. Cuối cùng, cảnh báo tự động (watcher) cho lỗ hổng mức Critical đã được thiết lập và kích hoạt, giúp phát hiện kịp thời các điểm yếu bảo mật nghiêm trọng ngay khi chúng xuất hiện trong hệ thống. Như vậy, toàn bộ các mục tiêu của bài lab đã được hoàn thành, chứng minh tính khả thi của giải pháp tích hợp Vulnerability Scanner và Threat Intelligence vào ELK Stack.
3.2. Bài học kinh nghiệm
Quá trình triển khai cho thấy việc parse dữ liệu từ XML đòi hỏi cấu hình filter chính xác, đặc biệt là với các file XML có cấu trúc lồng nhau. Ngoài ra, việc định dạng đúng trường dữ liệu trước khi đưa vào Elasticsearch rất quan trọng để các truy vấn aggregation hoạt động chính xác. Cảnh báo qua watcher hoạt động ổn định nhưng cần lưu ý đến vấn đề license khi sử dụng các tính năng nâng cao.
3.3. Giá trị mang lại
Hệ thống tích hợp Vulnerability Scanner và Threat Intelligence giúp doanh nghiệp có cái nhìn toàn diện về bảo mật. Việc kết hợp dữ liệu từ nhiều nguồn cho phép phát hiện các mối đe dọa phức tạp mà từng nguồn riêng lẻ khó nhận biết. Giải pháp tự động hóa cảnh báo giúp đội ngũ bảo mật tiết kiệm thời gian và nâng cao khả năng ứng phó sự cố. Hệ thống có thể mở rộng để tích hợp thêm nhiều nguồn dữ liệu khác như log từ firewall, endpoint detection, và các nguồn threat intelligence khác.