Trong quản trị hệ thống và bảo mật (SOC), việc thu thập log là chưa đủ. Dữ liệu thô cần được phân tích định dạng và bóc tách thành các trường dữ liệu có cấu trúc để có thể thực hiện truy vấn và vẽ biểu đồ giám sát.
PUT _ingest/pipeline/ssh-parser-pipeline{ "description": "Pipeline bóc tách log SSH", "processors": [ { "grok": { "field": "message", "patterns": ["%{SYSLOGTIMESTAMP:time} %{SYSLOGHOST:host} %{WORD
rogram}: %{WORD:action} password for %{USER:user} from %{IP:ip}"] } } ]}
1. Phân tích và Nhận dạng định dạng Log (Log Format Analysis)
Dựa trên dữ liệu thực tế thu thập được trong mục Discover, chúng ta tiến hành phân tích các nguồn log hiện có:- Nguồn log từ Windows (Winlogbeat): Đã được agent tự động chuyển đổi sang định dạng có cấu trúc (ECS - Elastic Common Schema).
- Nguồn log từ Linux (SSH/Syslog): Đây là dữ liệu không cấu trúc dưới dạng văn bản thuần túy (Plain text).
- Xác định Format: Qua quan sát, log SSH thuộc định dạng Syslog RFC3164.
- Dấu hiệu nhận biết: Chứa mốc thời gian (Timestamp), tên máy chủ (ubuntu) và tên ứng dụng (sshd).
- Nội dung: Chứa các thông tin quan trọng như hành động (Failed password), tên người dùng (root) và địa chỉ IP nguồn (192.168.1.1).
2. Lựa chọn Parser phù hợp
Sau khi đánh giá các công cụ (Logstash, Vector, Ingest Pipeline), tôi quyết định lựa chọn: Ingest Pipeline của OpenSearch.- Lý do: Tích hợp sẵn, cấu hình trực tiếp qua giao diện web, nhẹ và dễ dàng quản lý thông qua Dev Tools.
- Công nghệ sử dụng: Bộ lọc Grok để khớp các mẫu văn bản và tách chúng thành các trường dữ liệu riêng biệt.
3. Thực hiện bóc tách dữ liệu bằng Grok
Để đảm bảo bộ giải mã hoạt động chính xác trước khi áp dụng chính thức, tôi thực hiện qua các bước sau trong Management > Dev Tools:Bước 1: Truy cập công cụ Dev Tools
Chúng ta sử dụng giao diện console để viết các mã lệnh bóc tách.Bước 2: Thử nghiệm giải mã (Simulation)
Sử dụng API _simulate để kiểm tra mẫu Grok. Tại đây, tôi đã viết một mẫu (pattern) bao gồm các biến như %{SYSLOGTIMESTAMP:time}, %{USER:user}, và %{IP:ip}.- Lưu ý kỹ thuật: Cần đặt con trỏ chuột bên trong khối lệnh trước khi nhấn nút Play để tránh lỗi "No request selected".
Bước 3: Kết quả thực thi
Hệ thống trả về kết quả thành công (Status 200). Dòng log thô đã được tách thành các trường dữ liệu sạch:- ip: 192.168.1.1
- user: root
- action: Failed
4. Lưu cấu hình Pipeline chính thức
Sau khi thử nghiệm thành công, tôi sử dụng lệnh PUT để lưu Pipeline với tên ssh-parser-pipeline.PUT _ingest/pipeline/ssh-parser-pipeline{ "description": "Pipeline bóc tách log SSH", "processors": [ { "grok": { "field": "message", "patterns": ["%{SYSLOGTIMESTAMP:time} %{SYSLOGHOST:host} %{WORD
5. Kết luận
Việc hoàn thành task Phân tích và nhận dạng định dạng log mang lại ý nghĩa cực kỳ quan trọng:- Tối ưu hóa tìm kiếm: Giờ đây có thể tìm kiếm nhanh theo IP hoặc Username thay vì phải tìm kiếm văn bản tự do trong trường message.
- Trực quan hóa: Các trường dữ liệu ip, user mới được tách ra chính là nguyên liệu để xây dựng các biểu đồ "Top IP tấn công" trên Dashboard.
- Hệ thống hóa: Quy trình này có thể áp dụng tương tự cho các nguồn log khác như JSON, CEF hoặc LEEF bằng cách thay đổi mẫu Grok tương ứng.