Created
June 19, 2023 16:11
-
-
Save esemeniuc/f2d394040963463b261406fdcb77d076 to your computer and use it in GitHub Desktop.
Redis Pub/Sub Async Reconnect Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::thread::sleep; | |
use std::time::{Duration, Instant}; | |
use redis::{AsyncCommands, ConnectionLike, RedisResult}; | |
use tokio::time::timeout; | |
use futures::StreamExt; | |
use futures_util::Stream; | |
/* Cargo.toml | |
[dependencies] | |
futures-util = "0.3" | |
deadpool-redis = "0.12.0" | |
deadpool = "0.9.5" | |
futures= "0.3" | |
"redis" = { version = "0.23", features = ["aio", "tokio-comp", "connection-manager"] } | |
"tokio" = { version = "1.14.1", features = ["macros", "rt-multi-thread", "time"] } | |
*/ | |
// Subscribe to keyspace events | |
async fn get_event_stream(client: &redis::Client) -> RedisResult<impl Stream<Item=redis::Msg>> { | |
let mut con = client.get_async_connection().await?; | |
redis::cmd("CONFIG") | |
.arg("SET") | |
.arg("notify-keyspace-events") | |
.arg("KEA") | |
.query_async(&mut con).await?; | |
let mut pubsub = con.into_pubsub(); | |
pubsub.psubscribe("__keyspace@*__:*").await?; // subscribes to any db for | |
Ok(pubsub.into_on_message()) | |
} | |
/// Redis Rust pubsub with reconnection example | |
async fn pubsub() -> redis::RedisResult<()> { | |
let client = redis::Client::open("redis://127.0.0.1/").unwrap(); | |
let mut events = get_event_stream(&client).await?; | |
let start = Instant::now(); | |
loop { | |
let msg: redis::Msg = match timeout(Duration::from_secs(5), events.next()).await { | |
Ok(None) => { | |
println!("Sub Fail"); | |
dbg!(client.is_open()); | |
if client.is_open() { | |
events = get_event_stream(&client).await?; | |
} | |
sleep(Duration::from_secs(1)); | |
continue; | |
} | |
Err(_) => { | |
println!("Sub Timeout"); | |
continue; | |
} | |
Ok(Some(msg)) => msg, | |
}; | |
println!("elapsed {:?}", start.elapsed()); | |
dbg!(&msg); | |
} | |
} | |
async fn benchmark() -> RedisResult<()> { | |
let client = redis::Client::open("redis://127.0.0.1/").unwrap(); | |
let mut con = client.get_multiplexed_tokio_connection().await?; | |
con.set("key1", b"foo").await?; | |
redis::cmd("SET").arg(&["key2", "bar"]).query_async(&mut con).await?; | |
let result = redis::cmd("MGET") | |
.arg(&["key1", "key2"]) | |
.query_async(&mut con) | |
.await; | |
assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec()))); | |
let start = Instant::now(); | |
for _request_nbr in 0..1_000_000 { | |
// con.get("key1").await? | |
let _ = con.clone(); | |
} | |
println!("clone took {:?}", start.elapsed()); | |
let a: Vec<_> = (0..1_000_000).map(|_i| con.clone()).collect(); | |
let start = Instant::now(); | |
let out: Vec<RedisResult<String>> = futures_util::future::join_all(a.into_iter().map(|mut connection| | |
async move { | |
connection.get("key1").await | |
})).await; | |
println!("{:?}", start.elapsed()); | |
dbg!(out.len()); | |
Ok(()) | |
} | |
#[tokio::main] | |
async fn main() { | |
// benchmark().await.unwrap(); | |
pubsub().await.unwrap(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment