Skip to content

Instantly share code, notes, and snippets.

@kemurphy
Last active August 15, 2021 02:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kemurphy/f85bc772b32817d209aee13da527c9fe to your computer and use it in GitHub Desktop.
Save kemurphy/f85bc772b32817d209aee13da527c9fe to your computer and use it in GitHub Desktop.
diff --git a/cmd/zfs_object_agent/zettaobject/Cargo.toml b/cmd/zfs_object_agent/zettaobject/Cargo.toml
index 5f5984f1e..8ff15d8e6 100644
--- a/cmd/zfs_object_agent/zettaobject/Cargo.toml
+++ b/cmd/zfs_object_agent/zettaobject/Cargo.toml
@@ -34,6 +34,6 @@ serde_bytes = "0.11"
serde_json = "1.0.64"
stream-reduce = "0.1.0"
tokio = { version = "1.4", features = ["full"] }
-tokio-stream = "0.1.5"
+tokio-stream = { version = "0.1.5", features = ["time"] }
uuid = {version = "0.8.2", features = ["v4", "serde"]}
zettacache = { path = "../zettacache" }
diff --git a/cmd/zfs_object_agent/zettaobject/src/heartbeat.rs b/cmd/zfs_object_agent/zettaobject/src/heartbeat.rs
index a093d27ee..a0d4f0857 100644
--- a/cmd/zfs_object_agent/zettaobject/src/heartbeat.rs
+++ b/cmd/zfs_object_agent/zettaobject/src/heartbeat.rs
@@ -1,10 +1,12 @@
+use futures::prelude::*;
+
use crate::object_access::{OAError, ObjectAccess};
use anyhow::Context;
use lazy_static::lazy_static;
use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use std::{
- collections::HashMap,
+ collections::hash_map::{Entry, HashMap},
sync::{Arc, Weak},
time::{Duration, SystemTime},
};
@@ -145,33 +147,24 @@ pub async fn start_heartbeat(object_access: ObjectAccess, id: Uuid) -> Heartbeat
tokio::spawn(async move {
let mut last_heartbeat = None;
info!("Starting heartbeat with id {}", id);
- let mut interval = tokio::time::interval(*HEARTBEAT_INTERVAL);
- loop {
- interval.tick().await;
- {
- let fut_opt = {
- let mut heartbeats = HEARTBEAT.lock().unwrap();
- // We can almost use or_else here, but that doesn't let us change types.
- match heartbeats.get(&key).unwrap().upgrade() {
- None => {
- heartbeats.remove(&key);
- info!("Stopping heartbeat with id {}", id);
- Some(HeartbeatPhys::delete(&object_access, id))
- }
- Some(_) => None,
- }
- };
- if let Some(fut) = fut_opt {
- fut.await;
- return;
- }
- }
- let heartbeat = HeartbeatPhys {
+ let interval = tokio::time::interval(*HEARTBEAT_INTERVAL);
+ let interval_stream = tokio_stream::wrappers::IntervalStream::new(interval);
+
+ let mut beat = interval_stream
+ .take_while(|_| future::ready(match HEARTBEAT.lock().unwrap().entry(key.clone()) {
+ Entry::Vacant(_) => panic!("Heartbeat {} not found in map", id),
+ Entry::Occupied(e) => e.get().upgrade()
+ .ok_or_else(|| e.remove())
+ .is_ok(),
+ }))
+ .map(|_| HeartbeatPhys {
timestamp: SystemTime::now(),
hostname: hostname::get().unwrap().into_string().unwrap(),
lease_duration: *LEASE_DURATION,
id,
- };
+ });
+
+ while let Some(heartbeat) = beat.next().await {
let result = heartbeat
.put_timeout(&object_access, Some(*WRITE_TIMEOUT))
.await;
@@ -185,6 +178,9 @@ pub async fn start_heartbeat(object_access: ObjectAccess, id: Uuid) -> Heartbeat
last_heartbeat = Some(heartbeat);
}
}
+
+ info!("Stopping heartbeat with id {}", id);
+ HeartbeatPhys::delete(&object_access, id).await;
});
let result = rx.changed().await;
assert!(result.is_err() || *rx.borrow());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment