-
-
Save mironal/22acb387b24b3250ba07 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.ScheduledExecutorService | |
import scala.collection.JavaConversions._ | |
import org.joda.time.DateTime | |
import twitter4j._ | |
import java.util.concurrent.Executors | |
import java.util.concurrent.TimeUnit | |
import scalax.file.Path | |
import scala.util.control.Exception._ | |
object FilterFeather { | |
def main(args: Array[String]): Unit = { | |
implicit val codec = scalax.io.Codec.UTF8 | |
// 前回終了時に保存した 最新の status_id を復元する | |
val latestStatusIdFilepath = Path("latest_status_id.archive") | |
var latestStatusId: Option[Long] = if ( latestStatusIdFilepath.isFile ) { | |
println(s"${latestStatusIdFilepath} found") | |
val line = latestStatusIdFilepath.lines().headOption | |
line match { | |
case Some(str) => allCatch opt str.toLong | |
case None => None | |
} | |
} else { | |
None | |
} | |
println(s"latest status id: ${latestStatusId}") | |
// 15分区切りで実行出来るように待機時間を計算 | |
val now = DateTime.now | |
val initialDelay = Math.abs(now.minuteOfHour().get % 15 - 15) | |
println(s"Start-up time: ${now}") | |
println(s"Initial delay: ${initialDelay} [min]") | |
// 収集開始 | |
val service = Executors.newSingleThreadScheduledExecutor | |
service.scheduleAtFixedRate(new Runnable { | |
def run() { | |
val latestStatus = search(latestStatusId) | |
latestStatusId = latestStatus match { | |
case Some(status) => | |
latestStatusIdFilepath.write(s"${status.getId()}") // 最新の status_id を保存しておく | |
Some(status.getId()) | |
case None => latestStatusId | |
} | |
} | |
}, initialDelay, 15, TimeUnit.MINUTES) | |
service.awaitTermination(1, TimeUnit.DAYS) | |
// 検索して保存する (検索で1件以上帰ってきた場合は 最新の Status が返ってくる | |
def search(sinceId: Option[Long]): Option[Status] = { | |
val twitter = new TwitterFactory().getInstance | |
val query = new Query("hogehoge") | |
query.setCount(100) | |
for (id <- sinceId) { | |
query.setSinceId(id) | |
} | |
// 最初のリクエストを出す直前の時刻を保存 | |
val now = DateTime.now | |
println(s"${"-" * 10} ${now} ${"-" * 10}") | |
// 1個目 | |
val result = twitter.search(query) | |
def searchNext(rslt: QueryResult): List[Status] = { | |
Option(rslt.nextQuery()) match { | |
case None => rslt.getTweets().toList | |
case Some(next) => rslt.getTweets().toList ::: searchNext(twitter.search(next)) | |
} | |
} | |
// 取れるだけ取る | |
val tweets = searchNext(result) | |
tweets.foreach(status => println(s"status_id: ${status.getId()}, screen_name: ${status.getUser().getScreenName()}, text: ${status.getText()}")) | |
val users =tweets | |
.map(status => status.getUser) | |
.toSet[User] // uniqにしてからListにする | |
.toList | |
// ユーザ数を保存 | |
val userCount = s"timestamp:${now}\tuser_count:${users.size}\tlatest_status_id:${latestStatusId.getOrElse(-1L)}" | |
Path("user_count.ltsv").append(userCount + "\n") | |
println(userCount) | |
// ユーザの一覧を保存 | |
val userList = users.zipWithIndex.map(t => s"${t._2}:${t._1.getId()},${t._1.getScreenName()}").mkString("\t") | |
Path("user_list.ltsv").append(userList + "\n") | |
println(userList) | |
tweets.headOption | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment