Skip to content

Instantly share code, notes, and snippets.

@d6tdev
Last active September 23, 2018 13:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save d6tdev/a8e80ce454b39a7317af54c4b0063e4f to your computer and use it in GitHub Desktop.
Save d6tdev/a8e80ce454b39a7317af54c4b0063e4f to your computer and use it in GitHub Desktop.
d6tpipe preview
#****************************************
# d6tpipe preview - client
#****************************************
import d6tpipe.api
import d6tpipe.pipe
import pandas as pd
import dask.dataframe as dd
d6tapi = d6tpipe.api.APIClient(key='9PCCZP5q9eN9abvm',secret='MLpTftafuH3bRAfX')
# consumer pulls files => quickly download vendor files using a single unified command
d6tpipe = d6tpipe.Pipe(d6tapi, 'vendorX-tutorial')
d6tpipe.pull()
# open most recent file without manual parsing/sorting
df = pd.read_csv(d6tpipe.files()[-1])
# open last 2 weeks files
df = dd.read_csv(d6tpipe.files(date_recent=2))
# quickly access file read settings
fpath = d6tpipe.files_one(name='data/vendorX/machinedata-2018-01.csv')
df = pd.read_csv(fpath, sep=d6tpipe.config['separator'])
# open all files in dask, eg full history of dataset
df = dd.read_csv(d6tpipe.files())
# open files with name filter applied
df = dd.read_csv(d6tpipe.files(include='machinedata-2018-*.csv'))
#****************************************
# d6tpipe preview - vendor
#****************************************
# vendor creates pipe
d6tapi = d6tpipe.api.APIClient(key='SYDvmaR9dqk687a',secret='SYDvmaR9dqk687a')
data = {
'name': 'dev-test',
'connection-details': {
'name': 's3-dev-test-private', 'type': 's3', 'bucket': 'test-augvest-20180719', 'key':'AKIAIM2OMJMEO7Y2OISA', 'secret':'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'
}, # create new connection
'subdir':'vendorX/',
'owner': 'vendorX',
'filedate': '',
'fileoptions': {
'separator':','} # vendor provides file processing options
}
response, data = d6tcnxn1.client.pipes.post(request_body=data)
# vendor uploads files
d6tpipe.push(glob.glob('data/vendorX/machinedata-*.csv'))
# vendor grants access
data = {'email':'you@client.com'}
response, = d6tcnxn1.pipes._(cfg_ds_name).permissions.grant.post(request_body=data)
assert response.status == 200
# create a new pipe with preprocessed data from another pipe. inherits underlying pipe permissions
import fastparquet
def loadfile(dfg):
dfg['date'] = pd.to_datetime(dfg['date'], format='%Y-%m-%d')
return dfg
files = d6tpipe.files(include='machinedata-2018-*.csv')
df = dd.read_csv(files)
dfout = loadfile(df)
fastparquet.write(d6tpipe_out.fpath_prefix_local+'out.pq', dfout)
d6tpipe_out = d6tpipe.Pipe(d6tapi, 'vendorX-tutorial-parquet')
d6tpipe_out.push([d6tpipe_out.fpath_prefix_local+'out.pq'])
d6tpipe = d6tpipe.Pipe(d6tapi, 'vendorX-tutorial-parquet')
df = dd.read_parquet(d6tpipe.files()[-1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment