Skip to content

Instantly share code, notes, and snippets.

@FZambia
Last active October 10, 2020 07:02
Show Gist options
  • Save FZambia/faab8f811dee95c7b174 to your computer and use it in GitHub Desktop.
Save FZambia/faab8f811dee95c7b174 to your computer and use it in GitHub Desktop.
Download image from url for Django
import logging
import requests
from PIL import Image
import requests.exceptions
from StringIO import StringIO
from django.core.files.base import ContentFile
from django.core.files.storage import FileSystemStorage
logger = logging.getLogger('uploads')
class DownloadException(Exception):
pass
class DownloadTimeout(DownloadException):
pass
class DownloadMalformedContentType(DownloadException):
pass
class DownloadTooBig(DownloadException):
pass
class ImageDownloader(object):
"""
Helper class to download images from URL
"""
UPLOAD_DIR = 'uploads'
MAX_CONTENT_LENGTH = 10485760 # 10mb
TIMEOUT = 5 # 5 sec
def __init__(self, user_pk=None, upload_dir=None, max_content_length=None, timeout=None):
self.user_pk = user_pk or 'anonymous'
self.upload_dir = upload_dir or self.UPLOAD_DIR
self.max_content_length = max_content_length or self.MAX_CONTENT_LENGTH
self.timeout = timeout or self.TIMEOUT
def check_content_length(self, response):
content_length = response.headers.get('content-length', None)
if not content_length:
logger.error('no content length header')
raise DownloadMalformedContentType()
try:
content_length = int(content_length)
except ValueError:
logger.error('malformed content type header')
raise DownloadMalformedContentType()
if content_length > self.max_content_length:
logger.error('content length too big - {0} bytes'.format(content_length))
raise DownloadTooBig()
def _run(self, url):
logger.info(u"downloading {0}".format(url))
try:
response = requests.get(url, timeout=self.timeout, stream=True)
except requests.exceptions.Timeout:
logger.error("timed out")
raise DownloadTimeout()
except requests.exceptions.RequestException as err:
logger.error(err)
raise DownloadException()
transfer_encoding = response.headers.get('transfer-encoding', None)
if transfer_encoding and transfer_encoding.lower() == "chunked":
logger.error('chunked transfer encoding not supported')
raise DownloadException()
self.check_content_length(response)
content = StringIO(response.content)
try:
Image.open(content)
except Exception:
logger.error("can't open content as image")
raise DownloadException()
else:
name = os.path.join(self.upload_dir, generate_filename(self.user_pk, url))
storage = FileSystemStorage()
storage.save(name, ContentFile(content.getvalue()))
return storage.url(name), storage.path(name)
def run(self, url):
"""
Load content from url, save locally, return link and path to saved file
"""
return self._run(url)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment