I. Khảo Sát Yêu Cầu & Thực Trạng Log
1. Bối cảnh hệ thống
Trong kiến trúc giám sát an ninh thông tin (SIEM/SOC), khâu xử lý dữ liệu đầu vào luôn là giai đoạn mất nhiều thời gian nhất. Log thu thập từ các thành phần trong hạ tầng (Firewall, Web Server, Database, các dịch vụ nội bộ) đổ về trạm trung chuyển với định dạng và cấu trúc hoàn toàn hỗn loạn.Bài lab này thực hiện việc phân tích, nhận dạng và bóc tách cấu trúc dữ liệu log thô dựa trên 3 công cụ: Logstash, Vector và OpenSearch Ingest Pipeline.
Thông số môi trường thử nghiệm:
- OS: Ubuntu 22.04 LTS
- Cấu hình VM: 2 vCPUs, 4 GB RAM, 40 GB Disk
Hình 1: Kiểm tra phiên bản công cụ
Nếu không qua bộ lọc (Parser), các chuỗi dữ liệu thô dưới đây sẽ làm tê liệt khả năng truy vấn nhanh khi có sự cố:
Loại Log | Thực trạng định dạng | Hệ quả nếu để dạng thô |
JSON Log | {"level":"ERROR","message":"Connection failed","ip":"192.168.1.100"} | Hệ thống coi là chuỗi văn bản phẳng, không thể filter theo trường ip hoặc level. |
Syslog (RFC3164) | <13>Jan 15 10:30:00 webserver sshd[1234]: Failed password... | Mất chỉ số phân loại mức độ nghiêm trọng (priority) hoặc tên máy chủ (hostname). |
CEF Log | CEF:0|Cisco|Firepower|6.0|2|Connection Blocked|... | Cấu trúc đan xen giữa Header và Extension key-value, regex thông thường không bóc tách hết được. |
Hình 2: Cấu trúc thư mục và file log mẫu
Mục tiêu xử lý: Xây dựng luồng tự động nhận diện định dạng log đầu vào, chuyển tiếp qua bộ lọc chuyên biệt để đưa toàn bộ dữ liệu về một cấu trúc JSON đồng nhất trước khi lưu trữ.
II. Phương Án Xử Lý & Triển Khai Chi Tiết
1. Kiến trúc luồng dữ liệu
Luồng xử lý được thiết lập theo mô hình tuyến tính:
Raw Log (Nguồn thô) ➔ Định danh Format ➔ Bộ lọc chuyên biệt (Parser) ➔ Chuẩn hóa dữ liệu đầu ra.
Đặc tính kỹ thuật của 3 giải pháp thử nghiệm:Raw Log (Nguồn thô) ➔ Định danh Format ➔ Bộ lọc chuyên biệt (Parser) ➔ Chuẩn hóa dữ liệu đầu ra.
- Logstash: Hoạt động trên môi trường JVM, tốn tài nguyên phần cứng nhưng sở hữu kho thư viện Grok Pattern đồ sộ để trị các loại log dị biệt.
- Vector (Rust-based): Tối ưu hóa bộ nhớ cực tốt (chỉ chiếm ~50MB RAM), xử lý luồng dữ liệu tốc độ cao.
- OpenSearch Ingest Pipeline: Thực hiện bóc tách trực tiếp tại tầng lưu trữ ngay khi dữ liệu chuẩn bị ghi vào Index, giảm bớt một bước trung chuyển bằng phần mềm bên thứ ba.
2. Các bước cấu hình thực tế
Bước 1: Khởi tạo dữ liệu mẫu
Tạo cấu trúc thư mục làm việc và nạp các dòng log giả lập sự cố:Bash
mkdir -p ~/log-analysis/{samples,parsers,output}cd ~/log-analysis
# Ghi mẫu log JSON, Syslog và CEF vào thư mục samples
cat > samples/01-json.log << 'EOF'
{"timestamp":"2024-01-15T10:30:00Z","level":"ERROR","message":"Connection failed","user":"admin","ip":"192.168.1.100"}
EOF
cat > samples/02-syslog.log << 'EOF'
<13>Jan 15 10:30:00 webserver sshd[1234]: Failed password for root from 192.168.1.100
EOF
cat > samples/03-cef.log << 'EOF'
CEF:0|Cisco|Firepower|6.0|2|Connection Blocked|6|src=8.8.8.8 dst=192.168.1.1
EOF
mkdir -p ~/log-analysis/{samples,parsers,output}cd ~/log-analysis
# Ghi mẫu log JSON, Syslog và CEF vào thư mục samples
cat > samples/01-json.log << 'EOF'
{"timestamp":"2024-01-15T10:30:00Z","level":"ERROR","message":"Connection failed","user":"admin","ip":"192.168.1.100"}
EOF
cat > samples/02-syslog.log << 'EOF'
<13>Jan 15 10:30:00 webserver sshd[1234]: Failed password for root from 192.168.1.100
EOF
cat > samples/03-cef.log << 'EOF'
CEF:0|Cisco|Firepower|6.0|2|Connection Blocked|6|src=8.8.8.8 dst=192.168.1.1
EOF
Hình 3: Nội dung log JSON
Bước 2: Viết script tự động nhận diện định dạng nguồn log
Sử dụng các biểu thức chính quy (Regex) cơ bản quét dòng đầu tiên của tệp tin để phân loại định dạng và đưa ra gợi ý xử lý:Bash
cat > detect-format.sh << 'EOF'#!/bin/bash
GREEN='\033[0;32m'; RED='\033[0;31m'; NC='\033[0m'
for file in ~/log-analysis/samples/*.log; do
filename=$(basename "$file")
line=$(head -1 "$file")
echo -e "\n Kiểm tra tệp: $filename"
if echo "$line" | jq -e . >/dev/null 2>&1; then
echo -e "Kết quả: ${GREEN}Định dạng JSON${NC} -> Sử dụng json filter hoặc Vector json parser."
elif [[ "$line" =~ ^CEF:[0-9]+\| ]]; then
echo -e "Kết quả: ${GREEN}Định dạng CEF${NC} -> Yêu cầu xử lý bằng bộ lọc Grok kết hợp KV."
elif [[ "$line" =~ ^\<[0-9]+\>[A-Za-z]{3}\ +[0-9]{1,2} ]]; then
echo -e "Kết quả: ${GREEN}Định dạng Syslog RFC3164${NC} -> Đề xuất sử dụng cấu trúc remapping của Vector."
else
echo -e "Kết quả: ${RED}Không xác định${NC}"
fidone
EOF
chmod +x detect-format.sh
./detect-format.sh
Bước 3: Triển khai các cấu hình bộ lọc (Parser)
Bộ lọc Logstash cho log định dạng CEF
Sử dụng filter grok để bóc tách phần Header cố định, sau đó dùng filter kv giải mã vùng dữ liệu mở rộng (cef_extension).
Hình 4: Nội dung log Syslog RFC3164 và CEF
# Đường dẫn tệp: parsers/03-cef-parser.conf
input {
file {
path => "/home/ubuntu/log-analysis/samples/03-cef.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => {
"message" => "CEF:%{NUMBER:cef_version}\|%{DATA:device_vendor}\|%{DATA:device_product}\|%{DATA:device_version}\|%{DATA:signature_id}\|%{DATA:name}\|%{DATA:severity}\|%{GREEDYDATA:cef_extension}"
}
}
kv {
source => "cef_extension"
field_split => " "
value_split => "="
}
}
output {
stdout { codec => rubydebug }
}
Bộ lọc Vector (Xử lý định dạng JSON)
Cấu hình nhẹ thông qua tệp tin định dạng TOML:Ini, TOML
# Đường dẫn tệp: vector-config.toml[sources.json_logs]type = "file"include = ["/home/ubuntu/log-analysis/samples/01-json.log"]
[transforms.parse_json]type = "remap"inputs = ["json_logs"]source = '''
parsed = parse_json!(.message)
. = merge(., parsed)
'''
[sinks.console]type = "console"inputs = ["parse_json"]encoding.codec = "json"
Cấu hình Ingest Pipeline trực tiếp trên OpenSearch
Khởi chạy cụm OpenSearch thông qua Docker để chuẩn bị môi trường thử nghiệm:Bash
sudo docker network create opensearch-net
# Chạy OpenSearch Engine
sudo docker run -d --name opensearch --network opensearch-net -p 9200:9200 -p 9600:9600 -e "discovery.type=single-node" -e "DISABLE_SECURITY_PLUGIN=true" opensearchproject/opensearch:latest
# Chạy giao diện quản trị OpenSearch Dashboards
sudo docker run -d --name opensearch-dashboards --network opensearch-net -p 5601:5601 -e "OPENSEARCH_HOSTS=http://opensearch:9200" opensearchproject/opensearch-dashboards:latest
Truy cập giao diện web OpenSearch Dashboards (http://localhost:5601), vào mục Dev Tools để nạp cấu hình bộ xử lý:
JSON
PUT _ingest/pipeline/log-parser
{
"description": "Pipeline tu dong parse da dinh dang log",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"^<%{NUMBER riority}>%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname} %{DATA rogram}(?:\\[%{POSINT id}\\])?: %{GREEDYDATA:message_content}$"
],
"ignore_missing": true
}
},
{
"json": {
"field": "message",
"target_field": "json_log",
"ignore_failure": true
}
}
]
}
Kiểm tra độ chính xác của Pipeline trước khi vận hành thực tế bằng tính năng mô phỏng (_simulate):
JSON
POST _ingest/pipeline/log-parser/_simulate
{
"docs": [
{"_source": {"message": "<13>Jan 15 10:30:00 webserver sshd: Failed password"}}
]
}
Hình 5: Tạo pipeline và test parse log Syslog
III. Đánh Giá Hiệu Năng & Kinh Nghiệm Thực Tế
1. Kết quả thu được trên hệ thống lưu trữ
Sau khi đi qua hệ thống bóc tách dữ liệu, toàn bộ log thô lộn xộn ban đầu đã được cấu trúc hóa rõ ràng, sẵn sàng cho việc phân tích sự cố hay dựng Dashboard giám sát.2. So sánh các giải pháp xử lý dữ liệu đầu vào
Chỉ số đánh giá | Logstash | Vector | OpenSearch Ingest |
Tài nguyên tiêu thụ | Cao (~500 MB RAM) | Thấp (~50 MB RAM) | Trung bình (~300 MB) |
Tốc độ xử lý (Throughput) | 10 - 20 MB/s | 30 - 50 MB/s | 20 - 30 MB/s |
Cú pháp cấu hình | Ruby DSL | TOML (VRL script) | JSON (API) |
Kịch bản áp dụng tối ưu | Khống chế các loại log tùy biến quá phức tạp. | Hệ thống phân tán, giới hạn phần cứng, lưu lượng lớn. | Kiến trúc rút gọn, xử lý trực tiếp trên cụm cơ sở dữ liệu có sẵn. |
3. Kinh nghiệm thực chiến khi xử lý chuỗi log
- Cơ chế Dead Letter Queue (DLQ): Thực tế vận hành không có chuỗi log nào chuẩn 100%. Khi gặp log lỗi cấu trúc do ứng dụng cập nhật hoặc do lỗi dev, parser rất dễ bị treo hoặc crash. Bắt buộc phải cấu hình rẽ nhánh xử lý (fallback) đẩy các log không parse được vào một tệp tin hoặc queue riêng để giữ cho luồng dữ liệu chính luôn thông suốt.
- Đồng bộ hóa mốc thời gian (Timezone): Log đổ về từ các thiết bị phân tán thường bị lệch múi giờ (giữa UTC và múi giờ máy chủ địa phương GMT+7). Cần cấu hình ép toàn bộ trường thời gian sự kiện về một múi giờ chuẩn duy nhất ngay tại tầng parser để đảm bảo tính chính xác khi thực hiện điều tra dòng thời gian sự kiện (Timeline) của các sự cố an ninh.
Đính kèm
Sửa lần cuối: