Skip to content

Instantly share code, notes, and snippets.

@pdxjohnny
Created March 23, 2021 13:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pdxjohnny/41baa1bf9523584a34c17f5cf43122c5 to your computer and use it in GitHub Desktop.
Save pdxjohnny/41baa1bf9523584a34c17f5cf43122c5 to your computer and use it in GitHub Desktop.
start on dataset_source() for ice cream demo / census data by city
"""
This file is an example of how one might use the dataset_source() decorator to
create a new cached dataset as a source.
Whenever you see BEGIN, that's meant to be a new section, which could be a new
file. You can split them into their own files if you want, just make sure to
import from other files as needed.
"""
# BEGIN
# Source implementation of a cached source for a dataset
import pathlib
from dffml.source.csv import CSVSource
from dffml.source.dataset import dataset_source
from dffml.util.net import cached_download, DEFAULT_PROTOCOL_ALLOWLIST
# Each record will be uniquely keyed off of a combination of the following
# headers. In this case "State-CityName" will be the unique key.
US_CENSUS_SOURCE_KEYS = ["STNAME", "NAME"]
# TODO We should be dynamically creating each function, similar to how we
# script the creation of the scikit models, or the creation of tests cases in
# tests/test_docstrings.py. There is an example at the bottom.
@dataset_source("us.census.oregon")
def us_census_oregon(
url: str = "https://www2.census.gov/programs-surveys/popest/datasets/2010-2019/cities/totals/sub-est2019_41.csv",
expected_sha384_hash: str = "c11bb1be135eb9f2521e6397339c10c87baae4669aab46198f87716cc652a2f8c85a96c477aabe7e11240d5049b1149a",
cache_dir: pathlib.Path = (
pathlib.Path("~", ".cache", "dffml", "datasets", "us", "census")
.expanduser()
.resolve()
),
):
with cached_download(
url, cache_dir / "oregon.csv", expected_sha384_hash,
) as filepath:
yield CSVSource(filename=str(filepath), key=US_CENSUS_SOURCE_KEYS)
# BEGIN
# Use the source we created above to print out all the records just to check
import sys
import json
import pathlib
import asyncio
import unittest
from dffml import export, load
async def load_records():
# Load all the records and convert from a generator to a list
records = [record async for record in load(us_census_oregon.source())]
# Dump out all the rows
print(json.dumps(export(records)))
if __name__ == "__main__":
asyncio.run(load_records())
# BEGIN
# Write an operation that uses the source when it runs. We open the Source when
# the operation is instantiated within the DataFlow. Similar to how is done in
# the PyPi operation for the shouldi example.
# https://intel.github.io/dffml/master/examples/shouldi.html#pypi-operations
# This way, the source is open for the lifetime of the DataFlow, it also ensures
# that the data in the file isn't loaded every time the operation is run. Only
# when the DataFlow is first loaded.
import asyncio
from typing import Union
from dffml import DataFlow, op, config, field, BaseSource, load, Sources
@config
class MyLookupOpConfig:
# Do lookup on multiple sources
source: Union[Sources, BaseSource] = field(
"Sources to use for lookup",
default_factory=lambda: Sources(
# One of them is the US Census data for the state of Oregon.
# We instantiate the source within the list of sources to include it
us_census_oregon.source(),
# Another is the state of Washington.
# We instantiate the source within the list of sources to include it
# us_census_washington.source(keys=MY_LOOKUP_OP_CONFIG_SOURCES_KEYS),
),
)
@op(
imp_enter={"source": lambda self: self.config.source},
ctx_enter={"sctx": lambda self: self.parent.source()},
config_cls=MyLookupOpConfig,
)
async def my_lookup_op(self, city: str, state: str):
return [record async for record in load(self.sctx, f"{state}-{city}")][0]
DATAFLOW = DataFlow(operations={"my_lookup_op": my_lookup_op})
async def run_op():
async with my_lookup_op.imp(my_lookup_op.imp.CONFIG()) as opimp:
async with opimp(None, None) as ctx:
print(
(
await ctx.run(dict(city="Portland city", state="Oregon"))
).export()
)
return
# Load all the records and convert from a generator to a list
records = list(load(us_census_oregon.source()))
# Dump out all the rows
print(json.dumps(export(records)))
for ctx, results in run(
DATAFLOW,
[
Input(
value="Portland city",
definition=my_lookup_op.op.inputs["city"],
),
Input(value="Oregon", definition=my_lookup_op.op.inputs["state"]),
],
):
print(ctx, results)
if __name__ == "__main__":
asyncio.run(run_op())
# run_op()
@pdxjohnny
Copy link
Author

I had messed around with this a bit to make sure that it would work for you @sk-ip. I also did a patch to the CSV source to make key into keys which combines column values into the record key, but I wasn't sure if that was needed. This patch still references that, so if you work from this then you'll need to remove that. You may already be past needing this, but wanted to send it your way since we didn't get a chance to sync in the meeting today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment