Xuân Thông
Intern
I. YÊU CẦU BÀI LAB
1.1. Mô tả sơ bộ
Bài lab thực hiện chuẩn hóa dữ liệu log theo Elastic Common Schema (ECS) trên OpenSearch. ECS là tiêu chuẩn mở định nghĩa các tên trường chung cho dữ liệu log, giúp việc phân tích và tìm kiếm trở nên nhất quán giữa các hệ thống.
Các trường ECS cần map:
1.1. Mô tả sơ bộ
Bài lab thực hiện chuẩn hóa dữ liệu log theo Elastic Common Schema (ECS) trên OpenSearch. ECS là tiêu chuẩn mở định nghĩa các tên trường chung cho dữ liệu log, giúp việc phân tích và tìm kiếm trở nên nhất quán giữa các hệ thống.
Các trường ECS cần map:
- @timestamp - Thời gian xảy ra sự kiện
- host.name - Tên máy chủ nguồn
- source.ip - Địa chỉ IP nguồn
- destination.ip - Địa chỉ IP đích
- event.category - Loại sự kiện
- event.outcome - Kết quả sự kiện
- user.name - Tên người dùng
Thiết bị chuẩn bị: Máy ảo Ubuntu 20.04 với OpenSearch đã cài đặt.
1.2. Mô tả vấn đề
Log từ hệ thống thường xuất ra dạng text không có cấu trúc:
2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {"path":"/login","method":"POST"} 200 user=admin
Với dạng log này, việc tìm kiếm và phân tích gặp nhiều khó khăn. Mục tiêu là biến log text thành JSON có cấu trúc theo chuẩn ECS.
II. TRIỂN KHAI
2.1. Giải pháp
Sử dụng Ingest Pipeline của OpenSearch để parse log và map sang trường ECS, kết hợp với Index Template để định nghĩa mapping.
Luồng xử lý: Log đầu vào → Ingest Pipeline → Document theo ECS → Index Template → Index
2.2. Các bước thực hiện
Bước 1: Kiểm tra OpenSearch
Lệnh thực hiện:
curl -X GET "localhost:9200"
Kết quả trả về là JSON chứa cluster_name và version, xác nhận OpenSearch đang chạy trên port 9200.
1.2. Mô tả vấn đề
Log từ hệ thống thường xuất ra dạng text không có cấu trúc:
2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {"path":"/login","method":"POST"} 200 user=admin
Với dạng log này, việc tìm kiếm và phân tích gặp nhiều khó khăn. Mục tiêu là biến log text thành JSON có cấu trúc theo chuẩn ECS.
II. TRIỂN KHAI
2.1. Giải pháp
Sử dụng Ingest Pipeline của OpenSearch để parse log và map sang trường ECS, kết hợp với Index Template để định nghĩa mapping.
Luồng xử lý: Log đầu vào → Ingest Pipeline → Document theo ECS → Index Template → Index
2.2. Các bước thực hiện
Bước 1: Kiểm tra OpenSearch
Lệnh thực hiện:
curl -X GET "localhost:9200"
Kết quả trả về là JSON chứa cluster_name và version, xác nhận OpenSearch đang chạy trên port 9200.
HÌNH 1 - Kết quả kiểm tra OpenSearch
Bước 2: Tạo Index Template với ECS mapping
Lệnh thực hiện:
curl -X PUT "localhost:9200/_index_template/ecs-logs-template" -H 'Content-Type: application/json' -d'
{
"index_patterns": ["ecs-logs-*"],
"priority": 100,
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"default_pipeline": "ecs-pipeline"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"host.name": { "type": "keyword" },
"source.ip": { "type": "ip" },
"destination.ip": { "type": "ip" },
"event.category": { "type": "keyword" },
"event.outcome": { "type": "keyword" },
"user.name": { "type": "keyword" },
"message": { "type": "text" }
}
}
}
}
Kết quả trả về: {"acknowledged":true}
HÌNH 2 - Kết quả tạo Index Template
Bước 3: Tạo Ingest Pipeline map sang ECS
Lệnh thực hiện:
curl -X PUT "localhost:9200/_ingest/pipeline/ecs-pipeline" -H 'Content-Type: application/json' -d'
{
"description": "ECS pipeline - map log to ECS fields",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{TIMESTAMP_ISO8601
timestamp} %{LOGLEVEL:event.outcome} \[%{IP:source.ip}\] %{GREEDYDATA:request} %{NUMBER:response_code:int} user=%{DATA:user.name}"]
}
},
{
"set": { "field": "event.category", "value": "web" }
},
{
"set": { "field": "host.name", "value": "web-server-01" }
},
{
"set": { "field": "destination.ip", "value": "10.0.0.1" }
},
{
"date": {
"field": "@timestamp",
"formats": ["ISO8601"],
"timezone": "UTC"
}
},
{
"remove": { "field": ["request", "response_code"], "ignore_missing": true }
}
]
}'
Kết quả trả về: {"acknowledged":true}
Lệnh thực hiện:
curl -X PUT "localhost:9200/_ingest/pipeline/ecs-pipeline" -H 'Content-Type: application/json' -d'
{
"description": "ECS pipeline - map log to ECS fields",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{TIMESTAMP_ISO8601
}
},
{
"set": { "field": "event.category", "value": "web" }
},
{
"set": { "field": "host.name", "value": "web-server-01" }
},
{
"set": { "field": "destination.ip", "value": "10.0.0.1" }
},
{
"date": {
"field": "@timestamp",
"formats": ["ISO8601"],
"timezone": "UTC"
}
},
{
"remove": { "field": ["request", "response_code"], "ignore_missing": true }
}
]
}'
Kết quả trả về: {"acknowledged":true}
HÌNH 3 - Kết quả tạo Pipeline
Bước 4: Test pipeline với Simulate API
Lệnh thực hiện:
curl -X POST "localhost:9200/_ingest/pipeline/ecs-pipeline/_simulate" -H 'Content-Type: application/json' -d'
{
"docs": [{
"_source": {
"message": "2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {"path":"/login","method":"POST"} 200 user=admin"
}
}]
}'
Kết quả trả về JSON chứa các trường ECS: @timestamp, source.ip, event.category, event.outcome, user.name, host.name, destination.ip.
Lệnh thực hiện:
curl -X POST "localhost:9200/_ingest/pipeline/ecs-pipeline/_simulate" -H 'Content-Type: application/json' -d'
{
"docs": [{
"_source": {
"message": "2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {"path":"/login","method":"POST"} 200 user=admin"
}
}]
}'
Kết quả trả về JSON chứa các trường ECS: @timestamp, source.ip, event.category, event.outcome, user.name, host.name, destination.ip.
HÌNH 4 - Kết quả simulate
Bước 5: Index document thật
Lệnh thực hiện:
curl -X POST "localhost:9200/ecs-logs-2025.03.20/_doc" -H 'Content-Type: application/json' -d'
{
"message": "2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {"path":"/login","method":"POST"} 200 user=admin"
}'
Kết quả trả về: {"result":"created"}
Lệnh thực hiện:
curl -X POST "localhost:9200/ecs-logs-2025.03.20/_doc" -H 'Content-Type: application/json' -d'
{
"message": "2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {"path":"/login","method":"POST"} 200 user=admin"
}'
Kết quả trả về: {"result":"created"}
HÌNH 5 - Kết quả index document
Bước 6: Tìm kiếm theo trường ECS - source.ip
Lệnh thực hiện:
curl -X GET "localhost:9200/ecs-logs-*/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": { "term": { "source.ip": "172.16.0.10" } }
}'
Kết quả trả về đúng 1 document có source.ip = 172.16.0.10.
Lệnh thực hiện:
curl -X GET "localhost:9200/ecs-logs-*/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": { "term": { "source.ip": "172.16.0.10" } }
}'
Kết quả trả về đúng 1 document có source.ip = 172.16.0.10.
HÌNH 6 - Kết quả tìm kiếm theo source.ip
Bước 7: Tìm kiếm theo trường ECS - event.category
Lệnh thực hiện:
curl -X GET "localhost:9200/ecs-logs-*/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": { "term": { "event.category": "web" } }
}'
Kết quả trả về đúng 1 document có event.category = web.
Lệnh thực hiện:
curl -X GET "localhost:9200/ecs-logs-*/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": { "term": { "event.category": "web" } }
}'
Kết quả trả về đúng 1 document có event.category = web.
HÌNH 7 - Kết quả tìm kiếm theo event.category
III. KẾT LUẬN
3.1. Kết quả đạt được
Ingest pipeline và index template đã được tạo thành công. Pipeline sử dụng Grok để trích xuất @timestamp, source.ip, event.outcome, user.name từ log gốc, kết hợp với Set processor để thêm event.category, host.name, destination.ip. Date processor chuẩn hóa @timestamp theo ISO 8601.
Simulate API xác nhận pipeline hoạt động đúng. Một document đã được index thành công vào index ecs-logs-2025.03.20. Hai truy vấn tìm kiếm theo source.ip và event.category đều trả về kết quả chính xác.
Như vậy, cả 7 trường ECS yêu cầu đã được map hoàn chỉnh. Log từ dạng text không cấu trúc đã được chuẩn hóa thành JSON theo đúng chuẩn ECS.
3.2. Bài học kinh nghiệm
3.1. Kết quả đạt được
Ingest pipeline và index template đã được tạo thành công. Pipeline sử dụng Grok để trích xuất @timestamp, source.ip, event.outcome, user.name từ log gốc, kết hợp với Set processor để thêm event.category, host.name, destination.ip. Date processor chuẩn hóa @timestamp theo ISO 8601.
Simulate API xác nhận pipeline hoạt động đúng. Một document đã được index thành công vào index ecs-logs-2025.03.20. Hai truy vấn tìm kiếm theo source.ip và event.category đều trả về kết quả chính xác.
Như vậy, cả 7 trường ECS yêu cầu đã được map hoàn chỉnh. Log từ dạng text không cấu trúc đã được chuẩn hóa thành JSON theo đúng chuẩn ECS.
3.2. Bài học kinh nghiệm
- Thứ nhất, Grok pattern rất quan trọng. Pattern TIMESTAMP_ISO8601 và IP giúp trích xuất chính xác timestamp và địa chỉ IP.
- Thứ hai, Set processor hữu ích để thêm các trường tĩnh như event.category hay host.name.
- Thứ ba, Date processor cần đặt cuối cùng để đảm bảo @timestamp được chuẩn hóa sau khi đã có dữ liệu.
- Thứ tư, luôn test bằng Simulate API trước khi áp dụng pipeline vào index thật.
3.3. Giá trị mang lại
Log từ dạng text được chuẩn hóa theo ECS, có thể tìm kiếm nhất quán giữa các hệ thống. Có thể tìm kiếm theo source.ip, event.category, user.name mà không cần biết cấu trúc log gốc. Pipeline và template có thể tái sử dụng cho nhiều index khác nhau.
Bài viết liên quan
Được quan tâm
Bài viết mới