Skip to content

Instantly share code, notes, and snippets.

@deeperunderstanding
Created December 15, 2019 16:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deeperunderstanding/b205692ae61e927e720b778672df3631 to your computer and use it in GitHub Desktop.
Save deeperunderstanding/b205692ae61e927e720b778672df3631 to your computer and use it in GitHub Desktop.
@Component
class TickRecordingService
@Autowired constructor(val socket: CoinbaseWebsocket,
val tickRepository: TickRepository) {
private val logger = LoggerFactory.getLogger(javaClass)
fun recordProducts(products: Array<String>): Flux<List<Message>> {
logger.info("Start Recording Tick Stream for Products: $products")
val stream = socket.connect(SubscriptionMessage(products, arrayOf("ticker")))
return stream.filter { it is Tick }
.buffer(Duration.ofSeconds(3))
.doOnNext { ticks -> tickRepository.store(ticks as List<Tick>) }
.doOnError {
logger.error(it.message)
logger.error(it.stackTrace.toString())
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment