Skip to content

Instantly share code, notes, and snippets.

@stephenlb
Last active September 18, 2017 19:39
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save stephenlb/e7ef1e97e3523d9ee9e2 to your computer and use it in GitHub Desktop.
Save stephenlb/e7ef1e97e3523d9ee9e2 to your computer and use it in GitHub Desktop.
Python Echo Server - Example of Publish / Subscribe Python Echo Server without IP address binding.
## www.pubnub.com - PubNub - Data Stream Network
# coding=utf8
## PubNub Real-time Push APIs and Notifications Framework
## Copyright (c) 2010 Stephen Blum
## http://www.pubnub.com/
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Import Libs
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
from Pubnub import PubnubTwisted as Pubnub
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Configuration
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
publish_key = 'pub-c-f6a20151-db8d-45af-ba42-def0edaa459f'
subscribe_key = 'sub-c-b5ff3208-7f64-11e4-b601-02ee2ddab7fe'
secret_key = 'demo'
server_channel = 'echo-server'
client_channel = 'echo-channel'
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Create PubNub Instance
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
pubnub = Pubnub(
publish_key=publish_key,
subscribe_key=subscribe_key,
secret_key=secret_key,
ssl_on=True
)
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Error Log
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def error_log(data): print(data)
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Access Log
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def access_log(data): print(data)
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Respond
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def request():
pubnub.publish(
server_channel,
{ 'response' : client_channel, 'body' : "Hello" },
callback=access_log,
error=error_log
)
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Request Received
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def onResponse( data, channel ):
print("Channel: %s | Req: %s" % (channel,data))
request()
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Ready to Receive Requests
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def onReady(message):
print("Ready to Receive Requests on '%s'" % server_channel)
request()
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Network Recovered
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def onReconnect(message): print("RECONNECTED")
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Network Failed
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def onDisconnect(message): print("DISCONNECTED")
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Start Echo Server
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
pubnub.subscribe(
client_channel,
callback=onResponse,
error=error_log,
connect=onReady,
reconnect=onReconnect,
disconnect=onDisconnect
)
pubnub.start()
## www.pubnub.com - PubNub - Data Stream Network
# coding=utf8
## PubNub Real-time Push APIs and Notifications Framework
## Copyright (c) 2010 Stephen Blum
## http://www.pubnub.com/
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Import Libs
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
from Pubnub import PubnubTwisted as Pubnub
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Configuration
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
publish_key = 'pub-c-f6a20151-db8d-45af-ba42-def0edaa459f'
subscribe_key = 'sub-c-b5ff3208-7f64-11e4-b601-02ee2ddab7fe'
secret_key = 'demo'
server_channel = 'echo-server'
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Create PubNub Instance
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
pubnub = Pubnub(
publish_key=publish_key,
subscribe_key=subscribe_key,
secret_key=secret_key,
ssl_on=True
)
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Error Log
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def error_log(data): print(data)
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Access Log
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def access_log(data): print(data)
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Respond
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def respond( channel, body ):
pubnub.publish(
channel,
body,
callback=access_log,
error=error_log
)
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Request Received
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def onRequest( request, channel ):
response_channel = request['response']
response_body = request['body']
print("Channel: %s | Req: %s" % (channel,request))
respond(
channel=response_channel,
body=response_body
)
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Ready to Receive Requests
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def onReady(message):
print("Ready to Receive Requests on '%s'" % server_channel)
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Network Recovered
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def onReconnect(message): print("RECONNECTED")
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Network Failed
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
def onDisconnect(message): print("DISCONNECTED")
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
## Start Echo Server
## =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
pubnub.subscribe(
server_channel,
callback=onRequest,
error=error_log,
connect=onReady,
reconnect=onReconnect,
disconnect=onDisconnect
)
pubnub.start()
## www.pubnub.com - PubNub Real-time push service in the cloud.
# coding=utf8
## PubNub Real-time Push APIs and Notifications Framework
## Copyright (c) 2014-15 Stephen Blum
## http://www.pubnub.com/
## -----------------------------------
## PubNub 3.5.3 Real-time Push Cloud API
## -----------------------------------
try:
import json
except ImportError:
import simplejson as json
import time
import hashlib
import uuid as uuid_lib
import random
import sys
from base64 import urlsafe_b64encode
from base64 import encodestring, decodestring
import hmac
from Crypto.Cipher import AES
from Crypto.Hash import MD5
try:
from hashlib import sha256
digestmod = sha256
except ImportError:
import Crypto.Hash.SHA256 as digestmod
sha256 = digestmod.new
##### vanilla python imports #####
try:
from urllib.parse import quote
except ImportError:
from urllib2 import quote
try:
import urllib.request
except ImportError:
import urllib2
try:
import requests
from requests.adapters import HTTPAdapter
except ImportError:
pass
import socket
import sys
import threading
from threading import current_thread
try:
import urllib3.HTTPConnection
default_socket_options = urllib3.HTTPConnection.default_socket_options
except:
default_socket_options = []
default_socket_options += [
# Enable TCP keepalive
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
]
if sys.platform.startswith("linux"):
default_socket_options += [
# Send first keepalive packet 200 seconds after last data packet
(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 200),
# Resend keepalive packets every second, when unanswered
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1),
# Close the socket after 5 unanswered keepalive packets
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
]
elif sys.platform.startswith("darwin"):
# From /usr/include/netinet/tcp.h
socket.TCP_KEEPALIVE = 0x10 # idle time used when SO_KEEPALIVE is enabled
default_socket_options += [
# Send first keepalive packet 200 seconds after last data packet
(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, 200),
# Resend keepalive packets every second, when unanswered
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1),
# Close the socket after 5 unanswered keepalive packets
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
]
"""
# The Windows code is currently untested
elif sys.platform.startswith("win"):
import struct
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
def patch_socket_keepalive(conn):
conn.sock.ioctl(socket.SIO_KEEPALIVE_VALS, (
# Enable TCP keepalive
1,
# Send first keepalive packet 200 seconds after last data packet
200,
# Resend keepalive packets every second, when unanswered
1
))
class PubnubHTTPConnectionPool(HTTPConnectionPool):
def _validate_conn(self, conn):
super(PubnubHTTPConnectionPool, self)._validate_conn(conn)
class PubnubHTTPSConnectionPool(HTTPSConnectionPool):
def _validate_conn(self, conn):
super(PubnubHTTPSConnectionPool, self)._validate_conn(conn)
import urllib3.poolmanager
urllib3.poolmanager.pool_classes_by_scheme = {
'http' : PubnubHTTPConnectionPool,
'https' : PubnubHTTPSConnectionPool
}
"""
##################################
##### Tornado imports and globals #####
try:
import tornado.httpclient
import tornado.ioloop
from tornado.stack_context import ExceptionStackContext
ioloop = tornado.ioloop.IOLoop.instance()
except ImportError:
pass
#######################################
##### Twisted imports and globals #####
try:
from twisted.web.client import getPage
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent, ContentDecoderAgent
from twisted.web.client import RedirectAgent, GzipDecoder
from twisted.web.client import HTTPConnectionPool
from twisted.web.http_headers import Headers
from twisted.internet.ssl import ClientContextFactory
from twisted.internet.task import LoopingCall
import twisted
from twisted.python.compat import (
_PY3, unicode, intToBytes, networkString, nativeString)
pnconn_pool = HTTPConnectionPool(reactor, persistent=True)
pnconn_pool.maxPersistentPerHost = 100000
pnconn_pool.cachedConnectionTimeout = 15
pnconn_pool.retryAutomatically = True
class WebClientContextFactory(ClientContextFactory):
def getContext(self, hostname, port):
return ClientContextFactory.getContext(self)
class PubNubPamResponse(Protocol):
def __init__(self, finished):
self.finished = finished
def dataReceived(self, bytes):
self.finished.callback(bytes)
class PubNubResponse(Protocol):
def __init__(self, finished):
self.finished = finished
def dataReceived(self, bytes):
self.finished.callback(bytes)
except ImportError:
pass
#######################################
def get_data_for_user(data):
try:
if 'message' in data and 'payload' in data:
return {'message': data['message'], 'payload': data['payload']}
else:
return data
except TypeError:
return data
class PubnubCrypto2():
def pad(self, msg, block_size=16):
padding = block_size - (len(msg) % block_size)
return msg + chr(padding) * padding
def depad(self, msg):
return msg[0:-ord(msg[-1])]
def getSecret(self, key):
return hashlib.sha256(key).hexdigest()
def encrypt(self, key, msg):
secret = self.getSecret(key)
Initial16bytes = '0123456789012345'
cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
enc = encodestring(cipher.encrypt(self.pad(msg)))
return enc
def decrypt(self, key, msg):
try:
secret = self.getSecret(key)
Initial16bytes = '0123456789012345'
cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
plain = self.depad(cipher.decrypt(decodestring(msg)))
except:
return msg
try:
return eval(plain)
except SyntaxError:
return plain
class PubnubCrypto3():
def pad(self, msg, block_size=16):
padding = block_size - (len(msg) % block_size)
return msg + (chr(padding) * padding).encode('utf-8')
def depad(self, msg):
return msg[0:-ord(msg[-1])]
def getSecret(self, key):
return hashlib.sha256(key.encode("utf-8")).hexdigest()
def encrypt(self, key, msg):
secret = self.getSecret(key)
Initial16bytes = '0123456789012345'
cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
return encodestring(
cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8')
def decrypt(self, key, msg):
secret = self.getSecret(key)
Initial16bytes = '0123456789012345'
cipher = AES.new(secret[0:32], AES.MODE_CBC, Initial16bytes)
return (cipher.decrypt(
decodestring(msg.encode('utf-8')))).decode('utf-8')
class PubnubBase(object):
def __init__(
self,
publish_key,
subscribe_key,
secret_key=False,
cipher_key=False,
auth_key=None,
ssl_on=False,
origin='pubsub.pubnub.com',
uuid=None
):
"""Pubnub Class
Provides methods to communicate with Pubnub cloud
Attributes:
publish_key: Publish Key
subscribe_key: Subscribe Key
secret_key: Secret Key
cipher_key: Cipher Key
auth_key: Auth Key (used with Pubnub Access Manager i.e. PAM)
ssl: SSL enabled ?
origin: Origin
"""
self.origin = origin
self.version = '3.5.2'
self.limit = 1800
self.publish_key = publish_key
self.subscribe_key = subscribe_key
self.secret_key = secret_key
self.cipher_key = cipher_key
self.ssl = ssl_on
self.auth_key = auth_key
if self.ssl:
self.origin = 'https://' + self.origin
else:
self.origin = 'http://' + self.origin
self.uuid = uuid or str(uuid_lib.uuid4())
if type(sys.version_info) is tuple:
self.python_version = 2
self.pc = PubnubCrypto2()
else:
if sys.version_info.major == 2:
self.python_version = 2
self.pc = PubnubCrypto2()
else:
self.python_version = 3
self.pc = PubnubCrypto3()
if not isinstance(self.uuid, str):
raise AttributeError("uuid must be a string")
def _pam_sign(self, msg):
return urlsafe_b64encode(hmac.new(
self.secret_key.encode("utf-8"),
msg.encode("utf-8"),
sha256
).digest())
def set_u(self, u=False):
self.u = u
def _pam_auth(self, query, apicode=0, callback=None, error=None):
if 'timestamp' not in query:
query['timestamp'] = int(time.time())
## Global Grant?
if 'auth' in query and not query['auth']:
del query['auth']
if 'channel' in query and not query['channel']:
del query['channel']
if 'channel-group' in query and not query['channel-group']:
del query['channel-group']
params = "&".join([
x + "=" + quote(
str(query[x]), safe=""
) for x in sorted(query)
])
sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format(
subkey=self.subscribe_key,
pubkey=self.publish_key,
apitype="audit" if (apicode) else "grant",
params=params
)
query['signature'] = self._pam_sign(sign_input)
return self._request({"urlcomponents": [
'v1', 'auth', "audit" if (apicode) else "grant",
'sub-key',
self.subscribe_key
], 'urlparams': query},
self._return_wrapped_callback(callback),
self._return_wrapped_callback(error))
def get_origin(self):
return self.origin
def set_auth_key(self, auth_key):
self.auth_key = auth_key
def get_auth_key(self):
return auth_key
def grant(self, channel=None, channel_group=None, auth_key=False, read=False,
write=False, manage=False, ttl=5, callback=None, error=None):
"""Method for granting permissions.
This function establishes subscribe and/or write permissions for
PubNub Access Manager (PAM) by setting the read or write attribute
to true. A grant with read or write set to false (or not included)
will revoke any previous grants with read or write set to true.
Permissions can be applied to any one of three levels:
1. Application level privileges are based on subscribe_key applying to all associated channels.
2. Channel level privileges are based on a combination of subscribe_key and channel name.
3. User level privileges are based on the combination of subscribe_key, channel and auth_key.
Args:
channel: (string) (optional)
Specifies channel name to grant permissions to.
If channel is not specified, the grant applies to all
channels associated with the subscribe_key. If auth_key
is not specified, it is possible to grant permissions to
multiple channels simultaneously by specifying the channels
as a comma separated list.
auth_key: (string) (optional)
Specifies auth_key to grant permissions to.
It is possible to specify multiple auth_keys as comma
separated list in combination with a single channel name.
If auth_key is provided as the special-case value "null"
(or included in a comma-separated list, eg. "null,null,abc"),
a new auth_key will be generated and returned for each "null" value.
read: (boolean) (default: True)
Read permissions are granted by setting to True.
Read permissions are removed by setting to False.
write: (boolean) (default: True)
Write permissions are granted by setting to true.
Write permissions are removed by setting to false.
ttl: (int) (default: 1440 i.e 24 hrs)
Time in minutes for which granted permissions are valid.
Max is 525600 , Min is 1.
Setting ttl to 0 will apply the grant indefinitely.
callback: (function) (optional)
A callback method can be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado
error: (function) (optional)
An error method can be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado .
Returns:
Returns a dict in sync mode i.e. when callback argument is not given
The dict returned contains values with keys 'message' and 'payload'
Sample Response:
{
"message":"Success",
"payload":{
"ttl":5,
"auths":{
"my_ro_authkey":{"r":1,"w":0}
},
"subscribe_key":"my_subkey",
"level":"user",
"channel":"my_channel"
}
}
"""
return self._pam_auth({
'channel' : channel,
'channel-group' : channel_group,
'auth' : auth_key,
'r' : read and 1 or 0,
'w' : write and 1 or 0,
'm' : manage and 1 or 0,
'ttl' : ttl,
'pnsdk' : self.pnsdk
}, callback=callback, error=error)
def revoke(self, channel=None, channel_group=None, auth_key=None, ttl=1, callback=None, error=None):
"""Method for revoking permissions.
Args:
channel: (string) (optional)
Specifies channel name to revoke permissions to.
If channel is not specified, the revoke applies to all
channels associated with the subscribe_key. If auth_key
is not specified, it is possible to grant permissions to
multiple channels simultaneously by specifying the channels
as a comma separated list.
auth_key: (string) (optional)
Specifies auth_key to revoke permissions to.
It is possible to specify multiple auth_keys as comma
separated list in combination with a single channel name.
If auth_key is provided as the special-case value "null"
(or included in a comma-separated list, eg. "null,null,abc"),
a new auth_key will be generated and returned for each "null" value.
ttl: (int) (default: 1440 i.e 24 hrs)
Time in minutes for which granted permissions are valid.
Max is 525600 , Min is 1.
Setting ttl to 0 will apply the grant indefinitely.
callback: (function) (optional)
A callback method can be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado
error: (function) (optional)
An error method can be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado .
Returns:
Returns a dict in sync mode i.e. when callback argument is not given
The dict returned contains values with keys 'message' and 'payload'
Sample Response:
{
"message":"Success",
"payload":{
"ttl":5,
"auths":{
"my_authkey":{"r":0,"w":0}
},
"subscribe_key":"my_subkey",
"level":"user",
"channel":"my_channel"
}
}
"""
return self._pam_auth({
'channel' : channel,
'channel-group' : channel_group,
'auth' : auth_key,
'r' : 0,
'w' : 0,
'ttl' : ttl,
'pnsdk' : self.pnsdk
}, callback=callback, error=error)
def audit(self, channel=None, channel_group=None, auth_key=None, callback=None, error=None):
"""Method for fetching permissions from pubnub servers.
This method provides a mechanism to reveal existing PubNub Access Manager attributes
for any combination of subscribe_key, channel and auth_key.
Args:
channel: (string) (optional)
Specifies channel name to return PAM
attributes optionally in combination with auth_key.
If channel is not specified, results for all channels
associated with subscribe_key are returned.
If auth_key is not specified, it is possible to return
results for a comma separated list of channels.
auth_key: (string) (optional)
Specifies the auth_key to return PAM attributes for.
If only a single channel is specified, it is possible to return
results for a comma separated list of auth_keys.
callback: (function) (optional)
A callback method can be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado
error: (function) (optional)
An error method can be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado .
Returns:
Returns a dict in sync mode i.e. when callback argument is not given
The dict returned contains values with keys 'message' and 'payload'
Sample Response
{
"message":"Success",
"payload":{
"channels":{
"my_channel":{
"auths":{"my_ro_authkey":{"r":1,"w":0},
"my_rw_authkey":{"r":0,"w":1},
"my_admin_authkey":{"r":1,"w":1}
}
}
},
}
Usage:
pubnub.audit ('my_channel'); # Sync Mode
"""
return self._pam_auth({
'channel' : channel,
'channel-group' : channel_group,
'auth' : auth_key,
'pnsdk' : self.pnsdk
}, 1, callback=callback, error=error)
def encrypt(self, message):
"""Method for encrypting data.
This method takes plaintext as input and returns encrypted data.
This need not be called directly as enncryption/decryption is
taken care of transparently by Pubnub class if cipher key is
provided at time of initializing pubnub object
Args:
message: Message to be encrypted.
Returns:
Returns encrypted message if cipher key is set
"""
if self.cipher_key:
message = json.dumps(self.pc.encrypt(
self.cipher_key, json.dumps(message)).replace('\n', ''))
else:
message = json.dumps(message)
return message
def decrypt(self, message):
"""Method for decrypting data.
This method takes ciphertext as input and returns decrypted data.
This need not be called directly as enncryption/decryption is
taken care of transparently by Pubnub class if cipher key is
provided at time of initializing pubnub object
Args:
message: Message to be decrypted.
Returns:
Returns decrypted message if cipher key is set
"""
if self.cipher_key:
message = self.pc.decrypt(self.cipher_key, message)
return message
def _return_wrapped_callback(self, callback=None):
def _new_format_callback(response):
if 'payload' in response:
if (callback is not None):
callback_data = dict()
callback_data['payload'] = response['payload']
if 'message' in response:
callback_data['message'] = response['message']
callback(callback_data)
else:
if (callback is not None):
callback(response)
if (callback is not None):
return _new_format_callback
else:
return None
def publish(self, channel, message, callback=None, error=None):
"""Publishes data on a channel.
The publish() method is used to send a message to all subscribers of a channel.
To publish a message you must first specify a valid publish_key at initialization.
A successfully published message is replicated across the PubNub Real-Time Network
and sent simultaneously to all subscribed clients on a channel.
Messages in transit can be secured from potential eavesdroppers with SSL/TLS by
setting ssl to True during initialization.
Published messages can also be encrypted with AES-256 simply by specifying a cipher_key
during initialization.
Args:
channel: (string)
Specifies channel name to publish messages to.
message: (string/int/double/dict/list)
Message to be published
callback: (optional)
A callback method can be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado
error: (optional)
An error method can be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado
Returns:
Sync Mode : list
Async Mode : None
The function returns the following formatted response:
[ Number, "Status", "Time Token"]
The output below demonstrates the response to a successful call:
[1,"Sent","13769558699541401"]
"""
message = self.encrypt(message)
## Send Message
return self._request({"urlcomponents": [
'publish',
self.publish_key,
self.subscribe_key,
'0',
channel,
'0',
message
], 'urlparams': {'auth': self.auth_key, 'pnsdk' : self.pnsdk}},
callback=self._return_wrapped_callback(callback),
error=self._return_wrapped_callback(error))
def presence(self, channel, callback, error=None):
"""Subscribe to presence data on a channel.
Only works in async mode
Args:
channel: Channel name ( string ) on which to publish message
callback: A callback method should be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado .
error: Optional variable. An error method can be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado .
Returns:
None
"""
return self.subscribe(channel+'-pnpres', callback=callback)
def here_now(self, channel, callback=None, error=None):
"""Get here now data.
You can obtain information about the current state of a channel including
a list of unique user-ids currently subscribed to the channel and the total
occupancy count of the channel by calling the here_now() function in your
application.
Args:
channel: (string) (optional)
Specifies the channel name to return occupancy results.
If channel is not provided, here_now will return data for all channels.
callback: (optional)
A callback method should be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado .
error: (optional)
Optional variable. An error method can be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado .
Returns:
Sync Mode: list
Async Mode: None
Response Format:
The here_now() method returns a list of uuid s currently subscribed to the channel.
uuids:["String","String", ... ,"String"] - List of UUIDs currently subscribed to the channel.
occupancy: Number - Total current occupancy of the channel.
Example Response:
{
occupancy: 4,
uuids: [
'123123234t234f34fq3dq',
'143r34f34t34fq34q34q3',
'23f34d3f4rq34r34rq23q',
'w34tcw45t45tcw435tww3',
]
}
"""
urlcomponents = [
'v2', 'presence',
'sub_key', self.subscribe_key
]
if (channel is not None and len(channel) > 0):
urlcomponents.append('channel')
urlcomponents.append(channel)
## Get Presence Here Now
return self._request({"urlcomponents": urlcomponents,
'urlparams': {'auth': self.auth_key, 'pnsdk' : self.pnsdk}},
callback=self._return_wrapped_callback(callback),
error=self._return_wrapped_callback(error))
def history(self, channel, count=100, reverse=False,
start=None, end=None, callback=None, error=None):
"""This method fetches historical messages of a channel.
PubNub Storage/Playback Service provides real-time access to an unlimited
history for all messages published to PubNub. Stored messages are replicated
across multiple availability zones in several geographical data center
locations. Stored messages can be encrypted with AES-256 message encryption
ensuring that they are not readable while stored on PubNub's network.
It is possible to control how messages are returned and in what order,
for example you can:
Return messages in the order newest to oldest (default behavior).
Return messages in the order oldest to newest by setting reverse to true.
Page through results by providing a start or end time token.
Retrieve a "slice" of the time line by providing both a start and end time token.
Limit the number of messages to a specific quantity using the count parameter.
Args:
channel: (string)
Specifies channel to return history messages from
count: (int) (default: 100)
Specifies the number of historical messages to return
callback: (optional)
A callback method should be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado .
error: (optional)
An error method can be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado .
Returns:
Returns a list in sync mode i.e. when callback argument is not given
Sample Response:
[["Pub1","Pub2","Pub3","Pub4","Pub5"],13406746729185766,13406746845892666]
"""
params = dict()
params['count'] = count
params['reverse'] = reverse
params['start'] = start
params['end'] = end
params['auth_key'] = self.auth_key
params['pnsdk'] = self.pnsdk
## Get History
return self._request({'urlcomponents': [
'v2',
'history',
'sub-key',
self.subscribe_key,
'channel',
channel,
], 'urlparams': params},
callback=self._return_wrapped_callback(callback),
error=self._return_wrapped_callback(error))
def time(self, callback=None):
"""This function will return a 17 digit precision Unix epoch.
Args:
callback: (optional)
A callback method should be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado .
Returns:
Returns a 17 digit number in sync mode i.e. when callback argument is not given
Sample:
13769501243685161
"""
time = self._request({'urlcomponents': [
'time',
'0'
]}, callback)
if time is not None:
return time[0]
def _encode(self, request):
return [
"".join([' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and
hex(ord(ch)).replace('0x', '%').upper() or
ch for ch in list(bit)
]) for bit in request]
def getUrl(self, request):
if self.u is True and "urlparams" in request:
request['urlparams']['u'] = str(random.randint(1, 100000000000))
## Build URL
url = self.origin + '/' + "/".join([
"".join([' ~`!@#$%^&*()+=[]\\{}|;\':",./<>?'.find(ch) > -1 and
hex(ord(ch)).replace('0x', '%').upper() or
ch for ch in list(bit)
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
url = url + '?' + "&".join([x + "=" + str(y) for x, y in request[
"urlparams"].items() if y is not None])
return url
def _channel_registry(self, url=None, params=None, callback=None, error=None):
if (params is None):
params = dict()
urlcomponents = ['v1', 'channel-registration', 'sub-key', self.subscribe_key ]
if (url is not None):
urlcomponents += url
params['auth'] = self.auth_key
params['pnsdk'] = self.pnsdk
## Get History
return self._request({'urlcomponents': urlcomponents, 'urlparams': params},
callback=self._return_wrapped_callback(callback),
error=self._return_wrapped_callback(error))
def _channel_group(self, channel_group=None, channels=None, cloak=None,mode='add', callback=None, error=None):
params = dict()
url = []
namespace = None
if (channel_group is not None and len(channel_group) > 0):
ns_ch_a = channel_group.split(':')
if len(ns_ch_a) > 1:
namespace = None if ns_ch_a[0] == '*' else ns_ch_a[0]
channel_group = ns_ch_a[1]
else:
channel_group = ns_ch_a[0]
if (namespace is not None):
url.append('namespace')
url.append(self._encode(namespace))
url.append('channel-group')
if channel_group is not None and channel_group != '*':
url.append(channel_group)
if (channels is not None):
if (type(channels) is list):
channels = channels.join(',')
params[mode] = channels
#params['cloak'] = 'true' if CLOAK is True else 'false'
else:
if mode == 'remove':
url.append('remove')
return self._channel_registry(url=url, params=params, callback=callback, error=error)
def channel_group_list_namespaces(self, callback=None, error=None):
url = ['namespace']
return self._channel_registry(url=url)
def channel_group_remove_namespace(self, namespace, callback=None, error=None):
url = ['namespace', self._encode(namespace), 'remove']
return self._channel_registry(url=url, callback=callback, error=error)
def channel_group_list_groups(self, namespace=None, channel_group=None, callback=None, error=None):
if (namespace is not None and len(namespace) > 0):
channel_group = namespace + ':*'
return self._channel_group(channel_group=channel_group, callback=callback, error=error)
def channel_group_list_channels(self, channel_group, callback=None, error=None):
return self._channel_group(channel_group=channel_group, callback=callback, error=error)
def channel_group_add_channel(self, channel_group, channel, callback=None, error=None):
return self._channel_group(channel_group=channel_group, channels=channel, mode='add', callback=callback, error=error)
def channel_group_remove_channel(self, channel_group, channel, callback=None, error=None):
return self._channel_group(channel_group=channel_group, channels=channel, mode='remove', callback=callback, error=error)
def channel_group_remove_group(self, channel_group, callback=None, error=None):
return self._channel_group(channel_group=channel_group, mode='remove', callback=callback, error=error)
class EmptyLock():
def __enter__(self):
pass
def __exit__(self, a, b, c):
pass
empty_lock = EmptyLock()
class PubnubCoreAsync(PubnubBase):
def start(self):
pass
def stop(self):
pass
def __init__(
self,
publish_key,
subscribe_key,
secret_key=None,
cipher_key=None,
auth_key=None,
ssl_on=False,
origin='pubsub.pubnub.com',
uuid=None,
_tt_lock=empty_lock,
_channel_list_lock=empty_lock
):
super(PubnubCoreAsync, self).__init__(
publish_key=publish_key,
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
uuid=uuid
)
self.subscriptions = {}
self.timetoken = 0
self.last_timetoken = 0
self.accept_encoding = 'gzip'
self.SUB_RECEIVER = None
self._connect = None
self._tt_lock = _tt_lock
self._channel_list_lock = _channel_list_lock
self._connect = lambda: None
self.u = None
def get_channel_list(self, channels):
channel = ''
first = True
with self._channel_list_lock:
for ch in channels:
if not channels[ch]['subscribed']:
continue
if not first:
channel += ','
else:
first = False
channel += ch
return channel
def get_channel_array(self):
"""Get List of currently subscribed channels
Returns:
Returns a list containing names of channels subscribed
Sample return value:
["a","b","c]
"""
channels = self.subscriptions
channel = []
with self._channel_list_lock:
for ch in channels:
if not channels[ch]['subscribed']:
continue
channel.append(ch)
return channel
def each(l, func):
if func is None:
return
for i in l:
func(i)
def subscribe(self, channels, callback, error=None,
connect=None, disconnect=None, reconnect=None, sync=False):
"""Subscribe to data on a channel.
This function causes the client to create an open TCP socket to the
PubNub Real-Time Network and begin listening for messages on a specified channel.
To subscribe to a channel the client must send the appropriate subscribe_key at
initialization.
Only works in async mode
Args:
channel: (string/list)
Specifies the channel to subscribe to. It is possible to specify
multiple channels as a comma separated list or andarray.
callback: (function)
This callback is called on receiving a message from the channel.
error: (function) (optional)
This callback is called on an error event
connect: (function) (optional)
This callback is called on a successful connection to the PubNub cloud
disconnect: (function) (optional)
This callback is called on client disconnect from the PubNub cloud
reconnect: (function) (optional)
This callback is called on successfully re-connecting to the PubNub cloud
Returns:
None
"""
with self._tt_lock:
self.last_timetoken = self.timetoken if self.timetoken != 0 \
else self.last_timetoken
self.timetoken = 0
if sync is True and self.subscribe_sync is not None:
self.subscribe_sync(args)
return
def _invoke(func, msg=None, channel=None):
if func is not None:
if msg is not None and channel is not None:
func(get_data_for_user(msg), channel)
elif msg is not None:
func(get_data_for_user(msg))
else:
func()
def _invoke_connect():
if self._channel_list_lock:
with self._channel_list_lock:
for ch in self.subscriptions:
chobj = self.subscriptions[ch]
if chobj['connected'] is False:
chobj['connected'] = True
chobj['disconnected'] = False
_invoke(chobj['connect'], chobj['name'])
else:
if chobj['disconnected'] is True:
chobj['disconnected'] = False
_invoke(chobj['reconnect'], chobj['name'])
def _invoke_disconnect():
if self._channel_list_lock:
with self._channel_list_lock:
for ch in self.subscriptions:
chobj = self.subscriptions[ch]
if chobj['connected'] is True:
if chobj['disconnected'] is False:
chobj['disconnected'] = True
_invoke(chobj['disconnect'], chobj['name'])
def _invoke_error(channel_list=None, err=None):
if channel_list is None:
for ch in self.subscriptions:
chobj = self.subscriptions[ch]
_invoke(chobj['error'], err)
else:
for ch in channel_list:
chobj = self.subscriptions[ch]
_invoke(chobj['error'], err)
def _get_channel():
for ch in self.subscriptions:
chobj = self.subscriptions[ch]
if chobj['subscribed'] is True:
return chobj
channels = channels if isinstance(
channels, list) else channels.split(",")
for channel in channels:
## New Channel?
if len(channel) > 0 and \
(not channel in self.subscriptions or
self.subscriptions[channel]['subscribed'] is False):
with self._channel_list_lock:
self.subscriptions[channel] = {
'name': channel,
'first': False,
'connected': False,
'disconnected': True,
'subscribed': True,
'callback': callback,
'connect': connect,
'disconnect': disconnect,
'reconnect': reconnect,
'error': error
}
'''
## return if already connected to channel
if channel in self.subscriptions and \
'connected' in self.subscriptions[channel] and \
self.subscriptions[channel]['connected'] is True:
_invoke(error, "Already Connected")
return
'''
## SUBSCRIPTION RECURSION
def _connect():
self._reset_offline()
def error_callback(response):
## ERROR ?
if not response or \
('message' in response and
response['message'] == 'Forbidden'):
_invoke_error(response['payload'][
'channels'], response['message'])
self.timeout(1, _connect)
return
if 'message' in response:
_invoke_error(err=response['message'])
else:
_invoke_disconnect()
self.timetoken = 0
self.timeout(1, _connect)
def sub_callback(response):
## ERROR ?
if not response or \
('message' in response and
response['message'] == 'Forbidden'):
_invoke_error(response['payload'][
'channels'], response['message'])
_connect()
return
_invoke_connect()
with self._tt_lock:
self.timetoken = \
self.last_timetoken if self.timetoken == 0 and \
self.last_timetoken != 0 else response[1]
if len(response) > 2:
channel_list = response[2].split(',')
response_list = response[0]
for ch in enumerate(channel_list):
if ch[1] in self.subscriptions:
chobj = self.subscriptions[ch[1]]
_invoke(chobj['callback'],
self.decrypt(response_list[ch[0]]),
chobj['name'])
else:
response_list = response[0]
chobj = _get_channel()
for r in response_list:
if chobj:
_invoke(chobj['callback'], self.decrypt(r),
chobj['name'])
_connect()
channel_list = self.get_channel_list(self.subscriptions)
if len(channel_list) <= 0:
return
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
#try:
self.SUB_RECEIVER = self._request({"urlcomponents": [
'subscribe',
self.subscribe_key,
channel_list,
'0',
str(self.timetoken)
], "urlparams": {"uuid": self.uuid, "auth": self.auth_key, 'pnsdk' : self.pnsdk}},
sub_callback,
error_callback,
single=True, timeout=320)
'''
except Exception as e:
print(e)
self.timeout(1, _connect)
return
'''
self._connect = _connect
## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
_connect()
def _reset_offline(self):
if self.SUB_RECEIVER is not None:
self.SUB_RECEIVER()
self.SUB_RECEIVER = None
def CONNECT(self):
self._reset_offline()
self._connect()
def unsubscribe(self, channel):
"""Subscribe to presence data on a channel.
Only works in async mode
Args:
channel: Channel name ( string ) on which to publish message
message: Message to be published ( String / int / double / dict / list ).
callback: A callback method should be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado .
error: Optional variable. An error method can be passed to the method.
If set, the api works in async mode.
Required argument when working with twisted or tornado .
Returns:
Returns a list in sync mode i.e. when callback argument is not given
"""
if channel in self.subscriptions is False:
return False
## DISCONNECT
with self._channel_list_lock:
if channel in self.subscriptions:
self.subscriptions[channel]['connected'] = 0
self.subscriptions[channel]['subscribed'] = False
self.subscriptions[channel]['timetoken'] = 0
self.subscriptions[channel]['first'] = False
self.CONNECT()
class PubnubCore(PubnubCoreAsync):
def __init__(
self,
publish_key,
subscribe_key,
secret_key=None,
cipher_key=None,
auth_key=None,
ssl_on=False,
origin='pubsub.pubnub.com',
uuid=None,
_tt_lock=None,
_channel_list_lock=None
):
super(PubnubCore, self).__init__(
publish_key=publish_key,
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
uuid=uuid,
_tt_lock=_tt_lock,
_channel_list_lock=_channel_list_lock
)
self.subscriptions = {}
self.timetoken = 0
self.accept_encoding = 'gzip'
def subscribe_sync(self, channel, callback, timetoken=0):
"""
#**
#* Subscribe
#*
#* This is BLOCKING.
#* Listen for a message on a channel.
#*
#* @param array args with channel and callback.
#* @return false on fail, array on success.
#**
## Subscribe Example
def receive(message) :
print(message)
return True
pubnub.subscribe({
'channel' : 'hello_world',
'callback' : receive
})
"""
subscribe_key = self.subscribe_key
## Begin Subscribe
while True:
try:
## Wait for Message
response = self._request({"urlcomponents": [
'subscribe',
subscribe_key,
channel,
'0',
str(timetoken)
], "urlparams": {"uuid": self.uuid, 'pnsdk' : self.pnsdk}})
messages = response[0]
timetoken = response[1]
## If it was a timeout
if not len(messages):
continue
## Run user Callback and Reconnect if user permits.
for message in messages:
if not callback(self.decrypt(message)):
return
except Exception:
time.sleep(1)
return True
class HTTPClient:
def __init__(self, pubnub, url, urllib_func=None,
callback=None, error=None, id=None, timeout=5):
self.url = url
self.id = id
self.callback = callback
self.error = error
self.stop = False
self._urllib_func = urllib_func
self.timeout = timeout
self.pubnub = pubnub
def cancel(self):
self.stop = True
self.callback = None
self.error = None
def run(self):
def _invoke(func, data):
if func is not None:
func(get_data_for_user(data))
if self._urllib_func is None:
return
resp = self._urllib_func(self.url, timeout=self.timeout)
data = resp[0]
code = resp[1]
if self.stop is True:
return
if self.callback is None:
with self.pubnub.latest_sub_callback_lock:
if self.pubnub.latest_sub_callback['id'] != self.id:
return
else:
if self.pubnub.latest_sub_callback['callback'] is not None:
self.pubnub.latest_sub_callback['id'] = 0
try:
data = json.loads(data)
except ValueError as e:
_invoke(self.pubnub.latest_sub_callback['error'],
{'error': 'json decoding error'})
return
if code != 200:
_invoke(self.pubnub.latest_sub_callback['error'], data)
else:
_invoke(self.pubnub.latest_sub_callback['callback'], data)
else:
try:
data = json.loads(data)
except ValueError:
_invoke(self.error, {'error': 'json decoding error'})
return
if code != 200:
_invoke(self.error, data)
else:
_invoke(self.callback, data)
def _urllib_request_2(url, timeout=5):
try:
resp = urllib2.urlopen(url, timeout=timeout)
except urllib2.HTTPError as http_error:
resp = http_error
except urllib2.URLError as error:
msg = {"message": str(error.reason)}
return (json.dumps(msg), 0)
return (resp.read(), resp.code)
class PubnubHTTPAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
kwargs.setdefault('socket_options', default_socket_options)
super(PubnubHTTPAdapter, self).init_poolmanager(*args, **kwargs)
s = requests.Session()
#s.mount('http://', PubnubHTTPAdapter(max_retries=1))
#s.mount('https://', PubnubHTTPAdapter(max_retries=1))
#s.mount('http://pubsub.pubnub.com', HTTPAdapter(max_retries=1))
#s.mount('https://pubsub.pubnub.com', HTTPAdapter(max_retries=1))
def _requests_request(url, timeout=5):
try:
resp = s.get(url, timeout=timeout)
except requests.exceptions.HTTPError as http_error:
resp = http_error
except requests.exceptions.ConnectionError as error:
msg = str(error)
return (json.dumps(msg), 0)
except requests.exceptions.Timeout as error:
msg = str(error)
return (json.dumps(msg), 0)
return (resp.text, resp.status_code)
def _urllib_request_3(url, timeout=5):
try:
resp = urllib.request.urlopen(url, timeout=timeout)
except (urllib.request.HTTPError, urllib.request.URLError) as http_error:
resp = http_error
r = resp.read().decode("utf-8")
return (r, resp.code)
_urllib_request = None
# Pubnub
class Pubnub(PubnubCore):
def __init__(
self,
publish_key,
subscribe_key,
secret_key=None,
cipher_key=None,
auth_key=None,
ssl_on=False,
origin='pubsub.pubnub.com',
uuid=None,
pooling=True,
daemon=False,
pres_uuid=None,
azure=False
):
super(Pubnub, self).__init__(
publish_key=publish_key,
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
uuid=uuid or pres_uuid,
_tt_lock=threading.RLock(),
_channel_list_lock=threading.RLock()
)
global _urllib_request
if self.python_version == 2:
_urllib_request = _urllib_request_2
else:
_urllib_request = _urllib_request_3
if pooling is True:
_urllib_request = _requests_request
self.latest_sub_callback_lock = threading.RLock()
self.latest_sub_callback = {'id': None, 'callback': None}
self.pnsdk = 'PubNub-Python' + '/' + self.version
self.daemon = daemon
if azure is False:
s.mount('http://pubsub.pubnub.com', HTTPAdapter(max_retries=1))
s.mount('https://pubsub.pubnub.com', HTTPAdapter(max_retries=1))
else:
s.mount('http://', PubnubHTTPAdapter(max_retries=1))
s.mount('https://', PubnubHTTPAdapter(max_retries=1))
def timeout(self, interval, func):
def cb():
time.sleep(interval)
func()
thread = threading.Thread(target=cb)
thread.daemon = self.daemon
thread.start()
def _request_async(self, request, callback=None, error=None, single=False, timeout=5):
global _urllib_request
## Build URL
url = self.getUrl(request)
if single is True:
id = time.time()
client = HTTPClient(self, url=url, urllib_func=_urllib_request,
callback=None, error=None, id=id, timeout=timeout)
with self.latest_sub_callback_lock:
self.latest_sub_callback['id'] = id
self.latest_sub_callback['callback'] = callback
self.latest_sub_callback['error'] = error
else:
client = HTTPClient(self, url=url, urllib_func=_urllib_request,
callback=callback, error=error, timeout=timeout)
thread = threading.Thread(target=client.run)
thread.daemon = self.daemon
thread.start()
def abort():
client.cancel()
return abort
def _request_sync(self, request, timeout=5):
global _urllib_request
## Build URL
url = self.getUrl(request)
## Send Request Expecting JSONP Response
response = _urllib_request(url, timeout=timeout)
try:
resp_json = json.loads(response[0])
except ValueError:
return [0, "JSON Error"]
if response[1] != 200 and 'message' in resp_json and 'payload' in resp_json:
return {'message': resp_json['message'],
'payload': resp_json['payload']}
if response[1] == 0:
return [0, resp_json]
return resp_json
def _request(self, request, callback=None, error=None, single=False, timeout=5):
if callback is None:
return get_data_for_user(self._request_sync(request, timeout=timeout))
else:
self._request_async(request, callback, error, single=single, timeout=timeout)
# Pubnub Twisted
class PubnubTwisted(PubnubCoreAsync):
def start(self):
reactor.run()
def stop(self):
reactor.stop()
def timeout(self, delay, callback):
reactor.callLater(delay, callback)
def __init__(
self,
publish_key,
subscribe_key,
secret_key=None,
cipher_key=None,
auth_key=None,
ssl_on=False,
origin='pubsub.pubnub.com'
):
super(PubnubTwisted, self).__init__(
publish_key=publish_key,
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
)
self.headers = {}
self.headers['User-Agent'] = ['Python-Twisted']
self.headers['V'] = [self.version]
self.pnsdk = 'PubNub-Python-' + 'Twisted' + '/' + self.version
def _request(self, request, callback=None, error=None, single=False, timeout=5):
global pnconn_pool
def _invoke(func, data):
if func is not None:
func(get_data_for_user(data))
## Build URL
url = self.getUrl(request)
agent = ContentDecoderAgent(RedirectAgent(Agent(
reactor,
contextFactory=WebClientContextFactory(),
pool=self.ssl and None or pnconn_pool
)), [('gzip', GzipDecoder)])
try:
request = agent.request(
'GET', url, Headers(self.headers), None)
except TypeError as te:
request = agent.request(
'GET', url.encode(), Headers(self.headers), None)
if single is True:
id = time.time()
self.id = id
def received(response):
if not isinstance(response, twisted.web._newclient.Response):
_invoke(error, {"message": "Not Found"})
return
finished = Deferred()
if response.code in [401, 403]:
response.deliverBody(PubNubPamResponse(finished))
else:
response.deliverBody(PubNubResponse(finished))
return finished
def complete(data):
if single is True:
if id != self.id:
return None
try:
data = json.loads(data)
except ValueError as e:
try:
data = json.loads(data.decode("utf-8"))
except ValueError as e:
_invoke(error, {'error': 'json decode error'})
if 'error' in data and 'status' in data and 'status' != 200:
_invoke(error, data)
else:
_invoke(callback, data)
def abort():
pass
request.addCallback(received)
request.addCallback(complete)
return abort
# PubnubTornado
class PubnubTornado(PubnubCoreAsync):
def stop(self):
ioloop.stop()
def start(self):
ioloop.start()
def timeout(self, delay, callback):
ioloop.add_timeout(time.time() + float(delay), callback)
def __init__(
self,
publish_key,
subscribe_key,
secret_key=False,
cipher_key=False,
auth_key=False,
ssl_on=False,
origin='pubsub.pubnub.com'
):
super(PubnubTornado, self).__init__(
publish_key=publish_key,
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
)
self.headers = {}
self.headers['User-Agent'] = 'Python-Tornado'
self.headers['Accept-Encoding'] = self.accept_encoding
self.headers['V'] = self.version
self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)
self.id = None
self.pnsdk = 'PubNub-Python-' + 'Tornado' + '/' + self.version
def _request(self, request, callback=None, error=None,
single=False, timeout=5, connect_timeout=5):
def _invoke(func, data):
if func is not None:
func(get_data_for_user(data))
url = self.getUrl(request)
request = tornado.httpclient.HTTPRequest(
url, 'GET',
self.headers,
connect_timeout=connect_timeout,
request_timeout=timeout)
if single is True:
id = time.time()
self.id = id
def responseCallback(response):
if single is True:
if not id == self.id:
return None
body = response._get_body()
if body is None:
return
def handle_exc(*args):
return True
if response.error is not None:
with ExceptionStackContext(handle_exc):
if response.code in [403, 401]:
response.rethrow()
else:
_invoke(error, {"message": response.reason})
return
try:
data = json.loads(body)
except TypeError as e:
try:
data = json.loads(body.decode("utf-8"))
except ValueError as ve:
_invoke(error, {'error': 'json decode error'})
if 'error' in data and 'status' in data and 'status' != 200:
_invoke(error, data)
else:
_invoke(callback, data)
self.http.fetch(
request=request,
callback=responseCallback
)
def abort():
pass
return abort
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment