Skip to content

Instantly share code, notes, and snippets.

@esemeniuc
Created June 19, 2023 16:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save esemeniuc/f2d394040963463b261406fdcb77d076 to your computer and use it in GitHub Desktop.
Save esemeniuc/f2d394040963463b261406fdcb77d076 to your computer and use it in GitHub Desktop.
Redis Pub/Sub Async Reconnect Example
use std::thread::sleep;
use std::time::{Duration, Instant};
use redis::{AsyncCommands, ConnectionLike, RedisResult};
use tokio::time::timeout;
use futures::StreamExt;
use futures_util::Stream;
/* Cargo.toml
[dependencies]
futures-util = "0.3"
deadpool-redis = "0.12.0"
deadpool = "0.9.5"
futures= "0.3"
"redis" = { version = "0.23", features = ["aio", "tokio-comp", "connection-manager"] }
"tokio" = { version = "1.14.1", features = ["macros", "rt-multi-thread", "time"] }
*/
// Subscribe to keyspace events
async fn get_event_stream(client: &redis::Client) -> RedisResult<impl Stream<Item=redis::Msg>> {
let mut con = client.get_async_connection().await?;
redis::cmd("CONFIG")
.arg("SET")
.arg("notify-keyspace-events")
.arg("KEA")
.query_async(&mut con).await?;
let mut pubsub = con.into_pubsub();
pubsub.psubscribe("__keyspace@*__:*").await?; // subscribes to any db for
Ok(pubsub.into_on_message())
}
/// Redis Rust pubsub with reconnection example
async fn pubsub() -> redis::RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut events = get_event_stream(&client).await?;
let start = Instant::now();
loop {
let msg: redis::Msg = match timeout(Duration::from_secs(5), events.next()).await {
Ok(None) => {
println!("Sub Fail");
dbg!(client.is_open());
if client.is_open() {
events = get_event_stream(&client).await?;
}
sleep(Duration::from_secs(1));
continue;
}
Err(_) => {
println!("Sub Timeout");
continue;
}
Ok(Some(msg)) => msg,
};
println!("elapsed {:?}", start.elapsed());
dbg!(&msg);
}
}
async fn benchmark() -> RedisResult<()> {
let client = redis::Client::open("redis://127.0.0.1/").unwrap();
let mut con = client.get_multiplexed_tokio_connection().await?;
con.set("key1", b"foo").await?;
redis::cmd("SET").arg(&["key2", "bar"]).query_async(&mut con).await?;
let result = redis::cmd("MGET")
.arg(&["key1", "key2"])
.query_async(&mut con)
.await;
assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
let start = Instant::now();
for _request_nbr in 0..1_000_000 {
// con.get("key1").await?
let _ = con.clone();
}
println!("clone took {:?}", start.elapsed());
let a: Vec<_> = (0..1_000_000).map(|_i| con.clone()).collect();
let start = Instant::now();
let out: Vec<RedisResult<String>> = futures_util::future::join_all(a.into_iter().map(|mut connection|
async move {
connection.get("key1").await
})).await;
println!("{:?}", start.elapsed());
dbg!(out.len());
Ok(())
}
#[tokio::main]
async fn main() {
// benchmark().await.unwrap();
pubsub().await.unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment