Skip to content

Instantly share code, notes, and snippets.

@jjmalina
Last active August 29, 2015 14:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jjmalina/0fac65d8d7f48b310fe3 to your computer and use it in GitHub Desktop.
Save jjmalina/0fac65d8d7f48b310fe3 to your computer and use it in GitHub Desktop.
Coinbase Orders into Concord
# -*- coding: utf-8 -*-
"""
orders_source
~~~~~~~~~~~~~
Puts coinbase orders into Concord
"""
import time
import json
from concord.computation import (
Computation,
Metadata,
serve_computation
)
import logging
from Queue import Queue
from twisted.internet import reactor
from threading import Thread
from autobahn.twisted.websocket import (WebSocketClientFactory,
WebSocketClientProtocol,
connectWS)
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
orders_queue = Queue()
def time_milli():
return int(round(time.time()) * 1000)
class ClientProtocol(WebSocketClientProtocol):
def onOpen(self):
logger.info("websocket opened")
self.sendMessage(json.dumps({
"type": "subscribe",
"product_id": "BTC-USD"
}))
def onMessage(self, payload, *args, **kwargs):
orders_queue.put(payload)
def onClose(self, wasClean, code, reason):
logger.info("websocket closed because", reason)
reactor.stop()
class OrdersSource(Computation):
def __init__(self):
logger.info("__init__")
def drain_queue(self, ctx):
while True:
data = orders_queue.get()
order = None
if data is None:
orders_queue.task_done()
try:
order = json.loads(data)
except:
logger.info("error json decoding %s" % data)
orders_queue.task_done()
if order:
order_id = order['order_id']
ctx.produce_record('orders', order_id, data)
logger.info("produced order %s" % data)
orders_queue.task_done()
def process_timer(self, ctx, key, timer):
self.drain_queue(ctx)
def init(self, ctx):
ctx.set_timer('consume-orders', time_milli() + 100)
logger.info("Source initialized")
def metadata(self):
return Metadata(
name='coinbase-orders-source',
istreams=[],
ostreams=['coinbase-orders'])
def main():
factory = WebSocketClientFactory("wss://ws-feed.exchange.coinbase.com")
factory.protocol = ClientProtocol
connectWS(factory)
Thread(target=reactor.run, args=(False,)).start()
logger.info("Main")
serve_computation(OrdersSource())
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment