I. Yêu cầu bài Lab
1.1. Mô tả sơ bộ bài lab
Bài lab này thực hiện tạo một Ingest Pipeline trên OpenSearch. Pipeline sử dụng 5 processor: Grok, Dissect, JSON, KV và Date.
Mục đích của bài lab là parse các dòng log dạng text thành cấu trúc JSON có tổ chức, phục vụ cho việc tìm kiếm và phân tích sau này.
Thiết bị cần chuẩn bị bao gồm một máy ảo Ubuntu 22.04 với cấu hình tối thiểu 4GB RAM và 2 nhân CPU. OpenSearch phiên bản 2.x phải được cài đặt sẵn và đang chạy. Người thực hiện cần có quyền SSH vào máy ảo để thực thi các câu lệnh.
1.2. Mô tả vấn đề cần giải quyết
Log từ các hệ thống thường xuất ra dưới dạng text. Ví dụ một dòng log điển hình:
text
2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {"path":"/test","method":"DELETE"} 500 id=123&action=remove
Với dạng log text như trên, việc tìm kiếm và lọc dữ liệu gặp nhiều khó khăn. Không thể tìm tất cả log có status bằng 500, không thể lọc theo IP client, cũng không thể truy vấn theo tham số id hay action.
Mục tiêu của bài lab là biến dòng log text đó thành một document JSON có cấu trúc, tách riêng được timestamp, log level, client IP, status code, nội dung request dạng JSON và các tham số dạng key-value.
II. Triển khai
2.1. Giải pháp
Ingest Pipeline của OpenSearch được sử dụng để giải quyết vấn đề trên. Pipeline là một chuỗi các processor xử lý tuần tự.
Processor Grok: Dùng các pattern có sẵn để trích xuất timestamp ISO 8601 và log level.
Processor Dissect: Cắt chuỗi theo cấu trúc cố định để tách IP, request JSON, status code và phần key-value.
Processor JSON: Biến chuỗi JSON thành object.
Processor KV: Parse chuỗi dạng key=value phân tách bằng dấu &.
Processor Date: Chuẩn hóa timestamp thành định dạng ISO 8601 và gán vào trường @timestamp.
Pipeline chạy tuần tự theo thứ tự: Grok → Dissect → JSON → KV → Date.
2.2. Triển khai từng bước
Bước 2.2.1: Kiểm tra OpenSearch
Kiểm tra OpenSearch đã chạy hay chưa:
bash
curl -X GET "localhost:9200"
Kết quả trả về là một JSON chứa cluster_name và version nếu OpenSearch đã chạy thành công.
1.1. Mô tả sơ bộ bài lab
Bài lab này thực hiện tạo một Ingest Pipeline trên OpenSearch. Pipeline sử dụng 5 processor: Grok, Dissect, JSON, KV và Date.
Mục đích của bài lab là parse các dòng log dạng text thành cấu trúc JSON có tổ chức, phục vụ cho việc tìm kiếm và phân tích sau này.
Thiết bị cần chuẩn bị bao gồm một máy ảo Ubuntu 22.04 với cấu hình tối thiểu 4GB RAM và 2 nhân CPU. OpenSearch phiên bản 2.x phải được cài đặt sẵn và đang chạy. Người thực hiện cần có quyền SSH vào máy ảo để thực thi các câu lệnh.
1.2. Mô tả vấn đề cần giải quyết
Log từ các hệ thống thường xuất ra dưới dạng text. Ví dụ một dòng log điển hình:
text
2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {"path":"/test","method":"DELETE"} 500 id=123&action=remove
Với dạng log text như trên, việc tìm kiếm và lọc dữ liệu gặp nhiều khó khăn. Không thể tìm tất cả log có status bằng 500, không thể lọc theo IP client, cũng không thể truy vấn theo tham số id hay action.
Mục tiêu của bài lab là biến dòng log text đó thành một document JSON có cấu trúc, tách riêng được timestamp, log level, client IP, status code, nội dung request dạng JSON và các tham số dạng key-value.
II. Triển khai
2.1. Giải pháp
Ingest Pipeline của OpenSearch được sử dụng để giải quyết vấn đề trên. Pipeline là một chuỗi các processor xử lý tuần tự.
Processor Grok: Dùng các pattern có sẵn để trích xuất timestamp ISO 8601 và log level.
Processor Dissect: Cắt chuỗi theo cấu trúc cố định để tách IP, request JSON, status code và phần key-value.
Processor JSON: Biến chuỗi JSON thành object.
Processor KV: Parse chuỗi dạng key=value phân tách bằng dấu &.
Processor Date: Chuẩn hóa timestamp thành định dạng ISO 8601 và gán vào trường @timestamp.
Pipeline chạy tuần tự theo thứ tự: Grok → Dissect → JSON → KV → Date.
2.2. Triển khai từng bước
Bước 2.2.1: Kiểm tra OpenSearch
Kiểm tra OpenSearch đã chạy hay chưa:
bash
curl -X GET "localhost:9200"
Kết quả trả về là một JSON chứa cluster_name và version nếu OpenSearch đã chạy thành công.
Hình 1: Kết quả kiểm tra OpenSearch thành công
Bước 2.2.2: Tạo Ingest Pipeline
Tạo pipeline tên log-pipeline:
bash
curl -X PUT "localhost:9200/_ingest/pipeline/log-pipeline" -H 'Content-Type: application/json' -d'
{
"description": "Pipeline parse log với Grok, Dissect, JSON, KV, Date",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{TIMESTAMP_ISO8601:timestamp_raw} %{LOGLEVEL:loglevel} %{GREEDYDATA:raw_message}"]
}
},
{
"dissect": {
"field": "raw_message",
"pattern": "[%{client_ip}] %{request} %{status:int} %{kv_data}"
}
},
{
"json": {
"field": "request",
"target_field": "request_json"
}
},
{
"kv": {
"field": "kv_data",
"field_split": "&",
"value_split": "=",
"target_field": "query_params"
}
},
{
"date": {
"field": "timestamp_raw",
"formats": ["ISO8601"],
"target_field": "@timestamp",
"timezone": "UTC"
}
}
]
}'
Kết quả trả về nếu thành công:
json
{
"acknowledged": true
}
Tạo pipeline tên log-pipeline:
bash
curl -X PUT "localhost:9200/_ingest/pipeline/log-pipeline" -H 'Content-Type: application/json' -d'
{
"description": "Pipeline parse log với Grok, Dissect, JSON, KV, Date",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{TIMESTAMP_ISO8601:timestamp_raw} %{LOGLEVEL:loglevel} %{GREEDYDATA:raw_message}"]
}
},
{
"dissect": {
"field": "raw_message",
"pattern": "[%{client_ip}] %{request} %{status:int} %{kv_data}"
}
},
{
"json": {
"field": "request",
"target_field": "request_json"
}
},
{
"kv": {
"field": "kv_data",
"field_split": "&",
"value_split": "=",
"target_field": "query_params"
}
},
{
"date": {
"field": "timestamp_raw",
"formats": ["ISO8601"],
"target_field": "@timestamp",
"timezone": "UTC"
}
}
]
}'
Kết quả trả về nếu thành công:
json
{
"acknowledged": true
}
Hình 2a: Pipeline đã được tạo thành công
Bước 2.2.3: Lưu ý về cấu hình Dissect
Một điểm quan trọng trong pattern dissect là phần %{kv_data} ở cuối. Phần này dùng để tách riêng dữ liệu key-value ra một trường độc lập. Nếu thiếu, KV processor sẽ parse sai toàn bộ dữ liệu.
Một điểm quan trọng trong pattern dissect là phần %{kv_data} ở cuối. Phần này dùng để tách riêng dữ liệu key-value ra một trường độc lập. Nếu thiếu, KV processor sẽ parse sai toàn bộ dữ liệu.
Hình 2b: Sơ đồ luồng xử lý pipeline
Bước 2.2.4: Test pipeline bằng Simulate API
Tạo file dữ liệu test test-data.json:
bash
cat > test-data.json << 'EOF'
{
"docs": [
{
"_source": {
"message": "2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {\"path\":\"/test\",\"method\":\"DELETE\"} 500 id=123&action=remove"
}
}
]
}
EOF
Chạy simulate:
bash
curl -X POST "localhost:9200/_ingest/pipeline/log-pipeline/_simulate" -H 'Content-Type: application/json' -d @test-data.json
Kết quả trả về là một JSON. Trong phần _source sẽ xuất hiện các trường mới:
Tạo file dữ liệu test test-data.json:
bash
cat > test-data.json << 'EOF'
{
"docs": [
{
"_source": {
"message": "2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {\"path\":\"/test\",\"method\":\"DELETE\"} 500 id=123&action=remove"
}
}
]
}
EOF
Chạy simulate:
bash
curl -X POST "localhost:9200/_ingest/pipeline/log-pipeline/_simulate" -H 'Content-Type: application/json' -d @test-data.json
Kết quả trả về là một JSON. Trong phần _source sẽ xuất hiện các trường mới:
Hình 3: Kết quả simulate pipeline - các trường đã được parse đúng
timestamp_raw, loglevel, raw_message do Grok tạo.
client_ip, request, status, kv_data do Dissect tạo.
request_json do JSON processor tạo.
query_params do KV processor tạo.
@timestamp do Date processor tạo.
Kiểm tra: Trường status phải là số 500 (không phải chuỗi). Trường query_params phải có hai trường con id và action.
Bước 2.2.5: Áp dụng pipeline vào index
Tạo index và gán pipeline làm mặc định:
bash
curl -X PUT "localhost:9200/my-logs-index" -H 'Content-Type: application/json' -d'
{
"settings": {
"default_pipeline": "log-pipeline"
}
}'
Index một document thật:
bash
curl -X POST "localhost:9200/my-logs-index/_doc" -H 'Content-Type: application/json' -d'
{
"message": "2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {\"path\":\"/test\",\"method\":\"DELETE\"} 500 id=123&action=remove"
}'
Tìm kiếm lại để xác nhận:
client_ip, request, status, kv_data do Dissect tạo.
request_json do JSON processor tạo.
query_params do KV processor tạo.
@timestamp do Date processor tạo.
Kiểm tra: Trường status phải là số 500 (không phải chuỗi). Trường query_params phải có hai trường con id và action.
Bước 2.2.5: Áp dụng pipeline vào index
Tạo index và gán pipeline làm mặc định:
bash
curl -X PUT "localhost:9200/my-logs-index" -H 'Content-Type: application/json' -d'
{
"settings": {
"default_pipeline": "log-pipeline"
}
}'
Index một document thật:
bash
curl -X POST "localhost:9200/my-logs-index/_doc" -H 'Content-Type: application/json' -d'
{
"message": "2025-03-20T15:30:45.111Z WARNING [172.16.0.10] {\"path\":\"/test\",\"method\":\"DELETE\"} 500 id=123&action=remove"
}'
Tìm kiếm lại để xác nhận:
Hình 4: Document trong index đã được pipeline parse hoàn chỉnh
bash
curl -X GET "localhost:9200/my-logs-index/_search?pretty"
Kết quả cho thấy document đã được parse thành JSON với đầy đủ các trường.
Bước 2.2.6: Kiểm tra khả năng tìm kiếm
Tìm kiếm log có status bằng 500:
bash
curl -X GET "localhost:9200/my-logs-index/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"term": {"status": 500}
}
}'
Lệnh này trả về đúng document vừa index, chứng tỏ trường status đã được trích xuất đúng kiểu số và có thể dùng để tìm kiếm.
Hình 5 - Kết quả tìm kiếm log có status=500, chứng tỏ trường status đã được parse đúng kiểu số
III. Kết luận
3.1. Kết quả đạt được
Ingest Pipeline với tên gọi log-pipeline đã được tạo thành công trên OpenSearch. Pipeline này bao gồm đầy đủ 5 processor theo yêu cầu: Grok, Dissect, JSON, KV và Date.
Processor Grok: Hoạt động với pattern %{TIMESTAMP_ISO8601:timestamp_raw} %{LOGLEVEL:loglevel} %{GREEDYDATA:raw_message}. Pattern này đã trích xuất chính xác trường timestamp_raw từ phần đầu của log, loglevel là WARNING từ phần thứ hai, và toàn bộ phần còn lại của log được lưu vào raw_message.
Processor Dissect: Sử dụng pattern [%{client_ip}] %{request} %{status:int} %{kv_data}. Pattern này đã tách thành công raw_message thành bốn thành phần: client_ip là 172.16.0.10, request là chuỗi JSON {"path":"/test","method":"DELETE"}, status là số 500, và kv_data là phần chuỗi key-value còn lại id=123&action=remove.
Processor JSON: Nhận trường request và biến nó thành một object với tên request_json. Sau processor này, có thể truy cập trực tiếp vào request_json.path và request_json.method.
Processor KV: Nhận trường kv_data và parse với cấu hình field_split là dấu & và value_split là dấu =. Kết quả tạo ra trường query_params là object chứa id: 123 và action: remove.
Processor Date: Nhận trường timestamp_raw và chuẩn hóa thành định dạng ISO 8601, ghi vào trường @timestamp - trường thời gian chuẩn mà OpenSearch hiểu ngầm.
Sau khi tạo pipeline, API Simulate được sử dụng để kiểm tra. Kết quả cho thấy tất cả các trường đều được parse đúng. Pipeline đã được gán vào index my-logs-index thông qua cấu hình default_pipeline và hoạt động ổn định.
3.2. Bài học kinh nghiệm
Ba bài học chính được rút ra từ bài lab này:
Thứ tự processor rất quan trọng. Thứ tự đúng là Grok → Dissect → JSON → KV → Date. Đặt Date ở cuối để đảm bảo @timestamp là kết quả cuối cùng.
Pattern của Dissect cần tách riêng phần kv_data thành một trường độc lập. Nếu không, KV processor sẽ parse sai toàn bộ dữ liệu.
Luôn sử dụng Simulate API trước khi áp dụng pipeline vào index thật để test và debug.
3.3. Giá trị mang lại
Log từ dạng text được biến thành JSON có cấu trúc. Có thể tìm kiếm log theo status code, lọc theo client IP, hoặc truy vấn theo bất kỳ tham số nào trong query_params. Pipeline có thể được export ra file JSON và tái sử dụng cho nhiều index khác nhau.
3.1. Kết quả đạt được
Ingest Pipeline với tên gọi log-pipeline đã được tạo thành công trên OpenSearch. Pipeline này bao gồm đầy đủ 5 processor theo yêu cầu: Grok, Dissect, JSON, KV và Date.
Processor Grok: Hoạt động với pattern %{TIMESTAMP_ISO8601:timestamp_raw} %{LOGLEVEL:loglevel} %{GREEDYDATA:raw_message}. Pattern này đã trích xuất chính xác trường timestamp_raw từ phần đầu của log, loglevel là WARNING từ phần thứ hai, và toàn bộ phần còn lại của log được lưu vào raw_message.
Processor Dissect: Sử dụng pattern [%{client_ip}] %{request} %{status:int} %{kv_data}. Pattern này đã tách thành công raw_message thành bốn thành phần: client_ip là 172.16.0.10, request là chuỗi JSON {"path":"/test","method":"DELETE"}, status là số 500, và kv_data là phần chuỗi key-value còn lại id=123&action=remove.
Processor JSON: Nhận trường request và biến nó thành một object với tên request_json. Sau processor này, có thể truy cập trực tiếp vào request_json.path và request_json.method.
Processor KV: Nhận trường kv_data và parse với cấu hình field_split là dấu & và value_split là dấu =. Kết quả tạo ra trường query_params là object chứa id: 123 và action: remove.
Processor Date: Nhận trường timestamp_raw và chuẩn hóa thành định dạng ISO 8601, ghi vào trường @timestamp - trường thời gian chuẩn mà OpenSearch hiểu ngầm.
Sau khi tạo pipeline, API Simulate được sử dụng để kiểm tra. Kết quả cho thấy tất cả các trường đều được parse đúng. Pipeline đã được gán vào index my-logs-index thông qua cấu hình default_pipeline và hoạt động ổn định.
3.2. Bài học kinh nghiệm
Ba bài học chính được rút ra từ bài lab này:
Thứ tự processor rất quan trọng. Thứ tự đúng là Grok → Dissect → JSON → KV → Date. Đặt Date ở cuối để đảm bảo @timestamp là kết quả cuối cùng.
Pattern của Dissect cần tách riêng phần kv_data thành một trường độc lập. Nếu không, KV processor sẽ parse sai toàn bộ dữ liệu.
Luôn sử dụng Simulate API trước khi áp dụng pipeline vào index thật để test và debug.
3.3. Giá trị mang lại
Log từ dạng text được biến thành JSON có cấu trúc. Có thể tìm kiếm log theo status code, lọc theo client IP, hoặc truy vấn theo bất kỳ tham số nào trong query_params. Pipeline có thể được export ra file JSON và tái sử dụng cho nhiều index khác nhau.