Skip to content

Instantly share code, notes, and snippets.

@muhang
Created June 24, 2019 15:01
Show Gist options
  • Save muhang/5527af74582b206625657fff31ccce9e to your computer and use it in GitHub Desktop.
Save muhang/5527af74582b206625657fff31ccce9e to your computer and use it in GitHub Desktop.
PIPELINE STEPS
- read most recent product feed data from GCS (stream A)
- read second most recent product feed data from GCS IF IT EXISTS (stream B)
- if only stream A
- write to new stream with the price drop data -> write to Bigtable
- write to new stream with product item data -> write to cloudSQL
- flatten stream A and stream B -> stream C
- stream C -> groupby SKU
- for each item, get diff and:
PLATFORMX
- write to new stream (stream D) if the item has changed -> write to cloud SQL
- write to new stream (stream E) with the price drop (product change) data -> write to BigTable
- KEY: timestamp:website_id
TTL: 48h
COLUMN FAMILY: price_drop
COLUMNS: sku,itemgroupid
BEHAVIORAL
- write to new stream w/ diff -> write to BigQuery
- query BigQuery for everyone who has seen that product in the last 180 days (get query from David)
- for each BQ result, emit a Trigger Event (product change event)
- scheduled to send email at the same time as the original view event (i.e. item was viewed at 15:00 Monday, this result was fetched at 17:00 Tuesday, the email will send at 15:00 Wednesday)
- for each item that only has one item in the groupby, it's either new or out of stock
- it's new if it's in the current feed but NOT old feed
- it's out of stock if it's in the old feed but NOT new feed
IMAGE PIPELINE STEPS
- final class ProductImages (so it's readonly)
- sku
- filePath (where it's saved to local)
- hash (md5)
- request content type (from header)
- read the product from avro (same source as productfeed pipeline)
- just getting the imageurl
- open http request
- get content type
- open stream
- goes through md5 stream
- saves to disk
- idea here is to stream the output from http, and pass the bytes (which are immutable) to an md5 hash stream and a local file write stream in parallel (this may or may not be possible)
- if this is too time-consuming, just write to disk then get the hash
- assuming there are no errors, return ProductImage containing Sku, md5, path on local disk, and content-type
- get sku from productimage and go to gcs and get the md5 of the object (GCS Object thing has md5 as part of its metadata)
- if the object exists and the md5 matches, then there's no change and we don't care (don't pass it further down the pipeline)
- if there is a change, get localfilepath from ProductImage and stream to GCS
- this can be a branch that runs async / in parallel, but we want to pass along the GCS path used
- as image is streaming to GCS, transform record into KV<Sku, GCSPath> (get sku from ProductImage, and GCSPath from the previous operation's invoker)
- return PCollection containing these KV<SKU, GCSPath>
NOTES
- both return the diff (this KV<SKU, GCSPath>) while simultaneously uploading the image to GCS (which needs a ProductImage)
DATA SCIENCE STEPS
- get pcollection from product feed pipeline (PCollection<Sku, ProductDescription>)
- get pcollection from image pipeline (PCollection<Sku, GCSPath>)
- cogroupbykey both pcollections on Sku
- for each Sku: ProductDescription, GCSPath
- write to a csv (one csv for the whole feed)
- COLUMNS: Sku, itemgroupid, description (if changed), gcspath (if image changed)
- write this csv to a object in gcs (/data in the feed bucket with the date in file)
NOTES
- we did not account for when the feed is updated hourly, does the data team want everything consolidated into one csv, or do they want all csvs generated that day
- i.e. do they want one csv for each day, or one csv for each feed upload/change
- if this happens, there are questoins:
- we need to keep a temp copy of data and update it on each feed, and upload it when everything
REQUIREMENTS
- This data science question (daily feeds or each update)
- I need to push forward an option with eligibility
- focus on behavioral and make it so the current eligibility will scale for a while
- do follow-up stuff w/ getting events from BQ (the stuff I indicated in my other notes)
- we have to move the final step (finding users and emitting events), and the normalization process (take clients version of the feed, translate it, and write the avro) to GCP
- the normalization is supposed to be for client engineers, but there's no way. we already have it in Go, it's just a matter of moving it
- are we keeping our focus on this or on pre-emptive stuff?
- we would need to clean up current el worker to handle this load
- we'd need to move portions of the data pipeline in aws (finding user matches) to gcp
- this would mean either a total or a minor rewrite
- we can't use a new eligibility worker to support behavioral
- there are way to many campaign variations; we would have to rebuild eligibility worker w/ feature parity, and there is nowhere near enough time for that
- don't focus on behavioral, and use a new eligibility worker
- here, we would do the original scope of platformx
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment