Skip to content

Instantly share code, notes, and snippets.

@hihebark
Last active January 15, 2018 16:08
Show Gist options
  • Save hihebark/a0a2aadb5cda54eb2c50534ba874ed44 to your computer and use it in GitHub Desktop.
Save hihebark/a0a2aadb5cda54eb2c50534ba874ed44 to your computer and use it in GitHub Desktop.
python broker that work with java clients
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" LICENSE:
Copyleft 2018 Hihebark
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
( CopyLeft License ) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import json
import asyncio
PORT = 1883
HOST = 'localhost'
clients = []
class Publisher(object):
def __init__(self, data = None, topic = None):
self.data = data
self.topic = topic
def getData(self): return self.data
def getTopic(self): return self.topic
def setData(self, data): self.data = data
def setTopic(self, topic): self.topic = topic
class Broker(asyncio.Protocol):
def __init__(self):
self.topic = []
self.username = None
self.isReceived = False
def getisReceived(self): return self.isReceived
def setisReceived(self, newValue): self.isReceived = newValue
def getUsername(self): return self.username
def setUsername(self, newValue): self.username = newValue
def getTopic(self): return self.topic
def setTopic(self, newValue): self.topic.append(newValue)
def connection_made(self, transport):
self.transport = transport
self.peername = transport.get_extra_info("peername")
self.say("CONNECT")
clients.append(self)
def broadcast(self):
while True:
if myPublisher.data is not None:
print("\033[96m[*] Publishing to {} subscriber(s) the data {}\033[0m".format(len(clients), myPublisher.data))
for client in clients:
print("\033[97m[*] Subscriber name: {} - {} - {}\033[0m".format(client.peername, client.username, client.isReceived))
if not client.isReceived:
print("\033[96m[*] Publishing to {}\033[0m".format(client.peername))
client.say(myPublisher.getData())
client.setisReceived(True)
break
def say(self, data):
self.transport.write((data+'\n').encode("ascii"))
def data_received(self, data):
try:
dataToJson = json.loads(data.decode())
topic = dataToJson["topic"] if dataToJson["topic"] is not None else set()
code = dataToJson["code"] #required
username = dataToJson["username"] #required
clientType = dataToJson["clientType"] #required
if code == "CONNECT":
print ('\033[92m[*] Got a connection from: {} there is: {} client(s)\033[0m'.format(username, len(clients)))
if clientType == "SUBSCRIBER":
self.setUsername(username)
elif code == "PUBLISH":
data = dataToJson["temperature"]
print ('\033[93m[*] {} want to publish in {} - {}\033[0m'.format(username, topic, data))
if clientType == "PUBLISHER":
myPublisher.setData(data) #<<Set the data
myPublisher.setTopic(topic) #<<Set the topic
for client in clients:
client.isReceived = False
self.say("PUBACK")
elif code == "SUBSCRIBE":
print ('\033[94m[*] {} want to subscribe to {}\033[0m'.format(username, topic))
for subscriber in clients:
self.setTopic(topic)
self.say("SUBACK")
self.broadcast()
elif code == "UNSUBSCRIBE":
print ('\033[91m[*] {} want to unsubscribe in {}\033[0m'.format(username, topic))
for subscriber in clients:
if subscriber.username == username:
subscriber.topics = ""
self.say("UNSUBACK")
clients.remove(self)
elif code == "DISCONNECT":
print ('\033[90m[*] {} want to disconnect\033[0m'.format(username))
self.connection_lost(code)
elif code == "PINGREQ":
self.broadcast()
else:
print ('\033[91m[error] {} not found in the list of the code\033[0m'.format(code))
raise()
pass
except socket.error as e:
if e.errno != errno.ECONNRESET:
raise #Not error we are looking for
print("[ERROR] Socket error ",e)
pass #Handle error here or not.
def connection_lost(self, e):
#pass
print("\033[91m[i] Connection lost: {}\033[0m".format(e))
self.say("END")
try:
clients.remove(self)
except ValueError as e:
pass
if __name__ == '__main__':
print("[!] Starting..")
try:
myPublisher = Publisher()
loop = asyncio.get_event_loop()
coro = loop.create_server(Broker, HOST, PORT)
server = loop.run_until_complete(coro)
for socket in server.sockets:
print("serving on {}".format(socket.getsockname()))
loop.run_forever()
except KeyboardInterrupt:
print ('\033[91m[?] Exiting ...\033[0m')
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment