SIEM/Log Management Làm giàu dữ liệu log: GeoIP, Threat Intel Enrich, Remove Noise


I. YÊU CẦU BÀI LAB & THỰC TRẠNG HẠ TẦNG

1. Mô tả sơ bộ bài lab

Trong kiến trúc giám sát an ninh thông tin (SIEM/SOC), dữ liệu log thô (Raw log) thu thập về máy chủ thường thiếu ngữ cảnh bảo mật. Nếu chỉ dựa vào địa chỉ IP nguồn hoặc chuỗi request thuần túy, kỹ sư SOC sẽ mất rất nhiều thời gian để tra cứu thủ công xem IP đó thuộc quốc gia nào, có nằm trong danh sách đen (Blacklist) hay không.

Bài lab này hướng dẫn xây dựng một hệ thống Log Enrichment Pipeline tự động bằng Python. Hệ thống sẽ đọc trực tiếp log từ Web Server (Nginx), tiến hành chuẩn hóa cấu trúc, lọc bỏ dữ liệu nhiễu, tra cứu vị trí địa lý (GeoIP), đối chiếu tri thức đe dọa (Threat Intelligence) và tự động gắn nhãn cảnh báo (Security Tagging) theo thời gian thực.

Môi trường thực hiện hạ tầng:
  • Hệ điều hành: Ubuntu 20.04 LTS​
  • Tài nguyên tối thiểu: 2 vCPUs, 2 GB RAM, 20 GB Disk​

2. Bản chất vấn đề cần giải quyết

Hệ thống giám sát nếu để cấu hình log mặc định sẽ đối mặt với 4 lỗ hổng lớn trong quy trình vận hành:
Thực trạng log thô
Hệ quả kỹ thuật
Tác động vận hành SOC
Thiếu ngữ cảnh GeoIP
Log chỉ lưu IP dạng số (45.142.212.50), không có quốc gia/thành phố.
Không thể khoanh vùng hoặc phát hiện các kết nối bất thường từ nước ngoài.
Không đối chiếu Threat Intel
Không nhận diện được các IP thuộc mạng lưới Botnet, Tor Exit Node hoặc C2 Server.
Bỏ sót các dấu hiệu xâm nhập sớm (Indicator of Compromise - IoC).
Nhiễu dữ liệu (Log Noise)Các request Healthcheck, Monitoring chiếm đến 70% dung lượng file log.Lãng phí tài nguyên lưu trữ, làm loãng dữ liệu khi điều tra sự cố.
Dữ liệu phẳng, không phân loạiToàn bộ log có mức độ ưu tiên như nhau, không có cơ chế dán nhãn nguy hiểm.Kỹ sư SOC phản ứng chậm, không thể thiết lập bộ lọc cảnh báo ưu tiên (Trigger Alert).

Mục tiêu giải pháp: Thiết lập một tiến trình chạy nền (Background Service) hoạt động liên tục, tiêu tốn ít tài nguyên phần cứng (RAM < 30MB) để xử lý triệt để các vấn đề trên và xuất ra định dạng JSON chuẩn hóa.

II. PHƯƠNG ÁN XỬ LÝ & TRIỂN KHAI CHI TIẾT
1. Kiến trúc luồng dữ liệu tổng thể

Hệ thống hoạt động theo mô hình Pipeline khép kín gồm 3 tầng:
  • Tầng dữ liệu vào (Input Layer): Theo dõi liên tục tệp tin log thô /var/log/nginx/access.log.
  • Tầng xử lý (Processing Layer): Trích xuất các trường dữ liệu bằng biểu thức chính quy (Regex) ➔ Sàng lọc log nhiễu ➔ Truy vấn cơ sở dữ liệu local (GeoIP, Threat Intel) ➔ Giới hạn độ dài ký tự tránh tấn công từ chối dịch vụ vào tầng lưu trữ (Buffer Overflow/Log Injection) ➔ Phân tích dấu hiệu tấn công dựa trên Signature để gắn tag bảo mật.
  • Tầng dữ liệu ra (Output Layer): Ghi dữ liệu đã làm giàu dưới dạng cấu trúc JSON vào tệp /var/log/enriched/output.log và duy trì hoạt động thông qua systemd.
1780053318360.png

Hình 1: Kiến trúc Pipeline tổng thể
2. Các bước triển khai cấu hình

Bước 1: Khởi tạo cấu trúc thư mục dự án

Mở Terminal trên máy chủ Ubuntu và chạy các lệnh sau để thiết lập không gian làm việc:

Bash

# Tạo cấu trúc thư mục lưu trữ mã nguồn và cấu hình
mkdir -p ~/log-enricher-project/{config,logs,data}cd ~/log-enricher-project
# Khởi tạo các thư mục lưu trữ log hệ thống
sudo mkdir -p /var/log/nginx /var/log/enriched
sudo chown -R $USER:$USER /var/log/enriched
echo "[OK] Đã khởi tạo cấu trúc thư mục hệ thống."

Bước 2: Thiết lập cơ sở dữ liệu vị trí địa lý (GeoIP)

Để đảm bảo pipeline hoạt động với tốc độ cao và có khả năng chạy offline trong các phân vùng mạng cô lập (Air-gapped network), chúng ta xây dựng cấu trúc tra cứu cục bộ (Local Lookup Database) dạng Key-Value:

Python

# Tạo tệp tin: config/geoip_db.py
GEOIP_DB = {
"45.142.212.50": {
"country": "Russian Federation",
"country_code": "RU",
"city": "Moscow",
"continent": "Europe",
"latitude": 55.7558,
"longitude": 37.6173,
"timezone": "Europe/Moscow"
},
"185.130.5.253": {
"country": "Netherlands",
"country_code": "NL",
"city": "Amsterdam",
"continent": "Europe",
"latitude": 52.3676,
"longitude": 4.9041,
"timezone": "Europe/Amsterdam"
},
"94.102.61.78": {
"country": "United States",
"country_code": "US",
"city": "New York",
"continent": "North America",
"latitude": 40.7128,
"longitude": -74.0060,
"timezone": "America/New_York"
}
}
Bước 3: Thiết lập cơ sở dữ liệu tri thức đe dọa (Threat Intelligence)
Định nghĩa danh sách các IP độc hại đã được các tổ chức an ninh mạng ghi nhận, đi kèm với chỉ số đánh giá mức độ rủi ro (risk_score từ 0 đến 100):​
Python
# Tạo tệp tin: config/threat_db.py
THREAT_DB = {
"45.142.212.50": {
"type": "Malware C2 Server",
"risk_score": 85,
"category": "botnet",
"first_seen": "2024-01-15",
"confidence": "high",
"malware_family": "TrickBot",
"actions_recommended": ["block", "alert", "investigate"]
},
"185.130.5.253": {
"type": "Port Scanner",
"risk_score": 65,
"category": "reconnaissance",
"first_seen": "2024-03-20",
"confidence": "medium",
"actions_recommended": ["monitor", "rate_limit"]
},
"94.102.61.78": {
"type": "Spambot",
"risk_score": 90,
"category": "spam",
"first_seen": "2023-11-10",
"confidence": "high",
"actions_recommended": ["block", "alert"]
}
}

Bước 4: Định nghĩa mẫu nhận diện mã độc và lọc nhiễu

Xây dựng tập luật Regex dùng để loại bỏ các log dịch vụ thông thường và nhận diện các hành vi dò quét, tấn công ứng dụng Web (SQLi, XSS, Path Traversal):
Python
# Tạo tệp tin: config/noise_patterns.py
NOISE_PATTERNS = [
r'health[-_]?check',
r'kube-probe',
r'prometheus',
r'ELB-HealthChecker',
r'/healthz$',
r'/metrics$',
r'/ping$',
r'heartbeat'
]

ATTACK_PATTERNS = [
# SQL Injection (SQLi)
r'union.*select',
r'select.*from',
r'drop.*table',
r'--',

# Cross-Site Scripting (XSS)
r'<script',
r'javascript:',
r'onerror=',

# Command Injection & Web Shell
r'/bin/bash',
r'/bin/sh',
r'cmd\.exe',
r'wget\s+http',
r'curl\s+http',

# Path Traversal
r'\.\./\.\./',

# Dấu hiệu quét từ công cụ tự động
r'sqlmap',
r'nmap',
r'nikto',
r'burp'
]

Bước 5: Viết mã nguồn xử lý chính cho Pipeline

Tệp tin core xử lý logic đọc file theo cơ chế con trỏ (tailing file), bóc tách, làm giàu dữ liệu và xuất log:
Python
# Tạo tệp tin: log_enricher.py#!/usr/bin/env python3import reimport jsonimport timeimport osimport sysfrom datetime import datetime
from config.geoip_db import GEOIP_DBfrom config.threat_db import THREAT_DBfrom config.noise_patterns import NOISE_PATTERNS, ATTACK_PATTERNS

LOG_FILE = "/var/log/nginx/access.log"
OUTPUT_FILE = "/var/log/enriched/output.log"
POSITION_FILE = "/var/log/enriched/position.txt"

MAX_FIELD_LENGTH = 1000
MAX_UA_LENGTH = 255
class LogEnricher:
def __init__(self):
self.stats = {
"total_processed": 0,
"noise_filtered": 0,
"threat_detected": 0,
"critical_events": 0
}

def parse_nginx_log(self, line):
# Định dạng Combined Log Format của Nginx
pattern = r'^(\d+\.\d+\.\d+\.\d+) - - \[([^\]]+)\] "(\S+) (\S+) HTTP/\d\.\d" (\d+) (\d+) "([^"]*)" "([^"]*)"'
match = re.match(pattern, line.strip())
if match:
groups = match.groups()
return {
"source_ip": groups[0],
"timestamp": groups[1],
"method": groups[2],
"request_path": groups[3][:MAX_FIELD_LENGTH],
"response_code": int(groups[4]),
"body_bytes_sent": int(groups[5]),
"http_referrer": groups[6][:MAX_FIELD_LENGTH],
"user_agent": groups[7][:MAX_UA_LENGTH]
}
return None

def is_noise(self, log_entry):
log_str = json.dumps(log_entry).lower()
for pattern in NOISE_PATTERNS:
if re.search(pattern, log_str):
self.stats["noise_filtered"] += 1
return True
return False

def add_geoip(self, log_entry):
ip = log_entry.get("source_ip")
log_entry["source"] = {"ip": ip, "geo": {}}
if ip in GEOIP_DB:
log_entry["source"]["geo"] = GEOIP_DB[ip]
else:
log_entry["source"]["geo"] = {"country": "Unknown", "country_code": "XX"}
return log_entry

def add_threat_intel(self, log_entry):
ip = log_entry.get("source_ip")
if ip in THREAT_DB:
log_entry["threat"] = THREAT_DB[ip]
self.stats["threat_detected"] += 1
return log_entry

def add_tags(self, log_entry):
tags = []
status = log_entry.get("response_code", 200)

if status >= 500:
tags.extend(["critical", "http_5xx"])
self.stats["critical_events"] += 1
elif status >= 400:
tags.append("warning")

if "threat" in log_entry:
if log_entry["threat"].get("risk_score", 0) >= 80:
tags.append("critical_threat")
tags.append("security_threat")

log_str = json.dumps(log_entry).lower()
for pattern in ATTACK_PATTERNS:
if re.search(pattern, log_str):
tags.extend(["attack_detected", "security_incident"])
break

if tags:
log_entry["tags"] = tags
return log_entry

def enrich(self, line):
parsed = self.parse_nginx_log(line)
if not parsed or self.is_noise(parsed):
return None

parsed["@timestamp"] = datetime.utcnow().isoformat() + "Z"
parsed = self.add_geoip(parsed)
parsed = self.add_threat_intel(parsed)
parsed = self.add_tags(parsed)

self.stats["total_processed"] += 1
return parsed

def run(self):
print("[INFO] Khởi động Pipeline xử lý log...")
if not os.path.exists(LOG_FILE):
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
open(LOG_FILE, 'w').close()

last_pos = 0
if os.path.exists(POSITION_FILE):
with open(POSITION_FILE, 'r') as pf:
last_pos = int(pf.read().strip() or 0)

with open(LOG_FILE, 'r') as f:
f.seek(last_pos)
while True:
line = f.readline()
if not line:
time.sleep(0.5)
continue

with open(POSITION_FILE, 'w') as pf:
pf.write(str(f.tell()))

enriched = self.enrich(line)
if enriched:
with open(OUTPUT_FILE, 'a') as out:
out.write(json.dumps(enriched) + '\n')
if __name__ == "__main__":
enricher = LogEnricher()
try:
enricher.run()
except KeyboardInterrupt:
print("\n[INFO] Dừng hệ thống. Thống kê:", enricher.stats)

Bước 6: Đóng gói hệ thống thành Systemd Service

Để mã nguồn Python tự động khởi chạy cùng hệ điều hành và tự phục hồi khi gặp lỗi, ta cấu hình một đơn vị dịch vụ hệ thống (Systemd Unit):
Bash
# Tạo file cấu hình dịch vụ hệ thống
sudo tee /etc/systemd/system/log-enricher.service << 'EOF'
[Unit]
Description=Log Enricher Service - GeoIP & Threat Intel Pipeline
After=network.target

[Service]
Type=simple
User=ubuntu
Group=ubuntu
WorkingDirectory=/home/ubuntu/log-enricher-project
ExecStart=/usr/bin/python3 /home/ubuntu/log-enricher-project/log_enricher.py
Restart=always
RestartSec=5
# Cơ chế Hardening bảo mật cho Service
NoNewPrivileges=yes
PrivateTmp=yes
ProtectSystem=strict
ReadWritePaths=/var/log/enriched /var/log/nginx

[Install]
WantedBy=multi-user.target
EOF
# Kích hoạt và chạy Service
sudo systemctl daemon-reload
sudo systemctl enable log-enricher.service
sudo systemctl start log-enricher.service

1780054353103.png

Hình 2: Trạng thái service log-enricher sau khi khởi động thành công​

Bước 7: Viết kịch bản kiểm thử (Test Script) và kết quả mô phỏng

Tạo file bash script tự động bơm các chuỗi log độc hại giả lập vào hệ thống để kiểm tra khả năng bóc tách và dán nhãn:
Bash
# Tạo tệp tin: test_pipeline.sh#!/bin/bashecho "=== BƠM DỮ LIỆU LOG THỬ NGHIỆM TẤN CÔNG ==="
# 1. Log từ IP độc hại (C2 Server của Nga) thực hiện hành vi POSTecho '45.142.212.50 - - [29/May/2026:10:15:32 +0700] "POST /login.php HTTP/1.1" 500 1234 "-" "Mozilla/5.0"' | sudo tee -a /var/log/nginx/access.log
# 2. Log chứa hành vi tấn công SQL Injection từ IP dò quétecho '185.130.5.253 - - [29/May/2026:10:16:45 +0700] "GET /?id=1%20UNION%20SELECT%201,2,3 HTTP/1.1" 403 567 "-" "curl/7.68.0"' | sudo tee -a /var/log/nginx/access.log
# 3. Log nhiễu hệ thống (Healthcheck) -> Bộ lọc phải tự động loại bỏecho '127.0.0.1 - - [29/May/2026:10:17:00 +0700] "GET /healthz HTTP/1.1" 200 43 "-" "kube-probe"' | sudo tee -a /var/log/nginx/access.log

sleep 2echo -e "\n=== KẾT QUẢ LOG ĐÃ ĐƯỢC LÀM GIÀU (DÒNG CUỐI) ==="
tail -n 1 /var/log/enriched/output.log | jq .

1780054602510.png

Hình 3: Output JSON sau khi qua pipeline làm giàu

III. KẾT LUẬN & HƯỚNG PHÁT TRIỂN

1. Phân tích kết quả đầu ra (Normalized Output)

Dưới đây là cấu trúc tệp tin log đích thu được sau khi đi qua Pipeline xử lý của bài lab đối với mẫu tấn công thứ nhất:
JSON
{
"source_ip": "45.142.212.50",
"timestamp": "29/May/2026:10:15:32 +0700",
"method": "POST",
"request_path": "/login.php",
"response_code": 500,
"body_bytes_sent": 1234,
"http_referrer": "",
"user_agent": "Mozilla/5.0",
"@timestamp": "2026-05-29T08:15:34.120543Z",
"source": {
"ip": "45.142.212.50",
"geo": {
"country": "Russian Federation",
"country_code": "RU",
"city": "Moscow",
"continent": "Europe",
"latitude": 55.7558,
"longitude": 37.6173,
"timezone": "Europe/Moscow"
}
},
"threat": {
"type": "Malware C2 Server",
"risk_score": 85,
"category": "botnet",
"first_seen": "2024-01-15",
"confidence": "high",
"malware_family": "TrickBot",
"actions_recommended": ["block", "alert", "investigate"]
},
"tags": [
"critical",
"http_5xx",
"critical_threat",
"security_threat",
"attack_detected",
"security_incident"
]
}

1780055068320.png

Hình 4: So sánh LOG Thô vs Log ENRICH

2. So sánh đối chiếu giải pháp giải bài toán thực tế

Tiêu chí vận hànhGiải pháp truyền thống (Logstash)Giải pháp tự phát triển (Python Script)
Tiêu thụ tài nguyênRất nặng (~500MB - 1GB RAM) do chạy nền tảng Java.
Rất nhẹ (~15MB - 25MB RAM), tối ưu tốt cho máy chủ biên (Edge Server).
Tính phụ thuộc mạngThường yêu cầu kết nối API Online để cập nhật thời gian thực.
Hỗ trợ mô hình Offline hoàn toàn nhờ cấu trúc Local Dictionary Database.
Khả năng tùy biếnPhụ thuộc vào cú pháp viết Plugin của bên thứ ba.
Can thiệp sâu vào logic xử lý bằng mã nguồn Python thuần, dễ mở rộng luật.

1780055427703.png

Hình 5: Biểu đồ so sánh hiệu năng giữa 4 giải pháp​

3. Bài học kinh nghiệm vận hành SOC

  • Quản lý trạng thái đọc log (Pointer State): Việc lưu trữ chỉ số byte đã đọc vào tệp position.txt giúp hệ thống không bị đọc trùng lặp hoặc bỏ sót dữ liệu khi dịch vụ log-enricher bị khởi động lại đột ngột.
  • An toàn hóa dữ liệu phòng độc (Data Hardening): Kẻ tấn công có thể cố tình gửi các request có độ dài hàng megabyte (User-Agent giả mạo chứa mã độc) nhằm làm tràn bộ nhớ hệ thống lưu trữ log. Việc áp dụng các hàm cắt chuỗi giới hạn (MAX_FIELD_LENGTH) là bắt buộc để bảo vệ tính toàn vẹn của hệ thống SIEM phía sau.

4. Hướng mở rộng hệ thống trong tương lai

Tích hợp cơ sở dữ liệu động: Chuyển đổi các tệp cấu hình cứng (geoip_db.py) sang định dạng tệp tin nhị phân MaxMind .mmdb để cập nhật tự động hàng tuần mà không cần khởi động lại mã nguồn.
Cơ chế cảnh báo chủ động (Active Alerting): Bổ sung Module Webhook kết nối trực tiếp với API của Telegram/Slack để đẩy thông báo khẩn cấp ngay khi phát hiện log có chứa tag critical_threat.


 
Back
Top