Skip to content

Instantly share code, notes, and snippets.

@remram44
Created November 18, 2021 02:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save remram44/ad8aee4cb23a71fefba014bbacbca365 to your computer and use it in GitHub Desktop.
Save remram44/ad8aee4cb23a71fefba014bbacbca365 to your computer and use it in GitHub Desktop.
import aiohttp
import asyncio
import glob
import itertools
import logging
import os
import re
import sodapy
logger = logging.getLogger(__name__)
re_non_id_safe = re.compile(r'[^a-z0-9-]+')
def encode_domain(url):
domain = re_non_id_safe.sub('-', url.lower())
return domain
async def get_socrata_datasets(domains, parallel):
for name in glob.glob('*.tmp'):
os.remove(name)
async with aiohttp.ClientSession() as http:
for domain in domains:
try:
await process_domain(domain, parallel, http)
except Exception:
logger.exception("Error processing %s", domain)
async def process_domain(domain, parallel, http):
logger.info("Processing %s...", domain)
socrata = sodapy.Socrata(domain, app_token=None)
datasets = socrata.datasets()
logger.info("Found %d datasets", len(datasets))
if not datasets:
return
seen = set()
# Start N tasks
it_datasets = iter(datasets)
tasks = {
asyncio.ensure_future(process_dataset(domain, dataset, http)): dataset
for dataset in itertools.islice(it_datasets, parallel)
}
while tasks:
# Wait for any task to complete
done, pending = await asyncio.wait(
list(tasks),
return_when=asyncio.FIRST_COMPLETED,
)
# Poll them
for fut in done:
dataset = tasks.pop(fut)
try:
valid = fut.result()
except Exception:
logger.exception("Error processing dataset %s",
dataset['resource']['id'])
else:
assert isinstance(valid, bool)
if valid:
seen.add(dataset['resource']['id'])
# Schedule new tasks
for dataset in itertools.islice(it_datasets, parallel - len(tasks)):
tasks[asyncio.ensure_future(process_dataset(domain, dataset, http))] = dataset
logger.info("Discovered %d/%d datasets", len(seen), len(datasets))
async def process_dataset(domain, dataset, http):
# Get metadata
resource = dataset['resource']
id = resource['id']
encoded_domain = encode_domain(domain)
dataset_id = '{}.{}'.format(encoded_domain, id)
# Check type
# api, calendar, chart, datalens, dataset, federated_href, file,
# filter, form, href, link, map, measure, story, visualization
if resource['type'] != 'dataset':
logger.info("Skipping %s, type %s", id, resource['type'])
return False
filename = dataset_id + '.csv'
tempname = dataset_id + '.csv.tmp'
# Check if dataset already exists
if os.path.exists(filename):
logger.info("%s exists, skipping", filename)
return True
direct_url = (
'https://{domain}/api/views/{dataset_id}/rows.csv'
'?accessType=DOWNLOAD'.format(domain=domain, dataset_id=id)
)
# Download this dataset
logger.info("Downloading %s", dataset_id)
async with http.get(direct_url) as response:
assert response.status == 200
file = open(tempname, 'wb')
try:
with file:
while True:
chunk = await response.content.read(4096)
if not chunk:
break
file.write(chunk)
except:
os.remove(tempname)
raise
else:
os.rename(tempname, filename)
logger.info("Download %s completed", dataset_id)
return True
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.get_event_loop().run_until_complete(
get_socrata_datasets(
# Socrata domains to download
['data.cityofnewyork.us'],
# How many datasets to download in parallel
parallel=4,
)
)
import subprocess
import aiohttp
import asyncio
import glob
import itertools
import logging
import os
import re
import sodapy
logger = logging.getLogger(__name__)
re_non_id_safe = re.compile(r'[^a-z0-9-]+')
def encode_domain(url):
domain = re_non_id_safe.sub('-', url.lower())
return domain
async def get_socrata_datasets(domains, target, parallel):
for name in glob.glob('*.tmp'):
os.remove(name)
async with aiohttp.ClientSession() as http:
for domain in domains:
try:
await process_domain(domain, parallel, target, http)
except Exception:
logger.exception("Error processing %s", domain)
async def process_domain(domain, parallel, target, http):
logger.info("Processing %s...", domain)
socrata = sodapy.Socrata(domain, app_token=None)
datasets = socrata.datasets()
logger.info("Found %d datasets", len(datasets))
if not datasets:
return
seen = set()
# Start N tasks
it_datasets = iter(datasets)
tasks = {
asyncio.ensure_future(process_dataset(domain, dataset, target, http)): dataset
for dataset in itertools.islice(it_datasets, parallel)
}
while tasks:
# Wait for any task to complete
done, pending = await asyncio.wait(
list(tasks),
return_when=asyncio.FIRST_COMPLETED,
)
# Poll them
for fut in done:
dataset = tasks.pop(fut)
try:
valid = fut.result()
except Exception:
logger.exception("Error processing dataset %s",
dataset['resource']['id'])
else:
assert isinstance(valid, bool)
if valid:
seen.add(dataset['resource']['id'])
# Schedule new tasks
for dataset in itertools.islice(it_datasets, parallel - len(tasks)):
tasks[asyncio.ensure_future(process_dataset(domain, dataset, target, http))] = dataset
logger.info("Discovered %d/%d datasets", len(seen), len(datasets))
async def process_dataset(domain, dataset, target, http):
# Get metadata
resource = dataset['resource']
id = resource['id']
encoded_domain = encode_domain(domain)
dataset_id = '{}.{}'.format(encoded_domain, id)
# Check type
# api, calendar, chart, datalens, dataset, federated_href, file,
# filter, form, href, link, map, measure, story, visualization
if resource['type'] != 'dataset':
logger.info("Skipping %s, type %s", id, resource['type'])
return False
filename = target.rstrip('/') + '/' + dataset_id + '.csv'
tempname = dataset_id + '.csv.tmp'
# Check if dataset already exists
cmd = ['hadoop', 'fs', '-ls', filename]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
ret = await proc.wait()
if ret == 0:
logger.info("%s exists, skipping", filename)
return True
direct_url = (
'https://{domain}/api/views/{dataset_id}/rows.csv'
'?accessType=DOWNLOAD'.format(domain=domain, dataset_id=id)
)
# Download this dataset
logger.info("Downloading %s", dataset_id)
async with http.get(direct_url) as response:
assert response.status == 200
file = open(tempname, 'wb')
try:
with file:
while True:
chunk = await response.content.read(4096)
if not chunk:
break
file.write(chunk)
except:
os.remove(tempname)
raise
else:
cmd = ['hadoop', 'fs', '-put', tempname, filename]
proc = await asyncio.create_subprocess_exec(*cmd)
ret = await proc.wait()
os.remove(tempname)
if ret != 0:
raise subprocess.CalledProcessError(ret, cmd)
logger.info("Download %s completed", dataset_id)
return True
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.get_event_loop().run_until_complete(
get_socrata_datasets(
# Socrata domains to download
['data.cityofnewyork.us'],
# Where to put it in HDFS
'/user/rr2369',
# How many datasets to download in parallel
parallel=4,
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment