Credit goes to Igor Kantor (https://medium.com/@devfire) who wrote the original post (5 parts) on Medium:
The goal of this GitHubGist is to support anyone who wants to implement the described architecture and get it running on AWS. This means you should use both the Medium Post and this GitHubGist for the implementation (since I will not repeat all the text here).
On my aws account I used a prefix (medium_) for all services, to easily find them amongst all the other running services/instance/funtions/roles etc. (just as a suggestion). It will make cleaning up your aws account easier later on.
In this GitHubGist we will focus on the anomaly detection ("components within the red dashes"):
Just introduction text. Does not contain any code or implementation.
Gives an overview of the architecture.
In this GitHubGist we will focus on the anomaly detection ("components within the red dashes").
This is where the fun begins.
Make sure you have the AWS Command Line Interface installed (https://docs.aws.amazon.com/de_de/cli/latest/userguide/cli-install-macos.html).
If you just installed the aws cli, make sure you run aws configure
to connect it to your aws account.
Let's get started (with the article):
aws kinesis create-stream --stream-name "VPCFlowLogs" --shard-count 1
vim allowCloudWatchAccesstoKinesis.json
Paste the content and substitute your region as needed (in line: "Principal": { "Service": "logs.us-east-1.amazonaws.com" }, ).
Run the following command from the same directory where allowCloudWatchAccesstoKinesis.json is located:
aws iam create-role --role-name CloudWatchToKinesisRole --assume-role-policy-document file://./allowCloudWatchAccesstoKinesis.json
vim cloudWatchPermissions.json
.
Paste the content and substitute the two resource arn strings with yours.
Run the following command from the same directory where cloudWatchPermissions.json is located:
aws iam put-role-policy --role-name CloudWatchToKinesisRole --policy-name Permissions-Policy-For-CWL --policy-document file://./cloudWatchPermissions.json
aws logs put-subscription-filter \
--log-group-name "VPCFlowLogs" \
--filter-name "VPCFlowLogsAllFilter" \
--filter-pattern "[version, account_id, interface_id, srcaddr != "-", dstaddr != "-", srcport != "-", dstport != "-", protocol, packets, bytes, start, end, action, log_status]" \
--destination-arn "arn:aws:kinesis:us-east-1:31415926:stream/VPCFlowLogs" \
--role-arn "arn:aws:iam::31415926:role/CloudWatchToKinesisRole"
brew install jq
aws kinesis get-records --limit 10 --shard-iterator $(aws kinesis get-shard-iterator --stream-name medium_VPCFlowLogs --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON | jq -r ."ShardIterator") | jq -r .Records[].Data | base64 -d | zcat
In my case aws kinesis get-records --limit 10 --shard-iterator $(aws kinesis get-shard-iterator --stream-name medium_VPCFlowLogs --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON | jq -r ."ShardIterator")
gives me the following:
{
"Records": [],
"NextShardIterator": "AAAAAAAAAAF3vrrgukNNTm4aHO8iRO5DXESr+ofQJFH57eXltrr1DTJW1DExKKckOxU5ZT2nFoQ2FOJsmfscXDRl9po0Q9Jb1Zvs9aytKcMLldmE6P7o/HZSC5uhlpT3/tFVAAMQAvqZ41MIFJqFik/kShb9q9oEB3WbYTrigihimcr1haixhIdQv2J6TJhJ6dZ1l6ggsVcnjbok1NZzIyzeABkS8fhLKSsJ/qIV+WmNigYX0MSYVA==",
"MillisBehindLatest": 31385000
}
so when appending the | jq -r .Records[].Data | base64 -d | zcat
part to that command it gives me "no matches found: .Records[].Data". So I still have to figure out how to insert records here (probably just need to put traffic on the vpc).
https://medium.com/@devfire/real-time-anomaly-detection-in-vpc-flow-logs-part-4-kinesis-analytics-c80cc4977e97
Navigate to the Kinesis Analytics page in the AWS console and click on Create Application.
Name it VPCFlowLogsAnalytics.
Hook it up to the VPCFlowLogs Kinesis stream you created earlier.
in case you can not create the application because you don't have enough data,
see section Amazon Kinesis Data Generator on the bottom of this file.
Enable Lambda pre-processing and say you want a new Lambda function.
Use blueprint kinesis-analytics-process-compressed-record for the lambda function
Name the lambda function KinesisAnalyticsProcessCompressedRecord
Create in IAM a lambda_kinesis_exec_role and give it AmazonKinesisFullAccess.
Assign lambda_kinesis_exec_role for your lambda function (KinesisAnalyticsProcessCompressedRecord).
Increase the Timeout setting to at least 1 minute
Click on Discover Schema
In my case the schema was not recognized and I had to manually create all the columns:
I had to rename the columns start and end (see error message in screenshot). Here are the column names:
version, account_id, interface_id, srcaddr, dstaddr, srcport, dstport, protocol, packets, bytes, start_, end_, action, log_status
I recommend to create them in reverse order (log_status
first, version
last) so you dont have to sort manually.
We are still where we left in part 4 (create Kinesis Analytics Application):
Click on the blue Go to SQL Editor button.
-- ** Anomaly detection **
-- Compute an anomaly score for each record in the source stream using Random Cut Forest
-- Creates a temporary stream and defines a schema
CREATE OR REPLACE STREAM "TEMP_STREAM" (
-- "APPROXIMATE_ARRIVAL_TIME" timestamp,
-- "srcaddr" varchar(16),
-- "dstaddr" varchar(16),
"bytes" DOUBLE,
"ANOMALY_SCORE" DOUBLE,
"ANOMALY_EXPLANATION" varchar(512));
-- Creates an output stream and defines a schema
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
-- "APPROXIMATE_ARRIVAL_TIME" timestamp,
-- "srcaddr" varchar(16),
-- "dstaddr" varchar(16),
"bytes" DOUBLE,
"ANOMALY_SCORE" DOUBLE,
"ANOMALY_EXPLANATION" varchar(512));
-- Compute an anomaly score for each record in the source stream
-- using Random Cut Forest
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM"
-- SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", "srcaddr", "dstaddr", "bytes", "ANOMALY_SCORE", "ANOMALY_EXPLANATION"
-- SELECT STREAM "srcaddr", "dstaddr", "bytes", "ANOMALY_SCORE", "ANOMALY_EXPLANATION"
SELECT STREAM "bytes", "ANOMALY_SCORE", "ANOMALY_EXPLANATION"
FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION(
CURSOR(SELECT STREAM "bytes" FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, true
)
);
-- Sort records by descending anomaly score, insert into output stream
CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM * FROM "TEMP_STREAM"
--WHERE ANOMALY_SCORE > 3.0
ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
(It is mentioned that credits go to Curtis Mitchell: https://medium.com/@curtis_mitchell)
To understand where the anomaly detection happens see: https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-random-cut-forest-with-explanation.html
Create new lambda function using the blueprint kinesis-analytics-output
Name the lambda function kinesis_analytics_destination
Open your VPCFlowLogsAnalytics in _Kinesis Analytics applications _ Click Connect new destination button
Choose AWS Lambda function as destination and select kinesis_analytics_destination
As In-application stream name choose DESTINATION_SQL_STREAM
Click Save and continue
To supply the Kinesis Stream with data I had to set up an Amazon Kinesis Data Generator first.
Open the page https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html.
Download the CloudFormation template cognito-setup.json (also linked on the page).
Click the blue Create a Cognito User with CloudFormation button (on the page).
You are redirected to aws console.
Choose Upload a template to Amazon S3 and choose the cognito-setup.json which you downloaded before.
Click the next button (bottom right).
Read the rest of the page to learn how to access and use the Amazon Kinesis Data Generator.