Skip to content

Instantly share code, notes, and snippets.

@Object905
Created March 21, 2024 13:35
Show Gist options
  • Save Object905/8e1bc379d10227405d099b083b41989a to your computer and use it in GitHub Desktop.
Save Object905/8e1bc379d10227405d099b083b41989a to your computer and use it in GitHub Desktop.
Map tile server downloader
"""
Download tiles from tile sever "smart way" - fetchin for biggest zoom and then going down only if tile has some data
"""
from __future__ import annotations
import asyncio
import io
from itertools import batched
import math
import zlib
from collections import defaultdict
from dataclasses import dataclass, field
import logging
from io import BytesIO
from pathlib import Path
from typing import NewType, TypeAlias
import aioboto3
import aiometer
import httpx
from asgiref.sync import sync_to_async
from django.conf import settings
from PIL import Image
from coverage.models import TileServer
from zones.models.models import ThisCountry
logger = logging.getLogger("tdl")
Z = NewType("Z", int)
X = NewType("X", int)
Y = NewType("Y", int)
Lat = NewType("Lat", float)
Lon = NewType("Lon", float)
PointDeg: TypeAlias = tuple[Lat, Lon]
PointTile: TypeAlias = tuple[X, Y]
Dest: TypeAlias = str
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
)
@dataclass
class UploadHandler:
client: httpx.AsyncClient | None = None
_empty_files_cache: dict[tuple[int, int], bool] = field(default_factory=dict)
def get_destination(self, p: PointTile, zoom: Z) -> Dest:
raise NotImplementedError(f"{self.__class__.__qualname__}.get_destination not implemented")
async def destination_exists(self, dest: Dest) -> bool:
return False
async def store(self, buf: BytesIO, dest: Dest):
raise NotImplementedError(f"{self.__class__.__qualname__}.store not implemented")
async def after_download(self, zoom: Z):
pass
async def on_miss(self, dest: Dest):
Path(dest).unlink(missing_ok=True)
async def fetch(self, request: httpx.Request, retry=3) -> tuple[BytesIO | None, int]:
"""Should return None instead of buf if img is empty or 404"""
logger.debug(f"GET {request.url}")
try:
resp = await self.client.send(request, stream=True, follow_redirects=True)
except (httpx.TransportError, httpx.TimeoutException) as e:
await asyncio.sleep(2)
if not retry:
raise e
return await self.fetch(request, retry=retry - 1)
if resp.status_code == 429 and retry:
logger.info(f"Rate limited {request.url}")
await resp.aclose()
await asyncio.sleep(3)
return await self.fetch(request, retry=retry - 1)
elif resp.status_code == 404:
await resp.aclose()
return None, request.idx
elif resp.status_code != 200:
await resp.aclose()
logger.error(f"Response error code: {resp.status_code}. At {request.url}")
return None, request.idx
logger.debug(f"Download {resp.url}")
buf = BytesIO()
async for chunk in resp.aiter_bytes(chunk_size=io.DEFAULT_BUFFER_SIZE):
buf.write(chunk)
buf.seek(0)
if filtered_buf := self.filter_image_buf(buf):
return filtered_buf, request.idx
else:
logger.debug(f"{resp.url} is an empty image, skipping")
return None, request.idx
def filter_image_buf(self, buf: BytesIO) -> BytesIO | None:
key = len(buf.getbuffer()), zlib.crc32(buf.getbuffer())
if (cached := self._empty_files_cache.get(key)) is not None:
return cached
img = Image.open(buf)
is_empty = self.is_image_empty(img)
self._empty_files_cache[key] = is_empty
if is_empty:
return None
buf.seek(0)
return buf
@staticmethod
def is_image_empty(img: Image.Image) -> bool:
extrema = img.getextrema()
return extrema[0] == extrema[1]
@dataclass
class FileUploadHandler(UploadHandler):
folder: Path = Path("~/tiles/t2_lte450").expanduser()
def get_destination(self, p: PointTile, zoom: Z) -> str:
result = self.folder / str(zoom) / str(p[0]) / str(p[1])
result = result.with_suffix(".png")
return str(result)
async def destination_exists(self, dest: str) -> bool:
return Path(dest).exists()
async def store(self, buf: BytesIO, dest: str):
Path(dest).parent.mkdir(exist_ok=True, parents=True)
with open(dest, "wb") as f:
f.write(buf.getbuffer())
@dataclass
class TileScraper:
upload_handler: UploadHandler
skip_existing: bool = True
fetched: dict[Z, list[PointTile, Dest]] = field(default_factory=lambda: defaultdict(list))
def get_possible_tiles_by_country(self, zoom: Z) -> list[PointTile]:
country_convex_hull = ThisCountry.get_solo().geometry.convex_hull
points = []
for ring in country_convex_hull:
for point in ring:
# reversed because not matches postgis
points.append((point[1], point[0]))
return points_to_tiles(points, zoom)
async def rotate_client(self, tile_server) -> httpx.AsyncClient:
if self.upload_handler.client is not None:
await self.upload_handler.client.__aexit__(None, None, None)
client = httpx.AsyncClient(
http2=True,
timeout=60,
headers={"User-Agent": tile_server.user_agent or DEFAULT_USER_AGENT},
follow_redirects=True,
limits=httpx.Limits(
max_connections=50,
max_keepalive_connections=25,
keepalive_expiry=10,
),
)
await client.__aenter__()
self.upload_handler.client = client
return client
async def run(self, tile_server: TileServer):
min_zoom_tiles = await sync_to_async(self.get_possible_tiles_by_country)(tile_server.min_zoom)
await self.download_tiles(
tiles=min_zoom_tiles,
zoom=tile_server.min_zoom,
tile_server=tile_server,
)
prev_zoom = tile_server.min_zoom
for zoom in range(tile_server.min_zoom + 1, tile_server.max_zoom + 1):
possible_tiles = set()
for prev_zoom_tile, _ in self.fetched[prev_zoom]:
possible_tiles.update(zoom_out_tiles(prev_zoom_tile, prev_zoom, zoom))
prev_zoom = zoom
possible_tiles = sorted(possible_tiles)
if not possible_tiles:
continue
logger.info(f"Downloading {len(possible_tiles)} on zoom {zoom}")
await self.download_tiles(
tiles=possible_tiles,
zoom=zoom,
tile_server=tile_server,
)
async def download_tiles(
self,
tiles: list[PointTile],
zoom: Z,
tile_server: TileServer,
skip_non_existing=False,
):
await self.rotate_client(tile_server)
idx_to_req = {}
idx_to_dest = {}
idx_to_tile = {}
existing_idx = []
for idx, tile in enumerate(tiles):
destination = self.upload_handler.get_destination(tile, zoom)
idx_to_dest[idx] = destination
idx_to_tile[idx] = tile
url = tile_server.url_template.format(z=zoom, x=tile[0], y=tile[1])
request = httpx.Request("GET", url)
request.idx = idx
idx_to_req[idx] = request
if self.skip_existing:
tasks = []
for dest in idx_to_dest.values():
tasks.append(self.upload_handler.destination_exists(dest))
dest_exists = await asyncio.gather(*tasks)
for idx, dest_exists in enumerate(dest_exists):
if not dest_exists:
continue
existing_idx.append(idx)
idx_to_req.pop(idx)
if skip_non_existing:
idx_to_req = {}
if idx_to_req:
for batch in batched(idx_to_req.values(), 50):
tasks = []
async with aiometer.amap(
self.upload_handler.fetch,
args=batch,
max_at_once=tile_server.req_max_at_once,
max_per_second=tile_server.req_max_per_second,
) as results:
async for buf, idx in results:
destination = idx_to_dest[idx]
tile = idx_to_tile[idx]
if buf is None:
tasks.append(asyncio.create_task(self.upload_handler.on_miss(destination)))
continue
tasks.append(asyncio.create_task(self.upload_handler.store(buf, destination)))
self.fetched[zoom].append((tile, destination))
await asyncio.gather(*tasks)
await asyncio.sleep(1)
await self.upload_handler.after_download(zoom)
for idx in existing_idx:
destination = idx_to_dest[idx]
tile = idx_to_tile[idx]
self.fetched[zoom].append((tile, destination))
self.fetched[zoom].sort(key=lambda x: x[0])
# ref https://wiki.openstreetmap.org/wiki/Slippy_map_tilenames
def coords_to_tile(point_deg: PointDeg, zoom: Z) -> PointTile:
"""Transform lat/lon into x,y with given zoom"""
lat_deg, lon_deg = point_deg
lat_rad = math.radians(lat_deg)
n = 1 << zoom
xtile = int((lon_deg + 180.0) / 360.0 * n)
ytile = int((1.0 - math.asinh(math.tan(lat_rad)) / math.pi) / 2.0 * n)
return xtile, ytile
def tile_to_coords(point_tile: PointTile, zoom: Z) -> PointDeg:
"""This returns the NW-corner of the square"""
x, y = point_tile
n = 1 << zoom
lon_deg = x / n * 360.0 - 180.0
lat_rad = math.atan(math.sinh(math.pi * (1 - 2 * y / n)))
lat_deg = math.degrees(lat_rad)
return lat_deg, lon_deg
def points_to_tiles(points: list[PointDeg], zoom: Z) -> list[PointTile]:
"""Return the biggest square of all possible x,y that will fit given points"""
possible_tiles = set()
for point in points:
tile = coords_to_tile(point, zoom)
possible_tiles.add(tile)
min_x = min(x for x, _ in possible_tiles)
min_x = max(min_x, 1)
max_x = max(x for x, _ in possible_tiles)
min_y = min(y for _, y in possible_tiles)
min_y = max(min_y, 1)
max_y = max(y for _, y in possible_tiles)
result = []
for x in range(min_x, max_x + 1):
for y in range(min_y, max_y + 1):
result.append((x, y))
return result
def tile_to_bb(point_tile: PointTile, zoom: Z) -> tuple[PointDeg, PointDeg, PointDeg, PointDeg]:
"""NW, NE, SE, SW corners of bounding box from point tile"""
nw = tile_to_coords(point_tile, zoom)
ne = tile_to_coords((point_tile[0] + 1, point_tile[1]), zoom)
se = tile_to_coords((point_tile[0] + 1, point_tile[1] + 1), zoom)
sw = tile_to_coords((point_tile[0], point_tile[1] + 1), zoom)
return nw, ne, se, sw
def zoom_out_tiles(tile: PointTile, old_zoom: Z, new_zoom: Z) -> list[PointTile]:
bb = tile_to_bb(tile, old_zoom)
return points_to_tiles(bb, new_zoom)
def tele2_lte():
tile_server = TileServer(
name="Tele2 LTE",
key="t2_lte",
url_template="https://msk.tele2.ru/maps/_4g/{z}/{x}/{y}.png",
min_zoom=4,
max_zoom=12,
user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
req_max_at_once=10,
req_max_per_second=20,
)
upload_handler = FileUploadHandler(folder=Path("~/tiles/").expanduser() / tile_server.key)
scraper = TileScraper(upload_handler=upload_handler)
asyncio.run(scraper.run(tile_server), debug=True)
if __name__ in ("__main__", "django.core.management.commands.shell"):
logging.basicConfig(level=logging.INFO)
logger.setLevel(logging.DEBUG)
tele2_lte()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment