SIEM/Log Management [SIEM/Log Management] Parsing log Sophos Firewall trên Vector theo ECS

Sau khi đã thu thập được log và parsing cơ bản: https://securityzone.vn/t/siem-log-management-parsing-log-sophos-firewall-tren-vector.12619/
Thực hiện parsing theo ECS scheme: https://www.elastic.co/docs/reference/ecs
File config vector:
data_dir: "/vector-data-dir"
api:
enabled: true
address: "0.0.0.0:8686"
sources:
wazuh_logs:
type: file
include:
- "/var/log/wazuh/archives.json"
ignore_older: 0
sophos_syslog:
type: socket
mode: udp
address: "0.0.0.0:514"
transforms:
# Parse Wazuh JSON logs
parse_json_message:
type: remap
inputs: ["wazuh_logs"]
source: |
. = parse_json!(.message)
ensure_index_and_tags:
type: remap
inputs: ["parse_json_message"]
source: |
if exists(.agent.labels.group) && is_string(.agent.labels.group) {
.index_name = "wazuh-" + downcase!(.agent.labels.group)
} else {
.index_name = "wazuh-ungrouped"
}
.received_at = now()
.source_type = "wazuh"

# Parse Sophos Firewall logs → ECS mapping
parse_sophos_logs:
type: remap
inputs: ["sophos_syslog"]
source: |
.event.created = .timestamp
del(.timestamp)
.ecs.version = "9.1.0"
.index_name = "sophos_firewall"
.source_type = "sophos"
match = parse_regex(.message, r'^<(?P<pri>[0-9]+)>(?P<rest>.*)') ?? {}
if exists(match.pri) {
.log.syslog.priority = to_int!(match.pri)
}
if exists(match.rest) {
kv = parse_key_value!(match.rest, field_delimiter: " ", key_value_delimiter: "=")
.parsed = kv
}
."@timestamp" = parse_timestamp!(.parsed.timestamp, format: "%+")
del(.parsed.timestamp)
.tags = ["sophos"]
if exists(.parsed.device_name) {
.observer.name = .parsed.device_name
del(.parsed.device_name)
}
if exists(.parsed.device_model) {
.observer.product = .parsed.device_model
del(.parsed.device_model)
}
if exists(.parsed.device_serial_id) {
.observer.serial_number = .parsed.device_serial_id
del(.parsed.device_serial_id)
}
.observer.vendor = "Sophos"
.observer.type = "firewall"

if exists(.parsed.log_id) {
.event.code = .parsed.log_id
del(.parsed.log_id)
}
if exists(.parsed.log_type) {
.event.category = .parsed.log_type
}
if exists(.parsed.log_component) {
.event.dataset = .parsed.log_component
del(.parsed.log_component)
}
if exists(.parsed.log_subtype) {
.event.action = .parsed.log_subtype
del(.parsed.log_subtype)
}
if exists(.parsed.log_version) {
.observer.version = .parsed.log_version
del(.parsed.log_version)
}
if exists(.host) {
.observer.ip = .host
del(.host)
}
if exists(.parsed.status) {
.event.outcome = downcase!(.parsed.status)
del(.parsed.status)
}
if exists(.parsed.message) {
.event.message = .parsed.message
del(.parsed.message)
}
if exists(.message) {
.event.original = .message
del(.message)
}
if exists(.parsed.severity) {
.log.syslog.severity.name = .parsed.severity
sev = downcase!(.parsed.severity)
.log.level = sev
if sev == "emergency" {
.event.severity = 0
} else if sev == "alert" {
.event.severity = 1
} else if sev == "critical" {
.event.severity = 2
} else if sev == "error" {
.event.severity = 3
} else if sev == "warning" {
.event.severity = 4
} else if sev == "notice" {
.event.severity = 5
} else if sev == "information" || sev == "info" {
.event.severity = 6
} else if sev == "debug" {
.event.severity = 7
} else {
.event.severity = 6
}
del(.parsed.severity)
}
if exists(.parsed.app_resolved_by) {
.app.resolved_by = .parsed.app_resolved_by
del(.parsed.app_resolved_by)
}
if exists(.parsed.app_is_cloud) {
.app.is_cloud = if .parsed.app_is_cloud == "TRUE" { true } else { false }
del(.parsed.app_is_cloud)
}
if exists(.parsed.hb_status) {
.hb.status = .parsed.hb_status
del(.parsed.hb_status)
}
if exists(.parsed.qualifier) {
.event.qualifier = .parsed.qualifier
del(.parsed.qualifier)
}
if exists(.parsed.log_occurrence) {
.event.sequence = .parsed.log_occurrence
del(.parsed.log_occurrence)
}
# User fields
if exists(.parsed.user) {
.user.name = .parsed.user
del(.parsed.user)
}
if exists(.parsed.user_group) {
.user.group = .parsed.user_group
del(.parsed.user_group)
}
if exists(.parsed.protocol) {
.network.transport.protocol = downcase!(.parsed.protocol)
del(.parsed.protocol)
}
# Source fields
if exists(.parsed.src_ip) {
.source.ip = .parsed.src_ip
del(.parsed.src_ip)
}
if exists(.parsed.src_country) {
.source.country = .parsed.src_country
del(.parsed.src_country)
}
if exists(.parsed.src_port) {
.source.port = to_int!(.parsed.src_port)
del(.parsed.src_port)
}
if exists(.parsed.src_trans_ip) {
.source.translated.ip = .parsed.src_trans_ip
del(.parsed.src_trans_ip)
}
if exists(.parsed.src_trans_port) {
.source.translated.port = to_int!(.parsed.src_trans_port)
del(.parsed.src_trans_port)
}
if exists(.parsed.src_zone) {
.source.zone = .parsed.src_zone
del(.parsed.src_zone)
}
if exists(.parsed.src_zone_type) {
.source.zone.type = .parsed.src_zone_type
del(.parsed.src_zone_type)
}
if exists(.parsed.src_mac) {
.source.mac = .parsed.src_mac
del(.parsed.src_mac)
}
# Destination fields
if exists(.parsed.dst_ip) {
.destination.ip = .parsed.dst_ip
del(.parsed.dst_ip)
}
if exists(.parsed.dst_country) {
.destination.geo.country_iso_code = .parsed.dst_country
del(.parsed.dst_country)
}
if exists(.parsed.dst_port) {
.destination.port = to_int!(.parsed.dst_port)
del(.parsed.dst_port)
}
if exists(.parsed.dst_mac) {
.destination.mac = .parsed.dst_mac
del(.parsed.dst_mac)
}
if exists(.parsed.dst_trans_ip) {
.destination.translated.ip = .parsed.dst_trans_ip
del(.parsed.dst_trans_ip)
}
if exists(.parsed.dst_trans_port) {
.destination.translated.port = to_int!(.parsed.dst_trans_port)
del(.parsed.dst_trans_port)
}
if exists(.parsed.dst_zone) {
.destination.zone = .parsed.dst_zone
del(.parsed.dst_zone)
}
if exists(.parsed.dst_zone_type) {
.destination.zone.type = .parsed.dst_zone_type
del(.parsed.dst_zone_type)
}
parse_firewall_logs:
type: remap
inputs: ["parse_sophos_logs"]
source: |
if exists(.parsed.log_type) {
lc = .parsed.log_type
if lc == "Firewall" {
del(.parsed.log_type)
.tags = append(array!(.tags), ["Firewall"])
# Rule field
if exists(.parsed.fw_rule_id) {
.rule.id = .parsed.fw_rule_id
del(.parsed.fw_rule_id)
}
if exists(.parsed.fw_rule_name) {
.rule.name = .parsed.fw_rule_name
del(.parsed.fw_rule_name)
}
if exists(.parsed.fw_rule_section) {
.rule.section = .parsed.fw_rule_section
del(.parsed.fw_rule_section)
}
if exists(.parsed.fw_rule_type) {
.rule.type = .parsed.fw_rule_type
del(.parsed.fw_rule_type)
}
if exists(.parsed.nat_rule_id) {
.rule.nat.id = .parsed.nat_rule_id
del(.parsed.nat_rule_id)
}
if exists(.parsed.nat_rule_name) {
.rule.nat.name = .parsed.nat_rule_name
del(.parsed.nat_rule_name)
}
# Network fields
if exists(.parsed.ether_type) {
.network.type = downcase!(.parsed.ether_type)
del(.parsed.ether_type)
}
if exists(.parsed.bridge_name) {
.network.bridge_name = .parsed.bridge_name
del(.parsed.bridge_name)
}
if exists(.parsed.bridge_display_name) {
.network.bridge.display_name = downcase!(.parsed.bridge_display_name)
del(.parsed.bridge_display_name)
}

if exists(.parsed.in_interface) {
.observer.ingress.interface.name = .parsed.in_interface
del(.parsed.in_interface)
}
if exists(.parsed.out_interface) {
.observer.egress.interface.name = .parsed.out_interface
del(.parsed.out_interface)
}
if exists(.parsed.in_display_interface) {
.observer.ingress.interface.alias = .parsed.in_display_interface
del(.parsed.in_display_interface)
}
if exists(.parsed.out_display_interface) {
.observer.egress.interface.alias = .parsed.out_display_interface
del(.parsed.out_display_interface)
}
if exists(.parsed.packets_sent) {
.network.transport.packets.sent = .parsed.packets_sent
del(.parsed.packets_sent)
}
if exists(.parsed.packets_received) {
.network.transport.packets.received = .parsed.packets_received
del(.parsed.packets_received)
}
if exists(.parsed.bytes_sent) {
.network.transport.bytes.sent = .parsed.bytes_sent
del(.parsed.bytes_sent)
}
if exists(.parsed.bytes_received) {
.network.transport.bytes.received = .parsed.bytes_received
del(.parsed.bytes_received)
}

if exists(.parsed.con_duration) {
.network.connection.duration = .parsed.con_duration
del(.parsed.con_duration)
}
if exists(.parsed.web_policy_id) {
.policy.web.id = .parsed.web_policy_id
del(.parsed.web_policy_id)
}
if exists(.parsed.ips_policy_id) {
.policy.ips.id = .parsed.ips_policy_id
del(.parsed.ips_policy_id)
}
if exists(.parsed.appfilter_policy_id) {
.policy.appfilter.id = .parsed.appfilter_policy_id
del(.parsed.appfilter_policy_id)
}
# SDWAN
if exists(.parsed.sdwan_profile_id_request) {
.network.sdn.profile.request.id = .parsed.sdwan_profile_id_request
del(.parsed.sdwan_profile_id_request)
}
if exists(.parsed.sdwan_profile_name_request) {
.network.sdn.profile.request.name = .parsed.sdwan_profile_name_request
del(.parsed.sdwan_profile_name_request)
}
if exists(.parsed.sdwan_profile_id_reply) {
.network.sdn.profile.reply.id = .parsed.sdwan_profile_id_reply
del(.parsed.sdwan_profile_id_reply)
}
if exists(.parsed.sdwan_profile_name_reply) {
.network.sdn.profile.reply.name = .parsed.sdwan_profile_name_reply
del(.parsed.sdwan_profile_name_reply)
}
if exists(.parsed.sdwan_route_id_request) {
.network.sdn.route.request.id = .parsed.sdwan_route_id_request
del(.parsed.sdwan_route_id_request)
}
if exists(.parsed.sdwan_route_name_request) {
.network.sdn.route.request.name = .parsed.sdwan_route_name_request
del(.parsed.sdwan_route_name_request)
}
if exists(.parsed.sdwan_route_id_reply) {
.network.sdn.route.reply.id = .parsed.sdwan_route_id_reply
del(.parsed.sdwan_route_id_reply)
}
if exists(.parsed.sdwan_route_name_reply) {
.network.sdn.route.reply.name = .parsed.sdwan_route_name_reply
del(.parsed.sdwan_route_name_reply)
}
if exists(.parsed.con_direction) {
.network.direction = .parsed.con_direction
del(.parsed.con_direction)
}
if exists(.parsed.con_id) {
.network.connection.id = .parsed.con_id
del(.parsed.con_id)
}
if exists(.parsed.virt_con_id) {
.network.virtual_connection.id = .parsed.virt_con_id
del(.parsed.virt_con_id)
}
# Gateway
if exists(.parsed.gw_id_request) {
.network.gateway.request.id = .parsed.gw_id_request
del(.parsed.gw_id_request)
}
if exists(.parsed.gw_name_request) {
.network.gateway.request.name = .parsed.gw_name_request
del(.parsed.gw_name_request)
}
if exists(.parsed.gw_id_reply) {
.network.gateway.reply.id = .parsed.gw_id_reply
del(.parsed.gw_id_reply)
}
if exists(.parsed.gw_name_reply) {
.network.gateway.reply.name = .parsed.gw_name_reply
del(.parsed.gw_name_reply)
}
# Application
if exists(.parsed.app_name) {
.application.name = .parsed.app_name
del(.parsed.app_name)
}
if exists(.parsed.app_risk) {
.application.risk = .parsed.app_risk
del(.parsed.app_risk)
}
if exists(.parsed.app_technology) {
.application.technology = .parsed.app_technology
del(.parsed.app_technology)
}
if exists(.parsed.app_category) {
.application.category = .parsed.app_category
del(.parsed.app_category)
}
}
}
parse_event_logs:
type: remap
inputs: ["parse_firewall_logs"]
source: |
if exists(.parsed.log_type) {
lc = .parsed.log_type
if lc == "Event" {
del(.parsed.log_type)
.tags = append(array!(.tags), ["Event"])
if exists(.parsed.additional_information) {
.event.additional_info = .parsed.additional_information
del(.parsed.additional_information)
}
}
}
parse_systemhealth_logs:
type: remap
inputs: ["parse_event_logs"]
source: |
if exists(.parsed.log_type) {
lc = .parsed.log_type
if lc == "System Health" {
del(.parsed.log_type)
.tags = append(array!(.tags), ["System Health"])
if exists(.parsed.display_interface) {
.observer.interface.name = .parsed.display_interface
del(.parsed.display_interface)
}
if exists(.parsed.interface) {
.observer.interface.id = .parsed.interface
del(.parsed.interface)
}
if exists(.parsed.receivedkbits) {
.network.in.bytes = to_float!(.parsed.receivedkbits) * 125
del(.parsed.receivedkbits)
}
if exists(.parsed.transmittedkbits) {
.network.out.bytes = to_float!(.parsed.transmittedkbits) * 125
del(.parsed.transmittedkbits)
}
if exists(.parsed.receivedkbits) && exists(.parsed.transmittedkbits) {
.network.bytes = to_float!(.network.in.bytes) + to_float!(.network.out.bytes)
}
if exists(.parsed.receivederrors) {
.network.in.errors = to_float!(.parsed.receivederrors)
del(.parsed.receivederrors)
}
if exists(.parsed.transmittederrors) {
.network.out.errors = to_float!(.parsed.transmittederrors)
del(.parsed.transmittederrors)
}
if exists(.parsed.receiveddrops) {
.network.in.dropped = to_float!(.parsed.receiveddrops)
del(.parsed.receiveddrops)
}
if exists(.parsed.transmitteddrops) {
.network.out.dropped = to_float!(.parsed.transmitteddrops)
del(.parsed.transmitteddrops)
}
if exists(.parsed.collisions) {
.network.collisions = to_float!(.parsed.collisions)
del(.parsed.collisions)
}
if exists(.parsed.Configuration) {
.host.disk.configuration.pct = to_float!(replace!(.parsed.Configuration, "%", ""))
del(.parsed.Configuration)
}
if exists(.parsed.Reports) {
.host.disk.reports.pct = to_float!(replace!(.parsed.Reports, "%", ""))
del(.parsed.Reports)
}
if exists(.parsed.Signature) {
.host.disk.signature.pct = to_float!(replace!(.parsed.Signature, "%", ""))
del(.parsed.Signature)
}
if exists(.parsed.Temp) {
.host.disk.temp.pct = to_float!(replace!(.parsed.Temp, "%", ""))
del(.parsed.Temp)
}
if exists(.parsed.Configuration) {
.host.disk.configuration.pct = to_float!(replace!(.parsed.Configuration, "%", ""))
del(.parsed.Configuration)
}
if exists(.parsed.Reports) {
.host.disk.reports.pct = to_float!(replace!(.parsed.Reports, "%", ""))
del(.parsed.Reports)
}
if exists(.parsed.Signature) {
.host.disk.signature.pct = to_float!(replace!(.parsed.Signature, "%", ""))
del(.parsed.Signature)
}
if exists(.parsed.Temp) {
.host.disk.temp.pct = to_float!(replace!(.parsed.Temp, "%", ""))
del(.parsed.Temp)
}
if exists(.parsed.system) {
.host.cpu.system = to_float!(replace!(.parsed.system, "%", "")) / 100
del(.parsed.system)
}
if exists(.parsed.user) {
.host.cpu.user = to_float!(replace!(.parsed.user, "%", "")) / 100
del(.parsed.user)
}
if exists(.parsed.idle) {
.host.cpu.usage = 1 - to_float!(replace!(.parsed.idle, "%", "")) / 100
del(.parsed.idle)
}
if exists(.parsed.non_ssl_count) {
.ssl.non_ssl_count = to_int!(.parsed.non_ssl_count)
del(.parsed.non_ssl_count)
}
if exists(.parsed.ssl_count) {
.ssl.ssl_count = to_int!(.parsed.ssl_count)
del(.parsed.ssl_count)
}
if exists(.parsed.decrypted_count) {
.ssl.decrypted_count = to_int!(.parsed.decrypted_count)
del(.parsed.decrypted_count)
}
del(.parsed.unit)
if exists(.parsed.total_memory) {
.host.memory.total.bytes = to_int!(.parsed.total_memory)
}
if exists(.parsed.total_memory) && exists(.parsed.used) {
used = to_float!(.parsed.used)
total = to_float!(.parsed.total_memory)
.host.memory.usage = (used / total) ?? 0.0
}
if exists(.parsed.total_memory) && exists(.parsed.free) {
free = to_float!(.parsed.free)
total = to_float!(.parsed.total_memory)
.host.memory.free = (free / total) ?? 0.0
}
del(.parsed.total_memory)
del(.parsed.used)
del(.parsed.free)
if exists(.parsed.users) {
.user.active_count = to_int!(.parsed.users)
del(.parsed.users)
}
}
}
parse_contentfilter_logs:
type: remap
inputs: ["parse_systemhealth_logs"]
source: |
if exists(.parsed.log_type) {
lc = .parsed.log_type
if lc == "Content Filter" {
del(.parsed.log_type)
.tags = append(array!(.tags), ["Content Filter"])
}
}
add_event_ingested:
type: remap
inputs: ["parse_contentfilter_logs"]
source: |
.event.ingested = now()
sinks:
# Sink cho cả Wazuh & Sophos
opensearch_sink:
type: elasticsearch
inputs: ["add_event_ingested"]
endpoints:
- "https://opensearch-node1:9200"
- "https://opensearch-node2:9200"
api_version: "v7"
bulk:
index: "{{ index_name }}"
request:
concurrency: 2
auth:
strategy: basic
user: admin
password: "Chinh123@"
tls:
verify_certificate: false
verify_hostname: false
console_out:
type: console
inputs: ["add_event_ingested"]
encoding:
codec: json

Kết quả sau khi xử lý log:
1759605800085.png

Thu được các trường theo ECS scheme.
 
Back
Top