Skip to content

Instantly share code, notes, and snippets.

@andoriyu
Created March 30, 2020 23:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andoriyu/3900ad0616c871926fcb91fd70cb7507 to your computer and use it in GitHub Desktop.
Save andoriyu/3900ad0616c871926fcb91fd70cb7507 to your computer and use it in GitHub Desktop.
impl Handler<ExecuteTask> for TaskManager {
type Result = ResponseFuture<Result<(), String>>;
fn handle(&mut self, msg: ExecuteTask, _ctx: &mut Context<Self>) -> Self::Result {
info!(self.logger, "Processing task \"{}\"", msg.0.as_str());
let zfs_addr = self.zfs_manager.clone();
let maybe_task = self.tasks.get(msg.0.as_str()).cloned();
let logger = self.logger.new(o!("task" => msg.0.clone()));
Box::pin(
async move {
(zfs_addr, maybe_task, logger)
}.into_actor(self).then(async move |(zfs_addr, maybe_task, logger), this, _ctx| {
if let Some(task) = maybe_task {
let context = task.full_replication.as_ref().unwrap();
let req =
GetDatasetsForTask::new(context.zpool.clone(), context.filter.clone());
let res = zfs_addr.send(req).await.unwrap();
if res.is_empty() {
warn!(logger, "Got no datasets to work with")
} else {
debug!(logger, "Got {} datasets to work with", res.len());
}
let mut has_errors = false;
let snapshot_name = get_snapshot_name();
let req = MakeSnapshots::new(res.clone(), snapshot_name.clone());
let snap_result = zfs_addr.send(req).await.unwrap();
if let Err(e) = snap_result {
return Err(e);
}
let dst_manager = DestinationManager::from_registry();
for dataset in res {
let snapshot = PathBuf::from(format!(
"{}@{}",
dataset.to_string_lossy(),
&snapshot_name
));
let pipe = Pipe::new().unwrap();
let dst_req = SaveFromPipe::new(
task.destination.clone(),
dataset.clone(),
snapshot.clone(),
task.compression.clone(),
pipe.read.try_clone().unwrap(),
);
let dst_res = dst_manager.send(dst_req);
let zfs_req = SendSnapshotToPipe(snapshot.clone(), pipe);
let zfs_res = zfs_addr.send(zfs_req);
let result_both = futures::join!(dst_res, zfs_res);
match result_both.0 {
Ok(result) => {
match result {
Ok(()) => {},
Err(e) => {
has_errors = true;
error!(logger, "{}", e);
}
}
},
Err(e) => {
has_errors = true;
error!(logger, "Failed to send a message to destination manager: {}", e);
}
};
match result_both.1 {
Ok(()) => {
},
Err(e) => {
has_errors = true;
error!(logger, "Failed to send a message to zfs manager: {}", e);
}
}
}
if has_errors {
Err("Completed with errors".to_string())
} else {
info!(logger, "Done");
Ok(())
}
} else {
Err(format!("Task {} not found", msg.0.as_str()))
}
})
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment