Skip to content

Instantly share code, notes, and snippets.

@hugobcar
Last active March 13, 2019 06:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save hugobcar/0801ba922fe413d7a3ea9b221ac80415 to your computer and use it in GitHub Desktop.
Save hugobcar/0801ba922fe413d7a3ea9b221ac80415 to your computer and use it in GitHub Desktop.
New line end of file
import boto3
import math
import time
import datetime
import ipaddress
from boto3.dynamodb.conditions import Key, Attr
from os import environ
# Get the service resource.
dynamodb = boto3.resource('dynamodb', region_name=environ['DYNAMODB_REGION'])
table = dynamodb.Table(environ['DYNAMODB_TABLE'])
table_blocked = dynamodb.Table(environ['DYNAMODB_TABLE_BLOCKED_IPS'])
# WAF
waf = boto3.client('waf')
API_CALL_NUM_RETRIES = 3
def valid_IPV4(ip):
if ipaddress.ip_address(ip).version == 4:
return True
else:
return False
def waf_get_ip_set(ip_set_id):
response = None
for attempt in range(API_CALL_NUM_RETRIES):
try:
response = waf.get_ip_set(IPSetId=ip_set_id)
except Exception as e:
print(e)
delay = math.pow(2, attempt)
print("[waf_get_ip_set] Retrying in %d seconds..." % (delay))
time.sleep(delay)
else:
break
else:
print("[waf_get_ip_set] Failed ALL attempts to call API")
return response
def waf_update_ip_set(ip_set_id, updates_list):
response = None
if updates_list != []:
for attempt in range(API_CALL_NUM_RETRIES):
try:
response = waf.update_ip_set(IPSetId=ip_set_id,
ChangeToken=waf.get_change_token()['ChangeToken'],
Updates=updates_list)
except Exception as e:
print(e)
delay = math.pow(2, attempt)
print("[waf_update_ip_set] Retrying in %d seconds..." % (delay))
time.sleep(delay)
else:
break
else:
print("[waf_update_ip_set] Failed ALL attempts to call API")
return response
def lambda_handler(event, context):
IPS_REMOVE_WAF = []
try:
print('------ LIMPANDO TABELA DYNAMODB -------------')
timestamp_removed_dynamodb=int(datetime.datetime.now().timestamp())-int(environ['TIME_HISTORY_DYNAMODB'])
response = table.scan(
IndexName='TIMESTAMP',
FilterExpression=Attr('TIMESTAMP').lte(timestamp_removed_dynamodb)
)
count_removed_logs=len(response['Items'])
print ('Excluindo %s logs antigos com mais de %s segundos do DYNAMODB.'%(count_removed_logs, str(environ['TIME_HISTORY_DYNAMODB'])))
if count_removed_logs > 0:
for k in response['Items']:
table.delete_item(
Key={'ID': k['ID']}
)
print('------ LIMPANDO IPs DO WAF -------------')
timestamp_removed_dynamodb=int(datetime.datetime.now().timestamp())-int(environ['TIME_IP_BLOCKED_WAF'])
response = table_blocked.scan(
IndexName='TIMESTAMP',
FilterExpression=Attr('TIMESTAMP').lte(timestamp_removed_dynamodb)
)
count_removed_waf=len(response['Items'])
if environ['IP_SET_ID_AUTO_BLOCK'] != None:
response_waf = waf_get_ip_set(environ['IP_SET_ID_AUTO_BLOCK'])
if response_waf != None:
for k in response_waf['IPSet']['IPSetDescriptors']:
IPS_REMOVE_WAF.append(k['Value'].split('/')[0])
if count_removed_waf > 0:
print ('Excluindo %s IPs antigos com mais de %s segundos do WAF.'%(count_removed_waf, str(environ['TIME_IP_BLOCKED_WAF'])))
for k in response['Items']:
IP = k['IP']
if IP in IPS_REMOVE_WAF:
if valid_IPV4(IP):
ip_type = 'IPV4'
mask = '32'
else:
ip_type = 'IPV6'
mask = '128'
print('Deletando IP (%s/%s) do WAF!'%(IP,mask))
JSON_IPS_REMOVE_WAF = [{
'Action': 'DELETE',
'IPSetDescriptor': {
'Type': ip_type,
'Value': "%s/%s"%(IP,mask)
}
}]
try:
response = waf_update_ip_set(environ['IP_SET_ID_AUTO_BLOCK'], JSON_IPS_REMOVE_WAF)
except Exception as e:
print('Erro ao executar funcao de removocao de IP no WAF')
print(e)
print('Deletando IP (%s) do DYNAMODB_BLOCK!'%(IP))
table_blocked.delete_item(
Key={'IP': IP}
)
except Exception as e:
print(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment