Skip to content

Instantly share code, notes, and snippets.

@spolischook
Created November 3, 2018 06:01
Show Gist options
  • Save spolischook/ec6227b8b36b9c24226562f7c6020b1f to your computer and use it in GitHub Desktop.
Save spolischook/ec6227b8b36b9c24226562f7c6020b1f to your computer and use it in GitHub Desktop.
client server socket chat for raspberry pi
server {
server_name akit.kotoblog.pp.ua;
listen 80;
# WebSocket.
location /chatsocket {
proxy_pass http://localhost:8080;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Origin '';
}
location / {
proxy_pass_header Server;
proxy_set_header Host $http_host;
proxy_redirect off;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Scheme $scheme;
proxy_pass http://localhost:8080;
}
}
import tornado.websocket
import asyncio
import logging
import RPi.GPIO as GPIO
from time import sleep
LedPin = 21
def setup():
GPIO.setwarnings(False)
GPIO.cleanup()
GPIO.setmode(GPIO.BCM)
GPIO.setup(LedPin, GPIO.OUT)
GPIO.output(LedPin, GPIO.HIGH)
def destroy():
GPIO.output(LedPin, GPIO.LOW)
GPIO.cleanup()
async def main():
logging.warn('Starting main')
conn = await tornado.websocket.websocket_connect('ws://akit.kotoblog.pp.ua/chatsocket')
conn.write_message('device')
setup()
while True:
GPIO.output(LedPin, GPIO.HIGH)
await asyncio.sleep(0.3)
GPIO.output(LedPin, GPIO.LOW)
await asyncio.sleep(0.3)
message = await conn.read_message()
try:
value = int(message)
logging.warn(value)
except:
logging.warn('"%s" is not integer. Ignore it', message)
if __name__ == "__main__":
logging.warn('Starting program')
try:
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(main())
ioloop.close()
except KeyboardInterrupt:
destroy()
import tornado.websocket
import tornado.web
import tornado.ioloop
import logging
import os.path
from tornado.options import define, options
define("port", default=8080, help="run on the given port", type=int)
define("debug", default=True, help="run in debug mode")
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/", MainHandler),
(r"/chatsocket", ChatSocketHandler),
]
settings = dict(
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
template_path=os.path.join(os.path.dirname(__file__), "views"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=False,
debug=options.debug,
)
super(Application, self).__init__(handlers, **settings)
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.render("index.html")
class ChatSocketHandler(tornado.websocket.WebSocketHandler):
controller = None
device = None
def open(self):
logging.info('Client connected')
def on_message(self, message):
logging.info('Message received: %s', message)
if 'device' == message:
ChatSocketHandler.device = self
elif 'controller' == message:
ChatSocketHandler.controller = self
if self == ChatSocketHandler.controller:
sendMessage(ChatSocketHandler.controller, isDeviceOnline())
if ChatSocketHandler.device is not None:
sendMessage(ChatSocketHandler.device, message)
elif self == ChatSocketHandler.device:
if ChatSocketHandler.controller is None:
logging.info('Controller is missing')
else:
sendMessage(ChatSocketHandler.controller, isDeviceOnline())
else:
logging.warn('Urgent! Somebody broke our super security connection!!!')
def data_received(self, chunk):
logging.info("data_received")
def on_close(self):
if ChatSocketHandler.controller == self:
logging.info('Controller leave')
ChatSocketHandler.controller = None
elif ChatSocketHandler.device == self:
logging.info('Device leave')
ChatSocketHandler.device = None
if ChatSocketHandler.controller is not None:
sendMessage(ChatSocketHandler.controller, isDeviceOnline())
def isDeviceOnline():
return 'online' if ChatSocketHandler.device is not None else 'offline'
def sendMessage(client, message):
try:
client.write_message(message)
except:
logging.error("Error sending message", exc_info=True)
def main():
tornado.options.parse_command_line()
app = Application()
app.listen(options.port)
tornado.ioloop.IOLoop.current().start()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment