Skip to content

Instantly share code, notes, and snippets.

@tim-tang
Created May 31, 2012 01:35
Show Gist options
  • Save tim-tang/2840207 to your computer and use it in GitHub Desktop.
Save tim-tang/2840207 to your computer and use it in GitHub Desktop.
[java] - File sync tool
package com.sz.zhenhe;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.security.InvalidKeyException;
import java.security.KeyManagementException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Date;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import javax.xml.stream.XMLStreamWriter;
public class NSync {
static String authHost;
static String syncHost;
static String email;
static String password;
static SSLSocketFactory sslfactory;
static XMLInputFactory xmlInFactory = XMLInputFactory.newInstance();
static XMLOutputFactory xmlOutFactory = XMLOutputFactory.newInstance();
static PrintWriter file;
static CLIArguments args;
static File syncRoot;
static int requestId;
static final String HD_AUTHTOKEN = "x-origin-authtoken";
static final String HD_LOCK = "x-origin-sync-lock";
static final String HD_LOCK_TTL = "x-origin-sync-lock-ttl";
static final String ENC_UTF_8 = "UTF-8";
static final String HD_CONTENT_TYPE = "Content-Type";
static final String HTTP_PUT = "PUT";
static final String HTTP_GET = "GET";
static final String HTTP_DELETE = "DELETE";
static final String HTTP_HEAD = "HEAD";
static final String STATUS_COMMIT = "commit";
static final String STATUS_ROLLBACK = "rollback";
static final String MANIFEST = "manifest.xml";
static final String MANIFEST_XMLNS = "http://origin.com/cloudsaves/manifest";
private static final String ELEM_LOCALNAME = "localName".intern();
private static final String ATTR_MD5 = "md5".intern();
private static final String ATTR_SIZE = "size".intern();
private static final String ATTR_HREF = "href".intern();
private static final String ELEM_FILE = "file".intern();
static interface ResponseHandler<R, T> {
boolean consumeXml();
void parseHeaders(Map<String,List<String>> headers, T userdata);
void consume(R reader, T userdata);
}
static interface RequestHandler {
void writeBody(OutputStream os) throws IOException;
}
static class User {
public String Id;
public String AuthToken;
}
static class SyncSession {
public String lockId;
public String manifest;
public int lockttl;
public String host;
public String root;
public String baseURL;
}
static class CLIArguments {
public String path = null;
public String contentid = null;
public boolean recurse = false;
public boolean verbose = false;
}
static class FileInfo {
public String path;
public long size;
public long lastModified;
public String md5;
//fucking hack, but i really don't want to create a new class
//used when parsing the S3 manifest
public String s3Href;
}
static class S3FileRequest {
public S3FileRequest(int id) {
this.id = id;
}
public final int id;
public FileInfo srcFile;
public String verb;
public String href; // the relative path from the base url
public String url; // the full path of the S3 Request
public Map<String, String> headers;
public byte[] data; //only if the verb is GET
public boolean completed = false;
public boolean compressed = false;
}
public static void main(String[] argv)
throws NoSuchAlgorithmException, KeyManagementException,InvalidKeyException,
UnsupportedEncodingException, InterruptedException, IOException, XMLStreamException, Exception {
User user = null;
SyncSession session = null;
ScheduledExecutorService extendLockScheduler = null;
ThreadPoolExecutor workerPool = null;
try {
file = new PrintWriter("nsync.log",ENC_UTF_8);
log("NSync v1.0 - Your friendly gateway to Cloudsync");
log("");
if (argv.length < 2) {
printUsage();
return;
}
args = parseCLIArguments(argv);
if (args.path == null) {
printUsage();
return;
}
if (!initSyncRoot()) {
return;
}
initSSLFactory();
loadConfig("nsync.config");
printCurrentConfig();
user = new User();
if (!login(email, password, user)) {
return;
}
session = new SyncSession();
if (!beginSync(user,args.contentid, session)) {
return;
}
extendLockScheduler = Executors.newSingleThreadScheduledExecutor();
ExtendLockWorker elw = new ExtendLockWorker(user.Id, user.AuthToken, session.lockId);
int interval = session.lockttl - 10;
log("ExtendLock Worker will run in %ds intervals", interval);
log("");
extendLockScheduler.scheduleWithFixedDelay(elw, interval, interval, TimeUnit.SECONDS);
long start = System.currentTimeMillis();
log("Begin sync files @ %s", DateFormat.getDateTimeInstance(DateFormat.FULL, DateFormat.FULL).format(new Date(start)));
log("");
int numThreads = Runtime.getRuntime().availableProcessors() * 2;
workerPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(numThreads * 2);
ExecutorCompletionService<FileInfo> calcMD5Service = new ExecutorCompletionService<FileInfo>(workerPool);
ExecutorCompletionService<S3FileRequest> s3FileRequestService = new ExecutorCompletionService<S3FileRequest>(workerPool);
Map<String, FileInfo> localFileMap = new HashMap<String, FileInfo>();
getFiles(syncRoot, calcMD5Service, localFileMap);
//now lets get the manifest on S3 and build a file map
// Map key = fullpath
Map<String,FileInfo> manifestFileMap = new HashMap<String,FileInfo>();
boolean hasManifest = getFilesInManifest(session, manifestFileMap);
completeMD5Calc(localFileMap.size(), calcMD5Service);
if(hasManifest) {
getLastModifiedTimeFromS3(user, session, manifestFileMap.values(),s3FileRequestService);
}
List<String> filesToDelete = new ArrayList<String>();
List<S3FileRequest> filesToUpload = new ArrayList<S3FileRequest>();
createS3Requests(manifestFileMap, localFileMap, filesToDelete, filesToUpload);
List<S3FileRequest> outstandingRequests = filesToUpload;
int retries = 3;
while(outstandingRequests.size() > 0 && retries-- > 0) {
outstandingRequests = syncFiles(user, session, outstandingRequests, s3FileRequestService, numThreads);
}
if (outstandingRequests.size() > 0) {
log("Too many failed attempts uploading files to S3, giving up...");
return;
}
S3FileRequest putManifestRequest = createManifest(session, syncRoot, localFileMap.values());
outstandingRequests = new ArrayList<S3FileRequest>();
outstandingRequests.add(putManifestRequest);
retries = 3;
while(outstandingRequests.size() > 0 && retries-- > 0) {
outstandingRequests = syncFiles(user, session, outstandingRequests, s3FileRequestService, numThreads);
}
if (outstandingRequests.size() > 0) {
log("Too many failed attempts uploading manifest to S3, giving up...");
return;
}
deleteLock(user.Id, user.AuthToken, session.lockId, STATUS_COMMIT, filesToDelete);
long end = System.currentTimeMillis();
log("End sync files @ %s", DateFormat.getDateTimeInstance(DateFormat.FULL, DateFormat.FULL).format(new Date(end)));
log("");
log("Total Files uploaded: %d", filesToUpload.size());
log("Orphan files found: %d", filesToDelete.size());
log("Total sync time: %dms", (end - start));
user = null;
session = null;
}
finally {
try {
if (workerPool != null) {
workerPool.shutdownNow();
workerPool.awaitTermination(30, TimeUnit.SECONDS);
}
} catch(InterruptedException iex) {
Thread.currentThread().interrupt();
}
try {
if (extendLockScheduler != null) {
extendLockScheduler.shutdownNow();
extendLockScheduler.awaitTermination(30, TimeUnit.SECONDS);
}
}
catch (InterruptedException iex) {
Thread.currentThread().interrupt();
}
if (user != null && session != null) // means that some freaking exception occurred, lets cleanup here...
deleteLock(user.Id, user.AuthToken, session.lockId, STATUS_ROLLBACK, null);
if (file!=null) {
file.close();
}
}
}
static void printUsage() {
prn("Usage: java NSync [-options]");
prn("options:");
prn(" -d : the directory to sync");
prn(" -c : contentid of the data to sync (defaults to directory name in -d option)");
prn(" -r : recurses into child directories (default false)");
prn(" -v : verbose logging with http connection details (default false)");
prn("");
prn("Examples:");
prn("");
prn("> java NSync -d c:\\mydata\\somedocs");
prn("Syncs files at c:\\mydata\\somedocs, no recurse into subdirs, no verbose logging, contentid defaults to somedocs");
prn("");
prn("> java NSync -d c:\\mygame\\savegame -c bf3_na -r");
prn("Syncs files at c:\\mygame\\savegame, recurses into subdirs, no verbose logging, contentid bf3_na");
}
static CLIArguments parseCLIArguments(String[] argv) {
CLIArguments args = new CLIArguments();
int count = argv.length;
for (int i=0; i < count;++i) {
String option = argv[i].toLowerCase().intern();
if (option == "-d".intern()) {
args.path = argv[++i];
}
else if (option == "-r".intern()) {
args.recurse = true;
}
else if (option == "-v".intern()) {
args.verbose = true;
}
else if (option == "-c".intern()) {
args.contentid = argv[++i];
}
}
return args;
}
static boolean initSyncRoot() {
syncRoot = new File(args.path);
try {
if (syncRoot.exists() && syncRoot.isDirectory()) {
if (args.contentid == null) {
args.contentid = syncRoot.getName();
}
return true;
}
}
catch (SecurityException sex) {
log("error while trying to read path: %s", sex);
return false;
}
log("Invalid directory specified: %s", args.path);
return false;
}
static void printCurrentConfig() {
log("NSync current settings");
log(" Auth Service: %s",authHost);
log(" Sync Service: %s", syncHost);
log(" Email: %s",email);
log(" Password: %s",password);
log(" Sync Root Dir: %s", syncRoot.getAbsolutePath());
log(" Content-ID: %s", args.contentid);
log(" Recurse to childs: %s", args.recurse);
log(" Verbose logs: %s", args.verbose);
log("");
}
static private class ExtendLockWorker implements Runnable {
String userId;
String authToken;
String lockId;
public ExtendLockWorker(String userId, String authToken, String lockId) {
this.userId = userId;
this.authToken = authToken;
this.lockId = lockId;
}
public void run() {
log("Extending Lock for userId: %s, lockId: %s", userId, lockId);
String url = String.format("%s/cloudsync/extendlock/%s",
syncHost,userId);
HashMap<String,String> headers = new HashMap<String, String>();
headers.put(HD_AUTHTOKEN, authToken);
headers.put(HD_LOCK, lockId);
int result = connect(HTTP_PUT,url,headers.entrySet(), null, null, null);
if (result != HttpURLConnection.HTTP_OK) {
log("Extend Lock failed! HTTP %d", result);
}
}
}
static void initSSLFactory() throws NoSuchAlgorithmException, KeyManagementException {
SSLContext sc = SSLContext.getInstance("SSL");
sc.init(null,
new TrustManager[]{
new X509TrustManager() {
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}
public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) {
}
public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) {
}
}
} , new SecureRandom());
sslfactory = sc.getSocketFactory();
}
static boolean login(String email, String password, User user) {
log("Login user with email: %s, password: %s", email, password);
String url = String.format("%s/loginregistration/login?ea_email=%s&ea_password=%s&remember_me=false",
authHost,email,password);
int statuscode = connect(HTTP_GET,url, null, null, new ResponseHandler<XMLStreamReader, User>() {
public boolean consumeXml() { return true; }
public void parseHeaders(Map<String, List<String>> headers, User userdata ) {}
public void consume(XMLStreamReader reader, User userdata) {
try {
while(reader.hasNext()) {
int event = reader.next();
if (event == XMLStreamReader.START_ELEMENT) {
String element = reader.getLocalName().toLowerCase().intern();
if (element == "userid".intern()) {
userdata.Id = reader.getElementText();
}
else if (element == "genericauthtoken".intern()) {
userdata.AuthToken = reader.getElementText();
}
}
}
}
catch(XMLStreamException xsex) {
log("[login.consume] error reading user response: " + xsex);
}
}
}, user);
boolean success = statuscode == HttpURLConnection.HTTP_OK;
if (success) {
log(" User Id: %s", user.Id);
log(" Authtoken: %s", user.AuthToken);
}
else {
log("Login user failed! HTTP %d", statuscode);
}
log("");
return success;
}
static boolean beginSync(User user, String contentId, SyncSession session) {
log("Begin Sync, userId: %s, contentId: %s", user.Id, contentId);
String url = String.format("%s/cloudsync/writelock/%s/%s",
syncHost,user.Id,contentId);
Map<String,String> headers = new HashMap<String, String>();
headers.put(HD_AUTHTOKEN, user.AuthToken);
int statuscode = connect(HTTP_PUT,url, headers.entrySet(), null, new ResponseHandler<XMLStreamReader, SyncSession>() {
public boolean consumeXml() { return true; }
public void parseHeaders(Map<String, List<String>> headers, SyncSession userdata ) {
List<String> values = headers.get(HD_LOCK);
userdata.lockId = values.get(0);
values = headers.get(HD_LOCK_TTL);
userdata.lockttl = Integer.parseInt(values.get(0));
}
public void consume(XMLStreamReader reader, SyncSession userdata) {
try {
while(reader.hasNext()) {
int event = reader.next();
if (event == XMLStreamReader.START_ELEMENT) {
String element = reader.getLocalName().toLowerCase().intern();
if (element == "host".intern()) {
userdata.host = reader.getElementText();
}
else if (element == "root".intern()) {
userdata.root = reader.getElementText();
}
else if (element == "manifest".intern()) {
userdata.manifest = reader.getElementText();
}
}
}
}
catch(XMLStreamException xsex) {
log("[beginSync.consume] error reading user response: " + xsex);
}
}
}, session);
boolean success = statuscode == HttpURLConnection.HTTP_OK;
if (success) {
String host = session.host;
host = (host.endsWith("/")) ? host.substring(0, host.length()-1) : host;
String root = session.root;
root = (root.startsWith("/")) ? root.substring(1) : root;
root = (root.endsWith("/")) ? root : root + "/";
session.baseURL = host + "/" + root;
log(" S3 host: %s", session.host);
log(" Root: %s", session.root);
log(" Base URL: %s", session.baseURL);
log(" Lock Id: %s", session.lockId);
log(" Lock TTL: %d",session.lockttl);
log(" Manifest: %s",session.manifest);
}
else {
log("Begin Sync failed! HTTP %d", statuscode);
}
log("");
return success;
}
static boolean getFilesInManifest(SyncSession session, Map<String, FileInfo> manifestFileMap) {
log("Retrieving Manifest from %s", session.manifest);
int status = connect(HTTP_GET, session.manifest, null, null,
new ResponseHandler<XMLStreamReader, Map<String,FileInfo>>() {
public boolean consumeXml() { return true; }
public void parseHeaders(Map<String, List<String>> headers, Map<String, FileInfo> userdata) {}
public void consume(XMLStreamReader reader, Map<String, FileInfo> userdata) {
try {
FileInfo fi = null;
while(reader.hasNext()) {
int event = reader.next();
if (event == XMLStreamReader.START_ELEMENT) {
String element = reader.getLocalName().intern();
if (element == ELEM_FILE) {
fi = new FileInfo();
fi.s3Href = reader.getAttributeValue(null, ATTR_HREF);
fi.size = Long.parseLong(reader.getAttributeValue(null, ATTR_SIZE));
fi.md5 = reader.getAttributeValue(null, ATTR_MD5);
}
else if (element == ELEM_LOCALNAME) {
fi.path = reader.getElementText();
userdata.put(fi.path, fi);
}
}
}
}
catch(XMLStreamException xsex) {
log("[getFilesInManifest.consume] error reading user response: " + xsex);
}
}
}, manifestFileMap);
boolean success = status == HttpURLConnection.HTTP_OK;
if (!success) {
if (status == HttpURLConnection.HTTP_NOT_FOUND) {
log("No Manifest found... first time performing NSync...");
}
else {
log("Failed to retrieve manifest! HTTP %d", status);
}
}
log("");
return success;
}
static S3FileRequest createManifest(SyncSession session, File syncRoot, Collection<FileInfo> files) throws Exception {
XMLStreamWriter writer = null;
String manifestPath = syncRoot.getAbsolutePath() + File.separatorChar + MANIFEST;
GZIPOutputStream os = null;
try {
log("Creating manifest at %s", manifestPath);
os = new GZIPOutputStream( new FileOutputStream(manifestPath) );
writer = xmlOutFactory.createXMLStreamWriter(os,ENC_UTF_8);
writer.writeStartElement("manifest");
writer.writeAttribute("xmlns", MANIFEST_XMLNS);
for(FileInfo fi: files) {
writer.writeStartElement(ELEM_FILE);
writer.writeAttribute(ATTR_HREF, generateHref(fi.size, fi.md5));
writer.writeAttribute(ATTR_SIZE, fi.size+"");
writer.writeAttribute(ATTR_MD5, fi.md5);
writer.writeStartElement(ELEM_LOCALNAME);
writer.writeCharacters(fi.path);
writer.writeEndElement();
writer.writeEndElement();
}
writer.writeEndElement();
writer.flush();
}
finally {
try {
writer.close();
} finally {
os.flush();
os.close();
}
}
S3FileRequest manifest = new S3FileRequest(nextId());
manifest.verb = HTTP_PUT;
manifest.href = MANIFEST;
FileInfo fi = new FileInfo();
fi.path = manifestPath;
File f = new File(fi.path);
fi.lastModified = f.lastModified();
fi.size = f.length();
fi.md5 = calcMD5(fi.path);
manifest.srcFile = fi;
manifest.compressed = true;
return manifest;
}
static class ProcessMD5 implements Callable<FileInfo> {
FileInfo fi;
public ProcessMD5(FileInfo fi) {
this.fi = fi;
}
@Override
public FileInfo call() throws Exception {
fi.md5 = calcMD5(fi.path);
return fi;
}
}
static String calcMD5(String filePath) throws NoSuchAlgorithmException, FileNotFoundException, IOException {
vlog("<%s> Calculating MD5 for %s", Thread.currentThread().getName(), filePath);
MessageDigest digest = MessageDigest.getInstance("MD5");
byte[] buff = new byte[8192];
InputStream fs = null;
try {
fs = new FileInputStream(filePath);
int read = -1;
while( (read = fs.read(buff)) != -1 )
digest.update(buff,0,read);
byte[] hash = encodeToByte(digest.digest(), false);
return new String(hash);
}
finally {
fs.close();
}
}
static void getLastModifiedTimeFromS3(User user, SyncSession session,
Collection<FileInfo> files, CompletionService<S3FileRequest> s3Service) throws XMLStreamException {
int totalFiles = files.size();
log("Preparing to get HEAD for %d files on S3...", totalFiles);
List<S3FileRequest> requests = new ArrayList<S3FileRequest>(totalFiles);
for (FileInfo fi : files) {
S3FileRequest r = new S3FileRequest(nextId());
r.srcFile = fi;
r.verb = HTTP_HEAD;
r.href = fi.s3Href;
requests.add(r);
}
int retries = 3;
while (requests.size() > 0 && retries-- > 0) {
requests = headFiles(user, session, requests, s3Service);
}
if (requests.size() > 0) {
log("Too many failed attempts to get HEAD, try next time...");
throw new RuntimeException("Too many failed attempts to connect to S3 for HEAD requests.");
}
}
static List<S3FileRequest> headFiles(User user, SyncSession session, List<S3FileRequest> requests,
CompletionService<S3FileRequest> s3Service) throws XMLStreamException {
int size = requests.size();
processFilesBatch(user, session, requests, s3Service, 0, size);
ArrayList<S3FileRequest> failedRequests = new ArrayList<S3FileRequest>();
for(int i=0; i<size; ++i) {
try {
Future<S3FileRequest> f = s3Service.take();
S3FileRequest request = f.get();
if (!request.completed)
failedRequests.add(request);
}
catch (InterruptedException iex) {
log("[syncFiles] interrupted: %s", iex);
}
catch (ExecutionException eex) {
log("[syncFiles] execution exception: ", eex);
}
}
return failedRequests;
}
static String generateHref(long size, String md5) {
return "content/" + size + "-" + md5;
}
static int nextId() {
return ++requestId;
}
static class S3Task implements Callable<S3FileRequest> {
private static final String HD_LAST_MODIFIED = "Last-Modified";
S3FileRequest request;
public S3Task( S3FileRequest request) {
this.request = request;
}
public S3FileRequest call() throws Exception {
vlog("[S3Task] <%s> Request[%d] [%s] %s",Thread.currentThread().getName(), request.id, request.verb, request.url);
String verb = request.verb;
if (request.compressed)
request.headers.put("Content-Encoding", "gzip");
if (verb == HTTP_PUT) {
int status = connect(verb, request.url, request.headers.entrySet(), new RequestHandler() {
public void writeBody(OutputStream os) throws IOException {
byte[] buff = new byte[8192];
InputStream fs = null;
try {
fs = new FileInputStream(request.srcFile.path);
int read = -1;
while( (read = fs.read(buff)) != -1 ) {
os.write(buff, 0, read);
}
}
finally {
fs.close();
}
request.completed = true;
}
}, null, null);
if (status == HttpURLConnection.HTTP_OK) {
log("Completed upload for Request[%d].", request.id);
}
else {
log("Failed to upload Request[%d], Reason: HTTP %d", request.id, status);
}
}
else if (verb == HTTP_HEAD ) {
int status = connect(verb, request.url, request.headers.entrySet(), null, new ResponseHandler<InputStream, S3FileRequest>() {
public boolean consumeXml() {
return false;
}
@Override
public void parseHeaders(Map<String, List<String>> headers, S3FileRequest userdata) {
String lastModified = headers.get(HD_LAST_MODIFIED).get(0);
try {
DateFormat f = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz");
Date date = f.parse(lastModified);
userdata.srcFile.lastModified = date.getTime();
userdata.completed = true;
}catch (ParseException pex) {
log("error parsing lastModified time: %s", pex);
}
}
@Override
public void consume(InputStream reader, S3FileRequest userdata) {}
}, request);
if (status == HttpURLConnection.HTTP_OK) {
log("Completed HEAD for Request[%d].", request.id);
}
else {
log("Failed to retrieve Request[%d], Reason: HTTP %d", request.id, status);
}
}
return request;
}
}
static void getFiles(File dir, CompletionService<FileInfo> md5Service, Map<String, FileInfo> fileMap) {
File[] childs = dir.listFiles();
for (File child : childs) {
if (child.isDirectory()) {
getFiles(child, md5Service, fileMap);
}
else if (child.isFile() && child.getName().toLowerCase().intern() != MANIFEST) {
FileInfo fi = new FileInfo();
fi.path = child.getAbsolutePath();
fi.size = child.length();
fi.lastModified = child.lastModified();
fileMap.put(fi.path, fi);
md5Service.submit(new ProcessMD5(fi));
}
}
}
static void completeMD5Calc(int files, CompletionService<FileInfo> calcMD5Service) {
for (int i=files; i>0; --i) {
Future<FileInfo> f;
try {
f = calcMD5Service.take();
FileInfo fi = f.get();
vlog("%s MD5: %s", fi.path, fi.md5);
}
catch (InterruptedException iex) {
log("[completeMD5Calc] interrupted: %s", iex);
}
catch (ExecutionException eex) {
log("[completeMD5Calc] execution exception: ", eex);
}
}
log("");
}
static void createS3Requests(Map<String,FileInfo> manifestFileMap, Map<String, FileInfo> localFileMap,
List<String> filesToDelete, List<S3FileRequest> filesToUpload) {
for (String file : manifestFileMap.keySet()) {
if (!localFileMap.containsKey(file)) { // means these files are no longer needed on the cloud.
filesToDelete.add(file);
}
}
for (Map.Entry<String, FileInfo> kvp : localFileMap.entrySet()) {
String filename = kvp.getKey();
FileInfo localFile = kvp.getValue();
FileInfo s3File = manifestFileMap.get(filename);
if (s3File != null) {
if (localFile.md5 != s3File.md5 && localFile.lastModified > s3File.lastModified) {
filesToDelete.add(filename);
filesToUpload.add(createS3PUTRequest(localFile));
}
}
else {
filesToUpload.add(createS3PUTRequest(localFile));
}
}
}
static S3FileRequest createS3PUTRequest(FileInfo fi) {
S3FileRequest r = new S3FileRequest(nextId());
r.srcFile = fi;
r.verb = HTTP_PUT;
r.href = generateHref(fi.size, fi.md5);
return r;
}
static List<S3FileRequest> syncFiles(User user, SyncSession session, List<S3FileRequest> requests,
CompletionService<S3FileRequest> s3Service, int batchSize) throws XMLStreamException {
int idx = 0;
int totalFiles = requests.size();
log("Prepare to sync %d files...", totalFiles);
int toBatch = 0;
while (idx < totalFiles) {
toBatch = Math.min((totalFiles - idx), batchSize);
processFilesBatch(user, session, requests, s3Service, idx, toBatch);
idx += toBatch;
}
List<S3FileRequest> failedRequests = new ArrayList<S3FileRequest>();
for(int i=0; i<totalFiles; ++i) {
try {
Future<S3FileRequest> f = s3Service.take();
S3FileRequest request = f.get();
if (!request.completed)
failedRequests.add(request);
}
catch (InterruptedException iex) {
log("[syncFiles] interrupted: %s", iex);
}
catch (ExecutionException eex) {
log("[syncFiles] execution exception: ", eex);
}
}
return failedRequests;
}
static class AuthorizeBuilder implements RequestHandler {
final List<S3FileRequest> requests;
int idx;
int end;
public AuthorizeBuilder(List<S3FileRequest> requests, int startIdx, int end) {
this.requests = requests;
this.idx = startIdx;
this.end = end;
}
@Override
public void writeBody(OutputStream os) throws IOException {
XMLStreamWriter writer = null;
try {
writer = xmlOutFactory.createXMLStreamWriter(os,ENC_UTF_8);
writer.writeStartElement("requests");
do {
S3FileRequest request = requests.get(idx);
writeFileRequest(writer, idx, request);
} while ( ++idx < end );
writer.writeEndElement();
writer.flush();
}
catch(XMLStreamException xsex) {
throw new IOException("Failing fast because this is obviously a bug.. and java doesn't like me to allow the XMLStreamException to pass, fucker", xsex);
}
finally {
try {
writer.close();
}
catch(XMLStreamException xsex) {
log("[AuthorizeBuilder.writeBody] error while closing xmlstreamwriter: %s", xsex);
//dont do anything since its not recoverable.
}
}
}
}
static void writeFileRequest(XMLStreamWriter writer, int id, S3FileRequest request) throws XMLStreamException {
writer.writeStartElement("request");
writer.writeAttribute("id", ""+id);
writer.writeStartElement("verb");
writer.writeCharacters(request.verb);
writer.writeEndElement();
writer.writeStartElement("resource");
writer.writeCharacters(request.href);
writer.writeEndElement();
if (request.verb == HTTP_PUT) {
writer.writeStartElement(ATTR_MD5);
writer.writeCharacters(request.srcFile.md5);
writer.writeEndElement();
writer.writeStartElement("content-type");
writer.writeCharacters("application/octet-stream");
writer.writeEndElement();
}
writer.writeEndElement();
}
static void processFilesBatch(User user, SyncSession session, final List<S3FileRequest> requests,
CompletionService<S3FileRequest> s3FileRequestService, int startIdx, int len) throws XMLStreamException {
final int idx = startIdx;
final int end = startIdx + len;
if(!authorizeRequests(user.Id, user.AuthToken, session.lockId, new AuthorizeBuilder(requests, idx, end),requests, startIdx, len)) {
throw new RuntimeException("Failed to authorize request with cloudsync server, failing fast...");
}
log("Accessing S3 for the following files: ");
for (int i=startIdx, size=startIdx+len; i<size; ++i) {
S3FileRequest request = requests.get(i);
log(" Request[%d], [%s] local: %s, remote: %s", request.id, request.verb, request.srcFile.path, request.url);
s3FileRequestService.submit(new S3Task(request));
}
}
static boolean authorizeRequests(String userId, String authToken, String lockId, RequestHandler reqHandler, List<S3FileRequest> requests, int startIdx, int len) {
log("Authorizing S3 requests for %d files...", len);
String url = String.format("%s/cloudsync/authorize/%s",
syncHost,userId);
HashMap<String,String> headers = new HashMap<String, String>();
headers.put(HD_AUTHTOKEN, authToken);
headers.put(HD_LOCK, lockId);
headers.put(HD_CONTENT_TYPE, "text/xml");
int statuscode = connect(HTTP_PUT, url, headers.entrySet(), reqHandler,
new ResponseHandler<XMLStreamReader, List<S3FileRequest>>() {
public boolean consumeXml() { return true; }
public void parseHeaders(Map<String, List<String>> headers, List<S3FileRequest> userdata ) {}
public void consume(XMLStreamReader reader, List<S3FileRequest> userdata) {
try {
S3FileRequest currRequest = null;
while(reader.hasNext()) {
int event = reader.next();
if (event == XMLStreamReader.START_ELEMENT) {
String element = reader.getLocalName().toLowerCase().intern();
if (element == "request".intern()) {
int idx = Integer.parseInt(reader.getAttributeValue(null, "id"));
currRequest = userdata.get(idx);
}
else if (element == "url".intern()) {
currRequest.url = reader.getElementText();
}
else if (element == "header".intern()) {
if (currRequest.headers == null) {
currRequest.headers = new HashMap<String,String>();
}
String key = reader.getAttributeValue(null,"key");
String val = reader.getAttributeValue(null, "value");
currRequest.headers.put(key, val);
}
}
}
}
catch(XMLStreamException xsex) {
log("[authorizeRequests.consume] error reading user response: " + xsex);
}
}
}, requests);
boolean success = statuscode == HttpURLConnection.HTTP_OK;
if (!success) {
log("Failed to authorize requests! HTTP %d", statuscode);
}
return success;
}
static boolean deleteLock(String userId, String authToken, String lockId, String status, final List<String> filesToDelete) {
log("Deleting Lock for userId: %s, lockId: %s", userId, lockId);
String url = String.format("%s/cloudsync/lock/%s?status=%s",
syncHost, userId, status);
HashMap<String,String> headers = new HashMap<String, String>();
headers.put(HD_AUTHTOKEN, authToken);
headers.put(HD_LOCK, lockId);
RequestHandler handler = null;
if (filesToDelete != null && filesToDelete.size() > 0) {
handler = new RequestHandler() {
public void writeBody(OutputStream os) throws IOException {
XMLStreamWriter writer = null;
try {
writer = xmlOutFactory.createXMLStreamWriter(os,ENC_UTF_8);
writer.writeStartElement("orphans");
for(String filename: filesToDelete) {
writer.writeStartElement("orphan");
writer.writeCharacters(filename);
writer.writeEndElement();
}
writer.writeEndElement();
writer.flush();
}
catch (XMLStreamException xsex) {
log("[deleteLock] error while writing orphans to outputstream: %s",xsex);
}
finally {
if (writer != null) {
try {
writer.close();
}
catch (XMLStreamException xsex) {
log("[deleteLock] error while closing xmlwriter");
}
}
}
}
};
}
int statuscode = connect("DELETE",url,headers.entrySet(), handler, null, null);
boolean success = statuscode == HttpURLConnection.HTTP_OK;
if (!success) log("Delete Lock failed! HTTP %d", statuscode);
return success;
}
static String cleanupPath(String path) {
if (path.endsWith("/")) {
return path.substring(0, path.length()-1);
}
return path;
}
static void loadConfig(String name) {
Properties props = new Properties();
InputStream is = null;
try {
is = NSync.class.getResourceAsStream(name);
props.load(is);
authHost = cleanupPath(props.getProperty("authHost"));
syncHost = cleanupPath(props.getProperty("syncHost"));
email = props.getProperty("email");
password = props.getProperty("password");
}
catch(IOException ex) {
log("[loadConfig] error: " + ex);
}
finally {
try {
if (is != null) is.close();
} catch (IOException ioex) {log("[loadConfig] close properties file error: " + ioex);}
}
}
public static <R,T> int connect(String verb, String uri, Set<Map.Entry<String,String>> headers, RequestHandler reqHandler, ResponseHandler<R, T> handler, T userdata) {
HttpsURLConnection conn = null;
InputStream is = null;
XMLStreamReader reader = null;
GZIPInputStream gis = null;
try {
vlog("[connect] %s %s", verb, uri );
URL url = new URL(uri);
conn = (HttpsURLConnection)url.openConnection();
conn.setRequestMethod(verb);
if (headers != null) {
for( Map.Entry<String,String> header : headers ) {
conn.setRequestProperty(header.getKey(), header.getValue());
}
}
conn.setSSLSocketFactory(sslfactory);
if (reqHandler != null) {
conn.setDoOutput(true);
OutputStream os = null;
try {
os = conn.getOutputStream();
reqHandler.writeBody(os);
os.flush();
}
finally {
os.close();
}
}
conn.connect();
int status = conn.getResponseCode();
int contentLength = conn.getContentLength();
if (args.verbose) {
log("\tHTTP " + status + " " + conn.getResponseMessage());
log("\tcontent-length: " + contentLength);
log("\tcontent-encoding: " + conn.getContentEncoding());
}
if (handler!=null && status == HttpURLConnection.HTTP_OK) {
handler.parseHeaders(conn.getHeaderFields(), userdata);
boolean responseCompressed = false;
String enc = conn.getHeaderField("Content-Encoding");
if (enc != null && enc.toLowerCase().equals("gzip")) {
responseCompressed = true;
}
InputStream in;
is = conn.getInputStream();
in = is;
if (responseCompressed) {
gis = new GZIPInputStream(is);
in = gis;
}
if (contentLength != 0) {
if (handler.consumeXml()) {
reader = xmlInFactory.createXMLStreamReader(in);
handler.consume((R)reader, userdata);
}
else {
handler.consume((R)is, userdata);
}
}
}
return status;
}catch (IOException e) { // openConnection() failed
InputStream es = null;
try {
int respCode = conn.getResponseCode();
es = conn.getErrorStream();
int ret = 0;
byte[] buf = new byte[4096];
// read the response body
log("[connect] Error while connecting to server - HTTP %d", respCode);
log("IOException: "+ e);
while ((ret = es.read(buf)) > 0) {
log(new String(buf, 0, ret));
}
log("");
// close the errorstream
es.close();
} catch(IOException ex) {
log("[connect] unexpected exception: " + ex);
}
finally {
if (es != null) {
try {
es.close();
}
catch(IOException ioex) {
log("[connect] error while closing error stream: %s", ioex);
}
}
}
}
catch(XMLStreamException xsex) {
log("[connect] error reading user response: " + xsex);
}
finally {
try {
if (reader != null) reader.close();
}
catch(XMLStreamException xsex) { log("[connect] error closing xmlreader "+ xsex); }
try {
if (gis != null)
gis.close();
if (is != null)
is.close();
}
catch (IOException ioex) {
log("[connect] error closing input stream: " + ioex );
}
}
return HttpURLConnection.HTTP_INTERNAL_ERROR;
}
public static synchronized void vlog(String msg) {
if (args.verbose) {
System.out.println(msg);
file.println(msg);
}
}
public static synchronized void vlog(String msg, Object... strings ) {
if (args.verbose) {
String s = String.format(msg, strings);
System.out.println(s);
file.println(s);
}
}
public static synchronized void log(String msg) {
System.out.println(msg);
file.println(msg);
}
public static synchronized void log(String msg, Object... strings ) {
String s = String.format(msg, strings);
System.out.println(s);
file.println(s);
}
public static void prn(String msg) {
System.out.println(msg);
}
public static void prn(String msg, Object... strings ) {
System.out.println(String.format(msg, strings));
}
private static final char[] CA = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/".toCharArray();
public final static byte[] encodeToByte(byte[] sArr, boolean lineSep)
{
// Check special case
int sLen = sArr != null ? sArr.length : 0;
if (sLen == 0)
return new byte[0];
int eLen = (sLen / 3) * 3; // Length of even 24-bits.
int cCnt = ((sLen - 1) / 3 + 1) << 2; // Returned character count
int dLen = cCnt + (lineSep ? (cCnt - 1) / 76 << 1 : 0); // Length of returned array
byte[] dArr = new byte[dLen];
// Encode even 24-bits
for (int s = 0, d = 0, cc = 0; s < eLen;) {
// Copy next three bytes into lower 24 bits of int, paying attension to sign.
int i = (sArr[s++] & 0xff) << 16 | (sArr[s++] & 0xff) << 8 | (sArr[s++] & 0xff);
// Encode the int into four chars
dArr[d++] = (byte) CA[(i >>> 18) & 0x3f];
dArr[d++] = (byte) CA[(i >>> 12) & 0x3f];
dArr[d++] = (byte) CA[(i >>> 6) & 0x3f];
dArr[d++] = (byte) CA[i & 0x3f];
// Add optional line separator
if (lineSep && ++cc == 19 && d < dLen - 2) {
dArr[d++] = '\r';
dArr[d++] = '\n';
cc = 0;
}
}
// Pad and encode last bits if source isn't an even 24 bits.
int left = sLen - eLen; // 0 - 2.
if (left > 0) {
// Prepare the int
int i = ((sArr[eLen] & 0xff) << 10) | (left == 2 ? ((sArr[sLen - 1] & 0xff) << 2) : 0);
// Set last four chars
dArr[dLen - 4] = (byte) CA[i >> 12];
dArr[dLen - 3] = (byte) CA[(i >>> 6) & 0x3f];
dArr[dLen - 2] = left == 2 ? (byte) CA[i & 0x3f] : (byte) '=';
dArr[dLen - 1] = '=';
}
return dArr;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment