Skip to content

Instantly share code, notes, and snippets.

@messa
Last active October 17, 2017 14:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save messa/a0a7699775ce66c584fdc99cd08853d2 to your computer and use it in GitHub Desktop.
Save messa/a0a7699775ce66c584fdc99cd08853d2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import boto3
from datetime import datetime
import logging
import socket
from time import time, sleep
logger = logging.getLogger(__name__)
log_format = '%(asctime)s %(name)s %(levelname)5s: %(message)s'
def main():
from logging import DEBUG, INFO, Formatter
logging.getLogger('').setLevel(DEBUG)
logging.getLogger('botocore').setLevel(INFO)
sh = logging.StreamHandler()
sh.setFormatter(Formatter(log_format))
sh.setLevel(DEBUG)
logging.getLogger('').addHandler(sh)
ah = AWSHandler()
ah.setFormatter(Formatter(log_format))
ah.setLevel(DEBUG)
logging.getLogger('').addHandler(ah)
try:
for i in range(1000):
logger.debug('debug %s', i)
logger.info('info %s', i)
if i % 20 == 0:
ah.flush()
sleep(.3)
finally:
ah.flush()
class AWSHandler (logging.Handler):
def __init__(self, session=None, logs_region=None, s3_region=None):
super().__init__()
session = session or boto3
self.logs_client = session.client('logs', region_name=logs_region or 'eu-central-1')
self.s3_client = session.client('s3', region_name=s3_region or 'eu-west-1')
self.stream_name = '{date}-{host}'.format(
date=datetime.utcnow().strftime('%Y%m%dT%H%M%SZ'),
host=socket.getfqdn())
self.logs_client.create_log_stream(
logGroupName='demo',
logStreamName=self.stream_name)
self.seq_token = None
self.recursion_guard = False
self.archive_content = b''
self.messages_to_flush = []
def emit(self, record):
if self.recursion_guard:
return
self.recursion_guard = True
try:
msg = self.format(record)
self.messages_to_flush.append(msg)
r = self.logs_client.put_log_events(
logGroupName='demo',
logStreamName=self.stream_name,
logEvents=[
{
'timestamp': int(time() * 1000),
'message': str(msg),
},
],
# sequenceToken cannot be None, it must be really missing for the first message...
**({'sequenceToken': self.seq_token} if self.seq_token else {}))
if r.get('rejectedLogEventsInfo'):
raise Exception('Rejected: {!r}'.format(r))
self.seq_token = r['nextSequenceToken']
finally:
self.recursion_guard = False
def flush(self):
if not self.messages_to_flush:
return
self.recursion_guard = True
try:
data = self.archive_content
data += ''.join(msg + '\n' for msg in self.messages_to_flush).encode()
self.s3_client.put_object(
#ACL='private',
ACL='public-read',
Body=data,
Bucket='example-bucket',
ContentType='text/plain',
Key='test/{name}.log'.format(name=self.stream_name),
StorageClass='STANDARD_IA',
)
self.archive_content = data
self.messages_to_flush = []
finally:
self.recursion_guard = False
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment