Võ Đức Chính
Intern
Sau khi đã thu thập được log và parsing cơ bản: https://securityzone.vn/t/siem-log-management-parsing-log-sophos-firewall-tren-vector.12619/
Thực hiện parsing theo ECS scheme: https://www.elastic.co/docs/reference/ecs
File config vector:
Kết quả sau khi xử lý log:
Thu được các trường theo ECS scheme.
Thực hiện parsing theo ECS scheme: https://www.elastic.co/docs/reference/ecs
File config vector:
data_dir: "/vector-data-dir" api: enabled: true address: "0.0.0.0:8686" sources: wazuh_logs: type: file include: - "/var/log/wazuh/archives.json" ignore_older: 0 sophos_syslog: type: socket mode: udp address: "0.0.0.0:514" transforms: # Parse Wazuh JSON logs parse_json_message: type: remap inputs: ["wazuh_logs"] source: | . = parse_json!(.message) ensure_index_and_tags: type: remap inputs: ["parse_json_message"] source: | if exists(.agent.labels.group) && is_string(.agent.labels.group) { .index_name = "wazuh-" + downcase!(.agent.labels.group) } else { .index_name = "wazuh-ungrouped" } .received_at = now() .source_type = "wazuh" # Parse Sophos Firewall logs → ECS mapping parse_sophos_logs: type: remap inputs: ["sophos_syslog"] source: | .event.created = .timestamp del(.timestamp) .ecs.version = "9.1.0" .index_name = "sophos_firewall" .source_type = "sophos" match = parse_regex(.message, r'^<(?P<pri>[0-9]+)>(?P<rest>.*)') ?? {} if exists(match.pri) { .log.syslog.priority = to_int!(match.pri) } if exists(match.rest) { kv = parse_key_value!(match.rest, field_delimiter: " ", key_value_delimiter: "=") .parsed = kv } ."@timestamp" = parse_timestamp!(.parsed.timestamp, format: "%+") del(.parsed.timestamp) .tags = ["sophos"] if exists(.parsed.device_name) { .observer.name = .parsed.device_name del(.parsed.device_name) } if exists(.parsed.device_model) { .observer.product = .parsed.device_model del(.parsed.device_model) } if exists(.parsed.device_serial_id) { .observer.serial_number = .parsed.device_serial_id del(.parsed.device_serial_id) } .observer.vendor = "Sophos" .observer.type = "firewall" if exists(.parsed.log_id) { .event.code = .parsed.log_id del(.parsed.log_id) } if exists(.parsed.log_type) { .event.category = .parsed.log_type } if exists(.parsed.log_component) { .event.dataset = .parsed.log_component del(.parsed.log_component) } if exists(.parsed.log_subtype) { .event.action = .parsed.log_subtype del(.parsed.log_subtype) } if exists(.parsed.log_version) { .observer.version = .parsed.log_version del(.parsed.log_version) } if exists(.host) { .observer.ip = .host del(.host) } if exists(.parsed.status) { .event.outcome = downcase!(.parsed.status) del(.parsed.status) } if exists(.parsed.message) { .event.message = .parsed.message del(.parsed.message) } if exists(.message) { .event.original = .message del(.message) } if exists(.parsed.severity) { .log.syslog.severity.name = .parsed.severity sev = downcase!(.parsed.severity) .log.level = sev if sev == "emergency" { .event.severity = 0 } else if sev == "alert" { .event.severity = 1 } else if sev == "critical" { .event.severity = 2 } else if sev == "error" { .event.severity = 3 } else if sev == "warning" { .event.severity = 4 } else if sev == "notice" { .event.severity = 5 } else if sev == "information" || sev == "info" { .event.severity = 6 } else if sev == "debug" { .event.severity = 7 } else { .event.severity = 6 } del(.parsed.severity) } if exists(.parsed.app_resolved_by) { .app.resolved_by = .parsed.app_resolved_by del(.parsed.app_resolved_by) } if exists(.parsed.app_is_cloud) { .app.is_cloud = if .parsed.app_is_cloud == "TRUE" { true } else { false } del(.parsed.app_is_cloud) } if exists(.parsed.hb_status) { .hb.status = .parsed.hb_status del(.parsed.hb_status) } if exists(.parsed.qualifier) { .event.qualifier = .parsed.qualifier del(.parsed.qualifier) } if exists(.parsed.log_occurrence) { .event.sequence = .parsed.log_occurrence del(.parsed.log_occurrence) } # User fields if exists(.parsed.user) { .user.name = .parsed.user del(.parsed.user) } if exists(.parsed.user_group) { .user.group = .parsed.user_group del(.parsed.user_group) } if exists(.parsed.protocol) { .network.transport.protocol = downcase!(.parsed.protocol) del(.parsed.protocol) } # Source fields if exists(.parsed.src_ip) { .source.ip = .parsed.src_ip del(.parsed.src_ip) } if exists(.parsed.src_country) { .source.country = .parsed.src_country del(.parsed.src_country) } if exists(.parsed.src_port) { .source.port = to_int!(.parsed.src_port) del(.parsed.src_port) } if exists(.parsed.src_trans_ip) { .source.translated.ip = .parsed.src_trans_ip del(.parsed.src_trans_ip) } if exists(.parsed.src_trans_port) { .source.translated.port = to_int!(.parsed.src_trans_port) del(.parsed.src_trans_port) } if exists(.parsed.src_zone) { .source.zone = .parsed.src_zone del(.parsed.src_zone) } if exists(.parsed.src_zone_type) { .source.zone.type = .parsed.src_zone_type del(.parsed.src_zone_type) } if exists(.parsed.src_mac) { .source.mac = .parsed.src_mac del(.parsed.src_mac) } # Destination fields if exists(.parsed.dst_ip) { .destination.ip = .parsed.dst_ip del(.parsed.dst_ip) } if exists(.parsed.dst_country) { .destination.geo.country_iso_code = .parsed.dst_country del(.parsed.dst_country) } if exists(.parsed.dst_port) { .destination.port = to_int!(.parsed.dst_port) del(.parsed.dst_port) } if exists(.parsed.dst_mac) { .destination.mac = .parsed.dst_mac del(.parsed.dst_mac) } if exists(.parsed.dst_trans_ip) { .destination.translated.ip = .parsed.dst_trans_ip del(.parsed.dst_trans_ip) } if exists(.parsed.dst_trans_port) { .destination.translated.port = to_int!(.parsed.dst_trans_port) del(.parsed.dst_trans_port) } if exists(.parsed.dst_zone) { .destination.zone = .parsed.dst_zone del(.parsed.dst_zone) } if exists(.parsed.dst_zone_type) { .destination.zone.type = .parsed.dst_zone_type del(.parsed.dst_zone_type) } parse_firewall_logs: type: remap inputs: ["parse_sophos_logs"] source: | if exists(.parsed.log_type) { lc = .parsed.log_type if lc == "Firewall" { del(.parsed.log_type) .tags = append(array!(.tags), ["Firewall"]) # Rule field if exists(.parsed.fw_rule_id) { .rule.id = .parsed.fw_rule_id del(.parsed.fw_rule_id) } if exists(.parsed.fw_rule_name) { .rule.name = .parsed.fw_rule_name del(.parsed.fw_rule_name) } if exists(.parsed.fw_rule_section) { .rule.section = .parsed.fw_rule_section del(.parsed.fw_rule_section) } if exists(.parsed.fw_rule_type) { .rule.type = .parsed.fw_rule_type del(.parsed.fw_rule_type) } if exists(.parsed.nat_rule_id) { .rule.nat.id = .parsed.nat_rule_id del(.parsed.nat_rule_id) } if exists(.parsed.nat_rule_name) { .rule.nat.name = .parsed.nat_rule_name del(.parsed.nat_rule_name) } # Network fields if exists(.parsed.ether_type) { .network.type = downcase!(.parsed.ether_type) del(.parsed.ether_type) } if exists(.parsed.bridge_name) { .network.bridge_name = .parsed.bridge_name del(.parsed.bridge_name) } if exists(.parsed.bridge_display_name) { .network.bridge.display_name = downcase!(.parsed.bridge_display_name) del(.parsed.bridge_display_name) } if exists(.parsed.in_interface) { .observer.ingress.interface.name = .parsed.in_interface del(.parsed.in_interface) } if exists(.parsed.out_interface) { .observer.egress.interface.name = .parsed.out_interface del(.parsed.out_interface) } if exists(.parsed.in_display_interface) { .observer.ingress.interface.alias = .parsed.in_display_interface del(.parsed.in_display_interface) } if exists(.parsed.out_display_interface) { .observer.egress.interface.alias = .parsed.out_display_interface del(.parsed.out_display_interface) } if exists(.parsed.packets_sent) { .network.transport.packets.sent = .parsed.packets_sent del(.parsed.packets_sent) } if exists(.parsed.packets_received) { .network.transport.packets.received = .parsed.packets_received del(.parsed.packets_received) } if exists(.parsed.bytes_sent) { .network.transport.bytes.sent = .parsed.bytes_sent del(.parsed.bytes_sent) } if exists(.parsed.bytes_received) { .network.transport.bytes.received = .parsed.bytes_received del(.parsed.bytes_received) } if exists(.parsed.con_duration) { .network.connection.duration = .parsed.con_duration del(.parsed.con_duration) } if exists(.parsed.web_policy_id) { .policy.web.id = .parsed.web_policy_id del(.parsed.web_policy_id) } if exists(.parsed.ips_policy_id) { .policy.ips.id = .parsed.ips_policy_id del(.parsed.ips_policy_id) } if exists(.parsed.appfilter_policy_id) { .policy.appfilter.id = .parsed.appfilter_policy_id del(.parsed.appfilter_policy_id) } # SDWAN if exists(.parsed.sdwan_profile_id_request) { .network.sdn.profile.request.id = .parsed.sdwan_profile_id_request del(.parsed.sdwan_profile_id_request) } if exists(.parsed.sdwan_profile_name_request) { .network.sdn.profile.request.name = .parsed.sdwan_profile_name_request del(.parsed.sdwan_profile_name_request) } if exists(.parsed.sdwan_profile_id_reply) { .network.sdn.profile.reply.id = .parsed.sdwan_profile_id_reply del(.parsed.sdwan_profile_id_reply) } if exists(.parsed.sdwan_profile_name_reply) { .network.sdn.profile.reply.name = .parsed.sdwan_profile_name_reply del(.parsed.sdwan_profile_name_reply) } if exists(.parsed.sdwan_route_id_request) { .network.sdn.route.request.id = .parsed.sdwan_route_id_request del(.parsed.sdwan_route_id_request) } if exists(.parsed.sdwan_route_name_request) { .network.sdn.route.request.name = .parsed.sdwan_route_name_request del(.parsed.sdwan_route_name_request) } if exists(.parsed.sdwan_route_id_reply) { .network.sdn.route.reply.id = .parsed.sdwan_route_id_reply del(.parsed.sdwan_route_id_reply) } if exists(.parsed.sdwan_route_name_reply) { .network.sdn.route.reply.name = .parsed.sdwan_route_name_reply del(.parsed.sdwan_route_name_reply) } if exists(.parsed.con_direction) { .network.direction = .parsed.con_direction del(.parsed.con_direction) } if exists(.parsed.con_id) { .network.connection.id = .parsed.con_id del(.parsed.con_id) } if exists(.parsed.virt_con_id) { .network.virtual_connection.id = .parsed.virt_con_id del(.parsed.virt_con_id) } # Gateway if exists(.parsed.gw_id_request) { .network.gateway.request.id = .parsed.gw_id_request del(.parsed.gw_id_request) } if exists(.parsed.gw_name_request) { .network.gateway.request.name = .parsed.gw_name_request del(.parsed.gw_name_request) } if exists(.parsed.gw_id_reply) { .network.gateway.reply.id = .parsed.gw_id_reply del(.parsed.gw_id_reply) } if exists(.parsed.gw_name_reply) { .network.gateway.reply.name = .parsed.gw_name_reply del(.parsed.gw_name_reply) } # Application if exists(.parsed.app_name) { .application.name = .parsed.app_name del(.parsed.app_name) } if exists(.parsed.app_risk) { .application.risk = .parsed.app_risk del(.parsed.app_risk) } if exists(.parsed.app_technology) { .application.technology = .parsed.app_technology del(.parsed.app_technology) } if exists(.parsed.app_category) { .application.category = .parsed.app_category del(.parsed.app_category) } } } parse_event_logs: type: remap inputs: ["parse_firewall_logs"] source: | if exists(.parsed.log_type) { lc = .parsed.log_type if lc == "Event" { del(.parsed.log_type) .tags = append(array!(.tags), ["Event"]) if exists(.parsed.additional_information) { .event.additional_info = .parsed.additional_information del(.parsed.additional_information) } } } parse_systemhealth_logs: type: remap inputs: ["parse_event_logs"] source: | if exists(.parsed.log_type) { lc = .parsed.log_type if lc == "System Health" { del(.parsed.log_type) .tags = append(array!(.tags), ["System Health"]) if exists(.parsed.display_interface) { .observer.interface.name = .parsed.display_interface del(.parsed.display_interface) } if exists(.parsed.interface) { .observer.interface.id = .parsed.interface del(.parsed.interface) } if exists(.parsed.receivedkbits) { .network.in.bytes = to_float!(.parsed.receivedkbits) * 125 del(.parsed.receivedkbits) } if exists(.parsed.transmittedkbits) { .network.out.bytes = to_float!(.parsed.transmittedkbits) * 125 del(.parsed.transmittedkbits) } if exists(.parsed.receivedkbits) && exists(.parsed.transmittedkbits) { .network.bytes = to_float!(.network.in.bytes) + to_float!(.network.out.bytes) } if exists(.parsed.receivederrors) { .network.in.errors = to_float!(.parsed.receivederrors) del(.parsed.receivederrors) } if exists(.parsed.transmittederrors) { .network.out.errors = to_float!(.parsed.transmittederrors) del(.parsed.transmittederrors) } if exists(.parsed.receiveddrops) { .network.in.dropped = to_float!(.parsed.receiveddrops) del(.parsed.receiveddrops) } if exists(.parsed.transmitteddrops) { .network.out.dropped = to_float!(.parsed.transmitteddrops) del(.parsed.transmitteddrops) } if exists(.parsed.collisions) { .network.collisions = to_float!(.parsed.collisions) del(.parsed.collisions) } if exists(.parsed.Configuration) { .host.disk.configuration.pct = to_float!(replace!(.parsed.Configuration, "%", "")) del(.parsed.Configuration) } if exists(.parsed.Reports) { .host.disk.reports.pct = to_float!(replace!(.parsed.Reports, "%", "")) del(.parsed.Reports) } if exists(.parsed.Signature) { .host.disk.signature.pct = to_float!(replace!(.parsed.Signature, "%", "")) del(.parsed.Signature) } if exists(.parsed.Temp) { .host.disk.temp.pct = to_float!(replace!(.parsed.Temp, "%", "")) del(.parsed.Temp) } if exists(.parsed.Configuration) { .host.disk.configuration.pct = to_float!(replace!(.parsed.Configuration, "%", "")) del(.parsed.Configuration) } if exists(.parsed.Reports) { .host.disk.reports.pct = to_float!(replace!(.parsed.Reports, "%", "")) del(.parsed.Reports) } if exists(.parsed.Signature) { .host.disk.signature.pct = to_float!(replace!(.parsed.Signature, "%", "")) del(.parsed.Signature) } if exists(.parsed.Temp) { .host.disk.temp.pct = to_float!(replace!(.parsed.Temp, "%", "")) del(.parsed.Temp) } if exists(.parsed.system) { .host.cpu.system = to_float!(replace!(.parsed.system, "%", "")) / 100 del(.parsed.system) } if exists(.parsed.user) { .host.cpu.user = to_float!(replace!(.parsed.user, "%", "")) / 100 del(.parsed.user) } if exists(.parsed.idle) { .host.cpu.usage = 1 - to_float!(replace!(.parsed.idle, "%", "")) / 100 del(.parsed.idle) } if exists(.parsed.non_ssl_count) { .ssl.non_ssl_count = to_int!(.parsed.non_ssl_count) del(.parsed.non_ssl_count) } if exists(.parsed.ssl_count) { .ssl.ssl_count = to_int!(.parsed.ssl_count) del(.parsed.ssl_count) } if exists(.parsed.decrypted_count) { .ssl.decrypted_count = to_int!(.parsed.decrypted_count) del(.parsed.decrypted_count) } del(.parsed.unit) if exists(.parsed.total_memory) { .host.memory.total.bytes = to_int!(.parsed.total_memory) } if exists(.parsed.total_memory) && exists(.parsed.used) { used = to_float!(.parsed.used) total = to_float!(.parsed.total_memory) .host.memory.usage = (used / total) ?? 0.0 } if exists(.parsed.total_memory) && exists(.parsed.free) { free = to_float!(.parsed.free) total = to_float!(.parsed.total_memory) .host.memory.free = (free / total) ?? 0.0 } del(.parsed.total_memory) del(.parsed.used) del(.parsed.free) if exists(.parsed.users) { .user.active_count = to_int!(.parsed.users) del(.parsed.users) } } } parse_contentfilter_logs: type: remap inputs: ["parse_systemhealth_logs"] source: | if exists(.parsed.log_type) { lc = .parsed.log_type if lc == "Content Filter" { del(.parsed.log_type) .tags = append(array!(.tags), ["Content Filter"]) } } add_event_ingested: type: remap inputs: ["parse_contentfilter_logs"] source: | .event.ingested = now() sinks: # Sink cho cả Wazuh & Sophos opensearch_sink: type: elasticsearch inputs: ["add_event_ingested"] endpoints: - "https://opensearch-node1:9200" - "https://opensearch-node2:9200" api_version: "v7" bulk: index: "{{ index_name }}" request: concurrency: 2 auth: strategy: basic user: admin password: "Chinh123@" tls: verify_certificate: false verify_hostname: false console_out: type: console inputs: ["add_event_ingested"] encoding: codec: json |
Kết quả sau khi xử lý log:

Thu được các trường theo ECS scheme.
Bài viết liên quan
Bài viết mới