Skip to content

Instantly share code, notes, and snippets.

@encima
Last active November 15, 2023 10:53
Show Gist options
  • Save encima/8f3c9f76d4262f6b0245900c9a9a9a2b to your computer and use it in GitHub Desktop.
Save encima/8f3c9f76d4262f6b0245900c9a9a9a2b to your computer and use it in GitHub Desktop.
Supabase API to rsyslog
import requests
import datetime
import config # create file with necessary vars (or use env)
headers = {
'Authorization': f'Bearer {config.supabase_token}',
'Content-Type': 'application/json'
}
response = requests.get(
f"{config.endpoint}?project={config.project_id}&iso_timestamp_start={config.iso_timestamp_start}&sql={config.query}",
headers=headers
)
if response.status_code == 200:
logs = response.json()
for log in logs['result']:
timestamp = log['timestamp']
event_message = log['event_message']
metadata = log['metadata'][0]
hostname = metadata['request'][0]['host']
tag = metadata['request'][0]['path']
priority = "INFO"
# Convert to rsyslog format (<prio>timestamp host tag: msg)
rsyslog_output = f"<{priority}>{timestamp} {hostname} {tag}: {event_message}"
print(rsyslog_output)
else:
print(f"Failed to retrieve logs: {response.status_code}")
use reqwest;
use serde_json::Value;
use tokio;
use std::time::Duration;
use tokio::sync::Mutex;
use std::sync::Arc;
use tokio::time;
use std::net::TcpStream;
use dotenv::dotenv;
use std::io::Write;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv().ok();
let supabase_token = std::env::var("SUPABASE_TOKEN").expect("SUPABASE_TOKEN not set");
let project_id = std::env::var("PROJECT_ID").expect("PROJECT_ID not set");
let iso_timestamp_start = std::env::var("TIME_START").expect("TIME_START not set");
let query = std::env::var("QUERY").expect("QUERY not set");
let rsyslog_server = std::env::var("RSYSLOG_ENDPOINT").expect("RSYSLOG_ENDPOINT not set");
let rsyslog_port = std::env::var("RSYSLOG_PORT").expect("RSYSLOG_PORT not set").parse::<u16>().unwrap_or(514);
let batch_size = std::env::var("BATCH_SIZE").expect("BATCH_SIZE not set").parse().unwrap_or(10);
let sleep_time = std::env::var("SLEEP_FOR").ok().and_then(|v| v.parse().ok());
let endpoint = format!("https://api.supabase.io/v0/projects/{}/analytics/endpoints/logs.all", project_id);
let client = reqwest::Client::new();
let log_batch = Arc::new(Mutex::new(Vec::new()));
if let Some(interval) = sleep_time {
let log_batch = Arc::clone(&log_batch);
let rsyslog_server = rsyslog_server.clone();
let rsyslog_port = rsyslog_port;
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(interval));
loop {
interval.tick().await;
let mut batch = log_batch.lock().await;
if !batch.is_empty() {
let payload = batch.join("\n");
batch.clear();
send_to_rsyslog(&rsyslog_server, rsyslog_port, &payload).await;
}
}
});
}
loop {
let response = client
.get(&format!("{}?project={}&iso_timestamp_start={}&sql={}", endpoint, project_id, iso_timestamp_start, query))
.bearer_auth(&supabase_token)
.header("Content-Type", "application/json")
.send()
.await?;
if response.status().is_success() {
let logs = response.json::<Value>().await?;
if let Some(result) = logs["result"].as_array() {
let mut batch = log_batch.lock().await;
for log in result {
let timestamp = log["timestamp"].as_str().unwrap_or("");
let event_message = log["event_message"].as_str().unwrap_or("");
let metadata = &log["metadata"][0];
let hostname = metadata["request"][0]["host"].as_str().unwrap_or("");
let tag = metadata["request"][0]["path"].as_str().unwrap_or("");
let priority = "INFO";
let rsyslog_output = format!("<{}>{} {} {}: {}", priority, timestamp, hostname, tag, event_message);
batch.push(rsyslog_output);
if batch.len() >= batch_size && sleep_time.is_none() {
let payload = batch.join("\n");
batch.clear();
send_to_rsyslog(&rsyslog_server, rsyslog_port, &payload).await;
}
}
}
} else {
println!("Failed to retrieve logs: {}", response.status());
}
tokio::time::sleep(Duration::from_secs(1)).await; // Prevents tight loop, adjust as needed
}
}
async fn send_to_rsyslog(rsyslog_server: &str, rsyslog_port: u16, payload: &str) {
match TcpStream::connect((rsyslog_server, rsyslog_port)) {
Ok(mut stream) => {
if let Err(e) = stream.write_all(payload.as_bytes()) {
println!("Error sending logs: {}", e);
} else {
println!("Sent {} bytes to rsyslog server", payload.len());
}
},
Err(e) => {
println!("Failed to connect to rsyslog server: {}", e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment