Created
April 30, 2015 12:45
-
-
Save Znack/60a5350413a54f6304ad to your computer and use it in GitHub Desktop.
Tornado server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import redis | |
import time | |
import tornado | |
import tornadoredis | |
class BaseHandler(object): | |
def __init__(self, client, socketHandler, *args, **kwargs): | |
self.client = client | |
self.socketHandler = socketHandler | |
return super(BaseHandler, self).__init__(*args, **kwargs) | |
def on_open(self): | |
self.user = self.socketHandler.user | |
self.user.online = True | |
def on_close(self, *args, **kwargs): | |
self.user.online = False | |
def on_message(self, data): | |
try: | |
self.route[data['action']](data) | |
except (KeyError, TypeError) as e: | |
self.write_message(json.dumps({"success": False, "error": "unknown action"})) | |
def write_message(self, data): | |
self.socketHandler.send(data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import brukva | |
import datetime | |
import json | |
import tornado.web | |
import tornado.websocket | |
import tornado.ioloop | |
import tornado.httpclient | |
from sockjs.tornado import SockJSConnection, SockJSRouter | |
from django.db import connection, connections | |
from django.contrib.auth import get_user_model | |
from django.utils.importlib import import_module | |
from django.conf import settings | |
logging.basicConfig(level=logging.DEBUG if settings.DEBUG else logging.WARNING) | |
session_engine = import_module(settings.SESSION_ENGINE) | |
client = brukva.Client() | |
client.connect() | |
handlers = {} | |
class ApplicationLauncher: | |
def __init__(self): | |
for app_label in settings.INSTALLED_APPS: | |
try: | |
websocket_module = import_module(".".join([app_label, "websocket_handler"])) | |
for app_handler in websocket_module.handlers: | |
handlers[app_handler['label']] = app_handler['handler'] | |
except ImportError: | |
continue | |
def create_handler(self): | |
router = SockJSRouter(CommonHandler, '/sockets') | |
return tornado.web.Application(router.urls) | |
class CommonHandler(SockJSConnection): | |
def __init__(self, *args, **kwargs): | |
self.handlers = {} | |
self.private_client = brukva.Client() | |
self.private_client.connect() | |
for handler_label, handler in handlers.items(): | |
self.handlers[handler_label] = handler(client, self) | |
super(CommonHandler, self).__init__(*args, **kwargs) | |
def check_origin(self, origin): | |
return True | |
def on_open(self, request): | |
super(CommonHandler, self).on_open(request) | |
# frequently the db connection destroy after some time, so we must recreate it after page refresh | |
self._check_db_connection() | |
session_key = request.cookies.get(settings.SESSION_COOKIE_NAME).value | |
session = session_engine.SessionStore(session_key) | |
try: | |
user_id = str(session["_auth_user_id"]) | |
self.user = get_user_model().objects.get(id=user_id) | |
logging.debug('Unknown session key was passed') | |
except (KeyError, get_user_model().DoesNotExist): | |
self.close() | |
return | |
logging.debug('User %s opened the connection' % self.user.email) | |
self.channel = "channel_%s" % user_id | |
self.private_client.subscribe(self.channel) | |
self.private_client.listen(self.show_new_message) | |
for handler in self.handlers.values(): | |
if hasattr(handler, "on_open") and callable(handler.on_open): | |
handler.on_open() | |
def show_new_message(self, result): | |
self.send(str(result)) | |
def on_message(self, data): | |
if not data: | |
return | |
logging.debug('New message from socket: %s' % data) | |
data = json.loads(data) | |
try: | |
message_handler = self.handlers[data['label']] | |
except (KeyError, TypeError) as e: | |
logging.warning('The handlers with label "%s" does not exists') | |
return self.send(json.dumps({"success": False, "error": "unknown label"})) | |
if hasattr(message_handler, 'on_message'): | |
message_handler.on_message(data) | |
def on_close(self): | |
for handler in self.handlers.values(): | |
if hasattr(handler, "on_close") and callable(handler.on_close): | |
handler.on_close() | |
try: | |
self.private_client.unsubscribe(self.channel) | |
except AttributeError: | |
pass | |
logging.debug('User %s close the connection' % self.user.email) | |
def disconnect_redis(): | |
if self.private_client.connection.in_progress: | |
tornado.ioloop.IOLoop.instance().add_timeout( | |
datetime.timedelta(0.00001), | |
disconnect_redis | |
) | |
else: | |
self.private_client.disconnect() | |
logging.debug('User %s disconnect the redis client' % self.user.email) | |
tornado.ioloop.IOLoop.instance().add_timeout( | |
datetime.timedelta(0.00001), | |
disconnect_redis | |
) | |
@staticmethod | |
def _check_db_connection(): | |
# mysql is lazily connected to in django. | |
# connection.connection is None means | |
# you have not connected to mysql before | |
if connection.connection and not connection.is_usable(): | |
# destroy the default mysql connection | |
# after this line, when you use ORM methods | |
# django will reconnect to the default mysql | |
del connections._connections.default |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import tornado.web | |
import tornado.websocket | |
import tornado.ioloop | |
import tornado.httpclient | |
import urllib | |
import redis | |
from django.core import serializers | |
from django.conf import settings | |
from django.db.models import Q | |
from im.models import Thread, Message | |
from project.websockets.base_handler import BaseHandler | |
class MessagesHandler(BaseHandler): | |
redis_client = redis.StrictRedis() | |
def __init__(self, *args, **kwargs): | |
super(MessagesHandler, self).__init__(*args, **kwargs) | |
self.route = { | |
"newThread": self.new_thread, | |
"initialThreads": self.initial_threads, | |
"initialMessages": self.initial_messages, | |
"previousMessage": self.previous_messages, | |
"newMessage": self.new_message, | |
"removeThread": self.remove_thread, | |
"removeMessage": self.remove_message, | |
"markMessageAsRead": self.mark_message_as_read, | |
"onlineStates": self.online_states | |
} | |
def _check_if_chunk_is_last(self, messages): | |
if len(messages) > settings.MESSAGES_CHUNK_CONTAIN: | |
return False, messages[:settings.MESSAGES_CHUNK_CONTAIN] | |
else: | |
return True, messages | |
def new_thread(self, data): | |
friends_pk = data['body']['friends'] | |
http_client = tornado.httpclient.AsyncHTTPClient() | |
request = tornado.httpclient.HTTPRequest( | |
settings.CREATE_THREAD_API_URL, | |
method="POST", | |
body=urllib.urlencode({ | |
"friends_pk": json.dumps(friends_pk), | |
"api_key": settings.API_KEY, | |
"sender_id": self.user.id, | |
}) | |
) | |
request_id = data.get('__requestId', -1) | |
def handle_new_thread_server_response(response): | |
response_server_data = json.loads(response.body) | |
if response_server_data.get('error'): | |
response_data = { | |
"label": 'message', | |
"action": 'newThread', | |
"__requestId": request_id, | |
"body": { | |
"success": False, | |
"error": response_server_data['error'] | |
} | |
} | |
self.write_message(json.dumps(response_data)) | |
response_data = { | |
"label": 'message', | |
"action": 'newThread', | |
"initiator": self.user.id, | |
"body": { | |
"rawThread": response_server_data['threadData'] | |
} | |
} | |
thread_data = json.loads(response_server_data['threadData'])[0] | |
removed_for_users = map(lambda user_info: user_info[0], thread_data['fields']['removed_for_users']) | |
for participant_pk in friends_pk: | |
if int(participant_pk) != int(self.user.id) and int(participant_pk) not in removed_for_users: | |
self.client.publish("channel_%s" % participant_pk, json.dumps(response_data)) | |
response_data["__requestId"] = request_id | |
del response_data["initiator"] # it doesn't matter for current user itself | |
self.write_message(json.dumps(response_data)) | |
http_client.fetch(request, handle_new_thread_server_response) | |
def initial_threads(self, data): | |
threads = Thread.objects.filter(~Q(removed_for_users=self.user) & Q(participants=self.user))\ | |
.select_related('messages').prefetch_related("participants") | |
threads_in_json = serializers.serialize( | |
'json', | |
threads, | |
use_natural_keys=True, | |
) | |
request_id = data.get('__requestId', "-1") | |
data = json.dumps({ | |
"label": 'message', | |
"action": 'initialThreads', | |
"__requestId": request_id, | |
'body': { | |
'threads': threads_in_json, | |
} | |
}) | |
self.write_message(data) | |
def initial_messages(self, data): | |
thread_id = data['body']['threadPk'] | |
loaded_messages_pk = data['body']['loadedMessages'] | |
thread = Thread.objects.select_related('messages').prefetch_related("participants").get(id=thread_id) | |
messages = thread.messages.order_by("-id") \ | |
.exclude(Q(removed_for_users=self.user) | Q(pk__in=loaded_messages_pk))[:settings.MESSAGES_CHUNK_CONTAIN + 1] | |
is_last_chunk, messages = self._check_if_chunk_is_last(messages) | |
messages = serializers.serialize( | |
'json', | |
messages, | |
use_natural_keys=True, | |
) | |
participants = serializers.serialize( | |
'json', | |
thread.participants.all(), | |
fields=('first_name', 'last_name', 'is_online'), | |
extras=('avatar_url',) | |
) | |
request_id = data.get('__requestId', "-1") | |
data = json.dumps({ | |
"label": 'message', | |
"action": 'initialMessages:%s' % thread_id, | |
"__requestId": request_id, | |
"body": { | |
'thread_id': thread_id, | |
'messages': messages, | |
'is_last_chunk': is_last_chunk, | |
'participants': participants, | |
}, | |
}) | |
self.write_message(data) | |
def previous_messages(self, data): | |
thread_id = data['body']['thread_id'] | |
thread = Thread.objects.select_related('messages').prefetch_related("participants").get(id=thread_id) | |
last_message_id = data['body']['last_message_id'] | |
messages = thread.messages.filter(~Q(removed_for_users=self.user) & Q(id__lt=last_message_id)) \ | |
.order_by("-id")[:settings.MESSAGES_CHUNK_CONTAIN + 1] | |
is_last_chunk, messages = self._check_if_chunk_is_last(messages) | |
messages = serializers.serialize( | |
'json', | |
messages, | |
use_natural_keys=True, | |
) | |
request_id = data.get('__requestId', "-1") | |
data = json.dumps({ | |
"label": 'message', | |
"action": 'previousMessage:%s' % thread_id, | |
"__requestId": request_id, | |
"body": { | |
'thread_id': thread_id, | |
'messages': messages, | |
'is_last_chunk': is_last_chunk | |
}, | |
}) | |
self.write_message(data) | |
def new_message(self, data): | |
text = data['body']['text'] | |
http_client = tornado.httpclient.AsyncHTTPClient() | |
request = tornado.httpclient.HTTPRequest( | |
"".join([settings.SEND_MESSAGE_API_URL, str(data['body']['thread_id']), '/']), | |
method="POST", | |
body=urllib.urlencode({ | |
"message": text.encode("utf-8"), | |
"images": json.dumps(data['body']['images']), | |
"api_key": settings.API_KEY, | |
"sender_id": self.user.id, | |
}) | |
) | |
request_id = data.get('__requestId', "-1") | |
def handle_new_message_server_response(response): | |
new_message_data = json.loads(response.body) | |
message_in_json = new_message_data['message'] | |
message = json.loads(new_message_data['message'])[0] | |
# we should extract first element, because serializers work only with arrays | |
thread_id = message['fields']['thread'] | |
thread = Thread.objects.prefetch_related("participants").get(id=thread_id) | |
participants = thread.participants.all() | |
other_participants = participants.exclude(id=self.user.id) | |
response_data = { | |
"label": 'message', | |
"action": 'newMessage:%s' % thread_id, | |
"initiator": self.user.id, | |
"body": message_in_json | |
} | |
for participant in other_participants: | |
if int(participant.id) != int(self.user.id): | |
self.client.publish("channel_%s" % participant.id, json.dumps(response_data)) | |
response_data["__requestId"] = request_id | |
del response_data["initiator"] # it doesn't matter for current user itself | |
self.write_message(json.dumps(response_data)) | |
read_updated_messages = json.loads(new_message_data['read_updated_messages']) | |
for read_updated_message in read_updated_messages: | |
response_data = json.dumps({ | |
"label": 'message', | |
"success": True, | |
"action": 'markMessageAsRead:%s' % read_updated_message['pk'], | |
"body": { | |
"pk": read_updated_message['pk'], | |
"is_read": True # we can't trust serialized data here because serialization was before updating | |
} | |
}) | |
for participant in participants: | |
self.client.publish("channel_%s" % participant.id, response_data) | |
http_client.fetch(request, handle_new_message_server_response) | |
def remove_thread(self, data): | |
thread_pk = data['body']['threadPk'] | |
response_data = { | |
"label": 'message', | |
"__requestId": data.get('__requestId', -1) | |
} | |
try: | |
thread = Thread.objects.get(id=thread_pk) | |
except Thread.DoesNotExist: | |
response_data["success"] = False | |
response_data["errorMessage"] = "No such thread" | |
return self.write_message(json.dumps(response_data)) | |
thread.removed_for_users.add(self.user) | |
response_data["success"] = True | |
response_data["action"] = 'removeThread' | |
return self.write_message(json.dumps(response_data)) | |
def remove_message(self, data): | |
message_pk = data['body']['messagePk'] | |
response_data = { | |
"label": 'message', | |
"__requestId": data.get('__requestId', -1) | |
} | |
try: | |
message = Message.objects.get(id=message_pk) | |
except Message.DoesNotExist: | |
response_data["success"] = False | |
response_data["errorMessage"] = "No such message" | |
return self.write_message(json.dumps(response_data)) | |
message.removed_for_users.add(self.user) | |
response_data["success"] = True | |
response_data["action"] = 'removeMessage' | |
return self.write_message(json.dumps(response_data)) | |
def mark_message_as_read(self, data): | |
message_pk = data['body']['messagePk'] | |
request_id = data.get('__requestId', -1) | |
response_data = { | |
"label": 'message' | |
} | |
try: | |
message = Message.objects.prefetch_related("thread__participants").get(id=message_pk) | |
except Message.DoesNotExist: | |
response_data["success"] = False | |
response_data["errorMessage"] = "No such message" | |
return self.write_message(json.dumps(response_data)) | |
message.is_read = True | |
message.save() | |
response_data["success"] = True | |
response_data["action"] = 'markMessageAsRead:%s' % message.id | |
response_data["initiator"] = self.user.id, | |
response_data["body"] = { | |
"pk": message.pk, | |
"is_read": message.is_read | |
} | |
participants = message.thread.participants.exclude(id=self.user.id) | |
for participant in participants: | |
if int(participant.id) != int(self.user.id): | |
self.client.publish("channel_%s" % participant.id, json.dumps(response_data)) | |
del response_data["initiator"] # it doesn't matter for current user itself | |
response_data["__requestId"] = request_id | |
self.write_message(json.dumps(response_data)) | |
def online_states(self, data): | |
users_pk = data['body']['users'] | |
users_states = [] | |
for user_pk in users_pk: | |
online = self.redis_client.hget( | |
"".join(["user_", user_pk]), | |
"online_state", | |
) | |
users_states.append({ | |
"pk": user_pk, | |
"online": True if online == "True" else False | |
}) | |
request_id = data.get('__requestId', "-1") | |
response_data = json.dumps({ | |
"label": 'message', | |
"success": True, | |
"action": 'onlineStates', | |
"__requestId": request_id, | |
"body": { | |
"onlineStates": users_states, | |
} | |
}) | |
self.write_message(response_data) | |
handlers = [ | |
{"label": "message", "handler": MessagesHandler}, | |
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from project.websockets.base_handler import BaseHandler | |
import json | |
from django.contrib.auth import get_user_model | |
import tornado | |
import tornadoredis | |
from im.models import Thread | |
class NotificationHandler(BaseHandler): | |
redis_connected = False | |
def __init__(self, *args, **kwargs): | |
super(NotificationHandler, self).__init__(*args, **kwargs) | |
self.redis_client = tornadoredis.Client() | |
self.redis_client.connect() | |
self.connect_to_redis() | |
def connect_to_redis(self): | |
if not NotificationHandler.redis_connected: | |
self.listen_redis() | |
NotificationHandler.redis_connected = True | |
@tornado.gen.engine | |
def listen_redis(self): | |
yield tornado.gen.Task(self.redis_client.subscribe, ['notifications']) | |
self.redis_client.listen(self.redishandler) | |
self.route = { | |
"friends": self.friends_handler, | |
"messages": self.messages_handler, | |
"wallpost": self.wallpost_handler | |
} | |
def on_close(self): | |
self.redis_client.unsubscribe('notifications') | |
self.redis_client.disconnect() | |
return super(NotificationHandler, self).on_close() | |
def redishandler(self, data): | |
if data.kind == 'message': | |
if data.channel == 'notifications': | |
body = data.body | |
body = json.loads(body) | |
self.route[body['notification_label']](body) | |
def notification(self, data): | |
if data.get('receiver_id'): | |
receiver_id = data.get('receiver_id') | |
elif data.get('thread_id'): | |
thread = Thread.objects.get(id=data.get('thread_id')) | |
receivers = thread.participants.exclude(id=data['from_user_id']) | |
try: | |
self.client.publish("channel_%s" % receiver_id, json.dumps(self.get_notification(receiver_id, data))) | |
except UnboundLocalError: | |
for receiver in receivers: | |
self.client.publish("channel_%s" % receiver.id, json.dumps(self.get_notification(receiver.id, data))) | |
def get_notification(self, receiver_id, data): | |
sender = get_user_model().objects.get(id=data['from_user_id']) | |
sender_avatar_url = sender.avatar_url | |
sender = sender.get_full_name() | |
receiver_lang = get_user_model().objects.get(id=receiver_id).language | |
notification = { | |
"label": 'notification', | |
"action": data['action'], | |
"body": { | |
'msglang': receiver_lang, | |
'thread_id': data.get('thread_id'), | |
'senderName': sender, | |
'avatarUrl': sender_avatar_url | |
}, | |
} | |
return notification | |
def friends_handler(self, data): | |
self.notification(data) | |
def messages_handler(self, data): | |
self.notification(data) | |
def wallpost_handler(self, data): | |
self.notification(data) | |
handlers = [ | |
{"label": "notifications", "handler": NotificationHandler}, | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment