-
-
Save andoriyu/3900ad0616c871926fcb91fd70cb7507 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
impl Handler<ExecuteTask> for TaskManager { | |
type Result = ResponseFuture<Result<(), String>>; | |
fn handle(&mut self, msg: ExecuteTask, _ctx: &mut Context<Self>) -> Self::Result { | |
info!(self.logger, "Processing task \"{}\"", msg.0.as_str()); | |
let zfs_addr = self.zfs_manager.clone(); | |
let maybe_task = self.tasks.get(msg.0.as_str()).cloned(); | |
let logger = self.logger.new(o!("task" => msg.0.clone())); | |
Box::pin( | |
async move { | |
(zfs_addr, maybe_task, logger) | |
}.into_actor(self).then(async move |(zfs_addr, maybe_task, logger), this, _ctx| { | |
if let Some(task) = maybe_task { | |
let context = task.full_replication.as_ref().unwrap(); | |
let req = | |
GetDatasetsForTask::new(context.zpool.clone(), context.filter.clone()); | |
let res = zfs_addr.send(req).await.unwrap(); | |
if res.is_empty() { | |
warn!(logger, "Got no datasets to work with") | |
} else { | |
debug!(logger, "Got {} datasets to work with", res.len()); | |
} | |
let mut has_errors = false; | |
let snapshot_name = get_snapshot_name(); | |
let req = MakeSnapshots::new(res.clone(), snapshot_name.clone()); | |
let snap_result = zfs_addr.send(req).await.unwrap(); | |
if let Err(e) = snap_result { | |
return Err(e); | |
} | |
let dst_manager = DestinationManager::from_registry(); | |
for dataset in res { | |
let snapshot = PathBuf::from(format!( | |
"{}@{}", | |
dataset.to_string_lossy(), | |
&snapshot_name | |
)); | |
let pipe = Pipe::new().unwrap(); | |
let dst_req = SaveFromPipe::new( | |
task.destination.clone(), | |
dataset.clone(), | |
snapshot.clone(), | |
task.compression.clone(), | |
pipe.read.try_clone().unwrap(), | |
); | |
let dst_res = dst_manager.send(dst_req); | |
let zfs_req = SendSnapshotToPipe(snapshot.clone(), pipe); | |
let zfs_res = zfs_addr.send(zfs_req); | |
let result_both = futures::join!(dst_res, zfs_res); | |
match result_both.0 { | |
Ok(result) => { | |
match result { | |
Ok(()) => {}, | |
Err(e) => { | |
has_errors = true; | |
error!(logger, "{}", e); | |
} | |
} | |
}, | |
Err(e) => { | |
has_errors = true; | |
error!(logger, "Failed to send a message to destination manager: {}", e); | |
} | |
}; | |
match result_both.1 { | |
Ok(()) => { | |
}, | |
Err(e) => { | |
has_errors = true; | |
error!(logger, "Failed to send a message to zfs manager: {}", e); | |
} | |
} | |
} | |
if has_errors { | |
Err("Completed with errors".to_string()) | |
} else { | |
info!(logger, "Done"); | |
Ok(()) | |
} | |
} else { | |
Err(format!("Task {} not found", msg.0.as_str())) | |
} | |
}) | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment