I. MÔ TẢ YÊU CẦU CỦA BÀI LAB
1.1. Mô tả sơ bộ bài lab
Bài lab tập trung vào việc điều tra một sự cố an ninh mạng bằng cách truy vết toàn bộ hoạt động của một địa chỉ IP cụ thể. Mục tiêu là sử dụng OpenSearch để truy vấn log, xây dựng timeline chi tiết các hành động của kẻ tấn công, sau đó xuất báo cáo và evidence log phục vụ công tác điều tra.
Thiết bị và công nghệ sử dụng:
1.1. Mô tả sơ bộ bài lab
Bài lab tập trung vào việc điều tra một sự cố an ninh mạng bằng cách truy vết toàn bộ hoạt động của một địa chỉ IP cụ thể. Mục tiêu là sử dụng OpenSearch để truy vấn log, xây dựng timeline chi tiết các hành động của kẻ tấn công, sau đó xuất báo cáo và evidence log phục vụ công tác điều tra.
Thiết bị và công nghệ sử dụng:
- Ubuntu 20.04 LTS
- OpenSearch và OpenSearch Dashboards (Docker)
- Python 3
- PPL (Piped Processing Language) và DSL (Domain Specific Language)
- Thư viện ReportLab để tạo PDF
1.2. Mô tả vấn đề cần giải quyết
Khi phát hiện một cuộc tấn công, điều tra viên cần trả lời các câu hỏi: Kẻ tấn công đã làm gì? Vào thời gian nào? Từ đâu? Bài lab này giải quyết các vấn đề sau:
Khi phát hiện một cuộc tấn công, điều tra viên cần trả lời các câu hỏi: Kẻ tấn công đã làm gì? Vào thời gian nào? Từ đâu? Bài lab này giải quyết các vấn đề sau:
- Truy vết toàn bộ hoạt động của một IP trong hệ thống.
- Xây dựng timeline sự cố từ log.
- Trích xuất evidence log để lưu trữ.
- Tạo báo cáo PDF phục vụ điều tra.
II. GIẢI PHÁP
2.1. Kiến trúc tổng thể
Hệ thống sử dụng OpenSearch làm kho lưu trữ log. Dữ liệu log được truy vấn bằng PPL và DSL trên Dev Tools. Script Python thực hiện xuất file CSV và báo cáo PDF.
2.1. Kiến trúc tổng thể
Hệ thống sử dụng OpenSearch làm kho lưu trữ log. Dữ liệu log được truy vấn bằng PPL và DSL trên Dev Tools. Script Python thực hiện xuất file CSV và báo cáo PDF.
Hình 1: Kiến trúc hệ thống điều tra sự cố.
2.2. Triển khai chi tiết
2.2.1. Chuẩn bị dữ liệu
Trước khi điều tra, cần có dữ liệu log trong OpenSearch. Sử dụng dữ liệu từ bài Web Attack:
bash
cd /opt/web-detection
python3 scripts/generate_web_logs.py
python3 scripts/detect_web_attacks.py
Kiểm tra dữ liệu:
bash
curl -s "http://localhost:9200/web-attacks-*/_count"
Kết quả: 1533 web attacks đã được ghi nhận.
2.2.2. Truy vấn DSL trên Dev Tools
Mở OpenSearch Dashboards → Dev Tools → Console, chạy các câu truy vấn sau:
Đếm số lượng log của IP cần điều tra:
json
GET /web-attacks-2026.06.18/_count
{
"query": {
"term": {
"src_ip": "192.168.1.100"
}
}
}
Kết quả: 492 logs
Lấy toàn bộ timeline của IP:
json
GET /web-attacks-2026.06.18/_search
{
"query": {
"term": {
"src_ip": "192.168.1.100"
}
},
"sort": [{"timestamp": "asc"}],
"size": 50
}
Thống kê các loại tấn công:
json
GET /web-attacks-2026.06.18/_search
{
"size": 0,
"query": {"term": {"src_ip": "192.168.1.100"}},
"aggs": {
"by_attack_type": {
"terms": {"field": "attack_type.keyword"}
}
}
}
Thống kê theo mức độ nghiêm trọng:
json
GET /web-attacks-2026.06.18/_search
{
"size": 0,
"query": {"term": {"src_ip": "192.168.1.100"}},
"aggs": {
"by_severity": {
"terms": {"field": "severity.keyword"}
}
}
}
Lọc log trong khoảng thời gian cụ thể:
json
GET /web-attacks-2026.06.18/_search
{
"query": {
"bool": {
"must": [
{"term": {"src_ip": "192.168.1.100"}},
{
"range": {
"timestamp": {
"gte": "2026-06-18T17:39:00",
"lte": "2026-06-18T17:40:00"
}
}
}
]
}
},
"sort": [{"timestamp": "asc"}],
"size": 50
}
Hình 2: Kết quả truy vấn DSL trên Dev Tools.
2.2.3. Truy vấn PPL trên Dev Tools
Chuyển từ Console sang PPL (dropdown góc trái trên):
Timeline của IP:
sql
source = web-attacks-2026.06.18
| where src_ip = "192.168.1.100"
| sort timestamp asc
| fields timestamp, src_ip, attack_type, severity
Thống kê loại tấn công:
sql
source = web-attacks-2026.06.18
| where src_ip = "192.168.1.100"
| stats count() by attack_type
Top 10 IP tấn công nhiều nhất:
sql
source = web-attacks-2026.06.18
| stats count() by src_ip
| sort count() desc
| head 10
Hình 3: Kết quả truy vấn PPL.
2.2.4. Xuất Evidence Log
Tạo script Python để xuất log ra file CSV:
python
import requests
import csv
import os
from datetime import datetime
IP_TARGET = "192.168.1.100"
OPENSEARCH = "http://localhost:9200"
OUTPUT_DIR = "/opt/incident-response"
os.makedirs(OUTPUT_DIR, exist_ok=True)
query = {
"query": {"term": {"src_ip": IP_TARGET}},
"sort": [{"timestamp": "asc"}],
"size": 1000
}
resp = requests.post(f"{OPENSEARCH}/web-attacks-2026.06.18/_search", json=query)
logs = [hit['_source'] for hit in resp.json()['hits']['hits']]
csv_file = f"{OUTPUT_DIR}/evidence_{IP_TARGET}.csv"
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=logs[0].keys())
writer.writeheader()
writer.writerows(logs)
print(f"Exported {len(logs)} logs to {csv_file}")
Chạy script:
bash
python3 /opt/incident-response/export_evidence.py
Kết quả: File evidence_192.168.1.100.csv với 492 dòng log.
Hình 4: Evidence log xuất ra file CSV.
2.2.5. Tạo Report PDF
Sử dụng ReportLab để tạo báo cáo PDF:
python
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.lib import colors
pdf_file = f"{OUTPUT_DIR}/Incident_Report_{IP_TARGET}.pdf"
doc = SimpleDocTemplate(pdf_file, pagesize=A4)
story = []
# Tiêu đề
title = ParagraphStyle('Title', parent=styles['Title'], fontSize=16, alignment=1)
story.append(Paragraph("INCIDENT INVESTIGATION REPORT", title))
story.append(Spacer(1, 0.2*inch))
# Thông tin
story.append(Paragraph(f"<b>Investigated IP:</b> {IP_TARGET}", styles['Normal']))
story.append(Paragraph(f"<b>Report Date:</b> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", styles['Normal']))
story.append(Paragraph(f"<b>Total Evidences:</b> {len(logs)}", styles['Normal']))
story.append(Spacer(1, 0.3*inch))
# Bảng dữ liệu
data = [["STT", "Time", "Attack", "Severity", "Method", "URI"]]
for i, log in enumerate(logs[:20]):
data.append([
str(i+1),
log.get('timestamp', '')[:16],
log.get('attack_type', 'N/A'),
log.get('severity', 'N/A'),
log.get('method', 'N/A'),
log.get('uri', 'N/A')[:40] + "..."
])
table = Table(data, colWidths=[0.6*inch, 1.5*inch, 1.8*inch, 1.2*inch, 0.8*inch, 2*inch])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 9),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(table)
story.append(Spacer(1, 0.3*inch))
# Kết luận
high_count = len([l for l in logs if l.get('severity') == 'HIGH'])
critical_count = len([l for l in logs if l.get('severity') == 'CRITICAL'])
summary = f"IP {IP_TARGET} performed {len(logs)} attacks. "
summary += f"HIGH: {high_count}, CRITICAL: {critical_count}. "
summary += f"Attacks include SQL Injection, XSS, and Log4Shell (RCE). Action: Blocked."
story.append(Paragraph(summary, styles['Normal']))
doc.build(story)
print(f"Report generated: {pdf_file}")
Kết quả: File Incident_Report_192.168.1.100.pdf.
Hình 5: Báo cáo PDF đã tạo thành công.
2.2.6. Dashboard Timeline
Tạo Dashboard hiển thị timeline sự cố:
Menu ☰ → Dashboard → Create dashboard
Add → Create visualization → Line
Index: web-attacks-*
X-axis: timestamp (Date Histogram)
Y-axis: Count
Add filter: src_ip = "192.168.1.100"
Save: Timeline - 192.168.1.100
Save Dashboard: Incident Timeline Dashboard
Hình 6: Dashboard timeline điều tra sự cố.
III. KẾT LUẬN
Sau khi triển khai, hệ thống đã đáp ứng được các yêu cầu đề ra:
2.2. Triển khai chi tiết
2.2.1. Chuẩn bị dữ liệu
Trước khi điều tra, cần có dữ liệu log trong OpenSearch. Sử dụng dữ liệu từ bài Web Attack:
bash
cd /opt/web-detection
python3 scripts/generate_web_logs.py
python3 scripts/detect_web_attacks.py
Kiểm tra dữ liệu:
bash
curl -s "http://localhost:9200/web-attacks-*/_count"
Kết quả: 1533 web attacks đã được ghi nhận.
2.2.2. Truy vấn DSL trên Dev Tools
Mở OpenSearch Dashboards → Dev Tools → Console, chạy các câu truy vấn sau:
Đếm số lượng log của IP cần điều tra:
json
GET /web-attacks-2026.06.18/_count
{
"query": {
"term": {
"src_ip": "192.168.1.100"
}
}
}
Kết quả: 492 logs
Lấy toàn bộ timeline của IP:
json
GET /web-attacks-2026.06.18/_search
{
"query": {
"term": {
"src_ip": "192.168.1.100"
}
},
"sort": [{"timestamp": "asc"}],
"size": 50
}
Thống kê các loại tấn công:
json
GET /web-attacks-2026.06.18/_search
{
"size": 0,
"query": {"term": {"src_ip": "192.168.1.100"}},
"aggs": {
"by_attack_type": {
"terms": {"field": "attack_type.keyword"}
}
}
}
Thống kê theo mức độ nghiêm trọng:
json
GET /web-attacks-2026.06.18/_search
{
"size": 0,
"query": {"term": {"src_ip": "192.168.1.100"}},
"aggs": {
"by_severity": {
"terms": {"field": "severity.keyword"}
}
}
}
Lọc log trong khoảng thời gian cụ thể:
json
GET /web-attacks-2026.06.18/_search
{
"query": {
"bool": {
"must": [
{"term": {"src_ip": "192.168.1.100"}},
{
"range": {
"timestamp": {
"gte": "2026-06-18T17:39:00",
"lte": "2026-06-18T17:40:00"
}
}
}
]
}
},
"sort": [{"timestamp": "asc"}],
"size": 50
}
Hình 2: Kết quả truy vấn DSL trên Dev Tools.
2.2.3. Truy vấn PPL trên Dev Tools
Chuyển từ Console sang PPL (dropdown góc trái trên):
Timeline của IP:
sql
source = web-attacks-2026.06.18
| where src_ip = "192.168.1.100"
| sort timestamp asc
| fields timestamp, src_ip, attack_type, severity
Thống kê loại tấn công:
sql
source = web-attacks-2026.06.18
| where src_ip = "192.168.1.100"
| stats count() by attack_type
Top 10 IP tấn công nhiều nhất:
sql
source = web-attacks-2026.06.18
| stats count() by src_ip
| sort count() desc
| head 10
Hình 3: Kết quả truy vấn PPL.
2.2.4. Xuất Evidence Log
Tạo script Python để xuất log ra file CSV:
python
import requests
import csv
import os
from datetime import datetime
IP_TARGET = "192.168.1.100"
OPENSEARCH = "http://localhost:9200"
OUTPUT_DIR = "/opt/incident-response"
os.makedirs(OUTPUT_DIR, exist_ok=True)
query = {
"query": {"term": {"src_ip": IP_TARGET}},
"sort": [{"timestamp": "asc"}],
"size": 1000
}
resp = requests.post(f"{OPENSEARCH}/web-attacks-2026.06.18/_search", json=query)
logs = [hit['_source'] for hit in resp.json()['hits']['hits']]
csv_file = f"{OUTPUT_DIR}/evidence_{IP_TARGET}.csv"
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=logs[0].keys())
writer.writeheader()
writer.writerows(logs)
print(f"Exported {len(logs)} logs to {csv_file}")
Chạy script:
bash
python3 /opt/incident-response/export_evidence.py
Kết quả: File evidence_192.168.1.100.csv với 492 dòng log.
Hình 4: Evidence log xuất ra file CSV.
2.2.5. Tạo Report PDF
Sử dụng ReportLab để tạo báo cáo PDF:
python
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.lib import colors
pdf_file = f"{OUTPUT_DIR}/Incident_Report_{IP_TARGET}.pdf"
doc = SimpleDocTemplate(pdf_file, pagesize=A4)
story = []
# Tiêu đề
title = ParagraphStyle('Title', parent=styles['Title'], fontSize=16, alignment=1)
story.append(Paragraph("INCIDENT INVESTIGATION REPORT", title))
story.append(Spacer(1, 0.2*inch))
# Thông tin
story.append(Paragraph(f"<b>Investigated IP:</b> {IP_TARGET}", styles['Normal']))
story.append(Paragraph(f"<b>Report Date:</b> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", styles['Normal']))
story.append(Paragraph(f"<b>Total Evidences:</b> {len(logs)}", styles['Normal']))
story.append(Spacer(1, 0.3*inch))
# Bảng dữ liệu
data = [["STT", "Time", "Attack", "Severity", "Method", "URI"]]
for i, log in enumerate(logs[:20]):
data.append([
str(i+1),
log.get('timestamp', '')[:16],
log.get('attack_type', 'N/A'),
log.get('severity', 'N/A'),
log.get('method', 'N/A'),
log.get('uri', 'N/A')[:40] + "..."
])
table = Table(data, colWidths=[0.6*inch, 1.5*inch, 1.8*inch, 1.2*inch, 0.8*inch, 2*inch])
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 9),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(table)
story.append(Spacer(1, 0.3*inch))
# Kết luận
high_count = len([l for l in logs if l.get('severity') == 'HIGH'])
critical_count = len([l for l in logs if l.get('severity') == 'CRITICAL'])
summary = f"IP {IP_TARGET} performed {len(logs)} attacks. "
summary += f"HIGH: {high_count}, CRITICAL: {critical_count}. "
summary += f"Attacks include SQL Injection, XSS, and Log4Shell (RCE). Action: Blocked."
story.append(Paragraph(summary, styles['Normal']))
doc.build(story)
print(f"Report generated: {pdf_file}")
Kết quả: File Incident_Report_192.168.1.100.pdf.
Hình 5: Báo cáo PDF đã tạo thành công.
2.2.6. Dashboard Timeline
Tạo Dashboard hiển thị timeline sự cố:
Menu ☰ → Dashboard → Create dashboard
Add → Create visualization → Line
Index: web-attacks-*
X-axis: timestamp (Date Histogram)
Y-axis: Count
Add filter: src_ip = "192.168.1.100"
Save: Timeline - 192.168.1.100
Save Dashboard: Incident Timeline Dashboard
Hình 6: Dashboard timeline điều tra sự cố.
III. KẾT LUẬN
Sau khi triển khai, hệ thống đã đáp ứng được các yêu cầu đề ra:
- Truy vết timeline của IP 192.168.1.100 trong khoảng thời gian từ 2026-06-18 17:39:13 đến 17:40:xx, thu được 492 log.
- Sử dụng PPL và DSL để lọc và sắp xếp log theo thời gian, giúp xác định chuỗi sự kiện của kẻ tấn công.
- Xuất evidence log ra file CSV với 492 dòng, sẵn sàng cho điều tra chuyên sâu.
- Tạo báo cáo PDF với đầy đủ thông tin IP, thời gian, số lượng log và bảng tóm tắt các hoạt động.
- Kết quả điều tra cho thấy IP 192.168.1.100 đã thực hiện 492 cuộc tấn công, bao gồm Log4Shell (RCE), SQL Injection và XSS, với mức độ HIGH và CRITICAL. IP này đã bị block tự động bởi hệ thống.
- Hệ thống cho thấy hiệu quả của việc sử dụng OpenSearch kết hợp với PPL/DSL trong điều tra sự cố, giúp kỹ sư SOC nhanh chóng xác định timeline và trích xuất evidence phục vụ báo cáo.