Last active
September 23, 2018 13:24
-
-
Save d6tdev/a8e80ce454b39a7317af54c4b0063e4f to your computer and use it in GitHub Desktop.
d6tpipe preview
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#**************************************** | |
# d6tpipe preview - client | |
#**************************************** | |
import d6tpipe.api | |
import d6tpipe.pipe | |
import pandas as pd | |
import dask.dataframe as dd | |
d6tapi = d6tpipe.api.APIClient(key='9PCCZP5q9eN9abvm',secret='MLpTftafuH3bRAfX') | |
# consumer pulls files => quickly download vendor files using a single unified command | |
d6tpipe = d6tpipe.Pipe(d6tapi, 'vendorX-tutorial') | |
d6tpipe.pull() | |
# open most recent file without manual parsing/sorting | |
df = pd.read_csv(d6tpipe.files()[-1]) | |
# open last 2 weeks files | |
df = dd.read_csv(d6tpipe.files(date_recent=2)) | |
# quickly access file read settings | |
fpath = d6tpipe.files_one(name='data/vendorX/machinedata-2018-01.csv') | |
df = pd.read_csv(fpath, sep=d6tpipe.config['separator']) | |
# open all files in dask, eg full history of dataset | |
df = dd.read_csv(d6tpipe.files()) | |
# open files with name filter applied | |
df = dd.read_csv(d6tpipe.files(include='machinedata-2018-*.csv')) | |
#**************************************** | |
# d6tpipe preview - vendor | |
#**************************************** | |
# vendor creates pipe | |
d6tapi = d6tpipe.api.APIClient(key='SYDvmaR9dqk687a',secret='SYDvmaR9dqk687a') | |
data = { | |
'name': 'dev-test', | |
'connection-details': { | |
'name': 's3-dev-test-private', 'type': 's3', 'bucket': 'test-augvest-20180719', 'key':'AKIAIM2OMJMEO7Y2OISA', 'secret':'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' | |
}, # create new connection | |
'subdir':'vendorX/', | |
'owner': 'vendorX', | |
'filedate': '', | |
'fileoptions': { | |
'separator':','} # vendor provides file processing options | |
} | |
response, data = d6tcnxn1.client.pipes.post(request_body=data) | |
# vendor uploads files | |
d6tpipe.push(glob.glob('data/vendorX/machinedata-*.csv')) | |
# vendor grants access | |
data = {'email':'you@client.com'} | |
response, = d6tcnxn1.pipes._(cfg_ds_name).permissions.grant.post(request_body=data) | |
assert response.status == 200 | |
# create a new pipe with preprocessed data from another pipe. inherits underlying pipe permissions | |
import fastparquet | |
def loadfile(dfg): | |
dfg['date'] = pd.to_datetime(dfg['date'], format='%Y-%m-%d') | |
return dfg | |
files = d6tpipe.files(include='machinedata-2018-*.csv') | |
df = dd.read_csv(files) | |
dfout = loadfile(df) | |
fastparquet.write(d6tpipe_out.fpath_prefix_local+'out.pq', dfout) | |
d6tpipe_out = d6tpipe.Pipe(d6tapi, 'vendorX-tutorial-parquet') | |
d6tpipe_out.push([d6tpipe_out.fpath_prefix_local+'out.pq']) | |
d6tpipe = d6tpipe.Pipe(d6tapi, 'vendorX-tutorial-parquet') | |
df = dd.read_parquet(d6tpipe.files()[-1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment