Created
June 24, 2019 15:01
-
-
Save muhang/5527af74582b206625657fff31ccce9e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
PIPELINE STEPS | |
- read most recent product feed data from GCS (stream A) | |
- read second most recent product feed data from GCS IF IT EXISTS (stream B) | |
- if only stream A | |
- write to new stream with the price drop data -> write to Bigtable | |
- write to new stream with product item data -> write to cloudSQL | |
- flatten stream A and stream B -> stream C | |
- stream C -> groupby SKU | |
- for each item, get diff and: | |
PLATFORMX | |
- write to new stream (stream D) if the item has changed -> write to cloud SQL | |
- write to new stream (stream E) with the price drop (product change) data -> write to BigTable | |
- KEY: timestamp:website_id | |
TTL: 48h | |
COLUMN FAMILY: price_drop | |
COLUMNS: sku,itemgroupid | |
BEHAVIORAL | |
- write to new stream w/ diff -> write to BigQuery | |
- query BigQuery for everyone who has seen that product in the last 180 days (get query from David) | |
- for each BQ result, emit a Trigger Event (product change event) | |
- scheduled to send email at the same time as the original view event (i.e. item was viewed at 15:00 Monday, this result was fetched at 17:00 Tuesday, the email will send at 15:00 Wednesday) | |
- for each item that only has one item in the groupby, it's either new or out of stock | |
- it's new if it's in the current feed but NOT old feed | |
- it's out of stock if it's in the old feed but NOT new feed | |
IMAGE PIPELINE STEPS | |
- final class ProductImages (so it's readonly) | |
- sku | |
- filePath (where it's saved to local) | |
- hash (md5) | |
- request content type (from header) | |
- read the product from avro (same source as productfeed pipeline) | |
- just getting the imageurl | |
- open http request | |
- get content type | |
- open stream | |
- goes through md5 stream | |
- saves to disk | |
- idea here is to stream the output from http, and pass the bytes (which are immutable) to an md5 hash stream and a local file write stream in parallel (this may or may not be possible) | |
- if this is too time-consuming, just write to disk then get the hash | |
- assuming there are no errors, return ProductImage containing Sku, md5, path on local disk, and content-type | |
- get sku from productimage and go to gcs and get the md5 of the object (GCS Object thing has md5 as part of its metadata) | |
- if the object exists and the md5 matches, then there's no change and we don't care (don't pass it further down the pipeline) | |
- if there is a change, get localfilepath from ProductImage and stream to GCS | |
- this can be a branch that runs async / in parallel, but we want to pass along the GCS path used | |
- as image is streaming to GCS, transform record into KV<Sku, GCSPath> (get sku from ProductImage, and GCSPath from the previous operation's invoker) | |
- return PCollection containing these KV<SKU, GCSPath> | |
NOTES | |
- both return the diff (this KV<SKU, GCSPath>) while simultaneously uploading the image to GCS (which needs a ProductImage) | |
DATA SCIENCE STEPS | |
- get pcollection from product feed pipeline (PCollection<Sku, ProductDescription>) | |
- get pcollection from image pipeline (PCollection<Sku, GCSPath>) | |
- cogroupbykey both pcollections on Sku | |
- for each Sku: ProductDescription, GCSPath | |
- write to a csv (one csv for the whole feed) | |
- COLUMNS: Sku, itemgroupid, description (if changed), gcspath (if image changed) | |
- write this csv to a object in gcs (/data in the feed bucket with the date in file) | |
NOTES | |
- we did not account for when the feed is updated hourly, does the data team want everything consolidated into one csv, or do they want all csvs generated that day | |
- i.e. do they want one csv for each day, or one csv for each feed upload/change | |
- if this happens, there are questoins: | |
- we need to keep a temp copy of data and update it on each feed, and upload it when everything | |
REQUIREMENTS | |
- This data science question (daily feeds or each update) | |
- I need to push forward an option with eligibility | |
- focus on behavioral and make it so the current eligibility will scale for a while | |
- do follow-up stuff w/ getting events from BQ (the stuff I indicated in my other notes) | |
- we have to move the final step (finding users and emitting events), and the normalization process (take clients version of the feed, translate it, and write the avro) to GCP | |
- the normalization is supposed to be for client engineers, but there's no way. we already have it in Go, it's just a matter of moving it | |
- are we keeping our focus on this or on pre-emptive stuff? | |
- we would need to clean up current el worker to handle this load | |
- we'd need to move portions of the data pipeline in aws (finding user matches) to gcp | |
- this would mean either a total or a minor rewrite | |
- we can't use a new eligibility worker to support behavioral | |
- there are way to many campaign variations; we would have to rebuild eligibility worker w/ feature parity, and there is nowhere near enough time for that | |
- don't focus on behavioral, and use a new eligibility worker | |
- here, we would do the original scope of platformx |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment