Võ Đức Chính
Intern
Cách cài đặt và thu thập raw log của Sophos firewall: https://securityzone.vn/t/siem-log-...ch-vector-thu-thap-log-sophos-firewall.12610/
Sau khi thu thập được log firewall sẽ thấy firewall gửi cho mình:
Từ đó, thực hiện chia cặp giá trị key - value để chia thành nhiều trường thay vì raw message:
Sau khi reset lại vector thì log được parsing và chia thành nhiều trường để dễ query log hơn:
Sau khi thu thập được log firewall sẽ thấy firewall gửi cho mình:
"message": "<134>device_name=\"SFW\" timestamp=\"2025-09-25T09:55:19+0700\" device_model=\"SF01V\" device_serial_id=\"NULL\" log_id=\"010102600002\" log_type=\"Firewall\" log_component=\"Firewall Rule\" log_subtype=\"Denied\" log_version=1 severity=\"Information\" fw_rule_id=\"4\" fw_rule_name=\"block 1.2.3.4\" fw_rule_section=\"Local rule\" nat_rule_id=\"0\" fw_rule_type=\"USER\" ether_type=\"IPv4 (0x0800)\" in_interface=\"PortA\" out_interface=\"PortB\" src_mac=\"00:50:56:89:ff:39\" src_ip=\"10.30.194.150\" src_country=\"R1\" dst_ip=\"8.8.8.8\" dst_country=\"USA\" protocol=\"UDP\" src_port=54728 dst_port=53 hb_status=\"No Heartbeat\" app_resolved_by=\"Signature\" app_is_cloud=\"FALSE\" qualifier=\"New\" in_display_interface=\"PortA\" out_display_interface=\"PortB\" log_occurrence=\"1\"" |
Từ đó, thực hiện chia cặp giá trị key - value để chia thành nhiều trường thay vì raw message:
vector.yaml: transforms: parse_sophos_logs: type: remap inputs: ["sophos_syslog"] source: | .raw_message = to_string!(.message) .index_name = "sophos_firewall" .received_at = now() .source_type = "sophos" # Parse key=value (infallible) kv = parse_key_value!(.raw_message, field_delimiter: " ", key_value_delimiter: "=") .parsed = kv # ECS mapping if exists(.parsed.timestamp) { ."@timestamp" = .parsed.timestamp } else { ."@timestamp" = now() } if !exists(.observer) { .observer = {} } .observer.name = .parsed.device_name .observer.product = .parsed.device_model .observer.serial_number = .parsed.device_serial_id if !exists(.event) { .event = {} } .event.code = .parsed.log_id .event.category = .parsed.log_type .event.dataset = .parsed.log_component .event.type = .parsed.log_subtype .event.severity = .parsed.severity .event.original = .raw_message if !exists(.rule) { .rule = {} } .rule.id = .parsed.fw_rule_id .rule.name = .parsed.fw_rule_name .rule.ruleset = .parsed.fw_rule_section if !exists(.source) { .source = {} } .source.ip = .parsed.src_ip if exists(.parsed.src_port) { .source.port = to_int!(.parsed.src_port) } .source.mac = .parsed.src_mac if !exists(.source.geo) { .source.geo = {} } .source.geo.country_iso_code = .parsed.src_country if !exists(.destination) { .destination = {} } .destination.ip = .parsed.dst_ip if exists(.parsed.dst_port) { .destination.port = to_int!(.parsed.dst_port) } if !exists(.network) { .network = {} } .network.transport = downcase!(.parsed.protocol) .network.type = .parsed.ether_type if !exists(.network.ingress) { .network.ingress = {} } if !exists(.network.ingress.interface) { .network.ingress.interface = {} } .network.ingress.interface.name = .parsed.in_interface if !exists(.network.egress) { .network.egress = {} } if !exists(.network.egress.interface) { .network.egress.interface = {} } .network.egress.interface.name = .parsed.out_interface |
Sau khi reset lại vector thì log được parsing và chia thành nhiều trường để dễ query log hơn:

Bài viết liên quan
Bài viết mới