Created
May 31, 2012 01:35
-
-
Save tim-tang/2840207 to your computer and use it in GitHub Desktop.
[java] - File sync tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sz.zhenhe; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.FileNotFoundException; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.io.PrintWriter; | |
import java.io.UnsupportedEncodingException; | |
import java.net.HttpURLConnection; | |
import java.net.URL; | |
import java.security.InvalidKeyException; | |
import java.security.KeyManagementException; | |
import java.security.MessageDigest; | |
import java.security.NoSuchAlgorithmException; | |
import java.security.SecureRandom; | |
import java.util.Date; | |
import java.text.DateFormat; | |
import java.text.ParseException; | |
import java.text.SimpleDateFormat; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import java.util.Set; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.CompletionService; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorCompletionService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.zip.GZIPInputStream; | |
import java.util.zip.GZIPOutputStream; | |
import javax.net.ssl.HttpsURLConnection; | |
import javax.net.ssl.SSLContext; | |
import javax.net.ssl.SSLSocketFactory; | |
import javax.net.ssl.TrustManager; | |
import javax.net.ssl.X509TrustManager; | |
import javax.xml.stream.XMLInputFactory; | |
import javax.xml.stream.XMLOutputFactory; | |
import javax.xml.stream.XMLStreamException; | |
import javax.xml.stream.XMLStreamReader; | |
import javax.xml.stream.XMLStreamWriter; | |
public class NSync { | |
static String authHost; | |
static String syncHost; | |
static String email; | |
static String password; | |
static SSLSocketFactory sslfactory; | |
static XMLInputFactory xmlInFactory = XMLInputFactory.newInstance(); | |
static XMLOutputFactory xmlOutFactory = XMLOutputFactory.newInstance(); | |
static PrintWriter file; | |
static CLIArguments args; | |
static File syncRoot; | |
static int requestId; | |
static final String HD_AUTHTOKEN = "x-origin-authtoken"; | |
static final String HD_LOCK = "x-origin-sync-lock"; | |
static final String HD_LOCK_TTL = "x-origin-sync-lock-ttl"; | |
static final String ENC_UTF_8 = "UTF-8"; | |
static final String HD_CONTENT_TYPE = "Content-Type"; | |
static final String HTTP_PUT = "PUT"; | |
static final String HTTP_GET = "GET"; | |
static final String HTTP_DELETE = "DELETE"; | |
static final String HTTP_HEAD = "HEAD"; | |
static final String STATUS_COMMIT = "commit"; | |
static final String STATUS_ROLLBACK = "rollback"; | |
static final String MANIFEST = "manifest.xml"; | |
static final String MANIFEST_XMLNS = "http://origin.com/cloudsaves/manifest"; | |
private static final String ELEM_LOCALNAME = "localName".intern(); | |
private static final String ATTR_MD5 = "md5".intern(); | |
private static final String ATTR_SIZE = "size".intern(); | |
private static final String ATTR_HREF = "href".intern(); | |
private static final String ELEM_FILE = "file".intern(); | |
static interface ResponseHandler<R, T> { | |
boolean consumeXml(); | |
void parseHeaders(Map<String,List<String>> headers, T userdata); | |
void consume(R reader, T userdata); | |
} | |
static interface RequestHandler { | |
void writeBody(OutputStream os) throws IOException; | |
} | |
static class User { | |
public String Id; | |
public String AuthToken; | |
} | |
static class SyncSession { | |
public String lockId; | |
public String manifest; | |
public int lockttl; | |
public String host; | |
public String root; | |
public String baseURL; | |
} | |
static class CLIArguments { | |
public String path = null; | |
public String contentid = null; | |
public boolean recurse = false; | |
public boolean verbose = false; | |
} | |
static class FileInfo { | |
public String path; | |
public long size; | |
public long lastModified; | |
public String md5; | |
//fucking hack, but i really don't want to create a new class | |
//used when parsing the S3 manifest | |
public String s3Href; | |
} | |
static class S3FileRequest { | |
public S3FileRequest(int id) { | |
this.id = id; | |
} | |
public final int id; | |
public FileInfo srcFile; | |
public String verb; | |
public String href; // the relative path from the base url | |
public String url; // the full path of the S3 Request | |
public Map<String, String> headers; | |
public byte[] data; //only if the verb is GET | |
public boolean completed = false; | |
public boolean compressed = false; | |
} | |
public static void main(String[] argv) | |
throws NoSuchAlgorithmException, KeyManagementException,InvalidKeyException, | |
UnsupportedEncodingException, InterruptedException, IOException, XMLStreamException, Exception { | |
User user = null; | |
SyncSession session = null; | |
ScheduledExecutorService extendLockScheduler = null; | |
ThreadPoolExecutor workerPool = null; | |
try { | |
file = new PrintWriter("nsync.log",ENC_UTF_8); | |
log("NSync v1.0 - Your friendly gateway to Cloudsync"); | |
log(""); | |
if (argv.length < 2) { | |
printUsage(); | |
return; | |
} | |
args = parseCLIArguments(argv); | |
if (args.path == null) { | |
printUsage(); | |
return; | |
} | |
if (!initSyncRoot()) { | |
return; | |
} | |
initSSLFactory(); | |
loadConfig("nsync.config"); | |
printCurrentConfig(); | |
user = new User(); | |
if (!login(email, password, user)) { | |
return; | |
} | |
session = new SyncSession(); | |
if (!beginSync(user,args.contentid, session)) { | |
return; | |
} | |
extendLockScheduler = Executors.newSingleThreadScheduledExecutor(); | |
ExtendLockWorker elw = new ExtendLockWorker(user.Id, user.AuthToken, session.lockId); | |
int interval = session.lockttl - 10; | |
log("ExtendLock Worker will run in %ds intervals", interval); | |
log(""); | |
extendLockScheduler.scheduleWithFixedDelay(elw, interval, interval, TimeUnit.SECONDS); | |
long start = System.currentTimeMillis(); | |
log("Begin sync files @ %s", DateFormat.getDateTimeInstance(DateFormat.FULL, DateFormat.FULL).format(new Date(start))); | |
log(""); | |
int numThreads = Runtime.getRuntime().availableProcessors() * 2; | |
workerPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(numThreads * 2); | |
ExecutorCompletionService<FileInfo> calcMD5Service = new ExecutorCompletionService<FileInfo>(workerPool); | |
ExecutorCompletionService<S3FileRequest> s3FileRequestService = new ExecutorCompletionService<S3FileRequest>(workerPool); | |
Map<String, FileInfo> localFileMap = new HashMap<String, FileInfo>(); | |
getFiles(syncRoot, calcMD5Service, localFileMap); | |
//now lets get the manifest on S3 and build a file map | |
// Map key = fullpath | |
Map<String,FileInfo> manifestFileMap = new HashMap<String,FileInfo>(); | |
boolean hasManifest = getFilesInManifest(session, manifestFileMap); | |
completeMD5Calc(localFileMap.size(), calcMD5Service); | |
if(hasManifest) { | |
getLastModifiedTimeFromS3(user, session, manifestFileMap.values(),s3FileRequestService); | |
} | |
List<String> filesToDelete = new ArrayList<String>(); | |
List<S3FileRequest> filesToUpload = new ArrayList<S3FileRequest>(); | |
createS3Requests(manifestFileMap, localFileMap, filesToDelete, filesToUpload); | |
List<S3FileRequest> outstandingRequests = filesToUpload; | |
int retries = 3; | |
while(outstandingRequests.size() > 0 && retries-- > 0) { | |
outstandingRequests = syncFiles(user, session, outstandingRequests, s3FileRequestService, numThreads); | |
} | |
if (outstandingRequests.size() > 0) { | |
log("Too many failed attempts uploading files to S3, giving up..."); | |
return; | |
} | |
S3FileRequest putManifestRequest = createManifest(session, syncRoot, localFileMap.values()); | |
outstandingRequests = new ArrayList<S3FileRequest>(); | |
outstandingRequests.add(putManifestRequest); | |
retries = 3; | |
while(outstandingRequests.size() > 0 && retries-- > 0) { | |
outstandingRequests = syncFiles(user, session, outstandingRequests, s3FileRequestService, numThreads); | |
} | |
if (outstandingRequests.size() > 0) { | |
log("Too many failed attempts uploading manifest to S3, giving up..."); | |
return; | |
} | |
deleteLock(user.Id, user.AuthToken, session.lockId, STATUS_COMMIT, filesToDelete); | |
long end = System.currentTimeMillis(); | |
log("End sync files @ %s", DateFormat.getDateTimeInstance(DateFormat.FULL, DateFormat.FULL).format(new Date(end))); | |
log(""); | |
log("Total Files uploaded: %d", filesToUpload.size()); | |
log("Orphan files found: %d", filesToDelete.size()); | |
log("Total sync time: %dms", (end - start)); | |
user = null; | |
session = null; | |
} | |
finally { | |
try { | |
if (workerPool != null) { | |
workerPool.shutdownNow(); | |
workerPool.awaitTermination(30, TimeUnit.SECONDS); | |
} | |
} catch(InterruptedException iex) { | |
Thread.currentThread().interrupt(); | |
} | |
try { | |
if (extendLockScheduler != null) { | |
extendLockScheduler.shutdownNow(); | |
extendLockScheduler.awaitTermination(30, TimeUnit.SECONDS); | |
} | |
} | |
catch (InterruptedException iex) { | |
Thread.currentThread().interrupt(); | |
} | |
if (user != null && session != null) // means that some freaking exception occurred, lets cleanup here... | |
deleteLock(user.Id, user.AuthToken, session.lockId, STATUS_ROLLBACK, null); | |
if (file!=null) { | |
file.close(); | |
} | |
} | |
} | |
static void printUsage() { | |
prn("Usage: java NSync [-options]"); | |
prn("options:"); | |
prn(" -d : the directory to sync"); | |
prn(" -c : contentid of the data to sync (defaults to directory name in -d option)"); | |
prn(" -r : recurses into child directories (default false)"); | |
prn(" -v : verbose logging with http connection details (default false)"); | |
prn(""); | |
prn("Examples:"); | |
prn(""); | |
prn("> java NSync -d c:\\mydata\\somedocs"); | |
prn("Syncs files at c:\\mydata\\somedocs, no recurse into subdirs, no verbose logging, contentid defaults to somedocs"); | |
prn(""); | |
prn("> java NSync -d c:\\mygame\\savegame -c bf3_na -r"); | |
prn("Syncs files at c:\\mygame\\savegame, recurses into subdirs, no verbose logging, contentid bf3_na"); | |
} | |
static CLIArguments parseCLIArguments(String[] argv) { | |
CLIArguments args = new CLIArguments(); | |
int count = argv.length; | |
for (int i=0; i < count;++i) { | |
String option = argv[i].toLowerCase().intern(); | |
if (option == "-d".intern()) { | |
args.path = argv[++i]; | |
} | |
else if (option == "-r".intern()) { | |
args.recurse = true; | |
} | |
else if (option == "-v".intern()) { | |
args.verbose = true; | |
} | |
else if (option == "-c".intern()) { | |
args.contentid = argv[++i]; | |
} | |
} | |
return args; | |
} | |
static boolean initSyncRoot() { | |
syncRoot = new File(args.path); | |
try { | |
if (syncRoot.exists() && syncRoot.isDirectory()) { | |
if (args.contentid == null) { | |
args.contentid = syncRoot.getName(); | |
} | |
return true; | |
} | |
} | |
catch (SecurityException sex) { | |
log("error while trying to read path: %s", sex); | |
return false; | |
} | |
log("Invalid directory specified: %s", args.path); | |
return false; | |
} | |
static void printCurrentConfig() { | |
log("NSync current settings"); | |
log(" Auth Service: %s",authHost); | |
log(" Sync Service: %s", syncHost); | |
log(" Email: %s",email); | |
log(" Password: %s",password); | |
log(" Sync Root Dir: %s", syncRoot.getAbsolutePath()); | |
log(" Content-ID: %s", args.contentid); | |
log(" Recurse to childs: %s", args.recurse); | |
log(" Verbose logs: %s", args.verbose); | |
log(""); | |
} | |
static private class ExtendLockWorker implements Runnable { | |
String userId; | |
String authToken; | |
String lockId; | |
public ExtendLockWorker(String userId, String authToken, String lockId) { | |
this.userId = userId; | |
this.authToken = authToken; | |
this.lockId = lockId; | |
} | |
public void run() { | |
log("Extending Lock for userId: %s, lockId: %s", userId, lockId); | |
String url = String.format("%s/cloudsync/extendlock/%s", | |
syncHost,userId); | |
HashMap<String,String> headers = new HashMap<String, String>(); | |
headers.put(HD_AUTHTOKEN, authToken); | |
headers.put(HD_LOCK, lockId); | |
int result = connect(HTTP_PUT,url,headers.entrySet(), null, null, null); | |
if (result != HttpURLConnection.HTTP_OK) { | |
log("Extend Lock failed! HTTP %d", result); | |
} | |
} | |
} | |
static void initSSLFactory() throws NoSuchAlgorithmException, KeyManagementException { | |
SSLContext sc = SSLContext.getInstance("SSL"); | |
sc.init(null, | |
new TrustManager[]{ | |
new X509TrustManager() { | |
public java.security.cert.X509Certificate[] getAcceptedIssuers() { | |
return null; | |
} | |
public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) { | |
} | |
public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) { | |
} | |
} | |
} , new SecureRandom()); | |
sslfactory = sc.getSocketFactory(); | |
} | |
static boolean login(String email, String password, User user) { | |
log("Login user with email: %s, password: %s", email, password); | |
String url = String.format("%s/loginregistration/login?ea_email=%s&ea_password=%s&remember_me=false", | |
authHost,email,password); | |
int statuscode = connect(HTTP_GET,url, null, null, new ResponseHandler<XMLStreamReader, User>() { | |
public boolean consumeXml() { return true; } | |
public void parseHeaders(Map<String, List<String>> headers, User userdata ) {} | |
public void consume(XMLStreamReader reader, User userdata) { | |
try { | |
while(reader.hasNext()) { | |
int event = reader.next(); | |
if (event == XMLStreamReader.START_ELEMENT) { | |
String element = reader.getLocalName().toLowerCase().intern(); | |
if (element == "userid".intern()) { | |
userdata.Id = reader.getElementText(); | |
} | |
else if (element == "genericauthtoken".intern()) { | |
userdata.AuthToken = reader.getElementText(); | |
} | |
} | |
} | |
} | |
catch(XMLStreamException xsex) { | |
log("[login.consume] error reading user response: " + xsex); | |
} | |
} | |
}, user); | |
boolean success = statuscode == HttpURLConnection.HTTP_OK; | |
if (success) { | |
log(" User Id: %s", user.Id); | |
log(" Authtoken: %s", user.AuthToken); | |
} | |
else { | |
log("Login user failed! HTTP %d", statuscode); | |
} | |
log(""); | |
return success; | |
} | |
static boolean beginSync(User user, String contentId, SyncSession session) { | |
log("Begin Sync, userId: %s, contentId: %s", user.Id, contentId); | |
String url = String.format("%s/cloudsync/writelock/%s/%s", | |
syncHost,user.Id,contentId); | |
Map<String,String> headers = new HashMap<String, String>(); | |
headers.put(HD_AUTHTOKEN, user.AuthToken); | |
int statuscode = connect(HTTP_PUT,url, headers.entrySet(), null, new ResponseHandler<XMLStreamReader, SyncSession>() { | |
public boolean consumeXml() { return true; } | |
public void parseHeaders(Map<String, List<String>> headers, SyncSession userdata ) { | |
List<String> values = headers.get(HD_LOCK); | |
userdata.lockId = values.get(0); | |
values = headers.get(HD_LOCK_TTL); | |
userdata.lockttl = Integer.parseInt(values.get(0)); | |
} | |
public void consume(XMLStreamReader reader, SyncSession userdata) { | |
try { | |
while(reader.hasNext()) { | |
int event = reader.next(); | |
if (event == XMLStreamReader.START_ELEMENT) { | |
String element = reader.getLocalName().toLowerCase().intern(); | |
if (element == "host".intern()) { | |
userdata.host = reader.getElementText(); | |
} | |
else if (element == "root".intern()) { | |
userdata.root = reader.getElementText(); | |
} | |
else if (element == "manifest".intern()) { | |
userdata.manifest = reader.getElementText(); | |
} | |
} | |
} | |
} | |
catch(XMLStreamException xsex) { | |
log("[beginSync.consume] error reading user response: " + xsex); | |
} | |
} | |
}, session); | |
boolean success = statuscode == HttpURLConnection.HTTP_OK; | |
if (success) { | |
String host = session.host; | |
host = (host.endsWith("/")) ? host.substring(0, host.length()-1) : host; | |
String root = session.root; | |
root = (root.startsWith("/")) ? root.substring(1) : root; | |
root = (root.endsWith("/")) ? root : root + "/"; | |
session.baseURL = host + "/" + root; | |
log(" S3 host: %s", session.host); | |
log(" Root: %s", session.root); | |
log(" Base URL: %s", session.baseURL); | |
log(" Lock Id: %s", session.lockId); | |
log(" Lock TTL: %d",session.lockttl); | |
log(" Manifest: %s",session.manifest); | |
} | |
else { | |
log("Begin Sync failed! HTTP %d", statuscode); | |
} | |
log(""); | |
return success; | |
} | |
static boolean getFilesInManifest(SyncSession session, Map<String, FileInfo> manifestFileMap) { | |
log("Retrieving Manifest from %s", session.manifest); | |
int status = connect(HTTP_GET, session.manifest, null, null, | |
new ResponseHandler<XMLStreamReader, Map<String,FileInfo>>() { | |
public boolean consumeXml() { return true; } | |
public void parseHeaders(Map<String, List<String>> headers, Map<String, FileInfo> userdata) {} | |
public void consume(XMLStreamReader reader, Map<String, FileInfo> userdata) { | |
try { | |
FileInfo fi = null; | |
while(reader.hasNext()) { | |
int event = reader.next(); | |
if (event == XMLStreamReader.START_ELEMENT) { | |
String element = reader.getLocalName().intern(); | |
if (element == ELEM_FILE) { | |
fi = new FileInfo(); | |
fi.s3Href = reader.getAttributeValue(null, ATTR_HREF); | |
fi.size = Long.parseLong(reader.getAttributeValue(null, ATTR_SIZE)); | |
fi.md5 = reader.getAttributeValue(null, ATTR_MD5); | |
} | |
else if (element == ELEM_LOCALNAME) { | |
fi.path = reader.getElementText(); | |
userdata.put(fi.path, fi); | |
} | |
} | |
} | |
} | |
catch(XMLStreamException xsex) { | |
log("[getFilesInManifest.consume] error reading user response: " + xsex); | |
} | |
} | |
}, manifestFileMap); | |
boolean success = status == HttpURLConnection.HTTP_OK; | |
if (!success) { | |
if (status == HttpURLConnection.HTTP_NOT_FOUND) { | |
log("No Manifest found... first time performing NSync..."); | |
} | |
else { | |
log("Failed to retrieve manifest! HTTP %d", status); | |
} | |
} | |
log(""); | |
return success; | |
} | |
static S3FileRequest createManifest(SyncSession session, File syncRoot, Collection<FileInfo> files) throws Exception { | |
XMLStreamWriter writer = null; | |
String manifestPath = syncRoot.getAbsolutePath() + File.separatorChar + MANIFEST; | |
GZIPOutputStream os = null; | |
try { | |
log("Creating manifest at %s", manifestPath); | |
os = new GZIPOutputStream( new FileOutputStream(manifestPath) ); | |
writer = xmlOutFactory.createXMLStreamWriter(os,ENC_UTF_8); | |
writer.writeStartElement("manifest"); | |
writer.writeAttribute("xmlns", MANIFEST_XMLNS); | |
for(FileInfo fi: files) { | |
writer.writeStartElement(ELEM_FILE); | |
writer.writeAttribute(ATTR_HREF, generateHref(fi.size, fi.md5)); | |
writer.writeAttribute(ATTR_SIZE, fi.size+""); | |
writer.writeAttribute(ATTR_MD5, fi.md5); | |
writer.writeStartElement(ELEM_LOCALNAME); | |
writer.writeCharacters(fi.path); | |
writer.writeEndElement(); | |
writer.writeEndElement(); | |
} | |
writer.writeEndElement(); | |
writer.flush(); | |
} | |
finally { | |
try { | |
writer.close(); | |
} finally { | |
os.flush(); | |
os.close(); | |
} | |
} | |
S3FileRequest manifest = new S3FileRequest(nextId()); | |
manifest.verb = HTTP_PUT; | |
manifest.href = MANIFEST; | |
FileInfo fi = new FileInfo(); | |
fi.path = manifestPath; | |
File f = new File(fi.path); | |
fi.lastModified = f.lastModified(); | |
fi.size = f.length(); | |
fi.md5 = calcMD5(fi.path); | |
manifest.srcFile = fi; | |
manifest.compressed = true; | |
return manifest; | |
} | |
static class ProcessMD5 implements Callable<FileInfo> { | |
FileInfo fi; | |
public ProcessMD5(FileInfo fi) { | |
this.fi = fi; | |
} | |
@Override | |
public FileInfo call() throws Exception { | |
fi.md5 = calcMD5(fi.path); | |
return fi; | |
} | |
} | |
static String calcMD5(String filePath) throws NoSuchAlgorithmException, FileNotFoundException, IOException { | |
vlog("<%s> Calculating MD5 for %s", Thread.currentThread().getName(), filePath); | |
MessageDigest digest = MessageDigest.getInstance("MD5"); | |
byte[] buff = new byte[8192]; | |
InputStream fs = null; | |
try { | |
fs = new FileInputStream(filePath); | |
int read = -1; | |
while( (read = fs.read(buff)) != -1 ) | |
digest.update(buff,0,read); | |
byte[] hash = encodeToByte(digest.digest(), false); | |
return new String(hash); | |
} | |
finally { | |
fs.close(); | |
} | |
} | |
static void getLastModifiedTimeFromS3(User user, SyncSession session, | |
Collection<FileInfo> files, CompletionService<S3FileRequest> s3Service) throws XMLStreamException { | |
int totalFiles = files.size(); | |
log("Preparing to get HEAD for %d files on S3...", totalFiles); | |
List<S3FileRequest> requests = new ArrayList<S3FileRequest>(totalFiles); | |
for (FileInfo fi : files) { | |
S3FileRequest r = new S3FileRequest(nextId()); | |
r.srcFile = fi; | |
r.verb = HTTP_HEAD; | |
r.href = fi.s3Href; | |
requests.add(r); | |
} | |
int retries = 3; | |
while (requests.size() > 0 && retries-- > 0) { | |
requests = headFiles(user, session, requests, s3Service); | |
} | |
if (requests.size() > 0) { | |
log("Too many failed attempts to get HEAD, try next time..."); | |
throw new RuntimeException("Too many failed attempts to connect to S3 for HEAD requests."); | |
} | |
} | |
static List<S3FileRequest> headFiles(User user, SyncSession session, List<S3FileRequest> requests, | |
CompletionService<S3FileRequest> s3Service) throws XMLStreamException { | |
int size = requests.size(); | |
processFilesBatch(user, session, requests, s3Service, 0, size); | |
ArrayList<S3FileRequest> failedRequests = new ArrayList<S3FileRequest>(); | |
for(int i=0; i<size; ++i) { | |
try { | |
Future<S3FileRequest> f = s3Service.take(); | |
S3FileRequest request = f.get(); | |
if (!request.completed) | |
failedRequests.add(request); | |
} | |
catch (InterruptedException iex) { | |
log("[syncFiles] interrupted: %s", iex); | |
} | |
catch (ExecutionException eex) { | |
log("[syncFiles] execution exception: ", eex); | |
} | |
} | |
return failedRequests; | |
} | |
static String generateHref(long size, String md5) { | |
return "content/" + size + "-" + md5; | |
} | |
static int nextId() { | |
return ++requestId; | |
} | |
static class S3Task implements Callable<S3FileRequest> { | |
private static final String HD_LAST_MODIFIED = "Last-Modified"; | |
S3FileRequest request; | |
public S3Task( S3FileRequest request) { | |
this.request = request; | |
} | |
public S3FileRequest call() throws Exception { | |
vlog("[S3Task] <%s> Request[%d] [%s] %s",Thread.currentThread().getName(), request.id, request.verb, request.url); | |
String verb = request.verb; | |
if (request.compressed) | |
request.headers.put("Content-Encoding", "gzip"); | |
if (verb == HTTP_PUT) { | |
int status = connect(verb, request.url, request.headers.entrySet(), new RequestHandler() { | |
public void writeBody(OutputStream os) throws IOException { | |
byte[] buff = new byte[8192]; | |
InputStream fs = null; | |
try { | |
fs = new FileInputStream(request.srcFile.path); | |
int read = -1; | |
while( (read = fs.read(buff)) != -1 ) { | |
os.write(buff, 0, read); | |
} | |
} | |
finally { | |
fs.close(); | |
} | |
request.completed = true; | |
} | |
}, null, null); | |
if (status == HttpURLConnection.HTTP_OK) { | |
log("Completed upload for Request[%d].", request.id); | |
} | |
else { | |
log("Failed to upload Request[%d], Reason: HTTP %d", request.id, status); | |
} | |
} | |
else if (verb == HTTP_HEAD ) { | |
int status = connect(verb, request.url, request.headers.entrySet(), null, new ResponseHandler<InputStream, S3FileRequest>() { | |
public boolean consumeXml() { | |
return false; | |
} | |
@Override | |
public void parseHeaders(Map<String, List<String>> headers, S3FileRequest userdata) { | |
String lastModified = headers.get(HD_LAST_MODIFIED).get(0); | |
try { | |
DateFormat f = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz"); | |
Date date = f.parse(lastModified); | |
userdata.srcFile.lastModified = date.getTime(); | |
userdata.completed = true; | |
}catch (ParseException pex) { | |
log("error parsing lastModified time: %s", pex); | |
} | |
} | |
@Override | |
public void consume(InputStream reader, S3FileRequest userdata) {} | |
}, request); | |
if (status == HttpURLConnection.HTTP_OK) { | |
log("Completed HEAD for Request[%d].", request.id); | |
} | |
else { | |
log("Failed to retrieve Request[%d], Reason: HTTP %d", request.id, status); | |
} | |
} | |
return request; | |
} | |
} | |
static void getFiles(File dir, CompletionService<FileInfo> md5Service, Map<String, FileInfo> fileMap) { | |
File[] childs = dir.listFiles(); | |
for (File child : childs) { | |
if (child.isDirectory()) { | |
getFiles(child, md5Service, fileMap); | |
} | |
else if (child.isFile() && child.getName().toLowerCase().intern() != MANIFEST) { | |
FileInfo fi = new FileInfo(); | |
fi.path = child.getAbsolutePath(); | |
fi.size = child.length(); | |
fi.lastModified = child.lastModified(); | |
fileMap.put(fi.path, fi); | |
md5Service.submit(new ProcessMD5(fi)); | |
} | |
} | |
} | |
static void completeMD5Calc(int files, CompletionService<FileInfo> calcMD5Service) { | |
for (int i=files; i>0; --i) { | |
Future<FileInfo> f; | |
try { | |
f = calcMD5Service.take(); | |
FileInfo fi = f.get(); | |
vlog("%s MD5: %s", fi.path, fi.md5); | |
} | |
catch (InterruptedException iex) { | |
log("[completeMD5Calc] interrupted: %s", iex); | |
} | |
catch (ExecutionException eex) { | |
log("[completeMD5Calc] execution exception: ", eex); | |
} | |
} | |
log(""); | |
} | |
static void createS3Requests(Map<String,FileInfo> manifestFileMap, Map<String, FileInfo> localFileMap, | |
List<String> filesToDelete, List<S3FileRequest> filesToUpload) { | |
for (String file : manifestFileMap.keySet()) { | |
if (!localFileMap.containsKey(file)) { // means these files are no longer needed on the cloud. | |
filesToDelete.add(file); | |
} | |
} | |
for (Map.Entry<String, FileInfo> kvp : localFileMap.entrySet()) { | |
String filename = kvp.getKey(); | |
FileInfo localFile = kvp.getValue(); | |
FileInfo s3File = manifestFileMap.get(filename); | |
if (s3File != null) { | |
if (localFile.md5 != s3File.md5 && localFile.lastModified > s3File.lastModified) { | |
filesToDelete.add(filename); | |
filesToUpload.add(createS3PUTRequest(localFile)); | |
} | |
} | |
else { | |
filesToUpload.add(createS3PUTRequest(localFile)); | |
} | |
} | |
} | |
static S3FileRequest createS3PUTRequest(FileInfo fi) { | |
S3FileRequest r = new S3FileRequest(nextId()); | |
r.srcFile = fi; | |
r.verb = HTTP_PUT; | |
r.href = generateHref(fi.size, fi.md5); | |
return r; | |
} | |
static List<S3FileRequest> syncFiles(User user, SyncSession session, List<S3FileRequest> requests, | |
CompletionService<S3FileRequest> s3Service, int batchSize) throws XMLStreamException { | |
int idx = 0; | |
int totalFiles = requests.size(); | |
log("Prepare to sync %d files...", totalFiles); | |
int toBatch = 0; | |
while (idx < totalFiles) { | |
toBatch = Math.min((totalFiles - idx), batchSize); | |
processFilesBatch(user, session, requests, s3Service, idx, toBatch); | |
idx += toBatch; | |
} | |
List<S3FileRequest> failedRequests = new ArrayList<S3FileRequest>(); | |
for(int i=0; i<totalFiles; ++i) { | |
try { | |
Future<S3FileRequest> f = s3Service.take(); | |
S3FileRequest request = f.get(); | |
if (!request.completed) | |
failedRequests.add(request); | |
} | |
catch (InterruptedException iex) { | |
log("[syncFiles] interrupted: %s", iex); | |
} | |
catch (ExecutionException eex) { | |
log("[syncFiles] execution exception: ", eex); | |
} | |
} | |
return failedRequests; | |
} | |
static class AuthorizeBuilder implements RequestHandler { | |
final List<S3FileRequest> requests; | |
int idx; | |
int end; | |
public AuthorizeBuilder(List<S3FileRequest> requests, int startIdx, int end) { | |
this.requests = requests; | |
this.idx = startIdx; | |
this.end = end; | |
} | |
@Override | |
public void writeBody(OutputStream os) throws IOException { | |
XMLStreamWriter writer = null; | |
try { | |
writer = xmlOutFactory.createXMLStreamWriter(os,ENC_UTF_8); | |
writer.writeStartElement("requests"); | |
do { | |
S3FileRequest request = requests.get(idx); | |
writeFileRequest(writer, idx, request); | |
} while ( ++idx < end ); | |
writer.writeEndElement(); | |
writer.flush(); | |
} | |
catch(XMLStreamException xsex) { | |
throw new IOException("Failing fast because this is obviously a bug.. and java doesn't like me to allow the XMLStreamException to pass, fucker", xsex); | |
} | |
finally { | |
try { | |
writer.close(); | |
} | |
catch(XMLStreamException xsex) { | |
log("[AuthorizeBuilder.writeBody] error while closing xmlstreamwriter: %s", xsex); | |
//dont do anything since its not recoverable. | |
} | |
} | |
} | |
} | |
static void writeFileRequest(XMLStreamWriter writer, int id, S3FileRequest request) throws XMLStreamException { | |
writer.writeStartElement("request"); | |
writer.writeAttribute("id", ""+id); | |
writer.writeStartElement("verb"); | |
writer.writeCharacters(request.verb); | |
writer.writeEndElement(); | |
writer.writeStartElement("resource"); | |
writer.writeCharacters(request.href); | |
writer.writeEndElement(); | |
if (request.verb == HTTP_PUT) { | |
writer.writeStartElement(ATTR_MD5); | |
writer.writeCharacters(request.srcFile.md5); | |
writer.writeEndElement(); | |
writer.writeStartElement("content-type"); | |
writer.writeCharacters("application/octet-stream"); | |
writer.writeEndElement(); | |
} | |
writer.writeEndElement(); | |
} | |
static void processFilesBatch(User user, SyncSession session, final List<S3FileRequest> requests, | |
CompletionService<S3FileRequest> s3FileRequestService, int startIdx, int len) throws XMLStreamException { | |
final int idx = startIdx; | |
final int end = startIdx + len; | |
if(!authorizeRequests(user.Id, user.AuthToken, session.lockId, new AuthorizeBuilder(requests, idx, end),requests, startIdx, len)) { | |
throw new RuntimeException("Failed to authorize request with cloudsync server, failing fast..."); | |
} | |
log("Accessing S3 for the following files: "); | |
for (int i=startIdx, size=startIdx+len; i<size; ++i) { | |
S3FileRequest request = requests.get(i); | |
log(" Request[%d], [%s] local: %s, remote: %s", request.id, request.verb, request.srcFile.path, request.url); | |
s3FileRequestService.submit(new S3Task(request)); | |
} | |
} | |
static boolean authorizeRequests(String userId, String authToken, String lockId, RequestHandler reqHandler, List<S3FileRequest> requests, int startIdx, int len) { | |
log("Authorizing S3 requests for %d files...", len); | |
String url = String.format("%s/cloudsync/authorize/%s", | |
syncHost,userId); | |
HashMap<String,String> headers = new HashMap<String, String>(); | |
headers.put(HD_AUTHTOKEN, authToken); | |
headers.put(HD_LOCK, lockId); | |
headers.put(HD_CONTENT_TYPE, "text/xml"); | |
int statuscode = connect(HTTP_PUT, url, headers.entrySet(), reqHandler, | |
new ResponseHandler<XMLStreamReader, List<S3FileRequest>>() { | |
public boolean consumeXml() { return true; } | |
public void parseHeaders(Map<String, List<String>> headers, List<S3FileRequest> userdata ) {} | |
public void consume(XMLStreamReader reader, List<S3FileRequest> userdata) { | |
try { | |
S3FileRequest currRequest = null; | |
while(reader.hasNext()) { | |
int event = reader.next(); | |
if (event == XMLStreamReader.START_ELEMENT) { | |
String element = reader.getLocalName().toLowerCase().intern(); | |
if (element == "request".intern()) { | |
int idx = Integer.parseInt(reader.getAttributeValue(null, "id")); | |
currRequest = userdata.get(idx); | |
} | |
else if (element == "url".intern()) { | |
currRequest.url = reader.getElementText(); | |
} | |
else if (element == "header".intern()) { | |
if (currRequest.headers == null) { | |
currRequest.headers = new HashMap<String,String>(); | |
} | |
String key = reader.getAttributeValue(null,"key"); | |
String val = reader.getAttributeValue(null, "value"); | |
currRequest.headers.put(key, val); | |
} | |
} | |
} | |
} | |
catch(XMLStreamException xsex) { | |
log("[authorizeRequests.consume] error reading user response: " + xsex); | |
} | |
} | |
}, requests); | |
boolean success = statuscode == HttpURLConnection.HTTP_OK; | |
if (!success) { | |
log("Failed to authorize requests! HTTP %d", statuscode); | |
} | |
return success; | |
} | |
static boolean deleteLock(String userId, String authToken, String lockId, String status, final List<String> filesToDelete) { | |
log("Deleting Lock for userId: %s, lockId: %s", userId, lockId); | |
String url = String.format("%s/cloudsync/lock/%s?status=%s", | |
syncHost, userId, status); | |
HashMap<String,String> headers = new HashMap<String, String>(); | |
headers.put(HD_AUTHTOKEN, authToken); | |
headers.put(HD_LOCK, lockId); | |
RequestHandler handler = null; | |
if (filesToDelete != null && filesToDelete.size() > 0) { | |
handler = new RequestHandler() { | |
public void writeBody(OutputStream os) throws IOException { | |
XMLStreamWriter writer = null; | |
try { | |
writer = xmlOutFactory.createXMLStreamWriter(os,ENC_UTF_8); | |
writer.writeStartElement("orphans"); | |
for(String filename: filesToDelete) { | |
writer.writeStartElement("orphan"); | |
writer.writeCharacters(filename); | |
writer.writeEndElement(); | |
} | |
writer.writeEndElement(); | |
writer.flush(); | |
} | |
catch (XMLStreamException xsex) { | |
log("[deleteLock] error while writing orphans to outputstream: %s",xsex); | |
} | |
finally { | |
if (writer != null) { | |
try { | |
writer.close(); | |
} | |
catch (XMLStreamException xsex) { | |
log("[deleteLock] error while closing xmlwriter"); | |
} | |
} | |
} | |
} | |
}; | |
} | |
int statuscode = connect("DELETE",url,headers.entrySet(), handler, null, null); | |
boolean success = statuscode == HttpURLConnection.HTTP_OK; | |
if (!success) log("Delete Lock failed! HTTP %d", statuscode); | |
return success; | |
} | |
static String cleanupPath(String path) { | |
if (path.endsWith("/")) { | |
return path.substring(0, path.length()-1); | |
} | |
return path; | |
} | |
static void loadConfig(String name) { | |
Properties props = new Properties(); | |
InputStream is = null; | |
try { | |
is = NSync.class.getResourceAsStream(name); | |
props.load(is); | |
authHost = cleanupPath(props.getProperty("authHost")); | |
syncHost = cleanupPath(props.getProperty("syncHost")); | |
email = props.getProperty("email"); | |
password = props.getProperty("password"); | |
} | |
catch(IOException ex) { | |
log("[loadConfig] error: " + ex); | |
} | |
finally { | |
try { | |
if (is != null) is.close(); | |
} catch (IOException ioex) {log("[loadConfig] close properties file error: " + ioex);} | |
} | |
} | |
public static <R,T> int connect(String verb, String uri, Set<Map.Entry<String,String>> headers, RequestHandler reqHandler, ResponseHandler<R, T> handler, T userdata) { | |
HttpsURLConnection conn = null; | |
InputStream is = null; | |
XMLStreamReader reader = null; | |
GZIPInputStream gis = null; | |
try { | |
vlog("[connect] %s %s", verb, uri ); | |
URL url = new URL(uri); | |
conn = (HttpsURLConnection)url.openConnection(); | |
conn.setRequestMethod(verb); | |
if (headers != null) { | |
for( Map.Entry<String,String> header : headers ) { | |
conn.setRequestProperty(header.getKey(), header.getValue()); | |
} | |
} | |
conn.setSSLSocketFactory(sslfactory); | |
if (reqHandler != null) { | |
conn.setDoOutput(true); | |
OutputStream os = null; | |
try { | |
os = conn.getOutputStream(); | |
reqHandler.writeBody(os); | |
os.flush(); | |
} | |
finally { | |
os.close(); | |
} | |
} | |
conn.connect(); | |
int status = conn.getResponseCode(); | |
int contentLength = conn.getContentLength(); | |
if (args.verbose) { | |
log("\tHTTP " + status + " " + conn.getResponseMessage()); | |
log("\tcontent-length: " + contentLength); | |
log("\tcontent-encoding: " + conn.getContentEncoding()); | |
} | |
if (handler!=null && status == HttpURLConnection.HTTP_OK) { | |
handler.parseHeaders(conn.getHeaderFields(), userdata); | |
boolean responseCompressed = false; | |
String enc = conn.getHeaderField("Content-Encoding"); | |
if (enc != null && enc.toLowerCase().equals("gzip")) { | |
responseCompressed = true; | |
} | |
InputStream in; | |
is = conn.getInputStream(); | |
in = is; | |
if (responseCompressed) { | |
gis = new GZIPInputStream(is); | |
in = gis; | |
} | |
if (contentLength != 0) { | |
if (handler.consumeXml()) { | |
reader = xmlInFactory.createXMLStreamReader(in); | |
handler.consume((R)reader, userdata); | |
} | |
else { | |
handler.consume((R)is, userdata); | |
} | |
} | |
} | |
return status; | |
}catch (IOException e) { // openConnection() failed | |
InputStream es = null; | |
try { | |
int respCode = conn.getResponseCode(); | |
es = conn.getErrorStream(); | |
int ret = 0; | |
byte[] buf = new byte[4096]; | |
// read the response body | |
log("[connect] Error while connecting to server - HTTP %d", respCode); | |
log("IOException: "+ e); | |
while ((ret = es.read(buf)) > 0) { | |
log(new String(buf, 0, ret)); | |
} | |
log(""); | |
// close the errorstream | |
es.close(); | |
} catch(IOException ex) { | |
log("[connect] unexpected exception: " + ex); | |
} | |
finally { | |
if (es != null) { | |
try { | |
es.close(); | |
} | |
catch(IOException ioex) { | |
log("[connect] error while closing error stream: %s", ioex); | |
} | |
} | |
} | |
} | |
catch(XMLStreamException xsex) { | |
log("[connect] error reading user response: " + xsex); | |
} | |
finally { | |
try { | |
if (reader != null) reader.close(); | |
} | |
catch(XMLStreamException xsex) { log("[connect] error closing xmlreader "+ xsex); } | |
try { | |
if (gis != null) | |
gis.close(); | |
if (is != null) | |
is.close(); | |
} | |
catch (IOException ioex) { | |
log("[connect] error closing input stream: " + ioex ); | |
} | |
} | |
return HttpURLConnection.HTTP_INTERNAL_ERROR; | |
} | |
public static synchronized void vlog(String msg) { | |
if (args.verbose) { | |
System.out.println(msg); | |
file.println(msg); | |
} | |
} | |
public static synchronized void vlog(String msg, Object... strings ) { | |
if (args.verbose) { | |
String s = String.format(msg, strings); | |
System.out.println(s); | |
file.println(s); | |
} | |
} | |
public static synchronized void log(String msg) { | |
System.out.println(msg); | |
file.println(msg); | |
} | |
public static synchronized void log(String msg, Object... strings ) { | |
String s = String.format(msg, strings); | |
System.out.println(s); | |
file.println(s); | |
} | |
public static void prn(String msg) { | |
System.out.println(msg); | |
} | |
public static void prn(String msg, Object... strings ) { | |
System.out.println(String.format(msg, strings)); | |
} | |
private static final char[] CA = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/".toCharArray(); | |
public final static byte[] encodeToByte(byte[] sArr, boolean lineSep) | |
{ | |
// Check special case | |
int sLen = sArr != null ? sArr.length : 0; | |
if (sLen == 0) | |
return new byte[0]; | |
int eLen = (sLen / 3) * 3; // Length of even 24-bits. | |
int cCnt = ((sLen - 1) / 3 + 1) << 2; // Returned character count | |
int dLen = cCnt + (lineSep ? (cCnt - 1) / 76 << 1 : 0); // Length of returned array | |
byte[] dArr = new byte[dLen]; | |
// Encode even 24-bits | |
for (int s = 0, d = 0, cc = 0; s < eLen;) { | |
// Copy next three bytes into lower 24 bits of int, paying attension to sign. | |
int i = (sArr[s++] & 0xff) << 16 | (sArr[s++] & 0xff) << 8 | (sArr[s++] & 0xff); | |
// Encode the int into four chars | |
dArr[d++] = (byte) CA[(i >>> 18) & 0x3f]; | |
dArr[d++] = (byte) CA[(i >>> 12) & 0x3f]; | |
dArr[d++] = (byte) CA[(i >>> 6) & 0x3f]; | |
dArr[d++] = (byte) CA[i & 0x3f]; | |
// Add optional line separator | |
if (lineSep && ++cc == 19 && d < dLen - 2) { | |
dArr[d++] = '\r'; | |
dArr[d++] = '\n'; | |
cc = 0; | |
} | |
} | |
// Pad and encode last bits if source isn't an even 24 bits. | |
int left = sLen - eLen; // 0 - 2. | |
if (left > 0) { | |
// Prepare the int | |
int i = ((sArr[eLen] & 0xff) << 10) | (left == 2 ? ((sArr[sLen - 1] & 0xff) << 2) : 0); | |
// Set last four chars | |
dArr[dLen - 4] = (byte) CA[i >> 12]; | |
dArr[dLen - 3] = (byte) CA[(i >>> 6) & 0x3f]; | |
dArr[dLen - 2] = left == 2 ? (byte) CA[i & 0x3f] : (byte) '='; | |
dArr[dLen - 1] = '='; | |
} | |
return dArr; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment