Skip to content

Instantly share code, notes, and snippets.

@bneil
Last active September 27, 2016 22:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bneil/4b0ff1db9c268d4fd31f71b966fe8771 to your computer and use it in GitHub Desktop.
Save bneil/4b0ff1db9c268d4fd31f71b966fe8771 to your computer and use it in GitHub Desktop.
Issues figuring out how to integrate streaming to s3 via fs2
package services.processing
import java.nio.file.{Paths, StandardOpenOption}
import akka.actor.Actor
import fs2._
import jp.co.bizreach.s3scala.S3
import services.consumer.EventRecord
import utils.Logging
import awscala.s3._
import awscala.Region
import com.amazonaws.services.s3.model.ObjectMetadata
import fs2.util.Async
import scala.language.higherKinds
trait SwitchboardActor[E <: EventRecord] extends Logging {
actor: Actor =>
case class EmptyEventRecord() extends EventRecord
implicit val strategy = Strategy.fromFixedDaemonPool(8)
implicit val region = Region.US_WEST_2
implicit val s3 = S3(accessKeyId = "test", secretAccessKey = "test")
val bucket: Bucket = s3.createBucket("save-bucket")
val diskTopic = async.topic[Task, EventRecord](EmptyEventRecord()).unsafeRun()
val s3topic = async.topic[Task, EventRecord](EmptyEventRecord()).unsafeRun()
def handleSwitchboardRecord(record: E): Unit = {
SwitchboardHandler.switchboard.intStatus match {
case 1 =>
log.info("dropping the records (wip)")
case 2 =>
log.info("processing to an api (wip)")
case 3 =>
log.info("processing to disk")
Stream(record).through(diskTopic.publish)
case 4 =>
log.info("processing to s3")
Stream(record).through(s3topic.publish)
case _ =>
log.warn("was told to process records but no target provided")
}
}
//not sure if this is safe
val diskSubscriber =
diskTopic
.subscribe(100)
.take(10)
.map(_.toString)
.through(text.utf8Encode)
.through(compress.deflate(nowrap = true))
.through(io.file.writeAll(Paths.get("/tmp/marshalled.gz"), Seq(StandardOpenOption.APPEND, StandardOpenOption.CREATE)))
.run
.unsafeRun()
//currently not working
val s3subscriber =
s3topic
.subscribe(100).take(10)
.map(_.toString)
.through(text.utf8Encode)
.through(compress.deflate(nowrap = true))
.run
.unsafeRun()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment