Skip to content

Instantly share code, notes, and snippets.

@brandond
Last active March 10, 2018 00:11
Show Gist options
  • Save brandond/f50c94bebc4b488782016d9eaa5f3110 to your computer and use it in GitHub Desktop.
Save brandond/f50c94bebc4b488782016d9eaa5f3110 to your computer and use it in GitHub Desktop.
Proof-of-concept CLI tool to ingest CloudWatch Logs data archived via Kinesis Firehose from S3 into Splunk via the HTTP Event Collector
from __future__ import print_function, division
import gzip
import io
import logging
import sys
import tempfile
import zlib
import boto3
import click
import requests
import ujson as json
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
logger = logging.getLogger(__name__)
MAX_PAYLOAD_LEN = 1000000
@click.command()
@click.option('--profile', help='Use a specific profile from your credential file.', default=None)
@click.option('--region', help='Region to act upon.', default=None)
@click.option('--s3-uri', help='S3 bucket URI', required=True)
@click.option('--hec-uri', help='Splunk HEC URI', required=True)
def cli(profile, region, s3_uri, hec_uri):
s3 = boto3.Session(profile_name=profile).client('s3', region_name=region)
req = requests.Session()
parsed_uri = urlparse.urlparse(s3_uri)
for page in s3.get_paginator('list_objects_v2').paginate(Bucket=parsed_uri.netloc, Prefix=parsed_uri.path[1:], Delimiter=''):
for s3_object in page.get('Contents', []):
logger.info('Getting parts from {} bytes of {}'.format(s3_object['Size'], s3_object['Key']))
for part in get_object_parts(s3, page['Name'], s3_object['Key']):
for event in get_splunk_events(part):
logger.info('Sending {} bytes to HEC'.format(len(event)))
response = req.post(hec_uri, data=event, headers=get_auth_headers(page['Name'], s3_object['Key']))
logger.info('Got response: {}'.format(response.text))
def get_object_parts(s3, bucket, key):
temp = tempfile.SpooledTemporaryFile(1024*1024*5)
s3.download_fileobj(Bucket=bucket, Key=key, Fileobj=temp)
temp.seek(0, 0)
decompress = zlib.decompressobj(32+15)
json_bytes = bytearray()
while True:
buf = temp.read(1024 * 2)
if buf:
json_bytes.extend(decompress.decompress(buf))
if decompress.unused_data:
try:
yield json.loads(json_bytes.decode())
except ValueError:
logger.warn('Skipping invalid JSON')
temp.seek(-len(decompress.unused_data), 1)
json_bytes = bytearray()
decompress = zlib.decompressobj(32+15)
else:
json_bytes.extend(decompress.flush())
try:
yield json.loads(json_bytes.decode())
except ValueError:
logger.warn('Skipping invalid JSON')
break
def get_splunk_events(message):
byte_buffer = io.BytesIO()
gzip_buffer = gzip.GzipFile(fileobj=byte_buffer, mode='wb')
source = ':'.join((message['owner'], message['logGroup'], message['logStream']))
sourcetype = get_sourcetype(message['logGroup'])
logger.info('Using sourcetype {} for logGroup {} with {} events'.format(sourcetype, message['logGroup'], len(message['logEvents'])))
for event in message['logEvents']:
json_str = json.dumps(
escape_forward_slashes=False,
double_precision=3,
ensure_ascii=True,
obj={
'time': event['timestamp'] / 1000.0,
'event': event['message'],
'sourcetype': sourcetype,
'source': source,
})
if gzip_buffer.tell() + len(json_str) >= MAX_PAYLOAD_LEN:
gzip_size = gzip_buffer.tell()
gzip_buffer.close()
byte_size = byte_buffer.tell()
logger.info('Compressed {0:d} -> {1:d} bytes - {2:0.2f}%'.format(gzip_size, byte_size, byte_size / gzip_size))
yield byte_buffer.getvalue()
byte_buffer.seek(0, 0)
byte_buffer.truncate()
gzip_buffer = gzip.GzipFile(fileobj=byte_buffer, mode='wb')
gzip_buffer.write(json_str)
if not gzip_buffer.closed:
gzip_size = gzip_buffer.tell()
gzip_buffer.close()
byte_size = byte_buffer.tell()
logger.info('Compressed {0:d} -> {1:d} bytes - {2:0.2f}%'.format(gzip_size, byte_size, byte_size / gzip_size * 100))
yield byte_buffer.getvalue()
def get_sourcetype(loggroup):
# TODO - check for additional types that require hard-coding sourcetype hints for proper parsing
loggroup = loggroup.lower()
if '/vpcflowlogs' in loggroup:
return 'aws:cloudwatchlogs:vpcflow'
elif 'squid/access.log' in loggroup:
return 'squid:access'
else:
return 'syslog'
def get_auth_headers(bucket, key):
# TODO - use bucket tags to store Splunk token for log type
uri = 's3://{}/{}'.format(bucket, key).lower()
if 'kinesyslog' in uri:
token = 'd3eb43f9-610f-486e-8318-bc9755beab6f'
elif 'logstream' in uri:
token = '269bbf98-d366-43ff-b64d-c61df4aa1b71'
elif 'hostlogs' in uri:
token = 'f4fe3e87-eabf-408c-a629-03e593cadfd8'
else:
raise Exception('No Splunk HEC token for S3 object {}'.format(uri))
return {'Content-Encoding': 'gzip',
'Content-Type': 'application/json',
'Authorization': 'Splunk '+token}
if __name__ == '__main__':
logging.basicConfig(level='INFO')
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment