Skip to content

Instantly share code, notes, and snippets.

@harajune
Last active April 8, 2018 02:54
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Embed
What would you like to do?
bitflyer api sample
from pubnub.callbacks import SubscribeCallback
from pubnub.enums import PNStatusCategory
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub import PubNub
import sys
import datetime
"""
helper functions
"""
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
"""
config
"""
pnconfig = PNConfiguration()
pnconfig.subscribe_key = 'sub-c-52a9ab50-291b-11e5-baaa-0619f8945a4f'
pubnub = PubNub(pnconfig)
class BitflyerSubscribeCallback(SubscribeCallback):
def presence(self, pubnub, presensce):
pass
def status(self, pubnub, status):
if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory:
eprint("unexpected disconecct")
elif status.category == PNStatusCategory.PNConnectedCategory:
eprint("connected")
elif status.category == PNStatusCategory.PNReconnectedCategory:
eprint("reconnected")
elif status.category == PNStatusCategory.PNDecryptionErrorCategory:
eprint("decryption error")
def message(self, pubnub, message):
data = message.message
data["timestamp"] = datetime.datetime.now().isoformat()
print(message.message)
pubnub.add_listener(BitflyerSubscribeCallback())
pubnub.subscribe().channels("lightning_board_FX_BTC_JPY").execute()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment