Skip to content

Instantly share code, notes, and snippets.

@andrewkroh
Last active April 21, 2021 16:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewkroh/c189cf23348faced244f5c8d29eb3f9d to your computer and use it in GitHub Desktop.
Save andrewkroh/c189cf23348faced244f5c8d29eb3f9d to your computer and use it in GitHub Desktop.
Symantec Endpoint Elasticsearch Ingest Node Pipeline (POC)
{
"description": "Pipeline for parsing Symantec Endpoint logs",
"processors": [
{
"set": {
"field": "event.original",
"value": "{{{message}}}"
}
},
{
"dissect": {
"if": "ctx?.message.contains(\"KAFKA_CONNECT_SYSLOG\")",
"field": "message",
"pattern": "%{} - - - - %{message}"
}
},
{
"set": {
"field": "event.ingested",
"value": "{{_ingest.timestamp}}"
}
},
{
"set": {
"field": "event.timezone",
"value": "-05:00"
}
},
{
"set": {
"field": "observer.vendor",
"value": "Symantec"
}
},
{
"set": {
"field": "observer.product",
"value": "Endpoint Protection"
}
},
{
"set": {
"field": "observer.type",
"value": "edr"
}
},
{
"set": {
"field": "event.module",
"value": "symantec"
}
},
{
"set": {
"field": "event.dataset",
"value": "symantec.endpoint"
}
},
{
"grok": {
"field": "message",
"patterns": [
"<%{NONNEGINT:log.syslog.priority:long}>%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:observer.name}: %{GREEDYDATA:message}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"MMM dd HH:mm:ss",
"MMM d HH:mm:ss",
"MMM d HH:mm:ss"
],
"target_field": "@timestamp",
"timezone": "{{event.timezone}}",
"ignore_failure": true
}
},
{
"remove": {
"field": [
"timestamp"
]
}
},
{
"csv": {
"field": "message",
"target_fields": [
"_csv.server",
"_csv.event_description",
"_csv.local_ip",
"_csv.local_mac",
"_csv.remote_name",
"_csv.remote_ip",
"_csv.remote_mac",
"_csv.network_direction",
"_csv.network_transport",
"_csv.intrusion_id",
"_csv.event_start",
"_csv.event_end",
"_csv.event_count",
"_csv.process_exe",
"_csv.location",
"_csv.user_name",
"_csv.user_domain",
"_csv.local_port",
"_csv.remote_port",
"_csv.signature_id",
"_csv.signature_name",
"_csv.signature_sub_id",
"_csv.intrusion_url",
"_csv.intrusion_payload_url",
"_csv.sha256",
"_csv.md5"
]
}
},
{
"script": {
"description": "split each csv value",
"lang": "painless",
"source": "def out = [:];\nctx._csv.forEach((k, v) -> {\n def idx = v.indexOf(\":\");\n if (idx != -1 && idx+1 <= v.length()) {\n def trimmed = v.substring(v.indexOf(\":\")+1, v.length()).trim();\n if (!trimmed.isEmpty()) {\n out[k] = trimmed;\n }\n } else {\n out[k] = v;\n }\n return true;\n});\nctx._csv = out;\n"
}
},
{
"set": {
"field": "event.code",
"value": "{{_csv.signature_id}}",
"ignore_empty_value": true
}
},
{
"rename": {
"field": "_csv.server",
"target_field": "symantec.endpoint.server",
"ignore_missing": true
}
},
{
"remove": {
"field": "message",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.event_description",
"target_field": "message",
"ignore_missing": true
}
},
{
"rename": {
"if": "ctx?._csv?.local_ip != \"0.0.0.0\"",
"field": "_csv.local_ip",
"target_field": "source.ip",
"ignore_missing": true
}
},
{
"rename": {
"if": "ctx?._csv?.local_mac != \"000000000000\"",
"field": "_csv.local_mac",
"target_field": "source.mac",
"ignore_missing": true
}
},
{
"rename": {
"if": "ctx?._csv?.remote_name != \"\"",
"field": "_csv.remote_name",
"target_field": "destination.domain",
"ignore_missing": true
}
},
{
"rename": {
"if": "ctx?._csv?.remote_ip != \"0.0.0.0\"",
"field": "_csv.remote_ip",
"target_field": "destination.ip",
"ignore_missing": true
}
},
{
"rename": {
"if": "ctx?._csv?.remote_mac != \"000000000000\"",
"field": "_csv.remote_mac",
"target_field": "destination.mac",
"ignore_missing": true
}
},
{
"set": {
"if": "ctx?._csv?.network_direction == \"Inbound\"",
"field": "network.direction",
"value": "ingress"
}
},
{
"set": {
"if": "ctx?._csv?.network_direction == \"Outbound\"",
"field": "network.direction",
"value": "egress"
}
},
{
"rename": {
"field": "_csv.network_transport",
"target_field": "network.transport",
"ignore_missing": true
}
},
{
"lowercase": {
"field": "network.transport",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.intrusion_id",
"target_field": "rule.id",
"ignore_missing": true
}
},
{
"date": {
"field": "_csv.event_start",
"target_field": "event.start",
"ignore_failure": true,
"formats": [
"yyyy-MM-dd HH:mm:ss"
],
"timezone": "{{event.timezone}}"
}
},
{
"date": {
"field": "_csv.event_end",
"target_field": "event.end",
"ignore_failure": true,
"formats": [
"yyyy-MM-dd HH:mm:ss"
],
"timezone": "{{event.timezone}}"
}
},
{
"convert": {
"field": "_csv.event_count",
"target_field": "event.count",
"type": "long",
"ignore_failure": true
}
},
{
"rename": {
"field": "_csv.process_exe",
"target_field": "process.executable",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.location",
"target_field": "source.geo.name",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.user_name",
"target_field": "user.name",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.user_domain",
"target_field": "user.domain",
"ignore_missing": true
}
},
{
"convert": {
"if": "ctx?._csv?.local_port != \"0\"",
"field": "_csv.local_port",
"target_field": "source.port",
"type": "long",
"ignore_failure": true
}
},
{
"convert": {
"if": "ctx?._csv?.remote_port != \"0\"",
"field": "_csv.remote_port",
"target_field": "destination.port",
"type": "long",
"ignore_failure": true
}
},
{
"rename": {
"field": "_csv.signature_id",
"target_field": "symantec.endpoint.signature.id",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.signature_name",
"target_field": "symantec.endpoint.signature.name",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.signature_sub_id",
"target_field": "symantec.endpoint.signature.sub_id",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.intrusion_url",
"target_field": "url.original",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.intrusion_payload_url",
"target_field": "symantec.endpoint.intrusion_payload_url",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.sha256",
"target_field": "process.hash.sha256",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.md5",
"target_field": "process.hash.md5",
"ignore_missing": true
}
},
{
"script": {
"if": "ctx?.network?.direction == \"ingress\" && ctx?.source != null && ctx?.destination != null",
"lang": "painless",
"source": "def tmp = ctx.source;\nctx.source = ctx.destination;\nctx.destination = tmp;\n"
}
},
{
"script": {
"description": "apply ecs categories",
"lang": "painless",
"ignore_failure": true,
"source": "ctx.event.kind = 'event';\nctx.event.category = ['intrusion_detection'];\nif (ctx.message.contains('not blocked')) {\n ctx.event.type = ['allowed'];\n ctx.event.action = 'detected';\n} else if (!ctx.message.contains('not blocked') && ctx.message.contains('blocked')) {\n ctx.event.type = ['denied'];\n ctx.event.action = 'blocked';\n} else {\n ctx.event.type = ['info'];\n}\n"
}
},
{
"remove": {
"field": [
"_csv"
]
}
},
{
"geoip": {
"if": "ctx.source?.geo == null",
"field": "source.ip",
"target_field": "source.geo",
"ignore_missing": true,
"ignore_failure": true
}
},
{
"geoip": {
"field": "destination.ip",
"target_field": "destination.geo",
"ignore_missing": true,
"if": "ctx.destination?.geo == null",
"ignore_failure": true
}
},
{
"geoip": {
"field": "source.ip",
"target_field": "source.as",
"database_file": "GeoLite2-ASN.mmdb",
"properties": [
"asn",
"organization_name"
],
"ignore_missing": true,
"ignore_failure": true
}
},
{
"geoip": {
"field": "destination.ip",
"target_field": "destination.as",
"database_file": "GeoLite2-ASN.mmdb",
"properties": [
"asn",
"organization_name"
],
"ignore_missing": true,
"ignore_failure": true
}
},
{
"rename": {
"field": "source.as.asn",
"target_field": "source.as.number",
"ignore_missing": true
}
},
{
"rename": {
"field": "source.as.organization_name",
"target_field": "source.as.organization.name",
"ignore_missing": true
}
},
{
"rename": {
"field": "destination.as.asn",
"target_field": "destination.as.number",
"ignore_missing": true
}
},
{
"rename": {
"field": "destination.as.organization_name",
"target_field": "destination.as.organization.name",
"ignore_missing": true
}
}
]
}

Symantec Endpoint Pipeline POC Usage

This is a pipeline to process Symantec Endpoint Protection logs. It expects a syslog header. The format of the events it expects is:

<51>Dec 8 11:08:30 devhost SymantecServer: server02,Event Description: Portscan,Local Host IP: 192.168.50.2,Local Host MAC: 000000000000,Remote Host Name: ,Remote Host IP: 192.168.50.3,Remote Host MAC: 000000000000,Inbound,TCP,Intrusion ID: 0,Begin: 2020-12-08 09:08:01,End Time: 2020-12-08 09:08:01,Occurrences: 1,Application: ,Location: Office,User Name: johndoe,Domain Name: local,Local Port: 0,Remote Port: 0,CIDS Signature ID: 10000,CIDS Signature string: Portscan,CIDS Signature SubID: 0,Intrusion URL: ,Intrusion Payload URL: ,SHA-256: ,MD-5:

Elasticsearch Setup

  1. Install the pipeline definition to Elasticsearch using Kibana Dev Tools Console or use curl.
    PUT _ingest/pipeline/symantec-endpoint
    { /* JSON pipeline content */ }
    
    OR
    curl -XPUT "https://es:9200/_ingest/pipeline/symantec-endpoint" -H 'Content-Type: application/json' -d@symantec-endpoint-pipeline.json
    

Filebeat Setup

  1. Add a log file input to the filebeat.yml. The udp input could be used alternatively.

    filebeat.inputs:
    - type: log
      paths:
        - /var/log/symantec-endpoint*.log
      tags: [symantec-endpoint, forwarded]
      pipeline: symantec-endpoint
    
  2. Restart Filebeat.

@andrewkroh
Copy link
Author

@herrBez I started an issue at elastic/integrations#471 that mentions some of the issues you pointed out.

One problem with using split is that some values can contain commas. I've seen some events that had ,"User Name: Doe,John", for example. So I think a proper CSV decoder will be needed. I'm thinking to ask Elasticsearch to expose a CSV decode function in Painless that returns an array. Then we could run that array through the foreach/kv loop that you have.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment