Skip to content

Instantly share code, notes, and snippets.

@ekmartin
Created November 6, 2017 11:47
Show Gist options
  • Save ekmartin/03569ec5f2d8bde35efcac918d71ef44 to your computer and use it in GitHub Desktop.
Save ekmartin/03569ec5f2d8bde35efcac918d71ef44 to your computer and use it in GitHub Desktop.
pub fn retrieve_recovery_packets<'a>(
file: File,
local_addr: LocalNodeIndex,
global_addr: NodeIndex,
is_transactional: bool,
checktable: Rc<checktable::CheckTableClient>,
) -> impl Iterator<Item = Packet> + 'a {
BufReader::new(file)
.lines()
.filter_map(|line| {
let line = line.unwrap();
let entries: Result<Vec<Records>, _> = serde_json::from_str(&line);
entries.ok()
})
.flat_map(|r| r)
// Merge packets into batches of RECOVERY_BATCH_SIZE:
.chunks(RECOVERY_BATCH_SIZE)
.into_iter()
.map(|chunk| chunk.fold(Records::default(), |mut acc, data| {
acc.append(data);
acc
}))
// Then create Packet objects from the data:
.map(move |data| {
let link = Link::new(local_addr, local_addr);
if is_transactional {
let (ts, prevs) = checktable.recover(global_addr).unwrap();
Packet::Transaction {
link,
data,
tracer: None,
state: TransactionState::Committed(ts, global_addr, prevs),
}
} else {
Packet::Message {
link,
data,
tracer: None,
}
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment