Skip to content

Instantly share code, notes, and snippets.

@vguerra
Last active June 18, 2019 15:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vguerra/c8041ea0f7a491266dcb1ceae990e5b3 to your computer and use it in GitHub Desktop.
Save vguerra/c8041ea0f7a491266dcb1ceae990e5b3 to your computer and use it in GitHub Desktop.

Ex 1

document = sc.textFile("hdfs:///user/user172/5000-8.txt")
wordCounts = document.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda left, right: left + right)
wordCounts.saveAsTextFile("hdfs:///user/user172/tp/wordcount")

Ex 2

pets = sc.parallelize([("cat", 1), ("dog", 1), ("cat", 2), ("dog", 3) ])
pets.groupByKey().map(lambda x: (x[0], 1.*sum(x[1])/len(x[1]))).collect()

Ex 3

visits = sc.parallelize( [("h", "1.2.3.4"), ("a", "3.4.5.6"), ("h", "1.3.3.1")] )
pageNames = sc.parallelize( [("h", "Home"), ("a", "About"), ("o", "Other")])
visits.cogroup(pageNames).map(lambda x: (x[0], ( list(x[1][0]), list(x[1][1]) ) ) ).collect()

Ex 4

g = sc.parallelize([(1,4), (4,2), (2,1), (3,2), (2,5), (5,3)])
reversed = g.map(lambda x: (x[1], x[0]))

countOutgoing = g.groupByKey().map(lambda x: (x[0], len(x[1])))
countIncomming = reversed.groupByKey().map(lambda x: (x[0], len(x[1])))

totalOutIn = countOutgoing.cogroup(countIncomming).map(lambda x: (x[0], ( sum(x[1][0]), sum(x[1][1]))))
numNodes = totalOutIn.count()

sinks = totalOutIn.filter(lambda x: x[1][0] == 0 and x[1][1] > 0).collect()
universalSinks = totalOutIn.filter(lambda x: x[1][0] == 0 and x[1][1] == numNodes).collect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment