Launch "Quick Dev" then stop all sensors and topologies.
service monit stop
service elasticsearch stop
storm kill bro
storm kill snort
storm kill indexing
storm kill enrichment
Create a script that can repeatedly generate messages to work with. Launch the script.
#!/usr/bin/env bash
TOPIC=values
OCTET=`shuf -i 1-10 -n 1`
while true
do
echo "{ \"ip_src_addr\": \"10.0.0.1\", \"value\": $RANDOM }" | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic $TOPIC
sleep 2
done
Validate that messages are landing.
$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic values
{ "value": 6030 }
{ "value": 5937 }
The parser definition will look like the following.
{
"parserClassName":"org.apache.metron.parsers.json.JSONMapParser",
"sensorTopic":"values"
}
Run the following in the Stellar shell: bin/stellar -z node1:2181
and paste the parser definition above into the editor.
conf := SHELL_EDIT()
CONFIG_PUT('PARSER', conf, 'values')
Launch the 'values' parser.
bin/start_parser_topology.sh -z node1:2181 -k node1:6667 -s values
Validate that the parser is working.
$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic enrichments
{"original_string":"{ \"value\": 13178 }","value":13178,"timestamp":1478024236619,"source.type":"values"}
{"original_string":"{ \"value\": 6707 }","value":6707,"timestamp":1478024239896,"source.type":"values"}
{"original_string":"{ \"value\": 20651 }","value":20651,"timestamp":1478024243125,"source.type":"values"}
Create the HBase table for the Profiler.
echo "create 'profiler','P'" | hbase shell
Run the Profiler with a 1 minute period by editing config/profiler.properties
. This is only necessary as I am too impatient to wait for the default 15 minute period.
profiler.workers=1
profiler.executors=0
profiler.input.topic=enrichments
profiler.period.duration=1
profiler.period.duration.units=MINUTES
profiler.hbase.salt.divisor=1000
profiler.hbase.table=profiler
profiler.hbase.column.family=P
profiler.hbase.batch=10
profiler.hbase.flush.interval.seconds=30
kafka.zk=node1:2181
kafka.broker=node1:6667
kafka.start=WHERE_I_LEFT_OFF
Create a profile that captures a "sketch" of the values by editing config/zookeeper/profiler.json
.
{
"profiles": [
{
"profile": "values",
"foreach": "ip_src_addr",
"onlyif": "exists(value)",
"update": { "s": "STATS_ADD(s, value)" },
"result": "s"
}
]
}
Launch the profiler.
bin/zk_load_configs.sh -m PUSH -i config/zookeeper/ -z node1:2181
bin/start_profiler_topology.sh
Sync the Profiler Client with the Profiler by editing $METRON/config/zookeeper/global.json
.
{
"es.clustername": "metron",
"es.ip": "node1",
"es.port": "9300",
"es.date.format": "yyyy.MM.dd.HH",
"profiler.client.period.duration": "1",
"profiler.client.period.duration.units": "MINUTES"
}
Open up the Stellar shell to validate the data.
$ bin/stellar -z node1:2181
[Stellar]>>> PROFILE_GET("values","10.0.0.1",2, "DAYS")
[org.apache.metron.common.math.stats.OnlineStatisticsProvider@79fe4ab9, org.apache.metron.common.math.stats.OnlineStatisticsProvider@ea728121, org.apache.metron.common.math.stats.OnlineStatisticsProvider@90a2df37, org.apache.metron.common.math.stats.OnlineStatisticsProvider@85f09c7e, org.apache.metron.common.math.stats.OnlineStatisticsProvider@c9370df0]
[Stellar]>>> STATS_MEAN( GET_FIRST( PROFILE_GET("values","10.0.0.1", 2, "MINUTES")))
15979.0625
[Stellar]>>> STATS_PERCENTILE( GET_LAST( PROFILE_GET("values","10.0.0.1", 2, "MINUTES")), 90)
3031.958
[Stellar]>>> s := PROFILE_GET("values","10.0.0.1", 1, "HOURS")
[Stellar]>>> m := STATS_MERGE( s)
[Stellar]>>> STATS_PERCENTILE(m, 90)
555.992
Create the enrichment definition for the 'values' sensor. During Enrichment a new field called value_threshold
is defined as the 90th percentile of values seen over the past 2 hours. Then during threat triage, if the value
is greater than this threshold, the triage score is set to 100, indicating an abnormality.
Run the following commands in a Stellar shell: bin/stellar -z node1:2181
ip_src_addr := "10.0.0.1"
is_alert := source.type == 'values'
value_threshold := STATS_PERCENTILE( STATS_MERGE( PROFILE_GET('values', ip_src_addr, 2, 'HOURS')), 90)
# define enrichment fields
conf := CONFIG_GET('ENRICHMENT', 'values', true)
conf := ENRICHMENT_STELLAR_TRANSFORM_ADD(conf, 'ENRICHMENT', SHELL_VARS2MAP('is_alert'))
conf := ENRICHMENT_STELLAR_TRANSFORM_ADD(conf, 'ENRICHMENT', SHELL_VARS2MAP( 'value_threshold'))
# define threat triage rules
outlier := value > value_threshold
rules := { SHELL_GET_EXPRESSION('outlier') : 100 }
conf := THREAT_TRIAGE_ADD( conf, rules)
# save the configuration
CONFIG_PUT('ENRICHMENT', conf, 'values')
Launch the Enrichment topology. Ensure MySQL is running other Enrichment will do nothing.
bin/start_enrichment_topology.sh
Validate enrichment and scoring. Notice that the last message in the following example has "threat.triage.level":100.0
as "value":32339
is above the"value_red_level":29119.199999999997
.
$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic indexing
{"enrichmentsplitterbolt.splitter.end.ts":"1478096276482","adapter.stellaradapter.end.ts":"1478096281466","enrichmentsplitterbolt.splitter.begin.ts":"1478096276212","source.type":"values","original_string":"{ \"ip_src_addr\": \"10.0.0.1\", \"value\": 23296 }","threatintelsplitterbolt.splitter.end.ts":"1478096281537","value_red_level":29119.199999999997,"threatinteljoinbolt.joiner.ts":"1478096281541","enrichmentjoinbolt.joiner.ts":"1478096281524","threatintelsplitterbolt.splitter.begin.ts":"1478096281537","ip_src_addr":"10.0.0.1","value":23296,"timestamp":1478096272246,"adapter.stellaradapter.begin.ts":"1478096276514"}
{"enrichmentsplitterbolt.splitter.end.ts":"1478096277251","adapter.stellaradapter.end.ts":"1478096281621","enrichmentsplitterbolt.splitter.begin.ts":"1478096277245","source.type":"values","original_string":"{ \"ip_src_addr\": \"10.0.0.1\", \"value\": 16358 }","threatintelsplitterbolt.splitter.end.ts":"1478096281626","value_red_level":29119.199999999997,"threatinteljoinbolt.joiner.ts":"1478096281629","enrichmentjoinbolt.joiner.ts":"1478096281624","threatintelsplitterbolt.splitter.begin.ts":"1478096281626","ip_src_addr":"10.0.0.1","value":16358,"timestamp":1478096277241,"adapter.stellaradapter.begin.ts":"1478096281466"}
{"enrichmentsplitterbolt.splitter.end.ts":"1478100229033","adapter.stellaradapter.end.ts":"1478100229066","enrichmentsplitterbolt.splitter.begin.ts":"1478100229031","is_alert":"true","source.type":"values","original_string":"{ \"ip_src_addr\": \"10.0.0.1\", \"value\": 32339 }","threatintelsplitterbolt.splitter.end.ts":"1478100229071","value_red_level":29121.666666666668,"threat.triage.level":100.0,"threatinteljoinbolt.joiner.ts":"1478100229073","enrichmentjoinbolt.joiner.ts":"1478100229069","threatintelsplitterbolt.splitter.begin.ts":"1478100229071","ip_src_addr":"10.0.0.1","value":32339,"adapter.stellaradapter.begin.ts":"1478100229035","timestamp":1478100229027}