Skip to content

Instantly share code, notes, and snippets.

@nickwallen
Created November 15, 2016 18:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nickwallen/43bea625c45c6d4ee6722d9a67868acf to your computer and use it in GitHub Desktop.
Save nickwallen/43bea625c45c6d4ee6722d9a67868acf to your computer and use it in GitHub Desktop.

1

Launch "Quick Dev" then stop all sensors and topologies.

service monit stop
service elasticsearch stop
storm kill bro
storm kill snort
storm kill indexing
storm kill enrichment

Setup Working Data Set

2

Create a script that can repeatedly generate messages to work with. Launch the script.

#!/usr/bin/env bash
TOPIC=values
OCTET=`shuf -i 1-10 -n 1`
while true
do
  echo "{ \"ip_src_addr\": \"10.0.0.1\", \"value\": $RANDOM }" | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic $TOPIC
  sleep 2
done

3.

Validate that messages are landing.

$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic values

  { "value": 6030 }
  { "value": 5937 }

Create a Parser

4.

The parser definition will look like the following.

{
  "parserClassName":"org.apache.metron.parsers.json.JSONMapParser",
  "sensorTopic":"values"
}

Run the following in the Stellar shell: bin/stellar -z node1:2181 and paste the parser definition above into the editor.

conf := SHELL_EDIT()
CONFIG_PUT('PARSER', conf, 'values')

5.

Launch the 'values' parser.

bin/start_parser_topology.sh -z node1:2181 -k node1:6667 -s values

6.

Validate that the parser is working.

$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic enrichments

  {"original_string":"{ \"value\": 13178 }","value":13178,"timestamp":1478024236619,"source.type":"values"}
  {"original_string":"{ \"value\": 6707 }","value":6707,"timestamp":1478024239896,"source.type":"values"}
  {"original_string":"{ \"value\": 20651 }","value":20651,"timestamp":1478024243125,"source.type":"values"}

Create a Profile

7.

Create the HBase table for the Profiler.

echo "create 'profiler','P'" | hbase shell

8.

Run the Profiler with a 1 minute period by editing config/profiler.properties. This is only necessary as I am too impatient to wait for the default 15 minute period.

profiler.workers=1
profiler.executors=0
profiler.input.topic=enrichments
profiler.period.duration=1
profiler.period.duration.units=MINUTES
profiler.hbase.salt.divisor=1000
profiler.hbase.table=profiler
profiler.hbase.column.family=P
profiler.hbase.batch=10
profiler.hbase.flush.interval.seconds=30

kafka.zk=node1:2181
kafka.broker=node1:6667
kafka.start=WHERE_I_LEFT_OFF

9.

Create a profile that captures a "sketch" of the values by editing config/zookeeper/profiler.json.

{
  "profiles": [
    {
      "profile": "values",
      "foreach": "ip_src_addr",
      "onlyif":  "exists(value)",
      "update":  { "s": "STATS_ADD(s, value)" },
      "result":  "s"
    }
  ]
}

10.

Launch the profiler.

bin/zk_load_configs.sh -m PUSH -i config/zookeeper/ -z node1:2181
bin/start_profiler_topology.sh

11.

Sync the Profiler Client with the Profiler by editing $METRON/config/zookeeper/global.json.

{
  "es.clustername": "metron",
  "es.ip": "node1",
  "es.port": "9300",
  "es.date.format": "yyyy.MM.dd.HH",

  "profiler.client.period.duration": "1",
  "profiler.client.period.duration.units": "MINUTES"
}

12.

Open up the Stellar shell to validate the data.

$ bin/stellar -z node1:2181

[Stellar]>>> PROFILE_GET("values","10.0.0.1",2, "DAYS")
[org.apache.metron.common.math.stats.OnlineStatisticsProvider@79fe4ab9, org.apache.metron.common.math.stats.OnlineStatisticsProvider@ea728121, org.apache.metron.common.math.stats.OnlineStatisticsProvider@90a2df37, org.apache.metron.common.math.stats.OnlineStatisticsProvider@85f09c7e, org.apache.metron.common.math.stats.OnlineStatisticsProvider@c9370df0]

[Stellar]>>> STATS_MEAN( GET_FIRST( PROFILE_GET("values","10.0.0.1", 2, "MINUTES")))
15979.0625

[Stellar]>>> STATS_PERCENTILE( GET_LAST( PROFILE_GET("values","10.0.0.1", 2, "MINUTES")), 90)
3031.958

[Stellar]>>> s := PROFILE_GET("values","10.0.0.1", 1, "HOURS")
[Stellar]>>> m := STATS_MERGE( s)
[Stellar]>>> STATS_PERCENTILE(m, 90)
555.992

Enrich and Score Messages

13.

Create the enrichment definition for the 'values' sensor. During Enrichment a new field called value_threshold is defined as the 90th percentile of values seen over the past 2 hours. Then during threat triage, if the value is greater than this threshold, the triage score is set to 100, indicating an abnormality.

Run the following commands in a Stellar shell: bin/stellar -z node1:2181

ip_src_addr := "10.0.0.1"
is_alert := source.type == 'values'
value_threshold := STATS_PERCENTILE( STATS_MERGE( PROFILE_GET('values', ip_src_addr, 2, 'HOURS')), 90)

# define enrichment fields
conf := CONFIG_GET('ENRICHMENT', 'values', true)
conf := ENRICHMENT_STELLAR_TRANSFORM_ADD(conf, 'ENRICHMENT', SHELL_VARS2MAP('is_alert'))
conf := ENRICHMENT_STELLAR_TRANSFORM_ADD(conf, 'ENRICHMENT', SHELL_VARS2MAP( 'value_threshold'))

# define threat triage rules
outlier := value > value_threshold
rules := { SHELL_GET_EXPRESSION('outlier') : 100 }
conf := THREAT_TRIAGE_ADD( conf, rules)

# save the configuration
CONFIG_PUT('ENRICHMENT', conf, 'values')

14.

Launch the Enrichment topology. Ensure MySQL is running other Enrichment will do nothing.

bin/start_enrichment_topology.sh

15.

Validate enrichment and scoring. Notice that the last message in the following example has "threat.triage.level":100.0 as "value":32339 is above the"value_red_level":29119.199999999997.

$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic indexing

{"enrichmentsplitterbolt.splitter.end.ts":"1478096276482","adapter.stellaradapter.end.ts":"1478096281466","enrichmentsplitterbolt.splitter.begin.ts":"1478096276212","source.type":"values","original_string":"{ \"ip_src_addr\": \"10.0.0.1\", \"value\": 23296 }","threatintelsplitterbolt.splitter.end.ts":"1478096281537","value_red_level":29119.199999999997,"threatinteljoinbolt.joiner.ts":"1478096281541","enrichmentjoinbolt.joiner.ts":"1478096281524","threatintelsplitterbolt.splitter.begin.ts":"1478096281537","ip_src_addr":"10.0.0.1","value":23296,"timestamp":1478096272246,"adapter.stellaradapter.begin.ts":"1478096276514"}
{"enrichmentsplitterbolt.splitter.end.ts":"1478096277251","adapter.stellaradapter.end.ts":"1478096281621","enrichmentsplitterbolt.splitter.begin.ts":"1478096277245","source.type":"values","original_string":"{ \"ip_src_addr\": \"10.0.0.1\", \"value\": 16358 }","threatintelsplitterbolt.splitter.end.ts":"1478096281626","value_red_level":29119.199999999997,"threatinteljoinbolt.joiner.ts":"1478096281629","enrichmentjoinbolt.joiner.ts":"1478096281624","threatintelsplitterbolt.splitter.begin.ts":"1478096281626","ip_src_addr":"10.0.0.1","value":16358,"timestamp":1478096277241,"adapter.stellaradapter.begin.ts":"1478096281466"}
{"enrichmentsplitterbolt.splitter.end.ts":"1478100229033","adapter.stellaradapter.end.ts":"1478100229066","enrichmentsplitterbolt.splitter.begin.ts":"1478100229031","is_alert":"true","source.type":"values","original_string":"{ \"ip_src_addr\": \"10.0.0.1\", \"value\": 32339 }","threatintelsplitterbolt.splitter.end.ts":"1478100229071","value_red_level":29121.666666666668,"threat.triage.level":100.0,"threatinteljoinbolt.joiner.ts":"1478100229073","enrichmentjoinbolt.joiner.ts":"1478100229069","threatintelsplitterbolt.splitter.begin.ts":"1478100229071","ip_src_addr":"10.0.0.1","value":32339,"adapter.stellaradapter.begin.ts":"1478100229035","timestamp":1478100229027}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment