Last active
March 13, 2020 04:03
-
-
Save CMCDragonkai/71a92ce231934b6c461c87cc4baa9944 to your computer and use it in GitHub Desktop.
Load Images and Numpy Arrays from Filesystem URLs and From Standard Input #python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import sys | |
import fs | |
import contextlib | |
import urllib.parse | |
import numpy as np | |
from PIL import Image | |
from typing import Tuple, Iterator, IO, Optional, List | |
def parse_file_url(url: str) -> Tuple[str, str]: | |
fs_url = "" | |
file_path = "" | |
url_parsed = urllib.parse.urlparse(url) | |
# if there's no scheme, it's a filesystem path | |
if not url_parsed.scheme: | |
fs_url += "osfs://" | |
# if it is an absolute path, the fs_url must start at the root | |
if url_parsed.path.startswith("/"): | |
fs_url += "/" | |
# remove any leading slashes | |
file_path += url_parsed.path.lstrip("/") | |
if url_parsed.params: | |
file_path += f";{url_parsed.params}" | |
if url_parsed.fragment: | |
file_path += f"#{url_parsed.fragment}" | |
else: | |
if not url_parsed.path: | |
fs_url += f"{url_parsed.scheme}://" | |
if url_parsed.query: | |
fs_url += f"?{url_parsed.query}" | |
file_path += url_parsed.netloc | |
else: | |
fs_url += f"{url_parsed.scheme}://" | |
if url_parsed.netloc: | |
fs_url += url_parsed.netloc | |
# if it is an absolute path, the fs_url must start at the root | |
if url_parsed.path.startswith("/"): | |
fs_url += "/" | |
if url_parsed.query: | |
fs_url += f"?{url_parsed.query}" | |
file_path += url_parsed.path | |
if url_parsed.params: | |
file_path += f";{url_parsed.params}" | |
if url_parsed.fragment: | |
file_path += f"#{url_parsed.fragment}" | |
return (fs_url, file_path) | |
@contextlib.contextmanager | |
def open_file_url( | |
url: str, mode: str = "r", buffering=-1, encoding=None, errors=None, newline="" | |
) -> Iterator[IO]: | |
(fs_url, file_path) = parse_file_url(url) | |
with fs.open_fs(fs_url) as fs_: | |
with fs_.open(file_path, mode, buffering, encoding, errors, newline) as file: | |
yield file | |
def get_images( | |
image_urls: Optional[List[str]] = None, images_urls: Optional[List[str]] = None | |
) -> Iterator[np.array]: | |
if image_urls is None and images_urls is None: | |
np_magic = sys.stdin.buffer.peek(6)[:6] | |
if np_magic == b"\x93NUMPY": | |
yield np.load(io.BytesIO(sys.stdin.buffer.read())) | |
else: | |
image = Image.open(io.BytesIO(sys.stdin.buffer.read())) | |
yield np.asarray(image, dtype=np.uint8) | |
else: | |
if image_urls is not None: | |
for image_url in image_urls: | |
with open_file_url(image_url, mode="rb") as image_file: | |
np_magic = image_file.peek(6)[:6] | |
if np_magic == b"\x93NUMPY": | |
yield np.load(image_file) | |
else: | |
image = Image.open(image_file) | |
yield np.asarray(image, dtype=np.uint8) | |
if images_urls is not None: | |
for images_url in images_urls: | |
with fs.open_fs(images_url) as images_fs: | |
for image_path in images_fs.walk.files(): | |
with images_fs.open(image_path, mode="rb") as image_file: | |
np_magic = image_file.peek(6)[:6] | |
if np_magic == b"\x93NUMPY": | |
yield np.load(image_file) | |
else: | |
image = Image.open(image_file) | |
yield np.asarray(image, dtype=np.uint8) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment