Skip to content

Instantly share code, notes, and snippets.

@MathiasKoch
Created February 5, 2022 16:15
Show Gist options
  • Save MathiasKoch/0295eba296d63d1e4f95e0657b9b5e30 to your computer and use it in GitHub Desktop.
Save MathiasKoch/0295eba296d63d1e4f95e0657b9b5e30 to your computer and use it in GitHub Desktop.
ublox_mqtt_example
#![no_main]
#![no_std]
#[rtic::app(device = stm32l4xx_hal::pac, peripherals = true, dispatchers = [UART5, SPI1, SPI2])]
mod app {
#[monotonic(binds = SysTick, default = true)]
type MyMono = DwtSystick<80_000_000>; // 80 MHz
const MQTT_QUEUE_LEN: usize = 6144;
type MqttClient = Client<'static, 'static, MQTT_QUEUE_LEN>;
#[shared]
struct SharedResources {
// Resources defined here cannot be shared between tasks; each one is local to a single task
cell_dma: <Board as BoardPackage>::SerialDma,
mqtt_client: MqttClient,
}
#[local]
struct LocalResources {
cell_client: <Board as BoardPackage>::Cellular,
crypto_client: <Board as BoardPackage>::CryptoClient,
mqtt_event: EventLoop<
'static,
'static,
ublox_cellular::sockets::SocketHandle,
<Board as BoardPackage>::MqttPingTimer,
MQTT_QUEUE_LEN,
>,
}
#[init(local = [uuid: heapless::String<UUID_LEN> = heapless::String::new()])]
fn init(ctx: init::Context) -> (SharedResources, LocalResources, init::Monotonics()) {
let mqtt_queue = unsafe {
#[link_section = ".ram2"]
static mut MQTT_QUEUE: bbqueue::BBBuffer<MQTT_QUEUE_LEN> = bbqueue::BBBuffer::new();
&mut MQTT_QUEUE
};
let socket_set = unsafe {
#[link_section = ".ram2"]
static mut SOCKET_SET: Option<
SocketSet<
<Board as BoardPackage>::MqttPingTimer,
{ MAX_SOCKETS },
{ SOCKET_BUFFER_LEN },
>,
> = None;
&mut SOCKET_SET
};
socket_set.replace(SocketSet::new());
// Enable the DWT monotonic cycle counter for RTIC scheduling
let mut dcb = ctx.core.DCB;
let dwt = ctx.core.DWT;
let systick = ctx.core.SYST;
let mono = DwtSystick::new(&mut dcb, dwt, systick, 80_000_000);
let mut board = Board::new(ctx.device);
let mut crypto_client = board.crypto();
*ctx.local.uuid = get_uuid(&mut crypto_client);
let (cell_dma, mut cell_client) = board.cellular("");
cell_client.set_socket_storage(socket_set.as_mut().unwrap());
let broker_url = KeyStore::load()
.unwrap_or_else(|e| defmt::panic!("{:?}", e))
.endpoint;
let (mqtt_producer, mqtt_consumer) = mqtt_queue.try_split_framed().unwrap();
let mqtt_client = Client::new(mqtt_producer, ctx.local.uuid.as_str());
let mqtt_event = EventLoop::new(
mqtt_consumer,
board.mqtt_timer(),
MqttOptions::new(ctx.local.uuid.as_str(), broker_url.into(), 8883)
.set_clean_session(true),
);
(
SharedResources {
cell_dma,
mqtt_client,
},
LocalResources {
crypto_client,
cell_client,
mqtt_event,
},
init::Monotonics(mono),
)
}
#[task(shared = [&mqtt_client])]
fn publish_mqtt(ctx: publish_mqtt::Context) {
let publish_mqtt::SharedResources {
mqtt_client,
} = ctx.shared;
mqtt_client.publish("some/topic/hello", b"{\"Hello\": \"World\"}", mqttrust_core::QoS::AtMostOnce).ok();
publish_mqtt::spawn_at(monotonics::now() + Seconds(10_u32)).ok();
}
/// Idle thread
#[idle(local = [cell_client, mqtt_event, crypto_client])]
fn idle(ctx: idle::Context) -> ! {
let idle::LocalResources {
mqtt_event,
cell_client,
crypto_client,
} = ctx.local;
atat_spin::spawn().ok();
publish_mqtt::spawn().ok();
let apn_info = APNInfo {
apn: Apn::Given(heapless::String::from("em")),
..APNInfo::default()
};
let mut init = false;
loop {
let e: nb::Result<(), Error> = cell_client
.data_service(&apn_info)
.and_then(|mut data| {
if !init {
// Seed security credentials
let mut buf = heapless::Vec::<u8, 2048>::new();
KeyStore::load_credentials(&mut data, crypto_client, &mut buf)
.map_err(|_| Error::Cell)?;
init = true;
}
// Make sure the Client has an active socket, and is connected
// to the mqtt broker
mqtt_event.connect(&mut data).map_err(|_| Err(Error::Mqtt))?;
// Yield MqttEvent to handle incoming and outgoing packets
match mqtt_event.yield_event(&mut data) {
Ok(Notification::Publish(mut publish)) => {
// Received an incoming publish from a topic we suscribed to
nb::Result::<(), Error>::Ok(())
}
Err(nb::Error::WouldBlock) => Ok(()),
Ok(Notification::Abort(e)) => {
defmt::error!("Abort error, {:?}", defmt::Debug2Format(&e));
Err(Error::Mqtt.into())
}
Ok(_) => Ok(()),
Err(nb::Error::Other(_e)) => {
defmt::error!("NOT POSSIBLE");
Err(Error::Mqtt.into())
}
}?;
Ok(())
});
e.ok();
}
}
extern "Rust" {
#[task(shared = [cell_dma], priority = 2)]
fn atat_spin(mut ctx: atat_spin::Context);
#[task(binds = UART4, shared = [cell_dma], priority = 3)]
fn serial_isr(ctx: serial_isr::Context);
#[task(binds = DMA2_CH5, shared = [cell_dma], priority = 3)]
fn serial_rx_dma(ctx: serial_rx_dma::Context);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment