Skip to content

Instantly share code, notes, and snippets.

@ldoguin
Last active April 5, 2018 15:57
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ldoguin/7f98924b22bd61c46fce to your computer and use it in GitHub Desktop.
Save ldoguin/7f98924b22bd61c46fce to your computer and use it in GitHub Desktop.
package org.couchbase.couchswitching;
import com.couchbase.client.core.BackpressureException;
import com.couchbase.client.core.RequestCancelledException;
import com.couchbase.client.deps.com.fasterxml.jackson.databind.JsonNode;
import com.couchbase.client.deps.com.fasterxml.jackson.databind.ObjectMapper;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.error.TemporaryFailureException;
import com.couchbase.client.java.util.retry.RetryBuilder;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.util.Assert;
import rx.Observable;
import java.io.BufferedReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.TimeUnit;
import static com.couchbase.client.core.time.Delay.fixed;
@SpringBootApplication
public class CouchdbToCouchbaseApplication {
public static Logger log = Logger.getLogger(CouchdbToCouchbaseApplication.class);
public static final String TOTAL_ROWS_PROPERTY = "total_rows";
public static final String OFFSET_PROPERTY = "offset";
@Value("${couchdb.downloadURL:http://127.0.0.1:5984/database_export/_all_docs?include_docs=true}")
String couchDBRequest;
@Value("${import.error.log:errors.out}")
String errorLogFilename;
@Value("${import.success.log:success.out}")
String successLogFilename;
public static void main(String[] args) {
SpringApplication.run(CouchdbToCouchbaseApplication.class, args);
}
@Bean
public CommandLineRunner runner() {
return new CommandLineRunner() {
@Override
public void run(String... args) throws Exception {
long start = System.currentTimeMillis();
CouchbaseCluster cc = CouchbaseCluster.create();
AsyncBucket asyncBucket = cc.openBucket().async();
URL url = new URL(couchDBRequest);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty("Accept", "application/json");
//assume this is going to be a big file...
conn.setReadTimeout(0);
if (conn.getResponseCode() != 200) {
throw new RuntimeException("Failed : HTTP error code : " + conn.getResponseCode());
}
ObjectMapper om = new ObjectMapper();
BufferedReader inp2 = new BufferedReader(new InputStreamReader(conn.getInputStream()));
final long[] totalRows = new long[2];
int count = Observable.from(inp2.lines()::iterator).flatMap(s -> {
JsonNode node = null;
if (s.endsWith("\"rows\":[")) {
// first line, find total rows, offset
s = s.concat("]}");
try {
node = om.readTree(s.toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
totalRows[0] = node.get(TOTAL_ROWS_PROPERTY).asLong();
totalRows[1] = node.get(OFFSET_PROPERTY).asLong();
log.info(String.format("Query starting at offset %d for a total of %d rows.", totalRows[1], totalRows[0]));
writeToSuccessLog(String.format("Query starting at offset %d for a total of %d rows.", totalRows[1], totalRows[0]));
return Observable.empty();
} else if (s.length() == 2) {
// last line, do nothing
writeToSuccessLog("end of the feed.");
return Observable.empty();
} else {
try {
if (s.endsWith(",")) {
node = om.readTree(s.substring(0, s.length() - 1).toString());
} else {
node = om.readTree(s.toString());
}
return Observable.just(node);
} catch (IOException e) {
return Observable.error(e);
}
}
}).flatMap(jsonNode -> {
String key = jsonNode.get("key").asText();
String jsonDoc = jsonNode.get("doc").toString();
RawJsonDocument rjd = RawJsonDocument.create(key, jsonDoc);
log.debug("Importing " + key);
return asyncBucket.upsert(rjd)
.timeout(500, TimeUnit.MILLISECONDS)
.retryWhen(RetryBuilder
.anyOf(RequestCancelledException.class)
.delay(fixed(31, TimeUnit.SECONDS)).max(100).build())
.retryWhen(RetryBuilder
.anyOf(TemporaryFailureException.class, BackpressureException.class)
.delay(fixed(100, TimeUnit.MILLISECONDS))
.max(100)
.build())
.doOnError(t -> writeToErrorLog(key))
.doOnNext(jd -> writeToSuccessLog(key))
.onErrorResumeNext(throwable -> {
log.error(String.format("Could not import document ", key));
log.error(throwable.getMessage());
return Observable.just(null);
});
}
).count().toBlocking().single();
long stop = System.currentTimeMillis();
writeToSuccessLog(String.format("End of the import in %dms.", stop - start));
Assert.isTrue(count == totalRows[0], String.format("Something went wrong during the import, expected" +
" %d but got %d", totalRows[0], count));
}
};
}
public void writeToSuccessLog(String text){
writeToFile(successLogFilename, text);
}
public void writeToErrorLog(String text){
writeToFile(errorLogFilename, text);
}
public void writeToFile(String filename, String text) {
try (FileWriter fw = new FileWriter(filename, true);){
fw.write(text + System.getProperty("line.separator"));
fw.close();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment