NGINX Plus for the IoT: Load Balancing MQTT
# Pull base image. The official docker openjdk-8 image is used here. | |
FROM java:8-jdk | |
# Copy HiveMQ to container | |
COPY hivemq.zip /tmp/ | |
#Install wget and unzip, then download and install HiveMQ. | |
RUN \ | |
apt-get install -y wget unzip &&\ | |
unzip /tmp/hivemq.zip -d /opt/ &&\ | |
mv /opt/hivemq-* /opt/hivemq | |
# Define working directory. | |
WORKDIR /opt/hivemq | |
# Define HIVEMQ_HOME variable | |
ENV HIVEMQ_HOME /opt/hivemq | |
# Expose MQTT port | |
EXPOSE 1883 | |
# Define default command. Here we use HiveMQ's run script. | |
CMD ["/opt/hivemq/bin/run.sh"] |
var client_messages = 1; | |
var client_id_str = "-"; | |
function getClientId(s) { | |
s.on('upload', function (data, flags) { | |
if ( data.length == 0 ) { // Initial calls may contain no data, so | |
s.log("No buffer yet"); // ask that we get called again | |
//s.done(1); // (supposing that code=1 means that) | |
return; | |
} else if ( client_messages == 1 ) { // CONNECT is first packet from the client | |
// CONNECT packet is 1, using upper 4 bits (00010000 to 00011111) | |
var packet_type_flags_byte = data.charCodeAt(0); | |
s.log("MQTT packet type+flags = " + packet_type_flags_byte.toString()); | |
if ( packet_type_flags_byte >= 16 && packet_type_flags_byte < 32 ) { | |
// Calculate remaining length with variable encoding scheme | |
var multiplier = 1; | |
var remaining_len_val = 0; | |
var remaining_len_byte; | |
for (var remaining_len_pos = 1; remaining_len_pos < 5; remaining_len_pos++ ) { | |
remaining_len_byte = data.charCodeAt(remaining_len_pos); | |
if ( remaining_len_byte == 0 ) break; // Stop decoding on 0 | |
remaining_len_val += (remaining_len_byte & 127) * multiplier; | |
multiplier *= 128; | |
} | |
// Extract ClientId based on length defined by 2-byte encoding | |
var payload_offset = remaining_len_pos + 12; // Skip fixed header | |
var client_id_len_msb = data.charCodeAt(payload_offset).toString(16); | |
var client_id_len_lsb = data.charCodeAt(payload_offset + 1).toString(16); | |
if ( client_id_len_lsb.length < 2 ) client_id_len_lsb = "0" + client_id_len_lsb; | |
var client_id_len_int = parseInt(client_id_len_msb + client_id_len_lsb, 16); | |
client_id_str = data.substr(payload_offset + 2, client_id_len_int); | |
s.log("ClientId value = " + client_id_str); | |
} else { | |
s.log("Received unexpected MQTT packet type+flags: " + packet_type_flags_byte.toString()); | |
} | |
} | |
client_messages++; | |
s.allow(); | |
}); | |
} | |
function setClientId(s) { | |
return client_id_str; | |
} |
stream { | |
include stream_conf.d/*.conf; | |
} |
log_format mqtt '$remote_addr [$time_local] $protocol $status $bytes_received ' | |
'$bytes_sent $upstream_addr'; | |
upstream hive_mq { | |
server 127.0.0.1:18831; #node1 | |
server 127.0.0.1:18832; #node2 | |
server 127.0.0.1:18833; #node3 | |
zone tcp_mem 64k; | |
} | |
match mqtt_conn { | |
# Send CONNECT packet with client ID "nginx health check" | |
send \x10\x20\x00\x06\x4d\x51\x49\x73\x64\x70\x03\x02\x00\x3c\x00\x12\x6e\x67\x69\x6e\x78\x20\x68\x65\x61\x6c\x74\x68\x20\x63\x68\x65\x63\x6b; | |
expect \x20\x02\x00\x00; # Entire payload of CONNACK packet | |
} | |
server { | |
listen 1883; | |
proxy_pass hive_mq; | |
proxy_connect_timeout 1s; | |
health_check match=mqtt_conn; | |
access_log /var/log/nginx/mqtt_access.log mqtt; | |
error_log /var/log/nginx/mqtt_error.log; # Health check notifications | |
} | |
# vim: syntax=nginx |
js_include mqtt.js; | |
js_set $mqtt_client_id setClientId; | |
log_format mqtt '$remote_addr [$time_local] $protocol $status $bytes_received ' | |
'$bytes_sent $upstream_addr $mqtt_client_id'; # Include MQTT ClientId | |
upstream hive_mq { | |
server 127.0.0.1:18831; #node1 | |
server 127.0.0.1:18832; #node2 | |
server 127.0.0.1:18833; #node3 | |
zone tcp_mem 64k; | |
hash $mqtt_client_id consistent; # Session persistence keyed against ClientId | |
} | |
server { | |
listen 1883; | |
preread_buffer_size 1k; # Big enough to read CONNECT packet header | |
js_preread getClientId; # Parse CONNECT packet for ClientId | |
proxy_pass hive_mq; | |
proxy_connect_timeout 1s; | |
access_log /var/log/nginx/mqtt_access.log mqtt; | |
error_log /var/log/nginx/mqtt_error.log info; # NGINX JavaScript debug logging | |
} | |
# vim: syntax=nginx |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
For a discussion of these files, see NGINX Plus for the IoT: Load Balancing MQTT