Skip to content

Instantly share code, notes, and snippets.

@maratuska
Last active February 21, 2023 09:43
Show Gist options
  • Save maratuska/f044093b4cde4652c6d1a7c59a94d816 to your computer and use it in GitHub Desktop.
Save maratuska/f044093b4cde4652c6d1a7c59a94d816 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import pyspark"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"ss = pyspark.sql.SparkSession.builder.getOrCreate()\n",
"\n",
"login_activity_type_id = 0\n",
"logout_activity_type_id = 1\n",
"\n",
"users = [\n",
" (1, \"Doe, John\"), \n",
" (2, \"Deo, June\"), \n",
" (3, \"Dow, Johnes\"), \n",
" (4, \"Gilbert, Olivia\"),\n",
"]\n",
"\n",
"user_events = [\n",
" (1, login_activity_type_id, 1514764800000),\n",
" (2, login_activity_type_id, 1514808000000),\n",
" (1, logout_activity_type_id, 1514829600000),\n",
" (1, login_activity_type_id, 1514894400000),\n",
" (4, login_activity_type_id, 1596877800),\n",
" (4, logout_activity_type_id, 1594194643),\n",
" (4, login_activity_type_id, 1594192830),\n",
" (4, logout_activity_type_id, 1596877920),\n",
"]\n",
"\n",
"users_df = ss.sparkContext.parallelize(users).toDF(['user_id', 'user_name'])\n",
"user_events_df = ss.sparkContext.parallelize(user_events).toDF(['user_id', 'activity_id', 'timestamp'])\n",
"\n",
"users_df.createOrReplaceTempView('users')\n",
"user_events_df.createOrReplaceTempView('user_events')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+---------------+-------------+\n",
"|user_id| user_name| first_login|\n",
"+-------+---------------+-------------+\n",
"| 2| Deo, June|1514808000000|\n",
"| 1| Doe, John|1514764800000|\n",
"| 4|Gilbert, Olivia| 1594192830|\n",
"| 3| Dow, Johnes| null|\n",
"+-------+---------------+-------------+\n",
"\n"
]
}
],
"source": [
"result = ss.sql(f\"\"\"\n",
" SELECT \n",
" u.user_id,\n",
" u.user_name,\n",
" MIN(ue.timestamp) first_login\n",
" FROM user_events ue\n",
" RIGHT JOIN users u\n",
" ON u.user_id = ue.user_id\n",
" WHERE \n",
" ue.activity_id = {login_activity_type_id} OR ue.activity_id IS NULL\n",
" GROUP BY u.user_id, u.user_name\n",
" ORDER BY first_login DESC\n",
"\"\"\")\n",
"\n",
"result.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment