Last active
October 10, 2020 07:02
-
-
Save FZambia/faab8f811dee95c7b174 to your computer and use it in GitHub Desktop.
Download image from url for Django
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import requests | |
from PIL import Image | |
import requests.exceptions | |
from StringIO import StringIO | |
from django.core.files.base import ContentFile | |
from django.core.files.storage import FileSystemStorage | |
logger = logging.getLogger('uploads') | |
class DownloadException(Exception): | |
pass | |
class DownloadTimeout(DownloadException): | |
pass | |
class DownloadMalformedContentType(DownloadException): | |
pass | |
class DownloadTooBig(DownloadException): | |
pass | |
class ImageDownloader(object): | |
""" | |
Helper class to download images from URL | |
""" | |
UPLOAD_DIR = 'uploads' | |
MAX_CONTENT_LENGTH = 10485760 # 10mb | |
TIMEOUT = 5 # 5 sec | |
def __init__(self, user_pk=None, upload_dir=None, max_content_length=None, timeout=None): | |
self.user_pk = user_pk or 'anonymous' | |
self.upload_dir = upload_dir or self.UPLOAD_DIR | |
self.max_content_length = max_content_length or self.MAX_CONTENT_LENGTH | |
self.timeout = timeout or self.TIMEOUT | |
def check_content_length(self, response): | |
content_length = response.headers.get('content-length', None) | |
if not content_length: | |
logger.error('no content length header') | |
raise DownloadMalformedContentType() | |
try: | |
content_length = int(content_length) | |
except ValueError: | |
logger.error('malformed content type header') | |
raise DownloadMalformedContentType() | |
if content_length > self.max_content_length: | |
logger.error('content length too big - {0} bytes'.format(content_length)) | |
raise DownloadTooBig() | |
def _run(self, url): | |
logger.info(u"downloading {0}".format(url)) | |
try: | |
response = requests.get(url, timeout=self.timeout, stream=True) | |
except requests.exceptions.Timeout: | |
logger.error("timed out") | |
raise DownloadTimeout() | |
except requests.exceptions.RequestException as err: | |
logger.error(err) | |
raise DownloadException() | |
transfer_encoding = response.headers.get('transfer-encoding', None) | |
if transfer_encoding and transfer_encoding.lower() == "chunked": | |
logger.error('chunked transfer encoding not supported') | |
raise DownloadException() | |
self.check_content_length(response) | |
content = StringIO(response.content) | |
try: | |
Image.open(content) | |
except Exception: | |
logger.error("can't open content as image") | |
raise DownloadException() | |
else: | |
name = os.path.join(self.upload_dir, generate_filename(self.user_pk, url)) | |
storage = FileSystemStorage() | |
storage.save(name, ContentFile(content.getvalue())) | |
return storage.url(name), storage.path(name) | |
def run(self, url): | |
""" | |
Load content from url, save locally, return link and path to saved file | |
""" | |
return self._run(url) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment