Skip to content

Instantly share code, notes, and snippets.

@rjurney
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rjurney/fd5c0110fe7eb686afc9 to your computer and use it in GitHub Desktop.
Save rjurney/fd5c0110fe7eb686afc9 to your computer and use it in GitHub Desktop.
What is wrong with this PySpark JOIN?
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext("Test App")
lines = sc.textFile("devices.tsv")
devices = lines.map(lambda x: x.split("\t"))
dh = sc.textFile("devices.header").collect()[0].split("\t")
devices = devices.map(lambda x: {'deviceid': x[dh.index('id')], 'foo': x[dh.index('foo')], 'bar': x[dh.index('bar')]})
# Try a bigger dataset and join
th = sc.textFile("transactions.header").collect()[0].split("\t")
tx_lines = sc.textFile("transactions.tsv")
transactions = tx_lines.map(lambda x: x.split("\t"))
bytes_in_out = transactions.map(lambda x: {'deviceid': x[th.index('deviceid')],
'foo': x[th.index('foo')],
'bar': x[th.index('bar')]})
joined = bytes_in_out.join(devices, 10)
j = joined.take(1) # fails
joined2 = devices.join(bytes_in_out, 10)
j2 = joined2.take(1) # fails
# Try again with a tuple - still fails
devices = devices.map(lambda x: (x[dh.index('id')], {'deviceid': x[dh.index('id')], 'manufacturer': x[dh.index('manufacturer')], 'model': x[dh.index('model')]}))
bytes_in_out = transactions.map(lambda x: (x[th.index('deviceid')], {'deviceid': x[th.index('deviceid')],
'foo': x[th.index('foo')],
'bar': x[th.index('bar')],
'hello': x[th.index('hello')],
'world': x[th.index('world')]}))
j3 = bytes_in_out.join(devices, 10)
j3.take(1)
j4 = devices.join(bytes_int_out, 10)
j4.take(1)
# All joins fail. Whats up?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment