Skip to content

Instantly share code, notes, and snippets.

@jsgf
Created September 14, 2017 02:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jsgf/20884d5c065c7f2932613747592ba514 to your computer and use it in GitHub Desktop.
Save jsgf/20884d5c065c7f2932613747592ba514 to your computer and use it in GitHub Desktop.
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
#![feature(conservative_impl_trait)]
extern crate clap;
#[macro_use]
extern crate error_chain;
extern crate futures;
#[macro_use]
extern crate slog;
extern crate slog_term;
extern crate tokio_core;
extern crate blobrepo;
extern crate blobstore;
extern crate bytes;
extern crate fileblob;
extern crate fileheads;
extern crate futures_cpupool;
extern crate futures_ext;
extern crate heads;
extern crate manifoldblob;
extern crate mercurial;
extern crate mercurial_types;
extern crate rocksblob;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate bincode;
use std::error;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use bytes::Bytes;
use futures::{Future, IntoFuture, Stream};
use futures::future::BoxFuture;
use futures_ext::StreamExt;
use tokio_core::reactor::{Core, Remote};
use futures_cpupool::CpuPool;
use clap::{App, Arg};
use slog::{Drain, Level, LevelFilter, Logger};
use blobstore::Blobstore;
use fileblob::Fileblob;
use manifoldblob::ManifoldBlob;
use rocksblob::Rocksblob;
use fileheads::FileHeads;
use heads::Heads;
use blobrepo::BlobChangeset;
use mercurial::{RevlogManifest, RevlogRepo};
use mercurial::revlog::RevIdx;
use mercurial_types::{hash, Blob, Changeset, NodeHash, Parents, Type};
use mercurial_types::manifest::{Entry, Manifest};
#[derive(Debug, Copy, Clone)]
#[derive(Serialize, Deserialize)]
pub struct NodeBlob {
parents: Parents,
blob: hash::Sha1,
}
#[derive(Debug, Eq, PartialEq)]
enum BlobstoreType {
Files,
Rocksdb,
Manifold,
}
type BBlobstore = Arc<
Blobstore<
Key = String,
ValueIn = Bytes,
ValueOut = Vec<u8>,
Error = Error,
GetBlob = BoxFuture<Option<Vec<u8>>, Error>,
PutBlob = BoxFuture<(), Error>,
>
+ Sync,
>;
fn _assert_clone<T: Clone>(_: &T) {}
fn _assert_send<T: Send>(_: &T) {}
fn _assert_static<T: 'static>(_: &T) {}
fn _assert_blobstore<T: Blobstore>(_: &T) {}
error_chain! {
links {
Blobrepo(::blobrepo::Error, ::blobrepo::ErrorKind);
Mercurial(::mercurial::Error, ::mercurial::ErrorKind);
Rocksblob(::rocksblob::Error, ::rocksblob::ErrorKind);
FileHeads(::fileheads::Error, ::fileheads::ErrorKind);
Fileblob(::fileblob::Error, ::fileblob::ErrorKind);
Manifold(::manifoldblob::Error, ::manifoldblob::ErrorKind);
}
foreign_links {
Io(::std::io::Error);
}
}
fn put_manifest_entry(
blobstore: BBlobstore,
entry_hash: NodeHash,
blob: Blob<Vec<u8>>,
parents: Parents,
) -> impl Future<Item = (), Error = Error> + Send + 'static {
let bytes = blob.into_inner()
.ok_or("missing blob data".into())
.map(Bytes::from)
.into_future();
bytes
.and_then(move |bytes| {
let nodeblob = NodeBlob {
parents: parents,
blob: hash::Sha1::from(bytes.as_ref()),
};
// TODO: (jsgf) T21597565 Convert blobimport to use blobrepo methods to name and create blobs.
let nodekey = format!("node-{}.bincode", entry_hash);
let blobkey = format!("sha1-{}", nodeblob.blob);
let nodeblob = bincode::serialize(&nodeblob, bincode::Bounded(4096))
.expect("bincode serialize failed");
// TODO: blobstore.putv?
let node = blobstore
.put(nodekey, Bytes::from(nodeblob))
.map_err(Into::into);
let blob = blobstore.put(blobkey, bytes).map_err(Into::into);
node.join(blob).map(|_| ())
})
}
// Copy a single manifest entry into the blobstore
// TODO: recast as `impl Future<...>` - remove most of these type constraints (which are mostly
// for BoxFuture)
// TODO: #[async]
fn copy_manifest_entry<E>(
entry: Box<Entry<Error = E>>,
blobstore: BBlobstore,
) -> impl Future<Item = (), Error = Error> + Send + 'static
where
Error: From<E>,
E: error::Error + Send + 'static,
{
let hash = *entry.get_hash();
let blobfuture = entry.get_raw_content().map_err(Error::from);
blobfuture
.join(entry.get_parents().map_err(Error::from))
.and_then(move |(blob, parents)| {
put_manifest_entry(blobstore, hash, blob, parents)
})
}
fn get_stream_of_manifest_entries(
entry: Box<Entry<Error = mercurial::Error>>,
revlog_repo: RevlogRepo,
cs_rev: RevIdx,
) -> Box<Stream<Item = Box<Entry<Error = mercurial::Error>>, Error = Error> + Send> {
let revlog = match entry.get_type() {
Type::File | Type::Executable | Type::Symlink => {
revlog_repo.get_file_revlog(entry.get_path())
}
Type::Tree => revlog_repo.get_tree_revlog(entry.get_path()),
};
let linkrev = revlog
.and_then(|file_revlog| {
file_revlog.get_entry_by_nodeid(entry.get_hash())
})
.map(|e| e.linkrev)
.map_err(|e| {
Error::with_chain(e, format!("cannot get linkrev of {}", entry.get_hash()))
});
match linkrev {
Ok(linkrev) => if linkrev != cs_rev {
return futures::stream::empty().boxed();
},
Err(e) => {
return futures::stream::once(Err(e)).boxed();
}
}
match entry.get_type() {
Type::File | Type::Executable | Type::Symlink => futures::stream::once(Ok(entry)).boxed(),
Type::Tree => entry
.get_content()
.and_then(|content| match content {
mercurial_types::manifest::Content::Tree(manifest) => Ok(manifest.list()),
_ => panic!("should not happened"),
})
.flatten_stream()
.map(move |entry| {
get_stream_of_manifest_entries(entry, revlog_repo.clone(), cs_rev.clone())
})
.map_err(Error::from)
.flatten()
.chain(futures::stream::once(Ok(entry)))
.boxed(),
}
}
/// Copy a changeset and its manifest into the blobstore
///
/// The changeset and the manifest are straightforward - we just make literal copies of the
/// blobs into the blobstore.
///
/// The files are more complex. For each manifest, we generate a stream of entries, then flatten
/// the entry streams from all changesets into a single stream. Then each entry is filtered
/// against a set of entries that have already been copied, and any remaining are actually copied.
fn copy_changeset(
revlog_repo: RevlogRepo,
blobstore: BBlobstore,
csid: NodeHash,
) -> impl Future<Item = (), Error = Error> + Send + 'static {
let put = {
let blobstore = blobstore.clone();
let csid = csid;
revlog_repo
.get_changeset_by_nodeid(&csid)
.from_err()
.and_then(move |cs| {
let bcs = BlobChangeset::new(&csid, cs);
bcs.save(blobstore).map_err(Into::into)
})
};
let manifest = {
revlog_repo
.get_changeset_by_nodeid(&csid)
.join(revlog_repo.get_changelog_revlog_entry_by_nodeid(&csid))
.from_err()
.and_then(move |(cs, entry)| {
let mfid = *cs.manifestid();
let linkrev = entry.linkrev;
// fetch the blob for the (root) manifest
revlog_repo
.get_manifest_blob_by_nodeid(&mfid)
.from_err()
.and_then(move |blob| {
let putmf = put_manifest_entry(
blobstore.clone(),
mfid,
blob.as_blob().clone(),
blob.parents().clone(),
);
// Get the listing of entries and fetch each of those
let files = RevlogManifest::new(revlog_repo.clone(), blob)
.map_err(|err| {
Error::with_chain(Error::from(err), "Parsing manifest to get list")
})
.map(|mf| mf.list().map_err(Error::from))
.map(|entry_stream| {
entry_stream
.map({
let revlog_repo = revlog_repo.clone();
move |entry| {
get_stream_of_manifest_entries(
entry,
revlog_repo.clone(),
linkrev.clone(),
)
}
})
.flatten()
.for_each(
move |entry| copy_manifest_entry(entry, blobstore.clone()),
)
})
.into_future()
.flatten();
putmf.join(files)
})
})
.map_err(move |err| {
Error::with_chain(err, format!("Can't copy manifest for cs {}", csid))
})
};
put.join(manifest).map(|_| ())
}
fn convert<H>(
revlog: RevlogRepo,
blobstore: BBlobstore,
headstore: H,
core: Core,
cpupool: Arc<CpuPool>,
logger: &Logger,
) -> Result<()>
where
H: Heads<Key = String>,
H::Error: Into<Error>,
{
let mut core = core;
// Generate stream of changesets. For each changeset, save the cs blob, and the manifest blob,
// and the files.
let changesets = revlog.changesets()
.map_err(Error::from)
.enumerate()
.map({
let blobstore = blobstore.clone();
let revlog = revlog.clone();
move |(seq, csid)| {
info!(logger, "{}: changeset {}", seq, csid);
copy_changeset(revlog.clone(), blobstore.clone(), csid)
}
}) // Stream<Future<()>>
.map(|copy| cpupool.spawn(copy))
.buffer_unordered(100);
let heads = revlog
.get_heads()
.map_err(Error::from)
.map_err(|err| Error::with_chain(err, "Failed get heads"))
.map(|h| {
info!(logger, "head {}", h);
headstore
.add(&format!("{}", h))
.map_err(Into::into)
.map_err({
move |err| Error::with_chain(err, format!("Failed to create head {}", h))
})
})
.buffer_unordered(100);
let convert = changesets.merge(heads).for_each(|_| Ok(()));
core.run(convert)?;
Ok(())
}
fn run<In, Out>(input: In, output: Out, blobtype: BlobstoreType, logger: &Logger) -> Result<()>
where
In: AsRef<Path>,
Out: AsRef<Path>,
{
let core = tokio_core::reactor::Core::new()?;
let cpupool = Arc::new(CpuPool::new_num_cpus());
let repo = open_repo(&input)?;
let blobstore = open_blobstore(&output, blobtype, &core.remote())?;
let headstore = open_headstore(&output, &cpupool)?;
convert(repo, blobstore, headstore, core, cpupool, logger)
}
fn open_repo<P: AsRef<Path>>(input: P) -> Result<RevlogRepo> {
let mut input = PathBuf::from(input.as_ref());
if !input.exists() || !input.is_dir() {
bail!("input {:?} doesn't exist or isn't a dir", input);
}
input.push(".hg");
let revlog = RevlogRepo::open(input)?;
Ok(revlog)
}
fn open_headstore<P: AsRef<Path>>(heads: P, pool: &Arc<CpuPool>) -> Result<FileHeads<String>> {
let mut heads = PathBuf::from(heads.as_ref());
heads.push("heads");
let headstore = fileheads::FileHeads::create_with_pool(heads, pool.clone())?;
Ok(headstore)
}
fn open_blobstore<P: AsRef<Path>>(
output: P,
ty: BlobstoreType,
remote: &Remote,
) -> Result<BBlobstore> {
let mut output = PathBuf::from(output.as_ref());
output.push("blobs");
let blobstore = match ty {
BlobstoreType::Files => Fileblob::<_, Bytes>::create(output)
.map_err(Error::from)
.chain_err::<_, Error>(|| "Failed to open file blob store".into())?
.arced(),
BlobstoreType::Rocksdb => Rocksblob::create(output)
.map_err(Error::from)
.chain_err::<_, Error>(|| "Failed to open rocksdb blob store".into())?
.arced(),
BlobstoreType::Manifold => {
let mb: ManifoldBlob<String, Bytes> = ManifoldBlob::new_may_panic("mononoke", remote);
mb.arced()
}
};
_assert_clone(&blobstore);
_assert_send(&blobstore);
_assert_static(&blobstore);
_assert_blobstore(&blobstore);
Ok(blobstore)
}
fn main() {
let matches = App::new("revlog to blob importer")
.version("0.0.0")
.about("make blobs")
.args_from_usage("[debug] -d, --debug 'print debug level output'")
.arg(
Arg::with_name("blobstore")
.long("blobstore")
.short("B")
.takes_value(true)
.possible_values(&["files", "rocksdb", "manifold"])
.required(true)
.help("blobstore type"),
)
.args_from_usage("<INPUT> 'input revlog repo'")
.args_from_usage("<OUTPUT> 'output blobstore RepoCtx'")
.get_matches();
let level = if matches.is_present("debug") {
Level::Debug
} else {
Level::Info
};
// TODO: switch to TermDecorator, which supports color
let drain = slog_term::PlainSyncDecorator::new(io::stdout());
let drain = slog_term::FullFormat::new(drain).build();
let drain = LevelFilter::new(drain, level).fuse();
let root_log = slog::Logger::root(drain, o![]);
let input = matches.value_of("INPUT").unwrap();
let output = matches.value_of("OUTPUT").unwrap();
let blobtype = match matches.value_of("blobstore").unwrap() {
"files" => BlobstoreType::Files,
"rocksdb" => BlobstoreType::Rocksdb,
"manifold" => BlobstoreType::Manifold,
bad => panic!("unexpected blobstore type {}", bad),
};
if let Err(ref e) = run(input, output, blobtype, &root_log) {
error!(root_log, "Failed: {}", e);
for e in e.iter().skip(1) {
error!(root_log, "caused by: {}", e);
}
std::process::exit(1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment