Skip to content

Instantly share code, notes, and snippets.

@yuyasugano
Created August 22, 2019 09:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yuyasugano/dbfa338db15c846644cdd55ccbe699a2 to your computer and use it in GitHub Desktop.
Save yuyasugano/dbfa338db15c846644cdd55ccbe699a2 to your computer and use it in GitHub Desktop.
Pubnub for bitbank.cc Realtime API
from pubnub.pubnub import PubNub
from pubnub.enums import PNStatusCategory
from pubnub.callbacks import SubscribeCallback
from pubnub.pnconfiguration import PNConfiguration
SUBSCRIBE_KEY = 'sub-c-e12e9174-dd60-11e6-806b-02ee2ddab7fe'
TICKER_CHANNEL = 'ticker_btc_jpy'
class BitbankSubscriberCallback(SubscribeCallback):
def presence(self, pubnub, presence):
pass # handle incoming presence data
def status(self, pubnub, status):
if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory:
# This event happens when radio / connectivity is lost
pubnub.reconnect()
elif status.category == PNStatusCategory.PNTimeoutCategory:
# do some magic and call reconnect when ready
pubnub.reconnect()
elif status.category == PNStatusCategory.PNConnectedCategory:
# Connect event. You can do stuff like publish, and know you'll get it.
# Or just use the connected event to confirm you are subscribed for
# UI / internal notifications, etc
print("Successfully connected.")
elif status.category == PNStatusCategory.PNReconnectedCategory:
# Happens as part of our regular operation. This event happens when
# radio / connectivity is lost, then regained.
print("Successfully re-connected.")
def message(self, pubnub, message):
# handle new message stored in message.message
print("channel: {0}\nmessage: {1}".format(message.channel, message.message))
def main():
pnconfig = PNConfiguration()
pnconfig.subscribe_key = SUBSCRIBE_KEY
pubnub = PubNub(pnconfig)
# inherits SubscribeCallBack class
my_listener = BitbankSubscriberCallback()
pubnub.add_listener(my_listener)
pubnub.subscribe().channels(TICKER_CHANNEL).execute()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment