Skip to content

Instantly share code, notes, and snippets.

Last active December 15, 2015 00:28
Show Gist options
  • Save halcat0x15a/5172645 to your computer and use it in GitHub Desktop.
Save halcat0x15a/5172645 to your computer and use it in GitHub Desktop.
Twitter User Streams with scala-machines!
import scala.language.postfixOps
import scalaj.http._
import scalaz._, Scalaz._
import scalaz.effect._, Effect._, IO._
import scalaz.concurrent.Promise
import com.clarifi.machines._, Machine._
import argonaut._, Argonaut._
case class UserStream(reader: InputStreamReader) extends Procedure[IO, String] {
type K = Char => Any
val user = jObjectPL >>> jsonObjectPL("user") >>> jObjectPL
val screenName = user >>> jsonObjectPL("screen_name") >>> jStringPL
val text = jObjectPL >>> jsonObjectPL("text") >>> jStringPL
def status(json: Json) = for {
name <- screenName get json
text <- text get json
} yield s"$name: $text"
lazy val parse = (_: String).parse.fold(_ => None, status).orZero
def line(string: String): Plan[K, String, Unit] =
Plan.await[Char].flatMap(char =>
if (char == '\n')
line(string + char))
val machine = line("") outmap parse repeatedly
val driver =
new Driver[IO, Char => Any] {
val M = Monad[IO]
def apply(k: Char => Any) =
IO(IoExceptionOr( map k)
def withDriver[R](k: Driver[IO, K] => IO[R]) = k(driver)
def start = foreach(putStrLn).unsafePerformIO
object TwitterMachine extends SafeApp with FileSystem with OAuth {
def connect(token: Token) =
.oauth(consumer, token)
override def runc =
for {
option <- file
token <- option.fold(oauth >>= write)(read)
connection <- connect(token)
_ <- using(IO(connection.getInputStream))(stream =>
using(IO(new InputStreamReader(stream)))(reader =>
IO(Promise(UserStream(reader).start)) >> readLn))
} yield ()
trait FileSystem extends Any {
def name = ".twitter-machine"
def using[A <: Closeable, B](io: IO[A])(f: A => IO[B]) =
io.bracket(r => IO(r.close))(f)
def file = IO {
val file = new File(name)
file.exists option file
def read(file: File) =
using(IO(new FileInputStream(file)))(stream =>
using(IO(new ObjectInputStream(stream)))(_.readObject match {
case token: Token => IO(token)
def write(token: Token) =
using(IO(new FileOutputStream(name)))(stream =>
using(IO(new ObjectOutputStream(stream)))(stream =>
IO(stream.writeObject(token)) >| token))
trait OAuth extends Any {
def key = "RmKQnjIlq6MnBjMnBNb3bQ"
def secret = "ZQ9Jx2y4SIvw0axtPZ2MaFYXJOLgmX1DArkWThM7D4E"
def consumer = Token(key, secret)
def http(url: String) =
def request =
.param("oauth_callback", "")
def authorize(token: String) =
putStrLn(s"$token") >>
putStr("Enter verifier: ") >>
def access(token: Token, verifier: String) =
.oauth(consumer, token, verifier).asToken)
def oauth =
for {
token <- request
verifier <- authorize(token.key)
token <- access(token, verifier)
} yield token
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment