Skip to content

Instantly share code, notes, and snippets.

@wldcordeiro
Created April 28, 2018 17:31
Show Gist options
  • Save wldcordeiro/db399052e7ee62a331c791b81325abb6 to your computer and use it in GitHub Desktop.
Save wldcordeiro/db399052e7ee62a331c791b81325abb6 to your computer and use it in GitHub Desktop.
TypeError?
import { from } from 'rxjs'
// This is the observable that is going to be emitted once the connection/subscription has been made. This will need to be subscribed to in order to receive actual messages.
const listener = client =>
// This used to be Observable.create(observer => ...)
from(observer => {
client.on('message', (topic, message) =>
observer.next(JSON.parse(message.toString()))
)
return () => {
// Remove event listeners
client.removeAllListeners('message')
}
})
// This return an observable that will attempt to connect to a MQTT client. However, this observable is simply dealing with connection, not with messages. The only emitted item from this observable is another observable that could be listened to for messages
export default (topic, endpoint, { region, credentials }) => {
// This used to be Observable.create(observer => ...)
return from(observer => {
let client = null
Promise.all([import('aws-mqtt'), import('aws-sdk/global')]).then(
([AWSMqtt, { config }]) => {
config.update({
region,
credentials,
})
client = AWSMqtt.connect({
WebSocket: window.WebSocket,
region,
endpoint,
credentials,
connectTimeout: 30 * 1000,
})
client.on('connect', () => {
client.subscribe(`${topic}`, { qos: 1 }, err => {
if (err) {
observer.error(err)
} else {
// Send back another observable to be subscribed to
observer.next(listener(client))
}
})
})
client.on('error', err => observer.error(err))
}
)
return () => {
client && client.end()
observer.complete()
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment