Skip to content

Instantly share code, notes, and snippets.

@callum-p
Last active August 24, 2021 21:31
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save callum-p/63cff6654d41db3a3051c14081faf3ea to your computer and use it in GitHub Desktop.
Save callum-p/63cff6654d41db3a3051c14081faf3ea to your computer and use it in GitHub Desktop.
Deploys lambda functions to forward cloudwatch logs to logstash
Description: Deploys lambda functions to forward cloudwatch logs to logstash
Parameters:
coreNetworkingStackName:
Type: String
Resources:
lambdaRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: "lambda.amazonaws.com"
Action: "sts:AssumeRole"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
lambdaPolicy:
Type: "AWS::IAM::Policy"
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- "logs:PutRetentionPolicy"
- "logs:describeLogGroups"
- "logs:putSubscriptionFilter"
- "logs:deleteSubscriptionFilter"
Resource: "*"
PolicyName: !Ref "AWS::StackName"
Roles:
- !Ref lambdaRole
cloudwatchPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref processLogGroupFunction
Action: lambda:InvokeFunction
Principal: logs.amazonaws.com
SourceArn: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*"
SourceAccount: !Ref "AWS::AccountId"
lambdaSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: logstash cloudwatch log lambda processor
VpcId:
Fn::ImportValue: !Sub ${coreNetworkingStackName}:vpcId
processLogGroupFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.handler
Role: !GetAtt lambdaRole.Arn
Runtime: python3.6
Timeout: '300'
VpcConfig:
SecurityGroupIds:
- !Ref lambdaSecurityGroup
SubnetIds:
- Fn::ImportValue: !Sub ${coreNetworkingStackName}:privateSubnet2A
- Fn::ImportValue: !Sub ${coreNetworkingStackName}:privateSubnet2B
- Fn::ImportValue: !Sub ${coreNetworkingStackName}:privateSubnet2C
Code:
ZipFile: !Sub |
#!/usr/bin/env python3
import socket
import sys
import json
import gzip
import copy
import base64
import re
def transform(data):
new_data = copy.deepcopy(data)
new_data['@metadata'] = {
"beat": "lambda",
"version": "0.0.1"
}
if 'service' in data:
if data['service'] == 'blah':
new_data['@metadata']['beat'] = 'ecs'
if 'timestamp' in data:
del new_data['timestamp']
new_data['lambda_timestamp'] = data['timestamp']
if 'port' in data:
del new_data['port']
return new_data
def send_log(data):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('logstash.xxx.internal', 5000))
s.sendall(str(json.dumps(data)).encode('utf-8'))
s.send('\n'.encode('utf-8'))
s.close()
def handler(event, context):
decompressed = gzip.decompress(
base64.b64decode(event['awslogs']['data'])).decode('utf-8')
try:
data = json.loads(decompressed)
except Exception as e:
return
for str_event in data['logEvents']:
try:
e = json.loads(str_event['message'])
except Exception as e:
return
if 'level' in e:
if e['level'] == 'debug':
return
send_log(transform(e))
if __name__ == '__main__':
handler(None, None)
updateAllLogGroupsFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.handler
Role: !GetAtt lambdaRole.Arn
Runtime: python3.6
Timeout: '30'
Environment:
Variables:
SUSBCRIPTION_FUNCTION_ARN: !GetAtt processLogGroupFunction.Arn
Code:
ZipFile: !Sub |
#!/usr/bin/env python3
import boto3
import os
c = boto3.client('logs')
bad_groups = [
'core-networking',
's3av',
'healthcheck',
'updateLogGroupFunction',
'processLogGroupFunction'
]
good_groups = [
'adamite',
'/aws/lambda'
]
def get_log_groups():
groups = []
params = {}
while True:
response = c.describe_log_groups(**params)
for group in response['logGroups']:
is_bad_group = False
for bg in bad_groups:
if bg in group['logGroupName']:
is_bad_group = True
if not is_bad_group:
for gg in good_groups:
if group['logGroupName'].startswith(gg):
groups.append(group['logGroupName'])
else:
delete_subscription(group['logGroupName'])
if 'nextToken' in response:
params['nextToken'] = response['nextToken']
else:
break
return groups
def delete_subscription(group):
try:
c.delete_subscription_filter(
logGroupName=group,
filterName='logstash')
except Exception as e:
print(e)
def create_subscription(group):
sub_arn = os.getenv('SUSBCRIPTION_FUNCTION_ARN')
c.put_subscription_filter(
logGroupName=group,
filterName='logstash',
filterPattern='',
destinationArn=sub_arn
)
def handler(event, context):
groups = get_log_groups()
for g in groups:
create_subscription(g)
# delete_subscription(g)
if __name__ == '__main__':
handler(None, None)
updateLogGroupFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.handler
Role: !GetAtt lambdaRole.Arn
Runtime: python3.6
Timeout: '30'
Environment:
Variables:
SUSBCRIPTION_FUNCTION_ARN: !GetAtt processLogGroupFunction.Arn
Code:
ZipFile: !Sub |
#!/usr/bin/env python
import boto3
import json
import os
logs = boto3.client('logs')
bad_groups = [
'core-networking',
's3av',
'healthcheck',
'updateLogGroupFunction',
'processLogGroupFunction'
]
good_groups = [
'adamite',
'/aws/lambda'
]
def create_subscription(group):
for g in bad_groups:
if g in group:
return
sub_arn = os.getenv('SUSBCRIPTION_FUNCTION_ARN')
logs.put_subscription_filter(
logGroupName=group,
filterName='logstash',
filterPattern='',
destinationArn=sub_arn
)
def handler(event, context):
try:
log_group = event['detail']['requestParameters']['logGroupName']
except Exception as e:
print(e)
print(json.dumps(event))
if log_group.startswith('/aws/lambda'):
for gg in good_groups:
if log_group.startswith(gg):
create_subscription(log_group)
break
if __name__ == '__main__':
handler(None, None)
logCreateEventPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref updateLogGroupFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt logCreateEvent.Arn
logCreateEvent:
Type: AWS::Events::Rule
Properties:
Description: Triggers logstash subscription on new log groups
State: ENABLED
Targets:
- Arn: !GetAtt updateLogGroupFunction.Arn
Id: updateLogGroupFunction
EventPattern: |
{
"source": [
"aws.logs"
],
"detail-type": [
"AWS API Call via CloudTrail"
],
"detail": {
"eventSource": [
"logs.amazonaws.com"
],
"eventName": [
"CreateLogGroup"
]
}
}
@kazizi-swe
Copy link

kazizi-swe commented May 24, 2019

@callum-p, Hi, I don't know what to replace this file with if I want to transfer Cloudwatch logs to Logstash input plugin using Lambda function? I appreciate if you can help me out here.

@kazizi-swe
Copy link

@callum-p, I really appreciate if you can let me know. It saves me a lot of time.

@callum-p
Copy link
Author

I don't understand your question sorry.

@kazizi-swe
Copy link

kazizi-swe commented May 27, 2019

@callum-p, I can rephrase my question. In your code above you said, "Description: Deploys lambda functions to forward cloudwatch logs to logstash", so my question is do I need to create a lambda function using this given code or should I replace my logstash.yml file with this code in my directory?

I don't understand where to place this code. I appreciate some clarification.

@callum-p
Copy link
Author

This is a cloudformation template. You would need to update https://gist.github.com/callum-p/63cff6654d41db3a3051c14081faf3ea#file-logstash-cloudwatch-yml-L104 to point it to your logstash server. You would also need to run the updateAllLogGroupsFunction function manually via the lambda console to update all your existing log groups (based on the whitelists defined in the code).

To be honest though, if you don't understand it you probably won't be able to get it working. I'm happy to help, but not walk you through step by step.

@kazizi-swe
Copy link

Thanks for letting me know what to change, but only one thing is unclear to me, where should I place this code? Should I place it in my project directory or should I create a lambda function in AWS using this code? If I understand this, I will find a way to use it.

@callum-p
Copy link
Author

It's a cloudformation template. You deploy the file via AWS cloudformation.

@kazizi-swe
Copy link

Ok great, I will look into it. Thank you :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment