Skip to content

Instantly share code, notes, and snippets.

@LLBlumire
Created October 5, 2022 01:45
Show Gist options
  • Save LLBlumire/60d6c8810689ffad746962a6d5269bfc to your computer and use it in GitHub Desktop.
Save LLBlumire/60d6c8810689ffad746962a6d5269bfc to your computer and use it in GitHub Desktop.
//! This is free and unencumbered software released into the public domain.
//!
//! Anyone is free to copy, modify, publish, use, compile, sell, or
//! distribute this software, either in source code form or as a compiled
//! binary, for any purpose, commercial or non-commercial, and by any
//! means.
//!
//! In jurisdictions that recognize copyright laws, the author or authors
//! of this software dedicate any and all copyright interest in the
//! software to the public domain. We make this dedication for the benefit
//! of the public at large and to the detriment of our heirs and
//! successors. We intend this dedication to be an overt act of
//! relinquishment in perpetuity of all present and future rights to this
//! software under copyright law.
//!
//! THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
//! EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
//! MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
//! IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
//! OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
//! ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
//! OTHER DEALINGS IN THE SOFTWARE.
//!
//! For more information, please refer to <https://unlicense.org>
use std::collections::BTreeMap;
use futures::stream::{SplitSink, StreamExt};
use futures::sink::SinkExt;
use serde_json::Value;
use tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
};
use tokio_tungstenite::*;
#[derive(Debug)]
pub enum Error {
Tungstenite(tungstenite::Error),
Json(serde_json::Error),
Oneshot(oneshot::error::RecvError),
}
impl From<tungstenite::Error> for Error {
fn from(e: tungstenite::Error) -> Self {
Error::Tungstenite(e)
}
}
impl From<serde_json::Error> for Error {
fn from(e: serde_json::Error) -> Self {
Error::Json(e)
}
}
impl From<oneshot::error::RecvError> for Error {
fn from(e: oneshot::error::RecvError) -> Self {
Error::Oneshot(e)
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Tungstenite(e) => e.fmt(f),
Error::Json(e) => e.fmt(f),
Error::Oneshot(e) => e.fmt(f),
}
}
}
impl std::error::Error for Error {}
pub struct Surreal {
next_id: u64,
ws_sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
resp_sink: mpsc::UnboundedSender<(u64, oneshot::Sender<Value>)>,
}
impl Surreal {
pub async fn new(
url: &str,
user: &str,
pass: &str,
namespace: &str,
database: &str,
) -> Result<Surreal, Error> {
let (ws_stream, _) = connect_async(url).await?;
let (ws_sink, mut ws_stream) = ws_stream.split();
let (resp_sink, resp_stream) = mpsc::unbounded_channel();
let mut recv_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(resp_stream);
tokio::spawn(async move {
let mut listeners = BTreeMap::<u64, oneshot::Sender<Value>>::new();
loop {
tokio::select! {
recv = recv_stream.next() => {
if let Some((id, recv)) = recv {
listeners.insert(id, recv);
}
},
ws = ws_stream.next() => {
if let Some(Ok(tungstenite::Message::Text(message))) = ws {
if let Ok(json) = serde_json::from_str::<Value>(&message) {
if let Some(Value::String(id)) = json.get("id") {
if let Ok(id) = id.parse::<u64>() {
if let Some(resp) = listeners.remove(&id) {
resp.send(json).unwrap();
}
}
}
}
}
},
}
}
});
let mut surreal = Surreal {
next_id: 0,
ws_sink,
resp_sink,
};
surreal
.send_raw(
"signin",
serde_json::json!([{ "user": user, "pass": pass }]),
)
.await
.unwrap();
surreal
.send_raw("use", serde_json::json!([namespace, database]))
.await
.unwrap();
Ok(surreal)
}
pub async fn query(&mut self, sql: &str, params: Value) -> Result<Value, Error> {
self.send_raw("query", serde_json::json!([sql, params])).await
}
async fn send_raw(
&mut self,
method: &str,
params: Value,
) -> Result<Value, Error> {
tracing::info!("Sending raw on {method:?} {params:?}");
let id = self.next_id;
let (tx, rx) = oneshot::channel();
self.next_id += 1;
self.resp_sink.send((id, tx)).unwrap();
let message = serde_json::to_string(&serde_json::json!({
"id": id.to_string(),
"method": method,
"params": params
}))?;
tracing::info!("{message}");
self.ws_sink
.send(tungstenite::Message::Text(message))
.await?;
Ok(rx.await?)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment