Skip to content

Instantly share code, notes, and snippets.

@maasg
Created January 20, 2017 12:41
Show Gist options
  • Save maasg/824e60cc522deada0986169dae733549 to your computer and use it in GitHub Desktop.
Save maasg/824e60cc522deada0986169dae733549 to your computer and use it in GitHub Desktop.
Calculate the count of unique matching elements between two dataframes
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": "df-filecount",
"user_save_timestamp": "1970-01-01T01:00:00.000Z",
"auto_save_timestamp": "1970-01-01T01:00:00.000Z",
"language_info": {
"name": "scala",
"file_extension": "scala",
"codemirror_mode": "text/x-scala"
},
"trusted": true,
"customLocalRepo": null,
"customRepos": null,
"customDeps": null,
"customImports": null,
"customArgs": null,
"customSparkConf": null
},
"cells": [
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "9C54FEFAE4C5481997D37ECCF3112C8B"
},
"cell_type": "code",
"source": "val data1 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.8).map(i => (s\"file$i\", i, \"rubbish\"))\nval data2 = (1 to 100000).filter(_ => scala.util.Random.nextDouble<0.7).map(i => (s\"file$i\", i, \"crap\"))",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "data1: scala.collection.immutable.IndexedSeq[(String, Int, String)] = Vector((file1,1,rubbish), (file3,3,rubbish), (file5,5,rubbish), (file6,6,rubbish), (file7,7,rubbish), (file8,8,rubbish), (file9,9,rubbish), (file10,10,rubbish), (file12,12,rubbish), (file13,13,rubbish), (file14,14,rubbish), (file16,16,rubbish), (file17,17,rubbish), (file19,19,rubbish), (file21,21,rubbish), (file22,22,rubbish), (file23,23,rubbish), (file24,24,rubbish), (file25,25,rubbish), (file27,27,rubbish), (file28,28,rubbish), (file30,30,rubbish), (file31,31,rubbish), (file33,33,rubbish), (file34,34,rubbish), (file36,36,rubbish), (file37,37,rubbish), (file39,39,rubbish), (file40,40,rubbish), (file41,41,rubbish), (file42,42,rubbish), (file43,43,rubbish), (file44,44,rubbish), (file45,45,rubbish), (file46,46,rubbish),..."
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 14,
"time": "Took: 648 milliseconds, at 2017-1-20 12:55"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "DAF7498680A342A19D17E5A17D927710"
},
"cell_type": "code",
"source": "val df1 = sparkSession.createDataFrame(data1).toDF(\"filename\", \"index\", \"data\")\nval df2 = sparkSession.createDataFrame(data2).toDF(\"filename\", \"index\", \"data\")",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "df1: org.apache.spark.sql.DataFrame = [filename: string, index: int ... 1 more field]\ndf2: org.apache.spark.sql.DataFrame = [filename: string, index: int ... 1 more field]\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 16,
"time": "Took: 2 seconds 360 milliseconds, at 2017-1-20 12:56"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "907FB8AAB75F43E18B05282640ED20F3"
},
"cell_type": "code",
"source": "val df1Filenames = df1.select(\"filename\").withColumn(\"df\", lit(\"df1\")).distinct\nval df2Filenames = df2.select(\"filename\").withColumn(\"df\", lit(\"df2\")).distinct",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "df1Filenames: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [filename: string, df: string]\ndf2Filenames: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [filename: string, df: string]\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 30,
"time": "Took: 1 second 576 milliseconds, at 2017-1-20 13:4"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "5625A56367504A9B951A04C59A271086"
},
"cell_type": "code",
"source": "val union = df1Filenames.union(df2Filenames).toDF(\"filename\",\"source\")",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "union: org.apache.spark.sql.DataFrame = [filename: string, source: string]\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 22,
"time": "Took: 873 milliseconds, at 2017-1-20 12:58"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "64C4AD22507A48D8AC134DBCBB13577B"
},
"cell_type": "code",
"source": "val occurrenceCount = union.groupBy(\"filename\").count",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "occurrenceCount: org.apache.spark.sql.DataFrame = [filename: string, count: bigint]\n"
},
{
"metadata": {},
"data": {
"text/html": ""
},
"output_type": "execute_result",
"execution_count": 29,
"time": "Took: 705 milliseconds, at 2017-1-20 13:3"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": false,
"id": "D4C46FF1DE5240F2836347F5EB145DAF"
},
"cell_type": "code",
"source": "occurrenceCount.filter($\"count\"===2).count",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "res31: Long = 55844\n"
},
{
"metadata": {},
"data": {
"text/html": "55844"
},
"output_type": "execute_result",
"execution_count": 27,
"time": "Took: 13 seconds 124 milliseconds, at 2017-1-20 13:1"
}
]
},
{
"metadata": {
"trusted": true,
"input_collapsed": false,
"collapsed": true,
"id": "6419803B9A014DE0B2DCAD36C2EB0AB3"
},
"cell_type": "code",
"source": "",
"outputs": []
}
],
"nbformat": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment