Skip to content

Instantly share code, notes, and snippets.

@callum-p
Last active August 24, 2021 21:31
Show Gist options
  • Save callum-p/63cff6654d41db3a3051c14081faf3ea to your computer and use it in GitHub Desktop.
Save callum-p/63cff6654d41db3a3051c14081faf3ea to your computer and use it in GitHub Desktop.
Deploys lambda functions to forward cloudwatch logs to logstash
Description: Deploys lambda functions to forward cloudwatch logs to logstash
Parameters:
coreNetworkingStackName:
Type: String
Resources:
lambdaRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: "lambda.amazonaws.com"
Action: "sts:AssumeRole"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
lambdaPolicy:
Type: "AWS::IAM::Policy"
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- "logs:PutRetentionPolicy"
- "logs:describeLogGroups"
- "logs:putSubscriptionFilter"
- "logs:deleteSubscriptionFilter"
Resource: "*"
PolicyName: !Ref "AWS::StackName"
Roles:
- !Ref lambdaRole
cloudwatchPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref processLogGroupFunction
Action: lambda:InvokeFunction
Principal: logs.amazonaws.com
SourceArn: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*"
SourceAccount: !Ref "AWS::AccountId"
lambdaSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: logstash cloudwatch log lambda processor
VpcId:
Fn::ImportValue: !Sub ${coreNetworkingStackName}:vpcId
processLogGroupFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.handler
Role: !GetAtt lambdaRole.Arn
Runtime: python3.6
Timeout: '300'
VpcConfig:
SecurityGroupIds:
- !Ref lambdaSecurityGroup
SubnetIds:
- Fn::ImportValue: !Sub ${coreNetworkingStackName}:privateSubnet2A
- Fn::ImportValue: !Sub ${coreNetworkingStackName}:privateSubnet2B
- Fn::ImportValue: !Sub ${coreNetworkingStackName}:privateSubnet2C
Code:
ZipFile: !Sub |
#!/usr/bin/env python3
import socket
import sys
import json
import gzip
import copy
import base64
import re
def transform(data):
new_data = copy.deepcopy(data)
new_data['@metadata'] = {
"beat": "lambda",
"version": "0.0.1"
}
if 'service' in data:
if data['service'] == 'blah':
new_data['@metadata']['beat'] = 'ecs'
if 'timestamp' in data:
del new_data['timestamp']
new_data['lambda_timestamp'] = data['timestamp']
if 'port' in data:
del new_data['port']
return new_data
def send_log(data):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('logstash.xxx.internal', 5000))
s.sendall(str(json.dumps(data)).encode('utf-8'))
s.send('\n'.encode('utf-8'))
s.close()
def handler(event, context):
decompressed = gzip.decompress(
base64.b64decode(event['awslogs']['data'])).decode('utf-8')
try:
data = json.loads(decompressed)
except Exception as e:
return
for str_event in data['logEvents']:
try:
e = json.loads(str_event['message'])
except Exception as e:
return
if 'level' in e:
if e['level'] == 'debug':
return
send_log(transform(e))
if __name__ == '__main__':
handler(None, None)
updateAllLogGroupsFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.handler
Role: !GetAtt lambdaRole.Arn
Runtime: python3.6
Timeout: '30'
Environment:
Variables:
SUSBCRIPTION_FUNCTION_ARN: !GetAtt processLogGroupFunction.Arn
Code:
ZipFile: !Sub |
#!/usr/bin/env python3
import boto3
import os
c = boto3.client('logs')
bad_groups = [
'core-networking',
's3av',
'healthcheck',
'updateLogGroupFunction',
'processLogGroupFunction'
]
good_groups = [
'adamite',
'/aws/lambda'
]
def get_log_groups():
groups = []
params = {}
while True:
response = c.describe_log_groups(**params)
for group in response['logGroups']:
is_bad_group = False
for bg in bad_groups:
if bg in group['logGroupName']:
is_bad_group = True
if not is_bad_group:
for gg in good_groups:
if group['logGroupName'].startswith(gg):
groups.append(group['logGroupName'])
else:
delete_subscription(group['logGroupName'])
if 'nextToken' in response:
params['nextToken'] = response['nextToken']
else:
break
return groups
def delete_subscription(group):
try:
c.delete_subscription_filter(
logGroupName=group,
filterName='logstash')
except Exception as e:
print(e)
def create_subscription(group):
sub_arn = os.getenv('SUSBCRIPTION_FUNCTION_ARN')
c.put_subscription_filter(
logGroupName=group,
filterName='logstash',
filterPattern='',
destinationArn=sub_arn
)
def handler(event, context):
groups = get_log_groups()
for g in groups:
create_subscription(g)
# delete_subscription(g)
if __name__ == '__main__':
handler(None, None)
updateLogGroupFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.handler
Role: !GetAtt lambdaRole.Arn
Runtime: python3.6
Timeout: '30'
Environment:
Variables:
SUSBCRIPTION_FUNCTION_ARN: !GetAtt processLogGroupFunction.Arn
Code:
ZipFile: !Sub |
#!/usr/bin/env python
import boto3
import json
import os
logs = boto3.client('logs')
bad_groups = [
'core-networking',
's3av',
'healthcheck',
'updateLogGroupFunction',
'processLogGroupFunction'
]
good_groups = [
'adamite',
'/aws/lambda'
]
def create_subscription(group):
for g in bad_groups:
if g in group:
return
sub_arn = os.getenv('SUSBCRIPTION_FUNCTION_ARN')
logs.put_subscription_filter(
logGroupName=group,
filterName='logstash',
filterPattern='',
destinationArn=sub_arn
)
def handler(event, context):
try:
log_group = event['detail']['requestParameters']['logGroupName']
except Exception as e:
print(e)
print(json.dumps(event))
if log_group.startswith('/aws/lambda'):
for gg in good_groups:
if log_group.startswith(gg):
create_subscription(log_group)
break
if __name__ == '__main__':
handler(None, None)
logCreateEventPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref updateLogGroupFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt logCreateEvent.Arn
logCreateEvent:
Type: AWS::Events::Rule
Properties:
Description: Triggers logstash subscription on new log groups
State: ENABLED
Targets:
- Arn: !GetAtt updateLogGroupFunction.Arn
Id: updateLogGroupFunction
EventPattern: |
{
"source": [
"aws.logs"
],
"detail-type": [
"AWS API Call via CloudTrail"
],
"detail": {
"eventSource": [
"logs.amazonaws.com"
],
"eventName": [
"CreateLogGroup"
]
}
}
@kazizi-swe
Copy link

Thanks for letting me know what to change, but only one thing is unclear to me, where should I place this code? Should I place it in my project directory or should I create a lambda function in AWS using this code? If I understand this, I will find a way to use it.

@callum-p
Copy link
Author

It's a cloudformation template. You deploy the file via AWS cloudformation.

@kazizi-swe
Copy link

Ok great, I will look into it. Thank you :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment