Skip to content

Instantly share code, notes, and snippets.

@halcat0x15a
Last active December 15, 2015 00:28
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save halcat0x15a/5172645 to your computer and use it in GitHub Desktop.
Twitter User Streams with scala-machines!
import scala.language.postfixOps
import java.io._
import scalaj.http._
import scalaz._, Scalaz._
import scalaz.effect._, Effect._, IO._
import scalaz.concurrent.Promise
import com.clarifi.machines._, Machine._
import argonaut._, Argonaut._
case class UserStream(reader: InputStreamReader) extends Procedure[IO, String] {
type K = Char => Any
val user = jObjectPL >>> jsonObjectPL("user") >>> jObjectPL
val screenName = user >>> jsonObjectPL("screen_name") >>> jStringPL
val text = jObjectPL >>> jsonObjectPL("text") >>> jStringPL
def status(json: Json) = for {
name <- screenName get json
text <- text get json
} yield s"$name: $text"
lazy val parse = (_: String).parse.fold(_ => None, status).orZero
def line(string: String): Plan[K, String, Unit] =
Plan.await[Char].flatMap(char =>
if (char == '\n')
Plan.emit(string)
else
line(string + char))
val machine = line("") outmap parse repeatedly
val driver =
new Driver[IO, Char => Any] {
val M = Monad[IO]
def apply(k: Char => Any) =
IO(IoExceptionOr(reader.read.toChar).toOption map k)
}
def withDriver[R](k: Driver[IO, K] => IO[R]) = k(driver)
def start = foreach(putStrLn).unsafePerformIO
}
object TwitterMachine extends SafeApp with FileSystem with OAuth {
def connect(token: Token) =
IO(http("https://userstream.twitter.com/1.1/user.json")
.oauth(consumer, token)
.process(identity))
override def runc =
for {
option <- file
token <- option.fold(oauth >>= write)(read)
connection <- connect(token)
_ <- using(IO(connection.getInputStream))(stream =>
using(IO(new InputStreamReader(stream)))(reader =>
IO(Promise(UserStream(reader).start)) >> readLn))
} yield ()
}
trait FileSystem extends Any {
def name = ".twitter-machine"
def using[A <: Closeable, B](io: IO[A])(f: A => IO[B]) =
io.bracket(r => IO(r.close))(f)
def file = IO {
val file = new File(name)
file.exists option file
}
def read(file: File) =
using(IO(new FileInputStream(file)))(stream =>
using(IO(new ObjectInputStream(stream)))(_.readObject match {
case token: Token => IO(token)
}))
def write(token: Token) =
using(IO(new FileOutputStream(name)))(stream =>
using(IO(new ObjectOutputStream(stream)))(stream =>
IO(stream.writeObject(token)) >| token))
}
trait OAuth extends Any {
def key = "RmKQnjIlq6MnBjMnBNb3bQ"
def secret = "ZQ9Jx2y4SIvw0axtPZ2MaFYXJOLgmX1DArkWThM7D4E"
def consumer = Token(key, secret)
def http(url: String) =
Http(url)
.option(HttpOptions.connTimeout(1000))
.option(HttpOptions.readTimeout(Int.MaxValue))
def request =
IO(http("https://api.twitter.com/oauth/request_token")
.param("oauth_callback", "")
.oauth(consumer)
.asToken)
def authorize(token: String) =
putStrLn(s"https://api.twitter.com/oauth/authorize?oauth_token=$token") >>
putStr("Enter verifier: ") >>
readLn
def access(token: Token, verifier: String) =
IO(http("https://api.twitter.com/oauth/access_token")
.oauth(consumer, token, verifier).asToken)
def oauth =
for {
token <- request
verifier <- authorize(token.key)
token <- access(token, verifier)
} yield token
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment