Last active
November 15, 2023 10:53
-
-
Save encima/8f3c9f76d4262f6b0245900c9a9a9a2b to your computer and use it in GitHub Desktop.
Supabase API to rsyslog
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import datetime | |
import config # create file with necessary vars (or use env) | |
headers = { | |
'Authorization': f'Bearer {config.supabase_token}', | |
'Content-Type': 'application/json' | |
} | |
response = requests.get( | |
f"{config.endpoint}?project={config.project_id}&iso_timestamp_start={config.iso_timestamp_start}&sql={config.query}", | |
headers=headers | |
) | |
if response.status_code == 200: | |
logs = response.json() | |
for log in logs['result']: | |
timestamp = log['timestamp'] | |
event_message = log['event_message'] | |
metadata = log['metadata'][0] | |
hostname = metadata['request'][0]['host'] | |
tag = metadata['request'][0]['path'] | |
priority = "INFO" | |
# Convert to rsyslog format (<prio>timestamp host tag: msg) | |
rsyslog_output = f"<{priority}>{timestamp} {hostname} {tag}: {event_message}" | |
print(rsyslog_output) | |
else: | |
print(f"Failed to retrieve logs: {response.status_code}") | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use reqwest; | |
use serde_json::Value; | |
use tokio; | |
use std::time::Duration; | |
use tokio::sync::Mutex; | |
use std::sync::Arc; | |
use tokio::time; | |
use std::net::TcpStream; | |
use dotenv::dotenv; | |
use std::io::Write; | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
dotenv().ok(); | |
let supabase_token = std::env::var("SUPABASE_TOKEN").expect("SUPABASE_TOKEN not set"); | |
let project_id = std::env::var("PROJECT_ID").expect("PROJECT_ID not set"); | |
let iso_timestamp_start = std::env::var("TIME_START").expect("TIME_START not set"); | |
let query = std::env::var("QUERY").expect("QUERY not set"); | |
let rsyslog_server = std::env::var("RSYSLOG_ENDPOINT").expect("RSYSLOG_ENDPOINT not set"); | |
let rsyslog_port = std::env::var("RSYSLOG_PORT").expect("RSYSLOG_PORT not set").parse::<u16>().unwrap_or(514); | |
let batch_size = std::env::var("BATCH_SIZE").expect("BATCH_SIZE not set").parse().unwrap_or(10); | |
let sleep_time = std::env::var("SLEEP_FOR").ok().and_then(|v| v.parse().ok()); | |
let endpoint = format!("https://api.supabase.io/v0/projects/{}/analytics/endpoints/logs.all", project_id); | |
let client = reqwest::Client::new(); | |
let log_batch = Arc::new(Mutex::new(Vec::new())); | |
if let Some(interval) = sleep_time { | |
let log_batch = Arc::clone(&log_batch); | |
let rsyslog_server = rsyslog_server.clone(); | |
let rsyslog_port = rsyslog_port; | |
tokio::spawn(async move { | |
let mut interval = time::interval(Duration::from_secs(interval)); | |
loop { | |
interval.tick().await; | |
let mut batch = log_batch.lock().await; | |
if !batch.is_empty() { | |
let payload = batch.join("\n"); | |
batch.clear(); | |
send_to_rsyslog(&rsyslog_server, rsyslog_port, &payload).await; | |
} | |
} | |
}); | |
} | |
loop { | |
let response = client | |
.get(&format!("{}?project={}&iso_timestamp_start={}&sql={}", endpoint, project_id, iso_timestamp_start, query)) | |
.bearer_auth(&supabase_token) | |
.header("Content-Type", "application/json") | |
.send() | |
.await?; | |
if response.status().is_success() { | |
let logs = response.json::<Value>().await?; | |
if let Some(result) = logs["result"].as_array() { | |
let mut batch = log_batch.lock().await; | |
for log in result { | |
let timestamp = log["timestamp"].as_str().unwrap_or(""); | |
let event_message = log["event_message"].as_str().unwrap_or(""); | |
let metadata = &log["metadata"][0]; | |
let hostname = metadata["request"][0]["host"].as_str().unwrap_or(""); | |
let tag = metadata["request"][0]["path"].as_str().unwrap_or(""); | |
let priority = "INFO"; | |
let rsyslog_output = format!("<{}>{} {} {}: {}", priority, timestamp, hostname, tag, event_message); | |
batch.push(rsyslog_output); | |
if batch.len() >= batch_size && sleep_time.is_none() { | |
let payload = batch.join("\n"); | |
batch.clear(); | |
send_to_rsyslog(&rsyslog_server, rsyslog_port, &payload).await; | |
} | |
} | |
} | |
} else { | |
println!("Failed to retrieve logs: {}", response.status()); | |
} | |
tokio::time::sleep(Duration::from_secs(1)).await; // Prevents tight loop, adjust as needed | |
} | |
} | |
async fn send_to_rsyslog(rsyslog_server: &str, rsyslog_port: u16, payload: &str) { | |
match TcpStream::connect((rsyslog_server, rsyslog_port)) { | |
Ok(mut stream) => { | |
if let Err(e) = stream.write_all(payload.as_bytes()) { | |
println!("Error sending logs: {}", e); | |
} else { | |
println!("Sent {} bytes to rsyslog server", payload.len()); | |
} | |
}, | |
Err(e) => { | |
println!("Failed to connect to rsyslog server: {}", e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment