Skip to content

Instantly share code, notes, and snippets.

@erikjohnston
Created January 18, 2018 21:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save erikjohnston/a12016bcc6ac88dd4047cb19cff8e79d to your computer and use it in GitHub Desktop.
Save erikjohnston/a12016bcc6ac88dd4047cb19cff8e79d to your computer and use it in GitHub Desktop.
A storage provider that fetches media from an S3 bucket
from twisted.internet import defer, threads, reactor
from twisted.python.failure import Failure
from synapse.rest.media.v1.storage_provider import StorageProvider
from synapse.rest.media.v1._base import Responder
import boto3
import botocore
import logging
import threading
logger = logging.getLogger("synapse.s3")
class S3StorageProviderBackend(StorageProvider):
"""
Args:
hs (HomeServer)
config: The config returned by `parse_config`
"""
def __init__(self, hs, config):
self.cache_directory = hs.config.media_store_path
self.bucket = config
self.s3 = boto3.client('s3')
def store_file(self, path, file_info):
"""See StorageProvider.store_file"""
pass
def fetch(self, path, file_info):
"""See StorageProvider.fetch"""
d = defer.Deferred()
S3DownloadThread(self.bucket, path, d).start()
return d
@staticmethod
def parse_config(config):
"""Called on startup to parse config supplied. This should parse
the config and raise if there is a problem.
The returned value is passed into the constructor.
In this case we only care about a single param, the bucket, so lets
just pull that out.
"""
assert isinstance(config["bucket"], basestring)
return config["bucket"]
class S3Responder(Responder):
def __init__(self, wakeup_event, stop_event):
self.wakeup_event = wakeup_event
self.stop_event = stop_event
self.consumer = None
self.deferred = defer.Deferred()
def write_to_consumer(self, consumer):
self.consumer = consumer
consumer.registerProducer(self, False)
return self.deferred
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop_event.set()
self.wakeup_event.set()
def resumeProducing(self):
self.wakeup_event.set()
def stopProducing(self):
self.stop_event.set()
self.wakeup_event.set()
self.deferred.errback(Exception("Consumer ask to stop producing"))
def _write(self, chunk):
if self.consumer and not self.stop_event.is_set():
self.consumer.write(chunk)
def _error(self, failure):
if self.consumer:
self.consumer.unregisterProducer()
self.consumer = None
if not self.deferred.called:
self.deferred.errback(failure)
def _finish(self):
if self.consumer:
self.consumer.unregisterProducer()
self.consumer = None
if not self.deferred.called:
self.deferred.callback(None)
class S3DownloadThread(threading.Thread):
def __init__(self, bucket, key, deferred):
super(S3DownloadThread, self).__init__(name="s3-download")
self.bucket = bucket
self.key = key
self.deferred = deferred
def run(self):
session = boto3.session.Session()
s3 = session.client('s3')
try:
resp = s3.get_object(Bucket=self.bucket, Key=self.key)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
self.deferred.callback(None)
return
self.deferred.errback(Failure())
return
wakeup_event = threading.Event()
stop_event = threading.Event()
producer = S3Responder(wakeup_event, stop_event)
self.deferred.callback(producer)
try:
body = resp["Body"]
while not stop_event.is_set():
wakeup_event.wait()
if stop_event.is_set():
return
chunk = body.read(4096)
if not chunk:
return
wakeup_event.clear()
reactor.callFromThread(producer._write, chunk)
except Exception:
reactor.callFromThread(producer._error, Failure())
return
finally:
reactor.callFromThread(producer._finish)
if body:
body.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment