Skip to content

Instantly share code, notes, and snippets.

@Znack
Created April 30, 2015 12:45
Show Gist options
  • Save Znack/60a5350413a54f6304ad to your computer and use it in GitHub Desktop.
Save Znack/60a5350413a54f6304ad to your computer and use it in GitHub Desktop.
Tornado server
import json
import redis
import time
import tornado
import tornadoredis
class BaseHandler(object):
def __init__(self, client, socketHandler, *args, **kwargs):
self.client = client
self.socketHandler = socketHandler
return super(BaseHandler, self).__init__(*args, **kwargs)
def on_open(self):
self.user = self.socketHandler.user
self.user.online = True
def on_close(self, *args, **kwargs):
self.user.online = False
def on_message(self, data):
try:
self.route[data['action']](data)
except (KeyError, TypeError) as e:
self.write_message(json.dumps({"success": False, "error": "unknown action"}))
def write_message(self, data):
self.socketHandler.send(data)
import logging
import brukva
import datetime
import json
import tornado.web
import tornado.websocket
import tornado.ioloop
import tornado.httpclient
from sockjs.tornado import SockJSConnection, SockJSRouter
from django.db import connection, connections
from django.contrib.auth import get_user_model
from django.utils.importlib import import_module
from django.conf import settings
logging.basicConfig(level=logging.DEBUG if settings.DEBUG else logging.WARNING)
session_engine = import_module(settings.SESSION_ENGINE)
client = brukva.Client()
client.connect()
handlers = {}
class ApplicationLauncher:
def __init__(self):
for app_label in settings.INSTALLED_APPS:
try:
websocket_module = import_module(".".join([app_label, "websocket_handler"]))
for app_handler in websocket_module.handlers:
handlers[app_handler['label']] = app_handler['handler']
except ImportError:
continue
def create_handler(self):
router = SockJSRouter(CommonHandler, '/sockets')
return tornado.web.Application(router.urls)
class CommonHandler(SockJSConnection):
def __init__(self, *args, **kwargs):
self.handlers = {}
self.private_client = brukva.Client()
self.private_client.connect()
for handler_label, handler in handlers.items():
self.handlers[handler_label] = handler(client, self)
super(CommonHandler, self).__init__(*args, **kwargs)
def check_origin(self, origin):
return True
def on_open(self, request):
super(CommonHandler, self).on_open(request)
# frequently the db connection destroy after some time, so we must recreate it after page refresh
self._check_db_connection()
session_key = request.cookies.get(settings.SESSION_COOKIE_NAME).value
session = session_engine.SessionStore(session_key)
try:
user_id = str(session["_auth_user_id"])
self.user = get_user_model().objects.get(id=user_id)
logging.debug('Unknown session key was passed')
except (KeyError, get_user_model().DoesNotExist):
self.close()
return
logging.debug('User %s opened the connection' % self.user.email)
self.channel = "channel_%s" % user_id
self.private_client.subscribe(self.channel)
self.private_client.listen(self.show_new_message)
for handler in self.handlers.values():
if hasattr(handler, "on_open") and callable(handler.on_open):
handler.on_open()
def show_new_message(self, result):
self.send(str(result))
def on_message(self, data):
if not data:
return
logging.debug('New message from socket: %s' % data)
data = json.loads(data)
try:
message_handler = self.handlers[data['label']]
except (KeyError, TypeError) as e:
logging.warning('The handlers with label "%s" does not exists')
return self.send(json.dumps({"success": False, "error": "unknown label"}))
if hasattr(message_handler, 'on_message'):
message_handler.on_message(data)
def on_close(self):
for handler in self.handlers.values():
if hasattr(handler, "on_close") and callable(handler.on_close):
handler.on_close()
try:
self.private_client.unsubscribe(self.channel)
except AttributeError:
pass
logging.debug('User %s close the connection' % self.user.email)
def disconnect_redis():
if self.private_client.connection.in_progress:
tornado.ioloop.IOLoop.instance().add_timeout(
datetime.timedelta(0.00001),
disconnect_redis
)
else:
self.private_client.disconnect()
logging.debug('User %s disconnect the redis client' % self.user.email)
tornado.ioloop.IOLoop.instance().add_timeout(
datetime.timedelta(0.00001),
disconnect_redis
)
@staticmethod
def _check_db_connection():
# mysql is lazily connected to in django.
# connection.connection is None means
# you have not connected to mysql before
if connection.connection and not connection.is_usable():
# destroy the default mysql connection
# after this line, when you use ORM methods
# django will reconnect to the default mysql
del connections._connections.default
import json
import tornado.web
import tornado.websocket
import tornado.ioloop
import tornado.httpclient
import urllib
import redis
from django.core import serializers
from django.conf import settings
from django.db.models import Q
from im.models import Thread, Message
from project.websockets.base_handler import BaseHandler
class MessagesHandler(BaseHandler):
redis_client = redis.StrictRedis()
def __init__(self, *args, **kwargs):
super(MessagesHandler, self).__init__(*args, **kwargs)
self.route = {
"newThread": self.new_thread,
"initialThreads": self.initial_threads,
"initialMessages": self.initial_messages,
"previousMessage": self.previous_messages,
"newMessage": self.new_message,
"removeThread": self.remove_thread,
"removeMessage": self.remove_message,
"markMessageAsRead": self.mark_message_as_read,
"onlineStates": self.online_states
}
def _check_if_chunk_is_last(self, messages):
if len(messages) > settings.MESSAGES_CHUNK_CONTAIN:
return False, messages[:settings.MESSAGES_CHUNK_CONTAIN]
else:
return True, messages
def new_thread(self, data):
friends_pk = data['body']['friends']
http_client = tornado.httpclient.AsyncHTTPClient()
request = tornado.httpclient.HTTPRequest(
settings.CREATE_THREAD_API_URL,
method="POST",
body=urllib.urlencode({
"friends_pk": json.dumps(friends_pk),
"api_key": settings.API_KEY,
"sender_id": self.user.id,
})
)
request_id = data.get('__requestId', -1)
def handle_new_thread_server_response(response):
response_server_data = json.loads(response.body)
if response_server_data.get('error'):
response_data = {
"label": 'message',
"action": 'newThread',
"__requestId": request_id,
"body": {
"success": False,
"error": response_server_data['error']
}
}
self.write_message(json.dumps(response_data))
response_data = {
"label": 'message',
"action": 'newThread',
"initiator": self.user.id,
"body": {
"rawThread": response_server_data['threadData']
}
}
thread_data = json.loads(response_server_data['threadData'])[0]
removed_for_users = map(lambda user_info: user_info[0], thread_data['fields']['removed_for_users'])
for participant_pk in friends_pk:
if int(participant_pk) != int(self.user.id) and int(participant_pk) not in removed_for_users:
self.client.publish("channel_%s" % participant_pk, json.dumps(response_data))
response_data["__requestId"] = request_id
del response_data["initiator"] # it doesn't matter for current user itself
self.write_message(json.dumps(response_data))
http_client.fetch(request, handle_new_thread_server_response)
def initial_threads(self, data):
threads = Thread.objects.filter(~Q(removed_for_users=self.user) & Q(participants=self.user))\
.select_related('messages').prefetch_related("participants")
threads_in_json = serializers.serialize(
'json',
threads,
use_natural_keys=True,
)
request_id = data.get('__requestId', "-1")
data = json.dumps({
"label": 'message',
"action": 'initialThreads',
"__requestId": request_id,
'body': {
'threads': threads_in_json,
}
})
self.write_message(data)
def initial_messages(self, data):
thread_id = data['body']['threadPk']
loaded_messages_pk = data['body']['loadedMessages']
thread = Thread.objects.select_related('messages').prefetch_related("participants").get(id=thread_id)
messages = thread.messages.order_by("-id") \
.exclude(Q(removed_for_users=self.user) | Q(pk__in=loaded_messages_pk))[:settings.MESSAGES_CHUNK_CONTAIN + 1]
is_last_chunk, messages = self._check_if_chunk_is_last(messages)
messages = serializers.serialize(
'json',
messages,
use_natural_keys=True,
)
participants = serializers.serialize(
'json',
thread.participants.all(),
fields=('first_name', 'last_name', 'is_online'),
extras=('avatar_url',)
)
request_id = data.get('__requestId', "-1")
data = json.dumps({
"label": 'message',
"action": 'initialMessages:%s' % thread_id,
"__requestId": request_id,
"body": {
'thread_id': thread_id,
'messages': messages,
'is_last_chunk': is_last_chunk,
'participants': participants,
},
})
self.write_message(data)
def previous_messages(self, data):
thread_id = data['body']['thread_id']
thread = Thread.objects.select_related('messages').prefetch_related("participants").get(id=thread_id)
last_message_id = data['body']['last_message_id']
messages = thread.messages.filter(~Q(removed_for_users=self.user) & Q(id__lt=last_message_id)) \
.order_by("-id")[:settings.MESSAGES_CHUNK_CONTAIN + 1]
is_last_chunk, messages = self._check_if_chunk_is_last(messages)
messages = serializers.serialize(
'json',
messages,
use_natural_keys=True,
)
request_id = data.get('__requestId', "-1")
data = json.dumps({
"label": 'message',
"action": 'previousMessage:%s' % thread_id,
"__requestId": request_id,
"body": {
'thread_id': thread_id,
'messages': messages,
'is_last_chunk': is_last_chunk
},
})
self.write_message(data)
def new_message(self, data):
text = data['body']['text']
http_client = tornado.httpclient.AsyncHTTPClient()
request = tornado.httpclient.HTTPRequest(
"".join([settings.SEND_MESSAGE_API_URL, str(data['body']['thread_id']), '/']),
method="POST",
body=urllib.urlencode({
"message": text.encode("utf-8"),
"images": json.dumps(data['body']['images']),
"api_key": settings.API_KEY,
"sender_id": self.user.id,
})
)
request_id = data.get('__requestId', "-1")
def handle_new_message_server_response(response):
new_message_data = json.loads(response.body)
message_in_json = new_message_data['message']
message = json.loads(new_message_data['message'])[0]
# we should extract first element, because serializers work only with arrays
thread_id = message['fields']['thread']
thread = Thread.objects.prefetch_related("participants").get(id=thread_id)
participants = thread.participants.all()
other_participants = participants.exclude(id=self.user.id)
response_data = {
"label": 'message',
"action": 'newMessage:%s' % thread_id,
"initiator": self.user.id,
"body": message_in_json
}
for participant in other_participants:
if int(participant.id) != int(self.user.id):
self.client.publish("channel_%s" % participant.id, json.dumps(response_data))
response_data["__requestId"] = request_id
del response_data["initiator"] # it doesn't matter for current user itself
self.write_message(json.dumps(response_data))
read_updated_messages = json.loads(new_message_data['read_updated_messages'])
for read_updated_message in read_updated_messages:
response_data = json.dumps({
"label": 'message',
"success": True,
"action": 'markMessageAsRead:%s' % read_updated_message['pk'],
"body": {
"pk": read_updated_message['pk'],
"is_read": True # we can't trust serialized data here because serialization was before updating
}
})
for participant in participants:
self.client.publish("channel_%s" % participant.id, response_data)
http_client.fetch(request, handle_new_message_server_response)
def remove_thread(self, data):
thread_pk = data['body']['threadPk']
response_data = {
"label": 'message',
"__requestId": data.get('__requestId', -1)
}
try:
thread = Thread.objects.get(id=thread_pk)
except Thread.DoesNotExist:
response_data["success"] = False
response_data["errorMessage"] = "No such thread"
return self.write_message(json.dumps(response_data))
thread.removed_for_users.add(self.user)
response_data["success"] = True
response_data["action"] = 'removeThread'
return self.write_message(json.dumps(response_data))
def remove_message(self, data):
message_pk = data['body']['messagePk']
response_data = {
"label": 'message',
"__requestId": data.get('__requestId', -1)
}
try:
message = Message.objects.get(id=message_pk)
except Message.DoesNotExist:
response_data["success"] = False
response_data["errorMessage"] = "No such message"
return self.write_message(json.dumps(response_data))
message.removed_for_users.add(self.user)
response_data["success"] = True
response_data["action"] = 'removeMessage'
return self.write_message(json.dumps(response_data))
def mark_message_as_read(self, data):
message_pk = data['body']['messagePk']
request_id = data.get('__requestId', -1)
response_data = {
"label": 'message'
}
try:
message = Message.objects.prefetch_related("thread__participants").get(id=message_pk)
except Message.DoesNotExist:
response_data["success"] = False
response_data["errorMessage"] = "No such message"
return self.write_message(json.dumps(response_data))
message.is_read = True
message.save()
response_data["success"] = True
response_data["action"] = 'markMessageAsRead:%s' % message.id
response_data["initiator"] = self.user.id,
response_data["body"] = {
"pk": message.pk,
"is_read": message.is_read
}
participants = message.thread.participants.exclude(id=self.user.id)
for participant in participants:
if int(participant.id) != int(self.user.id):
self.client.publish("channel_%s" % participant.id, json.dumps(response_data))
del response_data["initiator"] # it doesn't matter for current user itself
response_data["__requestId"] = request_id
self.write_message(json.dumps(response_data))
def online_states(self, data):
users_pk = data['body']['users']
users_states = []
for user_pk in users_pk:
online = self.redis_client.hget(
"".join(["user_", user_pk]),
"online_state",
)
users_states.append({
"pk": user_pk,
"online": True if online == "True" else False
})
request_id = data.get('__requestId', "-1")
response_data = json.dumps({
"label": 'message',
"success": True,
"action": 'onlineStates',
"__requestId": request_id,
"body": {
"onlineStates": users_states,
}
})
self.write_message(response_data)
handlers = [
{"label": "message", "handler": MessagesHandler},
]
from project.websockets.base_handler import BaseHandler
import json
from django.contrib.auth import get_user_model
import tornado
import tornadoredis
from im.models import Thread
class NotificationHandler(BaseHandler):
redis_connected = False
def __init__(self, *args, **kwargs):
super(NotificationHandler, self).__init__(*args, **kwargs)
self.redis_client = tornadoredis.Client()
self.redis_client.connect()
self.connect_to_redis()
def connect_to_redis(self):
if not NotificationHandler.redis_connected:
self.listen_redis()
NotificationHandler.redis_connected = True
@tornado.gen.engine
def listen_redis(self):
yield tornado.gen.Task(self.redis_client.subscribe, ['notifications'])
self.redis_client.listen(self.redishandler)
self.route = {
"friends": self.friends_handler,
"messages": self.messages_handler,
"wallpost": self.wallpost_handler
}
def on_close(self):
self.redis_client.unsubscribe('notifications')
self.redis_client.disconnect()
return super(NotificationHandler, self).on_close()
def redishandler(self, data):
if data.kind == 'message':
if data.channel == 'notifications':
body = data.body
body = json.loads(body)
self.route[body['notification_label']](body)
def notification(self, data):
if data.get('receiver_id'):
receiver_id = data.get('receiver_id')
elif data.get('thread_id'):
thread = Thread.objects.get(id=data.get('thread_id'))
receivers = thread.participants.exclude(id=data['from_user_id'])
try:
self.client.publish("channel_%s" % receiver_id, json.dumps(self.get_notification(receiver_id, data)))
except UnboundLocalError:
for receiver in receivers:
self.client.publish("channel_%s" % receiver.id, json.dumps(self.get_notification(receiver.id, data)))
def get_notification(self, receiver_id, data):
sender = get_user_model().objects.get(id=data['from_user_id'])
sender_avatar_url = sender.avatar_url
sender = sender.get_full_name()
receiver_lang = get_user_model().objects.get(id=receiver_id).language
notification = {
"label": 'notification',
"action": data['action'],
"body": {
'msglang': receiver_lang,
'thread_id': data.get('thread_id'),
'senderName': sender,
'avatarUrl': sender_avatar_url
},
}
return notification
def friends_handler(self, data):
self.notification(data)
def messages_handler(self, data):
self.notification(data)
def wallpost_handler(self, data):
self.notification(data)
handlers = [
{"label": "notifications", "handler": NotificationHandler},
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment