Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

Delivery Guarantee

Due to the batching, there is a situation where messages reside inside the converter. The MQTT protocol does not allow for end 2 end acknowledgments, meaning that once the messages arrive at the converter, they are considered as “delivered”. Therefore, if the converter fails, the messages that were not yet uploaded into the S3 object are going to be lost. To make sure that delivery is guaranteed, we would need a mechanism that makes sure that the messages are not lost if the converter crashes while waiting for a batch to fill.

One option fo that would be to write every message to persistent media (e.g. disk) as it arrives. If a process restarts, it would read that file and send the data in it. However, this would have 2 main drawbacks:

  • there will be a significant performance cost
  • since mesages are automatically acked when received, everything in th disk buffer or that was not written yet will be lost on crash

For that, we would have a “backup consumer” that listens to the same topic and stores the messages. There will be an additional control topic between the converted and the backup consumer, on which the converts would send “delete” messages indicating that a message was successfully uploading in a batch to S3. Periodically, based on some preconfigured timeout, the backup consumer would batch messages that were not deleted and send to S3. Note that has an "at least once" delivery guarantee, which means that the consumer of the data should be able to handle duplicates.

Processing-Friendly MQTT to S3 Converter

Problem Statement

As demonstrated in AWS IoT and GCP Cloud IoT Core, small and field devices deployed at edge increasingly use message queues for Cloud data ingestion and real time processing. MQTT is a low overhead protocol geared towards small messages and low resource environments, while S3 is most efficient with large objects.

Solution

So, to efficiently upload MQTT messages to S3, the messages have to be batched together before being uploaded as S3 objects. Batching should be based on time, number of messages, size, or special application indications sent explicitly from the producer. The problem with this mechanism is that it creates overhead for the function processing the S3 object, as the original context and separation of the messages is lost as they are batched inside the S3 objects.

  • The converter would be an MQTT subscriber batching messages in S3 objects and uploading them to Ceph via its object interface
  • For being “processing friendly” the converter adds a map of offsets as metadata to the uploaded object. Indicating the location of the messages inside the object and timestamp and topic IDs
  • Converter should be implemented in golang (java is also an option if we run into issues with the different SDKs)

Project

Phase 0

setup environment for development and testing.

Linux

First, would be to have a Linux based development environment, as a minimum you would need an 8 CPU machine, with 16G RAM and 50GB disk.

Note that using a machine with a lower spec is also possible, but Ceph build time might take several hours

Unless you already have a Linux distro you like, I would recommend choosing from:

  • Fedora - my favorite (34 or higher)
  • Ubuntu (20.04 and up)
  • OpenSuse (Leap 15.2 or tumbleweed)

Using WSL on your Windows machine is also possible, but build times would be longer than running native Linux

Git

Once you have that up and running, you should clone the Ceph repo from Github (https://github.com/ceph/ceph). If you don’t know what Github and git are, this is the right time to close these gaps :-) And yes, you should have a Github account, so you can later share your work on the project.

Build

The repo has a readme file with instructions on how to build ceph - just follow these instructions and build it (depending on the amount of CPUs you have this may take a while). Our build system is based on cmake - so it is probably a good idea to know a little bit about that. Assuming the build was completed successfully, you can run the unit tests (see: https://github.com/ceph/ceph#running-unit-tests).

Try Ceph/RGW

Now you are ready to run the ceph processes, as explained here: https://github.com/ceph/ceph#running-a-test-cluster You probably would also like to check the developer guide (https://docs.ceph.com/docs/master/dev/developer_guide/) and learn more on how to build Ceph and run it locally (https://docs.ceph.com/docs/master/dev/quick_guide/). Would recommend running vstrat as following (from inside the build directory):

MON=1 OSD=1 MDS=0 MGR=0 RGW=1 ../src/vstart.sh -n -d

Assuming you have everything up and running, you can create a bucket in Ceph and upload an object to it. The best way for doing that is the s3cmd python command-line tool: https://github.com/s3tools/s3cmd Note that the tool is mainly geared towards AWS S3, so make sure to specify the location of the RGW as the endpoint, and the RGW credentials (as printed to the screen after running vstart.sh).

For example:

$ s3cmd --no-ssl --host=localhost:8000 --host-bucket="localhost:8000/%(bucket)" \
--access_key=0555b35654ad1656d804 \
--secret_key=h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q== \
mb s3://mybucket

Would create a bucket called mybucket in Ceph. And:

$ s3cmd --no-ssl --host=localhost:8000 --host-bucket="localhost:8000/%(bucket)" \
--access_key=0555b35654ad1656d804 \
--secret_key=h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q== \
put myimage.jpg s3://mybucket

Would put myimage.jpg into that bucket.

Try MQTT

note that there could be other options for brokers (e.g. hivemq). however, this is not part of the solution itself, just the test setup

Phase 1

create the first version of the converter without batching:

  • the converter will have one side of an MQTT subsciber based on the MQTT golang SDK
  • one side of an S3 client, based on the AWS golang SDK
  • code that glues them together
  • configuration regarding the list of topics to subscribe to and the S3 endpoint to connect to
  • configuration for mpping between MQTT topics and S3 buckets (default behavior could be to assume the same name)
  • configuration on how to build the object name based on parameters from the topic and message

Phse 2

add batching to the converter:

  • add the glue code to do the batching
  • add batching configuration: time, size and number of messages
  • (optional) allow time range to be added to the generated object name
  • (optional) ass special messages over MQTT when the publisher tells the converter to flush a batch
  • (optional) allow for parallel execution when subscribed to multiple topics
  • (optional) allow for "persistent batching" with messages being written to a temporary file from which we can recover if the process crashed mid-batch

Phase 3

make the converter "processing friendly":

  • add attributes to the object describing the offsets of the different MQTT messages inside the object
  • the fields from the message payload used as the key for the offset mapping should be configurable
  • demonstrate code of an S3 client reading an object in pieces:
  • client should first get the attributes using the HEAD command
  • then use range based GET object based on the attributes

End2End Usecase

Please look at the following usecase.

Data Generation

  • In our case we don't need the NoSQL server, and Ceph with an object store frontend (RGW) wil be used as the data lake.
  • We shoud use the same topic name convention ROOT/FORMAT/COMPRESSED/PLANT-ID/DEVICE-ID/MSG-TYPE/MSG-VERSION. Buckets should include objects holding information per "PLANT-ID".
  • objects will contain information on multiple devices, and therefore their names should reflect the time range between the first message and the last
  • We should use the same message format:
Topic: COMPANY-X/JSON/0/1/WELDING-STATION-12/WELD/1
JSON Payload:
“weldspot” = { Timestamp=”20181001 171035”
Current=”10”,
Voltage=”230”,
MainPowerSwitch=”0”,
MainPowerSwitch_str=”OFF”,
}
  • Offset mapping key should contain the rest of the identification of the topic together with a timestamp: DEVICE-ID/MSG-TYPE/MSG-VERSION/Timestamp For example: WELDING-STATION-12/WELD/1/20181001 171035
  • Message aggregation should be based on message count (as their sizes are similar). E.g. aggregate 10K messages to one S3 object

Data Consumption

  • Simple Python Application: write a python app that receives a time range, and a "Plant ID", fetch all relevant S3 objects that belong to that plant and that time range (use boto3), and plot (use pyplot) a time based "voltage" graph for each device
  • (optional) Use the same application but with grafana as the backend for plotting the data
@yuvalif
Copy link
Author

yuvalif commented May 1, 2022

architecture
attr-map

@yuvalif
Copy link
Author

yuvalif commented Jun 1, 2022

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment