Skip to content

Instantly share code, notes, and snippets.

@alexeygrigorev
Created July 18, 2018 12:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexeygrigorev/b710ea12313f0609211cc255460bf126 to your computer and use it in GitHub Desktop.
Save alexeygrigorev/b710ea12313f0609211cc255460bf126 to your computer and use it in GitHub Desktop.
Capturing stdout and sending it to cloudwatch logs
import sys
import time
import threading
from io import StringIO
from multiprocessing import Process
from queue import Queue
import boto3
from botocore.exceptions import ClientError
def time_now_utc():
t = time.time() * 1000
return int(t)
class CloudWatchLogger():
def __init__(self, logs, log_group, log_stream, create_stream=True):
self.logs = logs
self.log_group = log_group
self.log_stream = log_stream
print('writing to %s/%s' % (log_group, log_stream))
if create_stream:
self.create_log_stream()
self._next_token = self.get_next_token()
def create_log_stream(self):
try:
self.logs.create_log_stream(
logGroupName=self.log_group,
logStreamName=self.log_stream
)
except ClientError as e:
code = e.response.get("Error", {}).get("Code")
if code != "ResourceAlreadyExistsException":
raise
def get_next_token(self):
response = self.logs.describe_log_streams(
logGroupName=self.log_group,
logStreamNamePrefix=self.log_stream
)
for stream in response['logStreams']:
if stream['logStreamName'] == self.log_stream:
return stream.get('uploadSequenceToken')
raise Exception('log stream %s/%s does not exist', (self.log_group, self.log_stream))
def log(self, messages):
log_records = []
for ts, message, num in messages:
record = {'timestamp': ts, 'message': message}
log_records.append(record)
if self._next_token is None:
response = self.logs.put_log_events(
logGroupName=self.log_group,
logStreamName=self.log_stream,
logEvents=log_records
)
else:
response = self.logs.put_log_events(
logGroupName=self.log_group,
logStreamName=self.log_stream,
logEvents=log_records,
sequenceToken=self._next_token
)
self._next_token = response['nextSequenceToken']
class ItemsStore:
def __init__(self):
self.lock = threading.Lock()
self.items = []
def add(self, item):
with self.lock:
self.items.append(item)
def get_all(self):
with self.lock:
items, self.items = self.items, []
return items
class AccumulatingTeeOut(StringIO):
def __init__(self, items, std=sys.__stdout__):
self.std = std
self.items = items
self.cnt = 0
def write(self, s):
s = s.strip()
if s == '':
return
now = time_now_utc()
self.items.add((now, s, self.cnt))
self.std.write('>> %d > %s <\n' % (self.cnt, s))
self.cnt = self.cnt + 1
class LogPublishingThread(threading.Thread):
def __init__(self, cw, items, stop_event):
threading.Thread.__init__(self)
self.cw = cw
self.items = items
self.stopped = stop_event
def run(self):
while not self.stopped.wait(3.0):
self.logs_to_cloudwatch()
def logs_to_cloudwatch(self):
all_logs = self.items.get_all()
if len(all_logs) > 0:
self.cw.log(all_logs)
def execute(target, args, region, group, stream):
logs = boto3.client('logs', region_name=region)
cw = CloudWatchLogger(logs, group, stream)
items = ItemsStore()
tee = AccumulatingTeeOut(items, std=sys.__stdout__)
sys.stdout = tee
sys.stderr = tee
stop_event = threading.Event()
log_thread = LogPublishingThread(cw, items, stop_event)
log_thread.start()
target(*args)
stop_event.set()
log_thread.logs_to_cloudwatch()
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
def start_capturing_process(target, args, region, group, stream):
process = Process(target=execute, args=(target, args, region, group, stream))
process.start()
process.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment