Skip to content

Instantly share code, notes, and snippets.

@CamDavidsonPilon
Last active June 29, 2017 13:59
Show Gist options
  • Save CamDavidsonPilon/56643b91cddf8609b1e355c8c9299040 to your computer and use it in GitHub Desktop.
Save CamDavidsonPilon/56643b91cddf8609b1e355c8c9299040 to your computer and use it in GitHub Desktop.
import py4j
from pyspark.sql.functions import monotonically_increasing_id
# very important to cache this.
df = df.select(monotonically_increasing_id().alias("index"), "*")\
.cache()
MAX = 34359738368
def mod_binary_search(round, previous_winner, dataset):
# round starts at 0, previous_winner starts at 0
if 2**round >= MAX:
return previous_winner
mod = 2 ** (round + 1)
if test(dataset, previous_winner, mod):
return mod_binary_search(round+1, previous_winner, dataset)
else:
return mod_binary_search(round+1, previous_winner + 2**round, dataset)
def test(dataset, check_for, mod):
pruned_dataset = dataset.where(dataset["index"] % mod == check_for)
try:
result = Job().apply(pruned_dataset).count()
return False
except py4j.protocol.Py4JJavaError as e:
return True
sol = mod_binary_search(0, 0, df)
# print offending row
print df.where(dataset["index"] == sol)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment