Skip to content

Instantly share code, notes, and snippets.

@malcolmsparks
Last active December 12, 2017 07:13
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save malcolmsparks/6044878 to your computer and use it in GitHub Desktop.
Save malcolmsparks/6044878 to your computer and use it in GitHub Desktop.
MQTT example for Clojure core.async
(ns mqtt-insertion.core
(:require [clojure.core.async :refer :all])
(:import (org.eclipse.paho.client.mqttv3
MqttCallback
MqttAsyncClient
MqttConnectOptions
MqttDeliveryToken
MqttException
MqttMessage
MqttTopic
)
(org.eclipse.paho.client.mqttv3.persist
MqttDefaultFilePersistence)))
(def tok->chan (atom {}))
(defn- mqtt-callback
"Function called after delivery confirmation"
[]
(reify MqttCallback
(connectionLost [_ cause]
nil)
(messageArrived [_ topic message]
nil)
(deliveryComplete [_ tok]
(when-let [c (@tok->chan tok)]
(go (>! c :arrived))
(swap! tok->chan dissoc tok)))))
(defn mqtt-connect
"Returns a MqttClient."
[broker-url client-id persistence]
(doto (MqttAsyncClient. broker-url client-id persistence)
(.setCallback (mqtt-callback))
(.connect (doto (MqttConnectOptions.)
(.setCleanSession true)
(.setKeepAliveInterval 30)))))
(defn- mqtt-create-message
"Creates a MQTT message."
([{:keys [qos retained] :or {qos 0 retained false}} ^String message]
(doto (MqttMessage. (.getBytes message))
(.setQos qos)
(.setRetained retained)))
([^String message]
(mqtt-create-message {} message)))
(defn- mqtt-publish
"Publishes MESSAGE to TOPIC"
[client topic message]
(.publish client topic message))
(defn test-me []
(let [persistence (new MqttDefaultFilePersistence)
client (mqtt-connect "tcp://localhost:1883" "emacs" persistence)]
(Thread/sleep 400)
(try
(dotimes [n 10]
(go
(loop []
(let [c (chan)
tok (mqtt-publish client "mousetrap" (mqtt-create-message (str "sprung-" n)))]
(swap! tok->chan assoc tok c)
(when-not (alts! c (timeout 500))
(swap! tok->chan dissoc tok)
(recur))
))))
(finally
(Thread/sleep 400)
(.disconnect client)
(.close client)
(.close persistence)))))
;; (test-me)
@bcambel
Copy link

bcambel commented Jul 28, 2016

Thanks for the example @malcolmsparks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment