Created
March 10, 2017 19:36
-
-
Save paulgoetze/6429e28cbf925820cf6d2b30094668ae to your computer and use it in GitHub Desktop.
sqlalchemy_media GoogleCloudStore
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os import path | |
from io import BytesIO | |
from sqlalchemy_media import Store | |
from sqlalchemy_media.typing_ import FileLike | |
from sqlalchemy_media.constants import KB | |
from google.cloud import storage | |
class GoogleCloudStore(Store): | |
""" Store for dealing with Google Cloud Storage """ | |
BASE_URL_FORMAT = 'https://{0}.storage.googleapis.com' | |
DEFAULT_MAX_AGE = 60 * 60 * 24 * 365 | |
# The chunk size must be a multiple of 256 KB, | |
# see https://google-cloud-python.readthedocs.io/en/latest/storage-blobs.html#google.cloud.storage.blob.Blob | |
BLOB_CHUNK_SIZE = 256 * KB | |
def __init__(self, project, bucket): | |
self.project = project | |
self.bucket_name = bucket | |
self.base_url = self.BASE_URL_FORMAT.format(bucket) | |
self._client = None | |
self._bucket = None | |
@property | |
def client(self): | |
""" Returns a Google Cloud Storage client for the configured project """ | |
if not self._client: | |
self._client = storage.Client(project=self.project) | |
return self._client | |
@property | |
def bucket(self): | |
""" Returns a Google Cloud Storage bucket from the configured client """ | |
if not self._bucket: | |
self._bucket = self.client.get_bucket(self.bucket_name) | |
return self._bucket | |
def put(self, filename: str, stream: FileLike): | |
data = stream.read() | |
content_type = stream.content_type | |
print(content_type) | |
blob = self.bucket.blob(filename, chunk_size=self.BLOB_CHUNK_SIZE) | |
blob.upload_from_string(data, content_type=content_type) | |
return len(data) | |
def delete(self, filename: str): | |
blob = self.bucket.blob(filename) | |
blob.delete() | |
def open(self, filename: str, mode: str=''): | |
blob = self.bucket.blob(filename) | |
conent = blob.download_as_string() | |
print(content) | |
return BytesIO(res.content) | |
def locate(self, attachment) -> str: | |
return path.join(self.base_url, attachment.path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment