Created
March 23, 2021 13:07
-
-
Save pdxjohnny/41baa1bf9523584a34c17f5cf43122c5 to your computer and use it in GitHub Desktop.
start on dataset_source() for ice cream demo / census data by city
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This file is an example of how one might use the dataset_source() decorator to | |
create a new cached dataset as a source. | |
Whenever you see BEGIN, that's meant to be a new section, which could be a new | |
file. You can split them into their own files if you want, just make sure to | |
import from other files as needed. | |
""" | |
# BEGIN | |
# Source implementation of a cached source for a dataset | |
import pathlib | |
from dffml.source.csv import CSVSource | |
from dffml.source.dataset import dataset_source | |
from dffml.util.net import cached_download, DEFAULT_PROTOCOL_ALLOWLIST | |
# Each record will be uniquely keyed off of a combination of the following | |
# headers. In this case "State-CityName" will be the unique key. | |
US_CENSUS_SOURCE_KEYS = ["STNAME", "NAME"] | |
# TODO We should be dynamically creating each function, similar to how we | |
# script the creation of the scikit models, or the creation of tests cases in | |
# tests/test_docstrings.py. There is an example at the bottom. | |
@dataset_source("us.census.oregon") | |
def us_census_oregon( | |
url: str = "https://www2.census.gov/programs-surveys/popest/datasets/2010-2019/cities/totals/sub-est2019_41.csv", | |
expected_sha384_hash: str = "c11bb1be135eb9f2521e6397339c10c87baae4669aab46198f87716cc652a2f8c85a96c477aabe7e11240d5049b1149a", | |
cache_dir: pathlib.Path = ( | |
pathlib.Path("~", ".cache", "dffml", "datasets", "us", "census") | |
.expanduser() | |
.resolve() | |
), | |
): | |
with cached_download( | |
url, cache_dir / "oregon.csv", expected_sha384_hash, | |
) as filepath: | |
yield CSVSource(filename=str(filepath), key=US_CENSUS_SOURCE_KEYS) | |
# BEGIN | |
# Use the source we created above to print out all the records just to check | |
import sys | |
import json | |
import pathlib | |
import asyncio | |
import unittest | |
from dffml import export, load | |
async def load_records(): | |
# Load all the records and convert from a generator to a list | |
records = [record async for record in load(us_census_oregon.source())] | |
# Dump out all the rows | |
print(json.dumps(export(records))) | |
if __name__ == "__main__": | |
asyncio.run(load_records()) | |
# BEGIN | |
# Write an operation that uses the source when it runs. We open the Source when | |
# the operation is instantiated within the DataFlow. Similar to how is done in | |
# the PyPi operation for the shouldi example. | |
# https://intel.github.io/dffml/master/examples/shouldi.html#pypi-operations | |
# This way, the source is open for the lifetime of the DataFlow, it also ensures | |
# that the data in the file isn't loaded every time the operation is run. Only | |
# when the DataFlow is first loaded. | |
import asyncio | |
from typing import Union | |
from dffml import DataFlow, op, config, field, BaseSource, load, Sources | |
@config | |
class MyLookupOpConfig: | |
# Do lookup on multiple sources | |
source: Union[Sources, BaseSource] = field( | |
"Sources to use for lookup", | |
default_factory=lambda: Sources( | |
# One of them is the US Census data for the state of Oregon. | |
# We instantiate the source within the list of sources to include it | |
us_census_oregon.source(), | |
# Another is the state of Washington. | |
# We instantiate the source within the list of sources to include it | |
# us_census_washington.source(keys=MY_LOOKUP_OP_CONFIG_SOURCES_KEYS), | |
), | |
) | |
@op( | |
imp_enter={"source": lambda self: self.config.source}, | |
ctx_enter={"sctx": lambda self: self.parent.source()}, | |
config_cls=MyLookupOpConfig, | |
) | |
async def my_lookup_op(self, city: str, state: str): | |
return [record async for record in load(self.sctx, f"{state}-{city}")][0] | |
DATAFLOW = DataFlow(operations={"my_lookup_op": my_lookup_op}) | |
async def run_op(): | |
async with my_lookup_op.imp(my_lookup_op.imp.CONFIG()) as opimp: | |
async with opimp(None, None) as ctx: | |
print( | |
( | |
await ctx.run(dict(city="Portland city", state="Oregon")) | |
).export() | |
) | |
return | |
# Load all the records and convert from a generator to a list | |
records = list(load(us_census_oregon.source())) | |
# Dump out all the rows | |
print(json.dumps(export(records))) | |
for ctx, results in run( | |
DATAFLOW, | |
[ | |
Input( | |
value="Portland city", | |
definition=my_lookup_op.op.inputs["city"], | |
), | |
Input(value="Oregon", definition=my_lookup_op.op.inputs["state"]), | |
], | |
): | |
print(ctx, results) | |
if __name__ == "__main__": | |
asyncio.run(run_op()) | |
# run_op() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I had messed around with this a bit to make sure that it would work for you @sk-ip. I also did a patch to the CSV source to make
key
intokeys
which combines column values into the record key, but I wasn't sure if that was needed. This patch still references that, so if you work from this then you'll need to remove that. You may already be past needing this, but wanted to send it your way since we didn't get a chance to sync in the meeting today.