Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save clemenspeters/8e9025e3bd71e9087df154fb06f96328 to your computer and use it in GitHub Desktop.
Save clemenspeters/8e9025e3bd71e9087df154fb06f96328 to your computer and use it in GitHub Desktop.
Real-time Anomaly Detection in VPC Flow Logs

Real-time Anomaly Detection in VPC Flow Logs (in AWS)

Introduction

Credit goes to Igor Kantor (https://medium.com/@devfire) who wrote the original post (5 parts) on Medium:

The goal of this GitHubGist is to support anyone who wants to implement the described architecture and get it running on AWS. This means you should use both the Medium Post and this GitHubGist for the implementation (since I will not repeat all the text here).

On my aws account I used a prefix (medium_) for all services, to easily find them amongst all the other running services/instance/funtions/roles etc. (just as a suggestion). It will make cleaning up your aws account easier later on.

In this GitHubGist we will focus on the anomaly detection ("components within the red dashes"):

Architecture Anomaly Detection

Part 1

https://medium.com/@devfire/real-time-anomaly-detection-in-vpc-flow-logs-part-1-introduction-55ed000e039b

Just introduction text. Does not contain any code or implementation.

Part 2

https://medium.com/@devfire/real-time-anomaly-detection-in-vpc-flow-logs-part-2-proposed-architecture-32683755abf7

Gives an overview of the architecture.

In this GitHubGist we will focus on the anomaly detection ("components within the red dashes").

Part 3

https://medium.com/@devfire/real-time-anomaly-detection-in-vpc-flow-logs-part-3-kinesis-stream-1bdd8a9426f1

This is where the fun begins.

Make sure you have the AWS Command Line Interface installed (https://docs.aws.amazon.com/de_de/cli/latest/userguide/cli-install-macos.html).

If you just installed the aws cli, make sure you run aws configure to connect it to your aws account.

Let's get started (with the article):

aws kinesis create-stream --stream-name "VPCFlowLogs" --shard-count 1

vim allowCloudWatchAccesstoKinesis.json

Paste the content and substitute your region as needed (in line: "Principal": { "Service": "logs.us-east-1.amazonaws.com" }, ).

Run the following command from the same directory where allowCloudWatchAccesstoKinesis.json is located:

aws iam create-role --role-name CloudWatchToKinesisRole --assume-role-policy-document file://./allowCloudWatchAccesstoKinesis.json

vim cloudWatchPermissions.json.

Paste the content and substitute the two resource arn strings with yours.

Run the following command from the same directory where cloudWatchPermissions.json is located:

aws iam put-role-policy --role-name CloudWatchToKinesisRole --policy-name Permissions-Policy-For-CWL --policy-document file://./cloudWatchPermissions.json

aws logs put-subscription-filter \
    --log-group-name "VPCFlowLogs" \
    --filter-name "VPCFlowLogsAllFilter" \
    --filter-pattern "[version, account_id, interface_id, srcaddr != "-", dstaddr != "-", srcport != "-", dstport != "-", protocol, packets, bytes, start, end, action, log_status]" \
    --destination-arn "arn:aws:kinesis:us-east-1:31415926:stream/VPCFlowLogs" \
    --role-arn "arn:aws:iam::31415926:role/CloudWatchToKinesisRole"

brew install jq

aws kinesis get-records --limit 10 --shard-iterator $(aws kinesis get-shard-iterator --stream-name medium_VPCFlowLogs --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON | jq -r ."ShardIterator") | jq -r .Records[].Data | base64 -d | zcat

In my case aws kinesis get-records --limit 10 --shard-iterator $(aws kinesis get-shard-iterator --stream-name medium_VPCFlowLogs --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON | jq -r ."ShardIterator") gives me the following:

{
    "Records": [],
    "NextShardIterator": "AAAAAAAAAAF3vrrgukNNTm4aHO8iRO5DXESr+ofQJFH57eXltrr1DTJW1DExKKckOxU5ZT2nFoQ2FOJsmfscXDRl9po0Q9Jb1Zvs9aytKcMLldmE6P7o/HZSC5uhlpT3/tFVAAMQAvqZ41MIFJqFik/kShb9q9oEB3WbYTrigihimcr1haixhIdQv2J6TJhJ6dZ1l6ggsVcnjbok1NZzIyzeABkS8fhLKSsJ/qIV+WmNigYX0MSYVA==",
    "MillisBehindLatest": 31385000
}

so when appending the | jq -r .Records[].Data | base64 -d | zcat part to that command it gives me "no matches found: .Records[].Data". So I still have to figure out how to insert records here (probably just need to put traffic on the vpc).

Part 4

https://medium.com/@devfire/real-time-anomaly-detection-in-vpc-flow-logs-part-4-kinesis-analytics-c80cc4977e97 Architecture Anomaly Detection

Navigate to the Kinesis Analytics page in the AWS console and click on Create Application.

       Name it VPCFlowLogsAnalytics.

       Hook it up to the VPCFlowLogs Kinesis stream you created earlier.

       in case you can not create the application because you don't have enough data,

       see section Amazon Kinesis Data Generator on the bottom of this file.

       Enable Lambda pre-processing and say you want a new Lambda function.

             Use blueprint kinesis-analytics-process-compressed-record for the lambda function

             Name the lambda function KinesisAnalyticsProcessCompressedRecord

             Create in IAM a lambda_kinesis_exec_role and give it AmazonKinesisFullAccess.

             Assign lambda_kinesis_exec_role for your lambda function (KinesisAnalyticsProcessCompressedRecord).

             Increase the Timeout setting to at least 1 minute

       Click on Discover Schema

In my case the schema was not recognized and I had to manually create all the columns: bildschirmfoto 2018-12-12 um 18 08 20

I had to rename the columns start and end (see error message in screenshot). Here are the column names:

version, account_id, interface_id, srcaddr, dstaddr, srcport, dstport, protocol, packets, bytes, start_, end_, action, log_status

I recommend to create them in reverse order (log_status first, version last) so you dont have to sort manually.

Part 5

https://medium.com/@devfire/real-time-anomaly-detection-in-vpc-flow-logs-part-4-kinesis-analytics-c80cc4977e97

We are still where we left in part 4 (create Kinesis Analytics Application):

       Click on the blue Go to SQL Editor button.

-- ** Anomaly detection **
-- Compute an anomaly score for each record in the source stream using Random Cut Forest
-- Creates a temporary stream and defines a schema
CREATE OR REPLACE STREAM "TEMP_STREAM" (
--   "APPROXIMATE_ARRIVAL_TIME"     timestamp,
--   "srcaddr"     varchar(16),
--   "dstaddr"   varchar(16),
   "bytes"        DOUBLE,
   "ANOMALY_SCORE"  DOUBLE,
   "ANOMALY_EXPLANATION"  varchar(512));
   
-- Creates an output stream and defines a schema
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
--   "APPROXIMATE_ARRIVAL_TIME"     timestamp,
 --  "srcaddr"     varchar(16),
--   "dstaddr"   varchar(16),
   "bytes"        DOUBLE,
   "ANOMALY_SCORE"  DOUBLE,
   "ANOMALY_EXPLANATION"  varchar(512));
 
-- Compute an anomaly score for each record in the source stream
-- using Random Cut Forest
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM"
--  SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", "srcaddr", "dstaddr", "bytes", "ANOMALY_SCORE", "ANOMALY_EXPLANATION"
-- SELECT STREAM "srcaddr", "dstaddr", "bytes", "ANOMALY_SCORE", "ANOMALY_EXPLANATION"
  SELECT STREAM "bytes", "ANOMALY_SCORE", "ANOMALY_EXPLANATION" 
  FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION(
    CURSOR(SELECT STREAM "bytes" FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, true
  )
);
-- Sort records by descending anomaly score, insert into output stream
CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM * FROM "TEMP_STREAM"
--WHERE ANOMALY_SCORE > 3.0
ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

(It is mentioned that credits go to Curtis Mitchell: https://medium.com/@curtis_mitchell)

To understand where the anomaly detection happens see: https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-random-cut-forest-with-explanation.html

The following part is not from the Medium article, but was added here

Part 5 continued

Create new lambda function using the blueprint kinesis-analytics-output

Name the lambda function kinesis_analytics_destination

Open your VPCFlowLogsAnalytics in _Kinesis Analytics applications _ Click Connect new destination button

Choose AWS Lambda function as destination and select kinesis_analytics_destination

As In-application stream name choose DESTINATION_SQL_STREAM

Click Save and continue

Amazon Kinesis Data Generator (I needed this for part 4)

To supply the Kinesis Stream with data I had to set up an Amazon Kinesis Data Generator first.

Open the page https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html.

Download the CloudFormation template cognito-setup.json (also linked on the page).

Click the blue Create a Cognito User with CloudFormation button (on the page).

You are redirected to aws console.

Choose Upload a template to Amazon S3 and choose the cognito-setup.json which you downloaded before.

Click the next button (bottom right).

Read the rest of the page to learn how to access and use the Amazon Kinesis Data Generator.

{
"Statement": {
"Effect": "Allow",
"Principal": { "Service": "logs.us-east-1.amazonaws.com" },
"Action": "sts:AssumeRole"
}
}
{
"Statement": [
{
"Effect": "Allow",
"Action": "kinesis:PutRecord",
"Resource": "arn:aws:kinesis:us-east-1:31415926:stream/VPCFlowLogs"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "arn:aws:iam::31415926:role/CloudWatchToKinesisRole"
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment