Last active
September 27, 2016 22:38
-
-
Save bneil/4b0ff1db9c268d4fd31f71b966fe8771 to your computer and use it in GitHub Desktop.
Issues figuring out how to integrate streaming to s3 via fs2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package services.processing | |
import java.nio.file.{Paths, StandardOpenOption} | |
import akka.actor.Actor | |
import fs2._ | |
import jp.co.bizreach.s3scala.S3 | |
import services.consumer.EventRecord | |
import utils.Logging | |
import awscala.s3._ | |
import awscala.Region | |
import com.amazonaws.services.s3.model.ObjectMetadata | |
import fs2.util.Async | |
import scala.language.higherKinds | |
trait SwitchboardActor[E <: EventRecord] extends Logging { | |
actor: Actor => | |
case class EmptyEventRecord() extends EventRecord | |
implicit val strategy = Strategy.fromFixedDaemonPool(8) | |
implicit val region = Region.US_WEST_2 | |
implicit val s3 = S3(accessKeyId = "test", secretAccessKey = "test") | |
val bucket: Bucket = s3.createBucket("save-bucket") | |
val diskTopic = async.topic[Task, EventRecord](EmptyEventRecord()).unsafeRun() | |
val s3topic = async.topic[Task, EventRecord](EmptyEventRecord()).unsafeRun() | |
def handleSwitchboardRecord(record: E): Unit = { | |
SwitchboardHandler.switchboard.intStatus match { | |
case 1 => | |
log.info("dropping the records (wip)") | |
case 2 => | |
log.info("processing to an api (wip)") | |
case 3 => | |
log.info("processing to disk") | |
Stream(record).through(diskTopic.publish) | |
case 4 => | |
log.info("processing to s3") | |
Stream(record).through(s3topic.publish) | |
case _ => | |
log.warn("was told to process records but no target provided") | |
} | |
} | |
//not sure if this is safe | |
val diskSubscriber = | |
diskTopic | |
.subscribe(100) | |
.take(10) | |
.map(_.toString) | |
.through(text.utf8Encode) | |
.through(compress.deflate(nowrap = true)) | |
.through(io.file.writeAll(Paths.get("/tmp/marshalled.gz"), Seq(StandardOpenOption.APPEND, StandardOpenOption.CREATE))) | |
.run | |
.unsafeRun() | |
//currently not working | |
val s3subscriber = | |
s3topic | |
.subscribe(100).take(10) | |
.map(_.toString) | |
.through(text.utf8Encode) | |
.through(compress.deflate(nowrap = true)) | |
.run | |
.unsafeRun() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment