Skip to content

Instantly share code, notes, and snippets.

@bszwej
Created August 17, 2017 15:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bszwej/32bab3b0978dc3396388e4c9673ddffb to your computer and use it in GitHub Desktop.
Save bszwej/32bab3b0978dc3396388e4c9673ddffb to your computer and use it in GitHub Desktop.
import akka.stream.scaladsl.{Compression, JsonFraming}
import akka.util.ByteString
import org.bson.Document
import akka.stream.scaladsl.{Sink, Source}
val maximumObjectLength = 16000000
s3Client
.download(bucket, fileName)
.via(Compression.gunzip())
.via(JsonFraming.objectScanner(maximumObjectLength))
.map((json: ByteString) ⇒ Document.parse(json.utf8String))
.flatMapConcat((doc: Document) ⇒ Source.fromPublisher(collection.insertOne(doc)))
.runWith(Sink.ignore)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment