SIEM/Log Management Chuẩn hóa trường theo ECS (Elastic Common Schema)

I. YÊU CẦU BÀI LAB
1.1. Mô tả sơ bộ

Bài lab thực hiện chuẩn hóa dữ liệu log theo Elastic Common Schema (ECS) trên OpenSearch. ECS là tiêu chuẩn mở định nghĩa các tên trường chung cho dữ liệu log, giúp việc phân tích và tìm kiếm trở nên nhất quán giữa các hệ thống.

Các trường ECS cần map:​
  • @timestamp - Thời gian xảy ra sự kiện​
  • host.name - Tên máy chủ nguồn​
  • source.ip - Địa chỉ IP nguồn​
  • destination.ip - Địa chỉ IP đích​
  • event.category - Loại sự kiện​
  • event.outcome - Kết quả sự kiện​
  • user.name - Tên người dùng​
Thiết bị chuẩn bị: Máy ảo Ubuntu 20.04 với OpenSearch đã cài đặt.


1.2. Mô tả vấn đề
Log từ hệ thống thường xuất ra dạng text không có cấu trúc:
2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {"path":"/login","method":"POST"} 200 user=admin

Với dạng log này, việc tìm kiếm và phân tích gặp nhiều khó khăn. Mục tiêu là biến log text thành JSON có cấu trúc theo chuẩn ECS.


II. TRIỂN KHAI
2.1. Giải pháp

Sử dụng Ingest Pipeline của OpenSearch để parse log và map sang trường ECS, kết hợp với Index Template để định nghĩa mapping.
Luồng xử lý: Log đầu vào → Ingest Pipeline → Document theo ECS → Index Template → Index

2.2. Các bước thực hiện
Bước 1: Kiểm tra OpenSearch


Lệnh thực hiện:
curl -X GET "localhost:9200"

Kết quả trả về là JSON chứa cluster_name và version, xác nhận OpenSearch đang chạy trên port 9200.​
1779863533255.png

HÌNH 1 - Kết quả kiểm tra OpenSearch​

Bước 2: Tạo Index Template với ECS mapping
Lệnh thực hiện:

curl -X PUT "localhost:9200/_index_template/ecs-logs-template" -H 'Content-Type: application/json' -d'

{

"index_patterns": ["ecs-logs-*"],
"priority": 100,
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"default_pipeline": "ecs-pipeline"
},

"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"host.name": { "type": "keyword" },
"source.ip": { "type": "ip" },
"destination.ip": { "type": "ip" },
"event.category": { "type": "keyword" },
"event.outcome": { "type": "keyword" },
"user.name": { "type": "keyword" },
"message": { "type": "text" }
}
}
}
}

Kết quả trả về: {"acknowledged":true}​
1779863780559.png

HÌNH 2 - Kết quả tạo Index Template​
Bước 3: Tạo Ingest Pipeline map sang ECS
Lệnh thực hiện:

curl -X PUT "localhost:9200/_ingest/pipeline/ecs-pipeline" -H 'Content-Type: application/json' -d'
{
"description": "ECS pipeline - map log to ECS fields",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{TIMESTAMP_ISO8601:@timestamp} %{LOGLEVEL:event.outcome} \[%{IP:source.ip}\] %{GREEDYDATA:request} %{NUMBER:response_code:int} user=%{DATA:user.name}"]
}
},
{
"set": { "field": "event.category", "value": "web" }
},
{

"set": { "field": "host.name", "value": "web-server-01" }
},
{
"set": { "field": "destination.ip", "value": "10.0.0.1" }
},
{
"date": {
"field": "@timestamp",
"formats": ["ISO8601"],
"timezone": "UTC"
}
},
{
"remove": { "field": ["request", "response_code"], "ignore_missing": true }
}
]
}'

Kết quả trả về: {"acknowledged":true}​
1779864083046.png

HÌNH 3 - Kết quả tạo Pipeline​
Bước 4: Test pipeline với Simulate API
Lệnh thực hiện:

curl -X POST "localhost:9200/_ingest/pipeline/ecs-pipeline/_simulate" -H 'Content-Type: application/json' -d'
{
"docs": [{
"_source": {
"message": "2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {"path":"/login","method":"POST"} 200 user=admin"
}
}]
}'

Kết quả trả về JSON chứa các trường ECS: @timestamp, source.ip, event.category, event.outcome, user.name, host.name, destination.ip.​
1779865199618.png

HÌNH 4 - Kết quả simulate​
Bước 5: Index document thật
Lệnh thực hiện:

curl -X POST "localhost:9200/ecs-logs-2025.03.20/_doc" -H 'Content-Type: application/json' -d'
{
"message": "2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {"path":"/login","method":"POST"} 200 user=admin"
}'

Kết quả trả về: {"result":"created"}​
1779865428444.png

HÌNH 5 - Kết quả index document​
Bước 6: Tìm kiếm theo trường ECS - source.ip
Lệnh thực hiện:

curl -X GET "localhost:9200/ecs-logs-*/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": { "term": { "source.ip": "172.16.0.10" } }
}'

Kết quả trả về đúng 1 document có source.ip = 172.16.0.10.​
1779865817906.png

1779865904994.png

HÌNH 6 - Kết quả tìm kiếm theo source.ip​
Bước 7: Tìm kiếm theo trường ECS - event.category
Lệnh thực hiện:

curl -X GET "localhost:9200/ecs-logs-*/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": { "term": { "event.category": "web" } }
}'

Kết quả trả về đúng 1 document có event.category = web.​
1779866341388.png

1779866386239.png

HÌNH 7 - Kết quả tìm kiếm theo event.category​
III. KẾT LUẬN
3.1. Kết quả đạt được

Ingest pipeline và index template đã được tạo thành công. Pipeline sử dụng Grok để trích xuất @timestamp, source.ip, event.outcome, user.name từ log gốc, kết hợp với Set processor để thêm event.category, host.name, destination.ip. Date processor chuẩn hóa @timestamp theo ISO 8601.
Simulate API xác nhận pipeline hoạt động đúng. Một document đã được index thành công vào index ecs-logs-2025.03.20. Hai truy vấn tìm kiếm theo source.ip và event.category đều trả về kết quả chính xác.
Như vậy, cả 7 trường ECS yêu cầu đã được map hoàn chỉnh. Log từ dạng text không cấu trúc đã được chuẩn hóa thành JSON theo đúng chuẩn ECS.

3.2. Bài học kinh nghiệm
  • Thứ nhất, Grok pattern rất quan trọng. Pattern TIMESTAMP_ISO8601 và IP giúp trích xuất chính xác timestamp và địa chỉ IP.​
  • Thứ hai, Set processor hữu ích để thêm các trường tĩnh như event.category hay host.name.​
  • Thứ ba, Date processor cần đặt cuối cùng để đảm bảo @timestamp được chuẩn hóa sau khi đã có dữ liệu.​
  • Thứ tư, luôn test bằng Simulate API trước khi áp dụng pipeline vào index thật.​

3.3. Giá trị mang lại
Log từ dạng text được chuẩn hóa theo ECS, có thể tìm kiếm nhất quán giữa các hệ thống. Có thể tìm kiếm theo source.ip, event.category, user.name mà không cần biết cấu trúc log gốc. Pipeline và template có thể tái sử dụng cho nhiều index khác nhau.​
 
Được quan tâm
Back
Top