Skip to content

Instantly share code, notes, and snippets.

@ak--47
Last active March 8, 2023 19:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ak--47/5bca50153ee4acbe33eb7c9f1008081d to your computer and use it in GitHub Desktop.
Save ak--47/5bca50153ee4acbe33eb7c9f1008081d to your computer and use it in GitHub Desktop.
☁️ BigQuery → Mixpanel

BigQuery → Mixpanel

rETL from scratch with

you will need the following roles in GCP:

BigQuery Admin Cloud Function Developer Pub/Sub Editor Cloud Scheduler Admin

Your First Cloud Pipeline

  • Create a new Mixpanel project, bigQuery-Test

go to the project settings and make note of your project id and api secret

(or create your own: npx carvis)

  • Go to BigQuery and create a new dataset; give it a clear name like mixpanel_test:



  • Navigate to your dataset and create a new table; give the table a clear name (first-import) and upload the sample CSV file you downloaded...for schema choose "auto-detect"



  • Once the table has loaded, confirm you can query your table:



  • Next, in GCP navigate to Cloud Functions and click create function



  • For your first cloud function, give it a clear name (cf-mixpanel-test), and use the following settings:




(note: these settings can always be changed later)

  • Press Next to go to the code editor... within the code editor you will make changes to two files:

change package.json to:

{
  "type": "module",
  "dependencies": {
    "@google-cloud/functions-framework": "^3.0.0",
    "ak-tools": "^1.0.32",
    "dwh-mixpanel": "1.0.61"
  }
}

change index.js to:

// DEPS
import functions from '@google-cloud/functions-framework';
import dwhImport from 'dwh-mixpanel';
import { cLog } from 'ak-tools';

// TRIGGER
functions.http('helloWorld', (req, res) => {
	main().then(() => {
		res.send(`DONE!\n`);
	});

});

// CONFIG
// note - you will need to modify your sql query and your mixpanel project details
const config = {
	"dwh": "bigquery",
	"auth": {},
	"sql": "SELECT * FROM `ak-internal-tool-1613096051700.mixpanel_test.first-import`",
	"mappings": {
		"event_name_col": "action",
		"time_col": "timestamp",
		"distinct_id_col": "uuid",
		"insert_id_col": "row_id"
	},
	"options": {
		"strict": false,
		"compress": true,
		"workers": 20,
		"verbose": false
	},
	"mixpanel": {
		"type": "event",
		"project_id": "YOUR-MIXPANEL-PROJECT-ID",
		"region": "US",
		"api_secret": "YOUR-MIXPANEL-API-SECRET"
	},
	"tags": {
		"dwh": "bigquery",
		"dataset": "ak sample data"
	}
};


// MAIN
async function main() {
	try {
		const data = await dwhImport(config);
		cLog(data, "mixpanel sync complete!", "NOTICE");
		return data;
	}

	catch(e) {
		cLog(e, 'mixpanel sync fail!', 'CRITICAL')
	}
}

IMPORTANT on line 19 you will need to changes your SQL query to include your GCP project, dataset, and table name... on lines 34 + 36 you will need to put in your mixpanel project id and API secret

  • press deploy and wait a little while

  • once the deploy is complete, head over to the testing tab and click the "test the function" button




when it says DONE ... that means your sync job is finished! you can now see the sample data in your mixpanel project!


if you click view logs, you can find the mixpanel sync complete message which has details about the job:


🥳 you now have implemented cloud ingestion with BigQuery sending data to Mixpanel in a serverless cloud pipeline!

Your Second Cloud Pipeline

Your next step is to swap out the sample data for something more meaningful.

To do this, go back to your cloud function, and choose edit:



now modify the code which contains the SQL query on line 19:

"sql": "SELECT * FROM `ak-internal-tool-1613096051700.mixpanel_test.first-import`",

You'll also need to modify the mappings object on Line 20 to reflect the column headers of your SQL query:

"mappings": {
		"event_name_col": "action",
		"time_col": "timestamp",
		"distinct_id_col": "uuid",
		"insert_id_col": "row_id"
	}

for more guidance on your SQL query + mappings configuration consult the docs

after each code change, you will redeploy your function.

as with before, you can use the test the function button to test your sync and verify the data in mixpanel!

Automated Pipeline

once you have a manual pipeline that you are happy with, it's time to automate it so that "syncs" to Mixpanel occur within your desired data update frequency.

our data stack is:

Cloud Scheduler → Pub Sub → Cloud Function → Mixpanel

the first step is to create a Pub Sub topic which will trigger our job... so:

  • navigate to Pub Sub in GCP:


  • click create topic


  • give your topic a clear name (test-mixpanel-sync)


  • now, head back to our original cloud function and copy (clone) it


  • we'll give our new cloud function a clear name (cf-mixpanel-sync) and instead of the HTTP trigger, we'll adjust the trigger type to be Cloud Pub/Sub and point it at the topic we just created (test-mixpanel-sync)


  • we'll click save and next and see that all of our prior code is duplicated. at this point, it would be wise to modify our SQL query so that it only grabs new rows in our dataset.

there are many ways to do this, here is a naive approach that relies on the timestamp of the event being in the last hour:

SELECT 
  * 
FROM 
  `myProject.myDataset.myTable` 
WHERE --job will run every hour and only grab rows in the last hour
  TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), timestamp, HOUR) <= 1 

bigQuery timestamp functions

recall that Mixpanel does de-duplication based on $insert_id ... provided you can include an $insert_id in your data, a small overlap in the pipeline's schedule will not create duplicate events.

  • once you've adjusted your query, deploy your new cloud function! (and wait a little while)


  • finally, head over to cloud scheduler and click "create job"



  • cloud scheduler lets you send pub sub messages to a particular topic at a particular interval... so we'll setup our schedule to trigger our test-mixpanel-sync topic every hour!



the format of the job scheduler uses a CRON like syntax; cron guru can help you build a cron string

for example 1 * * * * means once per hour

  • once you've created your scheduled job, "force run" it to test the pipeline:


  • if you check the logs of your new cloud function, you should see the same complete message:


(and of course, you'll have new data in mixpanel... fully automated!)

now take a 🍺 or a ☕️ break ... you've just done some cutting edge data engineering!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment