Skip to content

Instantly share code, notes, and snippets.

@deeperunderstanding
Created December 15, 2019 14:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deeperunderstanding/68492b468bf17bae6717af26fe836095 to your computer and use it in GitHub Desktop.
Save deeperunderstanding/68492b468bf17bae6717af26fe836095 to your computer and use it in GitHub Desktop.
@Component
class CoinbaseWebsocket @Autowired constructor(
@Value("\${cbpro.websocket.baseurl}") val websocketUrl: String
) {
private final val mapper = ObjectMapper()
private final val logger = LoggerFactory.getLogger(javaClass)
private final val client = ReactorNettyWebSocketClient()
init { client.maxFramePayloadLength = Int.MAX_VALUE }
fun connect(subscription: SubscriptionMessage): Flux<Message> {
val emitter = EmitterProcessor.create<Message>()
val subscriptionMessage = mapper.writeValueAsString(subscription)
logger.info("Coinbase Endpoint will connect to $websocketUrl")
val session = client.execute(URI(websocketUrl)) { session ->
logger.info("Coinbase Session started ${session.handshakeInfo}")
logger.info("Sending Subscription Message $subscriptionMessage")
session.send(Mono.just(session.textMessage(subscriptionMessage)))
.thenMany(session.receive()
.map { it.payloadAsText }
.doOnNext { logger.trace(it) }
.map { JsonMessageReader.deserialize(it) }
.subscribeWith(emitter)).then()
}
return emitter.doOnSubscribe { session.subscribe() }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment