Create a gist now

Instantly share code, notes, and snippets.

Another simple Bubbles example: Use two data sources: customers and orders CSV files (exports from Volusion platform). Get list of customers (name, email address) who made orders in certain range of years. There are two versions of the same process: one involves loading the data into a SQL table and performing the operations using SQL, the other…
from bubbles import Pipeline, open_store
stores = {
"source": open_store("csv", "data/source", encoding="utf16", infer_fields=True),
"target": open_store("sql", "sqlite:///data.sqlite")
}
p = Pipeline(stores=stores)
# Load customers into a SQL table
detail = p.fork()
detail.source("source", "Customers")
detail.create("target", "customers", replace=True)
# Load orders into a SQL table, convert date into a date type
p.source("source", "Orders")
p.string_to_date("shipdate", fmt="%m/%d/%Y %H:%M:%S %p")
p.create("target", "orders", replace=True)
# Get only shipped orders
p.filter_not_empty("shipdate")
# Filter by year
p.split_date("shipdate")
p.filter_by_range("shipdate_year", 2011, 2013)
# Join customer details (see above)
p.join_details(detail, "customerid", "customerid")
# Naive removal of duplicates
p.distinct(["customerid", "shipdate_year", "firstname", "lastname", "emailaddress"])
# Store result into a `report` table, replace the table if exists
p.create("target", "report", replace=True)
p.run()
from bubbles import Pipeline, open_store
stores = {
"source": open_store("csv", "data/source", encoding="utf16", infer_fields=True),
"target": open_store("csv", "data/output", role="target")
p = Pipeline(stores=stores)
# Set source for customers
detail = p.fork()
detail.source("source", "Customers")
# Get orders source
p.source("source", "Orders")
# Convert ship date to date format and get only shipped orders
p.string_to_date("shipdate", fmt="%m/%d/%Y %H:%M:%S %p")
p.filter_not_empty("shipdate")
# Filter by year
p.split_date("shipdate")
p.filter_by_range("shipdate_year", 2011, 2013)
# Join customer details (see above)
p.join_details(detail, "customerid", "customerid")
# Naive removal of duplicates
p.distinct(["customerid", "shipdate_year", "firstname", "lastname", "emailaddress"])
# Store result into a `report.csv` file, replace the file if exists
p.create("target", "report", replace=True)
p.run()
DEBUG step 0: evaluate soure Orders in source
DEBUG step 1: evaluate operation string_to_date
DEBUG calling string_to_date(rows)
WARNING operation string_to_date is experimental
INFO called string_to_date(rows)
DEBUG step 2: evaluate soure Customers in source
DEBUG step 3: evaluate operation filter_not_empty
DEBUG calling filter_not_empty(rows)
INFO called filter_not_empty(rows)
DEBUG step 4: evaluate operation split_date
DEBUG calling split_date(rows)
INFO called split_date(rows)
DEBUG step 5: evaluate operation filter_by_range
DEBUG calling filter_by_range(rows)
INFO called filter_by_range(rows)
DEBUG step 6: evaluate operation join_details
DEBUG calling join_details(rows, rows)
INFO called join_details(rows, rows)
DEBUG step 7: evaluate operation distinct
DEBUG calling distinct(rows)
INFO called distinct(rows)
DEBUG step 8: evaluate <bubbles.graph.CreateObjectNode object at 0x10332b6d0>
DEBUG step 0: evaluate soure Orders in source
DEBUG step 1: evaluate soure Customers in source
DEBUG step 2: evaluate operation string_to_date
DEBUG calling string_to_date(rows)
WARNING operation string_to_date is experimental
INFO called string_to_date(rows)
DEBUG step 3: evaluate <bubbles.graph.CreateObjectNode object at 0x1055d0650>
DEBUG append_from: appending rows into customers
DEBUG step 4: evaluate <bubbles.graph.CreateObjectNode object at 0x1059ec150>
DEBUG append_from: appending rows into orders
DEBUG step 5: evaluate operation filter_not_empty
DEBUG calling filter_not_empty(sql)
INFO called filter_not_empty(sql)
DEBUG step 6: evaluate operation split_date
DEBUG calling split_date(sql)
INFO called split_date(sql)
DEBUG step 7: evaluate operation filter_by_range
DEBUG calling filter_by_range(sql)
INFO called filter_by_range(sql)
DEBUG step 8: evaluate operation join_details
DEBUG calling join_details(sql, sql)
INFO called join_details(sql, sql)
DEBUG step 9: evaluate operation distinct
DEBUG calling distinct(sql)
INFO called distinct(sql)
DEBUG step 10: evaluate <bubbles.graph.CreateObjectNode object at 0x1059ec210>
DEBUG append_from: composing into report
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment