Skip to content

Instantly share code, notes, and snippets.

@soulmachine
Last active September 15, 2017 12:52
Show Gist options
  • Save soulmachine/9226666 to your computer and use it in GitHub Desktop.
Save soulmachine/9226666 to your computer and use it in GitHub Desktop.
使用 Spark 计算新浪微博的互相关注的关系。输入为一个文本文件,每行的格式为 userid1, userid2, userid3,...,useridN, 表示用户userid1关注了userid2, userid3,...,useridN。输出也是一个文本文件,每行格式为 userid1, userid2,表示这两个用户互相关注了。
package com.yanjiuyanjiu.weibo
import org.apache.spark.SparkContext._
import org.apache.spark.{SparkContext, Logging}
/**
* 查找互相关注的关系。
*
* 输入为一个文本文件,每行的格式为 userId1, userId2, userId3,..., userIdN,表示 userId1 关注了 userId2, userId3, ..., userIdN
* 输出为一个文本文件,每行格式为 userId1,userId2,表示这两个用户互相关注了。
*
* 算法:把每一行变成一个 ((userIdA,userIdB), 1L)的KV序列,userId小的放在前边,key 是 (userIdA,userIdB)。然后统计个数,
* 如果key对应的值大于或等于2,说明是是互相关注。
*
*
*/
object Mutual extends Logging {
val SEPERATOR="\t"
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println("Usage: Mutual <master> <input> <output>")
System.exit(-1)
}
val start = System.currentTimeMillis()
val sc = new SparkContext(args(0), "Mutual", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val lines = sc.textFile(args(1))
val pairs = lines.flatMap { line =>
val (Array(user), followings) = line.split(SEPERATOR).map(_.toLong).splitAt(1)
followings.map ((following: Long) => (user.min(following), user.max(following)) -> 1)
}
// 出现两次以上,说明是双向关注
val result = pairs.reduceByKey(_ + _).filter(_._2 > 1)
result.keys.map(_.productIterator.mkString(",")).saveAsTextFile(args(2))
val end = System.currentTimeMillis()
logInfo("Time elapsed: " + (end-start) + "ms")
sc.stop()
System.exit(0)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment