Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example of trying to use Twisted's trial as an asynchronous unit test framework with paho-mqtt in python
#!/usr/bin/python
from __future__ import print_function
from threading import Thread
from twisted.internet.defer import Deferred
from twisted.trial.unittest import TestCase
from paho.mqtt.client import Client
class MqttTestCase(TestCase):
def setUp(self):
messaged = Deferred()
receiver = Client()
sender = Client()
subscription = "button/1"
def handle_receiver_messaged(client, userdata, msg):
print("receiver_messaged...",end='')
messaged.callback(msg)
def handle_sender_connected(client, userdata, flags, rc):
print("sender_connected...",end='')
receiver.on_message = handle_receiver_messaged
sender.publish(subscription,"Hello World")
def handle_receiver_subscribed(client, userdata, mid, granted_qos):
print("receiver_subscribed...",end='')
sender.on_connect = handle_sender_connected
sender.connect("localhost", 1883, 5)
sender.loop_start()
def handle_receiver_connected(client, userdata, flags, rc):
print("receiver_connected...",end='')
receiver.on_subscribe = handle_receiver_subscribed
receiver.subscribe(subscription)
def connect_receiver():
receiver.on_connect = handle_receiver_connected
receiver.connect("localhost", 1883, 5)
receiver.loop_start()
connect_receiver()
self.sender = sender
self.receiver = receiver
self.messaged = messaged
def test_messaged(self):
print("Running test_messaged successfully")
return self.messaged
# actually run within the receiver loop
# (triggered by a message receive)
def tearDown(self):
print("Running tearDown")
self.sender.disconnect()
self.receiver.disconnect()
self.sender.loop_stop()
Thread(target=self.receiver.loop_stop).start() # join is otherwise called from the receiver's own thread
@cefn

This comment has been minimized.

Copy link
Owner Author

@cefn cefn commented May 18, 2015

This runtest.py file is run by calling

trial runtest.py

...and assumes there is a Mosquitto server running on the system on its default port (this can be achieved by installing mosquitto via apt-get on debian or ubuntu).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment