rETL from scratch with
you will need the following roles in GCP:
BigQuery Admin
Cloud Function Developer
Pub/Sub Editor
Cloud Scheduler Admin
- Create a new Mixpanel project,
bigQuery-Test
go to the project settings and make note of your project id and api secret
- Download the sample data set https://aktunes.neocities.org/dwh-mixpanel/sampleData.csv
(or create your own: npx carvis
)
- Go to BigQuery and create a new dataset; give it a clear name like
mixpanel_test
:
- Navigate to your dataset and create a new table; give the table a clear name (
first-import
) and upload the sample CSV file you downloaded...forschema
choose "auto-detect"
- Once the table has loaded, confirm you can query your table:
- Next, in GCP navigate to Cloud Functions and click create function
- For your first cloud function, give it a clear name (
cf-mixpanel-test
), and use the following settings:
(note: these settings can always be changed later)
- Press
Next
to go to the code editor... within the code editor you will make changes to two files:
change package.json
to:
{
"type": "module",
"dependencies": {
"@google-cloud/functions-framework": "^3.0.0",
"ak-tools": "^1.0.32",
"dwh-mixpanel": "1.0.61"
}
}
change index.js
to:
// DEPS
import functions from '@google-cloud/functions-framework';
import dwhImport from 'dwh-mixpanel';
import { cLog } from 'ak-tools';
// TRIGGER
functions.http('helloWorld', (req, res) => {
main().then(() => {
res.send(`DONE!\n`);
});
});
// CONFIG
// note - you will need to modify your sql query and your mixpanel project details
const config = {
"dwh": "bigquery",
"auth": {},
"sql": "SELECT * FROM `ak-internal-tool-1613096051700.mixpanel_test.first-import`",
"mappings": {
"event_name_col": "action",
"time_col": "timestamp",
"distinct_id_col": "uuid",
"insert_id_col": "row_id"
},
"options": {
"strict": false,
"compress": true,
"workers": 20,
"verbose": false
},
"mixpanel": {
"type": "event",
"project_id": "YOUR-MIXPANEL-PROJECT-ID",
"region": "US",
"api_secret": "YOUR-MIXPANEL-API-SECRET"
},
"tags": {
"dwh": "bigquery",
"dataset": "ak sample data"
}
};
// MAIN
async function main() {
try {
const data = await dwhImport(config);
cLog(data, "mixpanel sync complete!", "NOTICE");
return data;
}
catch(e) {
cLog(e, 'mixpanel sync fail!', 'CRITICAL')
}
}
IMPORTANT on line 19 you will need to changes your SQL query to include your GCP project, dataset, and table name... on lines 34 + 36 you will need to put in your mixpanel project id and API secret
- press deploy and wait a little while
- once the deploy is complete, head over to the
testing
tab and click the "test the function" button
when it says DONE
... that means your sync job is finished! you can now see the sample data in your mixpanel project!
if you click view logs
, you can find the mixpanel sync complete
message which has details about the job:
🥳 you now have implemented cloud ingestion with BigQuery sending data to Mixpanel in a serverless cloud pipeline!
Your next step is to swap out the sample data for something more meaningful.
To do this, go back to your cloud function, and choose edit:
now modify the code which contains the SQL query on line 19:
"sql": "SELECT * FROM `ak-internal-tool-1613096051700.mixpanel_test.first-import`",
You'll also need to modify the mappings
object on Line 20 to reflect the column headers of your SQL query:
"mappings": {
"event_name_col": "action",
"time_col": "timestamp",
"distinct_id_col": "uuid",
"insert_id_col": "row_id"
}
for more guidance on your SQL query + mappings configuration consult the docs
after each code change, you will redeploy your function.
as with before, you can use the test the function
button to test your sync and verify the data in mixpanel!
once you have a manual pipeline that you are happy with, it's time to automate it so that "syncs" to Mixpanel occur within your desired data update frequency.
our data stack is:
Cloud Scheduler → Pub Sub → Cloud Function → Mixpanel
the first step is to create a Pub Sub topic which will trigger our job... so:
- navigate to Pub Sub in GCP:
- click
create topic
- give your topic a clear name (
test-mixpanel-sync
)
- now, head back to our original cloud function and copy (clone) it
- we'll give our new cloud function a clear name (
cf-mixpanel-sync
) and instead of theHTTP
trigger, we'll adjust the trigger type to be Cloud Pub/Sub and point it at the topic we just created (test-mixpanel-sync
)
- we'll click
save
andnext
and see that all of our prior code is duplicated. at this point, it would be wise to modify our SQL query so that it only grabs new rows in our dataset.
there are many ways to do this, here is a naive approach that relies on the timestamp of the event being in the last hour:
SELECT
*
FROM
`myProject.myDataset.myTable`
WHERE --job will run every hour and only grab rows in the last hour
TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), timestamp, HOUR) <= 1
recall that Mixpanel does de-duplication based on $insert_id
... provided you can include an $insert_id
in your data, a small overlap in the pipeline's schedule will not create duplicate events.
- once you've adjusted your query, deploy your new cloud function! (and wait a little while)
- finally, head over to cloud scheduler and click "create job"
- cloud scheduler lets you send pub sub messages to a particular topic at a particular interval... so we'll setup our schedule to trigger our
test-mixpanel-sync
topic every hour!
the format of the job scheduler uses a CRON
like syntax; cron guru can help you build a cron string
for example 1 * * * *
means once per hour
- once you've created your scheduled job, "force run" it to test the pipeline:
- if you check the logs of your new cloud function, you should see the same complete message:
(and of course, you'll have new data in mixpanel... fully automated!)
now take a 🍺 or a ☕️ break ... you've just done some cutting edge data engineering!