Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Symantec Endpoint Elasticsearch Ingest Node Pipeline (POC)
{
"description": "Pipeline for parsing Symantec Endpoint logs",
"processors": [
{
"set": {
"field": "event.original",
"value": "{{{message}}}"
}
},
{
"dissect": {
"if": "ctx?.message.contains(\"KAFKA_CONNECT_SYSLOG\")",
"field": "message",
"pattern": "%{} - - - - %{message}"
}
},
{
"set": {
"field": "event.ingested",
"value": "{{_ingest.timestamp}}"
}
},
{
"set": {
"field": "event.timezone",
"value": "-05:00"
}
},
{
"set": {
"field": "observer.vendor",
"value": "Symantec"
}
},
{
"set": {
"field": "observer.product",
"value": "Endpoint Protection"
}
},
{
"set": {
"field": "observer.type",
"value": "edr"
}
},
{
"set": {
"field": "event.module",
"value": "symantec"
}
},
{
"set": {
"field": "event.dataset",
"value": "symantec.endpoint"
}
},
{
"grok": {
"field": "message",
"patterns": [
"<%{NONNEGINT:log.syslog.priority:long}>%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:observer.name}: %{GREEDYDATA:message}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"MMM dd HH:mm:ss",
"MMM d HH:mm:ss",
"MMM d HH:mm:ss"
],
"target_field": "@timestamp",
"timezone": "{{event.timezone}}",
"ignore_failure": true
}
},
{
"remove": {
"field": [
"timestamp"
]
}
},
{
"csv": {
"field": "message",
"target_fields": [
"_csv.server",
"_csv.event_description",
"_csv.local_ip",
"_csv.local_mac",
"_csv.remote_name",
"_csv.remote_ip",
"_csv.remote_mac",
"_csv.network_direction",
"_csv.network_transport",
"_csv.intrusion_id",
"_csv.event_start",
"_csv.event_end",
"_csv.event_count",
"_csv.process_exe",
"_csv.location",
"_csv.user_name",
"_csv.user_domain",
"_csv.local_port",
"_csv.remote_port",
"_csv.signature_id",
"_csv.signature_name",
"_csv.signature_sub_id",
"_csv.intrusion_url",
"_csv.intrusion_payload_url",
"_csv.sha256",
"_csv.md5"
]
}
},
{
"script": {
"description": "split each csv value",
"lang": "painless",
"source": "def out = [:];\nctx._csv.forEach((k, v) -> {\n def idx = v.indexOf(\":\");\n if (idx != -1 && idx+1 <= v.length()) {\n def trimmed = v.substring(v.indexOf(\":\")+1, v.length()).trim();\n if (!trimmed.isEmpty()) {\n out[k] = trimmed;\n }\n } else {\n out[k] = v;\n }\n return true;\n});\nctx._csv = out;\n"
}
},
{
"set": {
"field": "event.code",
"value": "{{_csv.signature_id}}",
"ignore_empty_value": true
}
},
{
"rename": {
"field": "_csv.server",
"target_field": "symantec.endpoint.server",
"ignore_missing": true
}
},
{
"remove": {
"field": "message",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.event_description",
"target_field": "message",
"ignore_missing": true
}
},
{
"rename": {
"if": "ctx?._csv?.local_ip != \"0.0.0.0\"",
"field": "_csv.local_ip",
"target_field": "source.ip",
"ignore_missing": true
}
},
{
"rename": {
"if": "ctx?._csv?.local_mac != \"000000000000\"",
"field": "_csv.local_mac",
"target_field": "source.mac",
"ignore_missing": true
}
},
{
"rename": {
"if": "ctx?._csv?.remote_name != \"\"",
"field": "_csv.remote_name",
"target_field": "destination.domain",
"ignore_missing": true
}
},
{
"rename": {
"if": "ctx?._csv?.remote_ip != \"0.0.0.0\"",
"field": "_csv.remote_ip",
"target_field": "destination.ip",
"ignore_missing": true
}
},
{
"rename": {
"if": "ctx?._csv?.remote_mac != \"000000000000\"",
"field": "_csv.remote_mac",
"target_field": "destination.mac",
"ignore_missing": true
}
},
{
"set": {
"if": "ctx?._csv?.network_direction == \"Inbound\"",
"field": "network.direction",
"value": "ingress"
}
},
{
"set": {
"if": "ctx?._csv?.network_direction == \"Outbound\"",
"field": "network.direction",
"value": "egress"
}
},
{
"rename": {
"field": "_csv.network_transport",
"target_field": "network.transport",
"ignore_missing": true
}
},
{
"lowercase": {
"field": "network.transport",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.intrusion_id",
"target_field": "rule.id",
"ignore_missing": true
}
},
{
"date": {
"field": "_csv.event_start",
"target_field": "event.start",
"ignore_failure": true,
"formats": [
"yyyy-MM-dd HH:mm:ss"
],
"timezone": "{{event.timezone}}"
}
},
{
"date": {
"field": "_csv.event_end",
"target_field": "event.end",
"ignore_failure": true,
"formats": [
"yyyy-MM-dd HH:mm:ss"
],
"timezone": "{{event.timezone}}"
}
},
{
"convert": {
"field": "_csv.event_count",
"target_field": "event.count",
"type": "long",
"ignore_failure": true
}
},
{
"rename": {
"field": "_csv.process_exe",
"target_field": "process.executable",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.location",
"target_field": "source.geo.name",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.user_name",
"target_field": "user.name",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.user_domain",
"target_field": "user.domain",
"ignore_missing": true
}
},
{
"convert": {
"if": "ctx?._csv?.local_port != \"0\"",
"field": "_csv.local_port",
"target_field": "source.port",
"type": "long",
"ignore_failure": true
}
},
{
"convert": {
"if": "ctx?._csv?.remote_port != \"0\"",
"field": "_csv.remote_port",
"target_field": "destination.port",
"type": "long",
"ignore_failure": true
}
},
{
"rename": {
"field": "_csv.signature_id",
"target_field": "symantec.endpoint.signature.id",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.signature_name",
"target_field": "symantec.endpoint.signature.name",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.signature_sub_id",
"target_field": "symantec.endpoint.signature.sub_id",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.intrusion_url",
"target_field": "url.original",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.intrusion_payload_url",
"target_field": "symantec.endpoint.intrusion_payload_url",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.sha256",
"target_field": "process.hash.sha256",
"ignore_missing": true
}
},
{
"rename": {
"field": "_csv.md5",
"target_field": "process.hash.md5",
"ignore_missing": true
}
},
{
"script": {
"if": "ctx?.network?.direction == \"ingress\" && ctx?.source != null && ctx?.destination != null",
"lang": "painless",
"source": "def tmp = ctx.source;\nctx.source = ctx.destination;\nctx.destination = tmp;\n"
}
},
{
"script": {
"description": "apply ecs categories",
"lang": "painless",
"ignore_failure": true,
"source": "ctx.event.kind = 'event';\nctx.event.category = ['intrusion_detection'];\nif (ctx.message.contains('not blocked')) {\n ctx.event.type = ['allowed'];\n ctx.event.action = 'detected';\n} else if (!ctx.message.contains('not blocked') && ctx.message.contains('blocked')) {\n ctx.event.type = ['denied'];\n ctx.event.action = 'blocked';\n} else {\n ctx.event.type = ['info'];\n}\n"
}
},
{
"remove": {
"field": [
"_csv"
]
}
},
{
"geoip": {
"if": "ctx.source?.geo == null",
"field": "source.ip",
"target_field": "source.geo",
"ignore_missing": true,
"ignore_failure": true
}
},
{
"geoip": {
"field": "destination.ip",
"target_field": "destination.geo",
"ignore_missing": true,
"if": "ctx.destination?.geo == null",
"ignore_failure": true
}
},
{
"geoip": {
"field": "source.ip",
"target_field": "source.as",
"database_file": "GeoLite2-ASN.mmdb",
"properties": [
"asn",
"organization_name"
],
"ignore_missing": true,
"ignore_failure": true
}
},
{
"geoip": {
"field": "destination.ip",
"target_field": "destination.as",
"database_file": "GeoLite2-ASN.mmdb",
"properties": [
"asn",
"organization_name"
],
"ignore_missing": true,
"ignore_failure": true
}
},
{
"rename": {
"field": "source.as.asn",
"target_field": "source.as.number",
"ignore_missing": true
}
},
{
"rename": {
"field": "source.as.organization_name",
"target_field": "source.as.organization.name",
"ignore_missing": true
}
},
{
"rename": {
"field": "destination.as.asn",
"target_field": "destination.as.number",
"ignore_missing": true
}
},
{
"rename": {
"field": "destination.as.organization_name",
"target_field": "destination.as.organization.name",
"ignore_missing": true
}
}
]
}

Symantec Endpoint Pipeline POC Usage

This is a pipeline to process Symantec Endpoint Protection logs. It expects a syslog header. The format of the events it expects is:

<51>Dec 8 11:08:30 devhost SymantecServer: server02,Event Description: Portscan,Local Host IP: 192.168.50.2,Local Host MAC: 000000000000,Remote Host Name: ,Remote Host IP: 192.168.50.3,Remote Host MAC: 000000000000,Inbound,TCP,Intrusion ID: 0,Begin: 2020-12-08 09:08:01,End Time: 2020-12-08 09:08:01,Occurrences: 1,Application: ,Location: Office,User Name: johndoe,Domain Name: local,Local Port: 0,Remote Port: 0,CIDS Signature ID: 10000,CIDS Signature string: Portscan,CIDS Signature SubID: 0,Intrusion URL: ,Intrusion Payload URL: ,SHA-256: ,MD-5:

Elasticsearch Setup

  1. Install the pipeline definition to Elasticsearch using Kibana Dev Tools Console or use curl.
    PUT _ingest/pipeline/symantec-endpoint
    { /* JSON pipeline content */ }
    
    OR
    curl -XPUT "https://es:9200/_ingest/pipeline/symantec-endpoint" -H 'Content-Type: application/json' -d@symantec-endpoint-pipeline.json
    

Filebeat Setup

  1. Add a log file input to the filebeat.yml. The udp input could be used alternatively.

    filebeat.inputs:
    - type: log
      paths:
        - /var/log/symantec-endpoint*.log
      tags: [symantec-endpoint, forwarded]
      pipeline: symantec-endpoint
    
  2. Restart Filebeat.

@herrBez
Copy link

herrBez commented Dec 22, 2020

Hi there,

I noted that the version I am using of Symantec (Version 14 (14.3 MP1) build 1148 (14.3.1148.0100)) can have different keys for different Logs and that the order is not always guaranteed to be the same. To cope with this fact I took your pipeline and instead of using the csv processor (which relies on the ordering) I used a combination of split and kv. So I can use labels to refer to the different fields, e.g., Occurrencies: 1 becomes ctx._csv.Occurrencies and so on. What is particularly tricky, is the presence of fields without a key. In this case I iterate over the "unparsed" entries and search for the string I need (e.g., Inbound, Outbound for network.direction and TCP, UDP for network.transport. I share my modification, in the hope to be useful for the community.

Kind regards,
Mirko

...
 {
      "split": {
        "field": "message",
        "separator": ","
      }
    },
  {
      "foreach": {
        "field": "message",
        "processor": {
          "kv": {
            "field": "_ingest._value",
            "value_split": ":",
            "field_split": "zzz",
            "trim_value": " ",
            "target_field": "_csv",
            "on_failure": [
              {
                "append": {
                  "value": "{{_ingest._value}}",
                  "field": "unparsed"
                }
              }
            ]
          }
        }
      }
    },
    {
      "script": {
        "source": """
        def unparsed = ctx.unparsed;
        ctx._csv.server = unparsed[0];
        for (x in unparsed) {
          if (["Inbound", "Outbound"].contains(x)) {
            ctx._csv.network_direction = x;
          }
          if (["UDP", "TCP", "ETHERNET"].contains(x)) {
            ctx._csv.network_transport = x;
          }
        }
        """
      }
    },
    {
      "set": {
        "field": "symantec.endpoint.server",
        "value": "{{ctx._csv.server}}"
      }
    },
    {
      "remove": {
        "field": "message",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "_csv.Event Description",
        "target_field": "message",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "if": "ctx._csv['Local Host IP'] != \"0.0.0.0\"",
        "field": "_csv.Local Host IP",
        "target_field": "source.ip",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "if": "ctx?._csv['Local Host MAC'] != \"000000000000\"",
        "field": "_csv.Local Host Mac",
        "target_field": "source.mac",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "if": "ctx._csv['Remote Host Name'] != \"\"",
        "field": "_csv.Remote Host Name",
        "target_field": "destination.domain",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "if": "ctx._csv['Remote Host IP'] != \"0.0.0.0\"",
        "field": "_csv.Remote Host IP",
        "target_field": "destination.ip",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "if": "ctx._csv['Remote Host MAC'] != \"000000000000\"",
        "field": "_csv.Remote Host MAC",
        "target_field": "destination.mac",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "if": "ctx?._csv?.network_direction == \"Inbound\"",
        "field": "network.direction",
        "value": "ingress"
      }
    },
    {
      "set": {
        "if": "ctx?._csv?.network_direction == \"Outbound\"",
        "field": "network.direction",
        "value": "egress"
      }
    },
    {
      "rename": {
        "field": "_csv.network_transport",
        "target_field": "network.transport",
        "ignore_missing": true
      }
    },
    {
      "lowercase": {
        "field": "network.transport",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "_csv.Intrusion ID",
        "target_field": "rule.id",
        "ignore_missing": true
      }
    },
    {
      "date": {
        "field": "_csv.Begin",
        "target_field": "event.start",
        "ignore_failure": true,
        "formats": [
          "yyyy-MM-dd HH:mm:ss"
        ],
        "timezone": "{{event.timezone}}"
      }
    },
    {
      "date": {
        "field": "_csv.End Time",
        "target_field": "event.end",
        "ignore_failure": true,
        "formats": [
          "yyyy-MM-dd HH:mm:ss"
        ],
        "timezone": "{{event.timezone}}"
      }
    },
    {
      "convert": {
        "field": "_csv.Occurrences",
        "target_field": "event.count",
        "type": "long",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "_csv.Application",
        "target_field": "process.executable",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "_csv.Location",
        "target_field": "source.geo.name",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "_csv.User Name",
        "target_field": "user.name",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "_csv.Domain Name",
        "target_field": "user.domain",
        "ignore_missing": true
      }
    },
     {
      "convert": {
        "if": "ctx._csv['Local Port'] != \"0\"",
        "field": "_csv.Local Port",
        "target_field": "source.port",
        "type": "long",
        "ignore_failure": true
      }
    },
    {
      "convert": {
        "if": "ctx._csv['Remote Port'] != \"0\"",
        "field": "_csv.Remote Port",
        "target_field": "destination.port",
        "type": "long",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "_csv.CIDS Signature ID",
        "target_field": "symantec.endpoint.signature.id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "_csv.CIDS Signature string",
        "target_field": "symantec.endpoint.signature.name",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "_csv.CIDS Signature SubID",
        "target_field": "symantec.endpoint.signature.sub_id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "_csv.Intrusion URL",
        "target_field": "url.original",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "_csv.Intrusion Payload URL",
        "target_field": "symantec.endpoint.intrusion_payload_url",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "_csv.SHA-256",
        "target_field": "process.hash.sha256",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "_csv.MD5",
        "target_field": "process.hash.md5",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "_csv.Action",
        "target_field": "event.action",
        "ignore_missing": true
      }
    },
    {
      "script": {
        "if": "ctx?.network?.direction == \"ingress\" && ctx?.source != null && ctx?.destination != null",
        "lang": "painless",
        "source": "def tmp = ctx.source;\nctx.source = ctx.destination;\nctx.destination = tmp;\n"
      }
    },
    {
      "script": {
        "description": "apply ecs categories",
        "lang": "painless",
        "ignore_failure": true,
        "source": "ctx.event.kind = 'event';\nctx.event.category = ['intrusion_detection'];\nif (ctx.message.contains('not blocked')) {\n    ctx.event.type = ['allowed'];\n    ctx.event.action = 'detected';\n} else if (!ctx.message.contains('not blocked') && ctx.message.contains('blocked')) {\n    ctx.event.type = ['denied'];\n    ctx.event.action = 'blocked';\n} else {\n    ctx.event.type = ['info'];\n}\n"
      }
    },
    {
      "remove": {
        "field": [
          "_csv"
        ]
      }
    }
    ...

@andrewkroh
Copy link
Author

andrewkroh commented Dec 22, 2020

@herrBez I started an issue at elastic/integrations#471 that mentions some of the issues you pointed out.

One problem with using split is that some values can contain commas. I've seen some events that had ,"User Name: Doe,John", for example. So I think a proper CSV decoder will be needed. I'm thinking to ask Elasticsearch to expose a CSV decode function in Painless that returns an array. Then we could run that array through the foreach/kv loop that you have.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment