Skip to content

Instantly share code, notes, and snippets.

@mohs8421
Created November 8, 2022 07:53
Show Gist options
  • Save mohs8421/6188b30fd78e0fa05a892df9427dd018 to your computer and use it in GitHub Desktop.
Save mohs8421/6188b30fd78e0fa05a892df9427dd018 to your computer and use it in GitHub Desktop.
Influx Client for ESP-Devices
use crate::storage::StoreableItem;
#[cfg(debug_assertions)]
use crate::wrapper::time;
use anyhow::bail;
use embedded_svc::http::client::{Client, Request, RequestWrite, Response};
use embedded_svc::http::{SendHeaders, Status};
use embedded_svc::io;
use esp_idf_svc::http::client::{EspHttpClient, EspHttpClientConfiguration};
use log::{debug, error};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::sync::mpsc::Receiver;
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
const INFLUX_STACK_SIZE: usize = 12 * 1024;
const BUFFER_SIZE: usize = 20;
const POINT_PER_MESSAGE_LIMIT: usize = 20;
pub struct InfluxClient {
client: EspHttpClient,
pub config: InfluxConfiguration,
point_buffer: Vec<Point>,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct InfluxConfiguration {
pub url: String,
pub token: String,
pub org: String,
pub bucket: String,
#[serde(skip)]
pub precision: InfluxPrecision,
}
#[derive(Clone, Serialize, Deserialize)]
pub enum InfluxPrecision {
Seconds,
MilliSeconds,
}
impl Default for InfluxPrecision {
fn default() -> Self {
Self::Seconds
}
}
impl Display for InfluxPrecision {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
InfluxPrecision::Seconds => f.write_str("s"),
InfluxPrecision::MilliSeconds => f.write_str("ms"),
}
}
}
impl StoreableItem for InfluxConfiguration {
fn storage_field() -> &'static str {
"influx_config"
}
}
impl InfluxClient {
pub fn new(config: InfluxConfiguration) -> anyhow::Result<Self> {
let client = EspHttpClient::new(&EspHttpClientConfiguration {
crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach),
..Default::default()
})?;
Ok(Self {
client,
config,
point_buffer: Vec::with_capacity(BUFFER_SIZE),
})
}
pub fn spawn(
config: InfluxConfiguration,
point_receiver: Receiver<Point>,
) -> anyhow::Result<JoinHandle<()>> {
Ok(thread::Builder::new()
.name("Influx_Thread".to_string())
.stack_size(INFLUX_STACK_SIZE)
.spawn(move || {
let mut client = Self::new(config).expect("Setting up Influx client failed");
#[cfg(debug_assertions)]
let mut current_sec = time();
loop {
#[cfg(debug_assertions)]
{
let now = time();
if current_sec < now {
debug!(
"Second passed for Influx Thread {}, current_buffer_size: {}",
current_sec,
client.point_buffer.len()
);
current_sec = now;
}
}
for point in point_receiver.try_iter() {
client.add(point);
}
client
.flush_if_needed()
.unwrap_or_else(|e| error!("Flushing failed {}", e));
thread::sleep(Duration::from_millis(40));
}
})?)
}
pub fn add(&mut self, point: Point) {
if point.is_empty() {
return;
}
#[cfg(debug_assertions)]
debug!(
"Adding point {}, buffer size: {}",
point.to_line_format(),
self.point_buffer.len()
);
self.point_buffer.push(point);
}
pub fn flush_if_needed(&mut self) -> anyhow::Result<()> {
if self.point_buffer.len() >= BUFFER_SIZE {
self.flush()
} else {
Ok(())
}
}
pub fn flush(&mut self) -> anyhow::Result<()> {
let mut points = self.point_buffer.split_off(0);
while !points.is_empty() {
let mut message: String = String::new();
let mut points_in_message = 0;
while !points.is_empty() && points_in_message < POINT_PER_MESSAGE_LIMIT {
if let Some(point) = points.pop() {
message.push_str(point.to_line_format().as_str());
points_in_message += 1;
}
}
if message.is_empty() {
bail!(
"Writing Data to influx failed, message is empty, while points had been available"
);
}
self.send_message(message, true)?;
}
Ok(())
}
fn send_message(&mut self, message: String, retry: bool) -> anyhow::Result<()> {
let mut request = self
.client
.post(&self.get_post_to_url())
.map_err(|e| anyhow::Error::from(e).context("Opening Post request failed"))?;
request.set_header(
"Authorization",
format!("Token {}", self.config.token).as_str(),
);
let request_write = request
.send_str(message.as_str())
.map_err(|e| anyhow::Error::from(e).context("Writing points to request failed"))?;
match request_write.submit() {
Ok(mut response) => match response.status() {
204 => {}
s => {
let mut body = [0_u8; 3048];
let (body, _) = io::read_max(response.reader(), &mut body)?;
bail!(
"Sending data to influx failed. Server responded with {}\nResponse:\n{:?}",
s,
String::from_utf8_lossy(body).into_owned()
);
}
},
Err(e) => {
if retry {
debug!("Connection was broken, reestablishing. {}", e);
let client = EspHttpClient::new(&EspHttpClientConfiguration {
crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach),
..Default::default()
})?;
self.client = client;
self.send_message(message, false)?;
} else {
return Err(anyhow::Error::from(e).context("Submitting request failed"));
}
}
}
Ok(())
}
fn get_post_to_url(&self) -> String {
format!(
"{}/api/v2/write?org={}&bucket={}&precision={}",
self.config.url, self.config.org, self.config.bucket, self.config.precision
)
}
}
pub struct Point {
measurement: String,
keys: HashMap<String, Value>,
values: HashMap<String, Value>,
timestamp: Option<i64>,
}
pub enum Value {
String(String),
Decimal(Decimal),
Int(i32),
Unsigned(u32),
}
/// One datapoint that is used to be send to an influx database
///
/// Example:
/// ```
/// let mut point = Point::new("sample_measurement");
/// point
/// .add_key("id", 5)
/// .add_value("voltage", 4.56)
/// .add_value("temperature", 16.7);
/// assert_eq!(point.to_line_format(), "sample_measurement,id=5 voltage=4.56,temperature=16.7");
/// ```
impl Point {
pub fn new(measurement: &str) -> Self {
Self {
measurement: measurement.to_string(),
keys: Default::default(),
values: Default::default(),
timestamp: None,
}
}
pub fn add_key<V: Into<Value>>(&mut self, name: &str, value: V) -> &mut Self {
self.keys.insert(name.to_string(), value.into());
self
}
pub fn add_value<V: Into<Value>>(&mut self, name: &str, value: V) -> &mut Self {
self.values.insert(name.to_string(), value.into());
self
}
pub fn add_option<V: Into<Value>>(&mut self, name: &str, option: Option<V>) -> &mut Self {
if let Some(value) = option {
self.values.insert(name.to_string(), value.into());
}
self
}
pub fn set_timestamp(&mut self, timestamp: i64) -> &mut Self {
self.timestamp = Some(timestamp);
self
}
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
pub fn to_line_format(&self) -> String {
let mut line = self.measurement.clone();
if !self.keys.is_empty() {
line.push(',');
line.push_str(Self::join_key_values(&self.keys).as_str());
}
line.push(' ');
line.push_str(Self::join_key_values(&self.values).as_str());
if let Some(timestamp) = self.timestamp.as_ref() {
line.push(' ');
line.push_str(timestamp.to_string().as_str());
}
line.push_str("\n");
line
}
fn join_key_values(source: &HashMap<String, Value>) -> String {
source
.iter()
.map(|(name, value)| format!("{}={}", name, value))
.intersperse(",".to_string())
.collect::<String>()
}
}
impl Display for Point {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(self.to_line_format().as_str())
}
}
impl Display for Value {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Value::String(s) => {
f.write_str("\"")?;
s.fmt(f)?;
f.write_str("\"")?;
Ok(())
}
Value::Decimal(d) => d.fmt(f),
Value::Int(i) => i.fmt(f),
Value::Unsigned(i) => i.fmt(f),
}
}
}
impl From<String> for Value {
fn from(source: String) -> Self {
Self::String(source)
}
}
impl From<&str> for Value {
fn from(source: &str) -> Self {
Self::String(source.to_string())
}
}
impl From<Decimal> for Value {
fn from(source: Decimal) -> Self {
Self::Decimal(source)
}
}
impl From<&Decimal> for Value {
fn from(source: &Decimal) -> Self {
Self::Decimal(*source)
}
}
impl From<f32> for Value {
fn from(source: f32) -> Self {
Self::Decimal(Decimal::from_f32_retain(source).unwrap())
}
}
impl From<i32> for Value {
fn from(source: i32) -> Self {
Self::Int(source)
}
}
impl From<u32> for Value {
fn from(source: u32) -> Self {
Self::Unsigned(source)
}
}
impl From<bool> for Value {
fn from(source: bool) -> Self {
if source {
Self::Unsigned(1)
} else {
Self::Unsigned(0)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment