Skip to content

Instantly share code, notes, and snippets.

@abitofhelp
Created July 1, 2019 09:13
Show Gist options
  • Save abitofhelp/57985ed06e903d77121842b6cc747ebb to your computer and use it in GitHub Desktop.
Save abitofhelp/57985ed06e903d77121842b6cc747ebb to your computer and use it in GitHub Desktop.
Reactive streaming from Azure Blob to rxjava generator/parser. This snippet shows how to stream content from Azure Blob Storage using their Java SDK v11 and reactive streams. The blob's stream is fed into a rxjava/rxkotlin generator, which parses sample data from the blob stream. Subscribers use the Flowable to work with each sample parsed from …
/**
* Method generator parses sample data from a json stream.
* @param sampleJsonUrl String is the URL to the Azure Blob Storage sample data.
* @return Flowable<Sample> is a cold, synchronous, stateful and backpressure-aware
* generator of features.
*/
fun generator(sampleJsonUrl: String) =
blobStorage.downloadBlob(sampleJsonUrl)
.map { bbuf: ByteBuffer -> JsonFactory().createParser(bbuf.array()) }
.map { jParser ->
Flowable.generate<Sample, JsonParser>(
java.util.concurrent.Callable { jParser.gobbleJsonToSamples() },
io.reactivex.functions.BiConsumer<JsonParser, Emitter<Sample>> {
parser: JsonParser, emitter: Emitter<Sample> ->
pullOrComplete(parser, emitter)
},
Consumer<JsonParser> { jParser.close() }
)
}.flatMapPublisher { it }
/**
* Method downloadBlob will retrieve a blob from storage and return a
* reactive stream of bytes.
* @param url String is the url to the blob.
* @return Single<ByteBuffer> is the blob's content.
*/
fun downloadBlob(url: String): Single<ByteBuffer> =
BlockBlobURL(URL(url), pipeline)
.download(null, null, false, null)
.flatMap {
FlowableUtil
.collectBytesInBuffer(
it.body(ReliableDownloadOptions().withMaxRetryRequests(3))
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment