Skip to content

Instantly share code, notes, and snippets.

@danizen
Last active January 17, 2018 21:45
Show Gist options
  • Save danizen/b5691e9900a751447c81a85a930e7fd2 to your computer and use it in GitHub Desktop.
Save danizen/b5691e9900a751447c81a85a930e7fd2 to your computer and use it in GitHub Desktop.
MongoJobStatusStore and unit tests
package gov.nih.nlm.occs.norconex;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.xml.stream.XMLStreamException;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.result.DeleteResult;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.eq;
import static com.mongodb.client.model.Updates.set;
import static com.mongodb.client.model.Updates.setOnInsert;
import static com.mongodb.client.model.Updates.combine;
import org.bson.conversions.Bson;
import com.norconex.commons.lang.config.XMLConfigurationUtil;
import com.norconex.commons.lang.encrypt.EncryptionKey;
import com.norconex.commons.lang.xml.EnhancedXMLStreamWriter;
import com.norconex.jef4.status.IJobStatus;
import com.norconex.jef4.status.IJobStatusStore;
import com.norconex.jef4.status.JobDuration;
import com.norconex.jef4.status.MutableJobStatus;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
public class MongoJobStatusStore implements IJobStatusStore {
/* Default Database Name */
public static final String DEFAULT_DATABASE_NAME = "jobstatus";
/* Default collection Name */
public static final String DEFAULT_COLLECTION_NAME = "jobstatus";
/* Mongo fields */
public class Field {
public static final String SUITE = "suite";
public static final String JOB_ID = "job_id";
public static final String BACKUP_DATE = "backup_date";
public static final String LAST_ACTIVITY = "last_activity";
public static final String STATUS_TYPE = "status_type";
public static final String PROGRESS = "progress";
public static final String NOTE = "note";
public static final String RESUME_ATTEMPTS = "resume_attempts";
public static final String RESUMED_START_TIME = "resumed_start_time";
public static final String RESUMED_LAST_ACTIVITY = "resumed_last_activity";
public static final String START_TIME = "start_time";
public static final String END_TIME = "end_time";
public static final String STOP_REQUESTED = "stop_requested";
public static final String PROPERTIES = "properties";
}
public class StatusType {
public static final String LATEST = "latest";
public static final String BACKUP = "backup";
};
/* Configurable attributes */
private ActiveMongoConnectionDetails connDetails;
private String collectionName;
/* Dynamic attributes */
private MongoClient lazyClient;
private MongoDatabase lazyDatabase;
private MongoCollection<Document> lazyCollection;
public MongoJobStatusStore() {
connDetails = new ActiveMongoConnectionDetails();
connDetails.setDatabaseName(DEFAULT_DATABASE_NAME);
collectionName = DEFAULT_COLLECTION_NAME;
}
@Override
public void loadFromXML(Reader reader) throws IOException {
XMLConfiguration xml = XMLConfigurationUtil.newXMLConfiguration(reader);
connDetails.setHost(xml.getString("host", connDetails.getHost()));
connDetails.setPort(xml.getInt("port", connDetails.getPort()));
connDetails.setDatabaseName(
xml.getString("dbname", connDetails.getDatabaseName()));
connDetails.setUsername(
xml.getString("username", connDetails.getUsername()));
connDetails.setPassword(
xml.getString("password", connDetails.getPassword()));
connDetails.setMechanism(
xml.getString("mechanism", connDetails.getMechanism()));
setCollectionName(xml.getString("collection", getCollectionName()));
// encrypted password:
String xmlKey = xml.getString("passwordKey", null);
String xmlSource = xml.getString("passwordKeySource", null);
if (StringUtils.isNotBlank(xmlKey)) {
EncryptionKey.Source source = null;
if (StringUtils.isNotBlank(xmlSource)) {
source = EncryptionKey.Source.valueOf(xmlSource.toUpperCase());
}
connDetails.setPasswordKey(new EncryptionKey(xmlKey, source));
}
}
@Override
public void saveToXML(Writer writer) throws IOException {
try {
EnhancedXMLStreamWriter xmlwriter = new EnhancedXMLStreamWriter(writer);
xmlwriter.writeStartElement("statusStore");
xmlwriter.writeAttribute("class", getClass().getCanonicalName());
xmlwriter.writeElementString("host", connDetails.getHost());
xmlwriter.writeElementInteger("port", connDetails.getPort());
xmlwriter.writeElementString("dbname", connDetails.getDatabaseName());
xmlwriter.writeElementString("collection", getCollectionName());
if (connDetails.getUsername() != null) {
xmlwriter.writeElementString("username", connDetails.getUsername());
xmlwriter.writeElementString("password", connDetails.getPassword());
xmlwriter.writeElementString("mechanism", connDetails.getMechanism());
}
// Encrypted password:
EncryptionKey key = connDetails.getPasswordKey();
if (key != null) {
xmlwriter.writeElementString("passwordKey", key.getValue());
if (key.getSource() != null) {
xmlwriter.writeElementString("passwordKeySource",
key.getSource().name().toLowerCase());
}
}
xmlwriter.writeEndElement();
xmlwriter.flush();
xmlwriter.close();
} catch (XMLStreamException e) {
throw new IOException("Cannot save as XML.", e);
}
}
@Override
public void backup(String suiteName, String jobId, Date backupDate) throws IOException {
MongoCollection<Document> collection = getCollection();
Bson updateBackupDate = combine(
set(Field.BACKUP_DATE, backupDate),
set(Field.STATUS_TYPE, StatusType.BACKUP));
collection.findOneAndUpdate(latestStatusFilter(suiteName, jobId), updateBackupDate);
}
@Override
public IJobStatus read(String suiteName, String jobId) throws IOException {
MutableJobStatus jobStatus = new MutableJobStatus(jobId);
MongoCollection<Document> collection = getCollection();
Document document = collection.find(latestStatusFilter(suiteName, jobId)).first();
if (document == null) {
return jobStatus;
}
fromDocument(document, jobStatus);
return jobStatus;
}
@Override
public void remove(String suiteName, String jobId) throws IOException {
MongoCollection<Document> collection = getCollection();
DeleteResult result = collection.deleteMany(latestStatusFilter(suiteName, jobId));
if (!result.wasAcknowledged()) {
// TODO throw exception
} else if (result.getDeletedCount() > 1) {
// TODO throw otherexception
}
}
@Override
public long touch(String suiteName, String jobId) throws IOException {
MongoCollection<Document> collection = getCollection();
Bson onInsertOps = updateOnlyOnInsert(suiteName, jobId);
// Should we let Mongo set the timestamp and do update options to return
// the document afterwards and get the FIELD_LAST_TOUCHED from that?
Date now = new Date();
Bson touchOp = set(Field.LAST_ACTIVITY, now);
collection.findOneAndUpdate(
latestStatusFilter(suiteName, jobId),
combine(onInsertOps, touchOp),
new FindOneAndUpdateOptions().upsert(true));
return now.getTime();
}
@Override
public void write(String suiteName, final IJobStatus jobStatus) throws IOException {
MongoCollection<Document> collection = getCollection();
Bson onInsertOps = updateOnlyOnInsert(suiteName, jobStatus.getJobId());
Bson alwaysOps = updateAlways(jobStatus);
Bson touchOp = set(Field.LAST_ACTIVITY, new Date());
collection.findOneAndUpdate(
latestStatusFilter(suiteName, jobStatus.getJobId()),
combine(onInsertOps, alwaysOps, touchOp),
new FindOneAndUpdateOptions().upsert(true));
}
protected MongoClient getClient() {
if (lazyClient == null) {
lazyClient = connDetails.buildMongoClient("jobstatus");
}
return lazyClient;
}
protected MongoDatabase getDatabase() {
if (lazyDatabase == null) {
MongoClient client = getClient();
lazyDatabase = client.getDatabase(connDetails.getSafeDatabaseName(DEFAULT_DATABASE_NAME));
}
return lazyDatabase;
}
protected MongoCollection<Document> getCollection() {
if (lazyCollection == null) {
MongoDatabase db = getDatabase();
lazyCollection = db.getCollection(getCollectionName());
// ensure an index on suite, job_id, and status_type
lazyCollection.createIndex(
Indexes.ascending(Field.SUITE, Field.JOB_ID, Field.STATUS_TYPE),
new IndexOptions().background(true).name("suite_job_status_1"));
}
return lazyCollection;
}
public static void fromDocument(final Document document, MutableJobStatus status) throws IOException {
JobDuration duration = new JobDuration();
for (String field: document.keySet()) {
if (field.equals(Field.LAST_ACTIVITY)) {
status.setLastActivity(document.getDate(field));
} else if (field.equals(Field.PROGRESS)) {
status.setProgress(document.getDouble(field));
} else if (field.equals(Field.NOTE)) {
status.setNote(document.getString(field));
} else if (field.equals(Field.RESUME_ATTEMPTS)) {
status.setResumeAttempts(document.getInteger(field));
} else if (field.equals(Field.RESUMED_START_TIME)) {
duration.setResumedStartTime(document.getDate(field));
} else if (field.equals(Field.RESUMED_LAST_ACTIVITY)) {
duration.setResumedLastActivity(document.getDate(field));
} else if (field.equals(Field.START_TIME)) {
duration.setStartTime(document.getDate(field));
} else if (field.equals(Field.END_TIME)) {
duration.setEndTime(document.getDate(field));
} else if (field.equals(Field.PROPERTIES)) {
// Both org.bson.Document and com.norconex.commons.lang.map.Properties speak JSON,
// but they don't use the same conventions to do so.
Document subdoc = document.get(field, Document.class);
StringReader sr = new StringReader(subdoc.toJson());
status.getProperties().loadFromJSON(sr);
}
}
status.setDuration(duration);
}
public static Bson updateOnlyOnInsert(String suiteName, String jobId) {
/* these are only set if the atomic operation will be an insert */
return combine(
setOnInsert(Field.SUITE, suiteName),
setOnInsert(Field.JOB_ID, jobId),
setOnInsert(Field.STATUS_TYPE, StatusType.LATEST));
}
public static Bson updateAlways(IJobStatus status) throws IOException {
List<Bson> oplist = new ArrayList<Bson>();
/* always update last activety on write */
oplist.add(set(Field.LAST_ACTIVITY, new Date()));
oplist.add(set(Field.PROGRESS, status.getProgress()));
oplist.add(set(Field.NOTE, status.getNote()));
oplist.add(set(Field.RESUME_ATTEMPTS, status.getResumeAttempts()));
JobDuration duration = status.getDuration();
if (status.getResumeAttempts() > 0) {
oplist.add(set(Field.RESUMED_START_TIME, duration.getResumedStartTime()));
oplist.add(set(Field.RESUMED_LAST_ACTIVITY, duration.getResumedLastActivity()));
}
if (duration.getStartTime() != null) {
oplist.add(set(Field.START_TIME, duration.getStartTime()));
}
if (duration.getEndTime() != null) {
oplist.add(set(Field.END_TIME, duration.getEndTime()));
}
if (status.isStopping() || status.isStopped()) {
oplist.add(set(Field.STOP_REQUESTED, true));
}
if (!status.getProperties().isEmpty()) {
// com.norconex.commons.lang.map.Properties can store as JSON,
// but let us not do that work unless we need to.
StringWriter sw = new StringWriter();
status.getProperties().storeToJSON(sw);
oplist.add(set(Field.PROPERTIES, Document.parse(sw.toString())));
}
return combine(oplist);
}
private Bson latestStatusFilter(String suiteName, String jobId) {
return and(
eq(Field.SUITE, suiteName),
eq(Field.JOB_ID, jobId),
eq(Field.STATUS_TYPE, StatusType.LATEST));
}
public ActiveMongoConnectionDetails getConnectionDetails() {
return connDetails;
}
public void setConnectionDetails(ActiveMongoConnectionDetails connDetails) {
this.connDetails = connDetails;
}
public String getCollectionName() {
return collectionName;
}
public void setCollectionName(String collectionName) {
this.collectionName = collectionName;
}
@Override
public boolean equals(final Object rawother) {
if (!(rawother instanceof MongoJobStatusStore)) {
return false;
}
MongoJobStatusStore other = (MongoJobStatusStore) rawother;
return new EqualsBuilder()
.append(connDetails, other.connDetails)
.append(collectionName, other.collectionName)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder()
.append(connDetails)
.append(collectionName)
.toHashCode();
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append(connDetails)
.append(collectionName)
.toString();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<statusStore class="gov.nih.nlm.occs.norconex.MongoJobStatusStore">
<host>10.1.0.77</host>
<port>5117</port>
<dbname>foodb</dbname>
<username>foouser</username>
<password>f00Mat1cal</password>
<collection>jobs</collection>
</statusStore>
<?xml version="1.0" encoding="UTF-8"?>
<statusStore class="gov.nih.nlm.occs.norconex.MongoJobStatusStore">
<host>10.1.0.77</host>
</statusStore>
package gov.nih.nlm.occs.norconex;
import java.io.StringWriter;
import java.io.IOException;
import java.io.Reader;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.Calendar;
import java.util.Random;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import org.apache.log4j.Level;
import org.junit.Test;
import org.junit.Assert;
import static org.junit.Assume.assumeNotNull;
import static org.junit.Assert.assertThat;
import org.bson.Document;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.eq;
import static com.mongodb.client.model.Sorts.descending;
import com.norconex.commons.lang.config.XMLConfigurationUtil;
import com.norconex.commons.lang.log.CountingConsoleAppender;
import com.norconex.jef4.status.IJobStatus;
import com.norconex.jef4.status.MutableJobStatus;
import com.norconex.jef4.status.JobDuration;
public class MongoJobStatusStoreTest {
public Random random = new Random();
@Test
public void testWriteRead() throws IOException {
ActiveMongoConnectionDetails connDetails = new ActiveMongoConnectionDetails();
connDetails.setHost("10.0.1.77");
connDetails.setPort(527);
connDetails.setDatabaseName("foodb");
connDetails.setUsername("foouser");
connDetails.setPassword("f00Mat1cal");
connDetails.setMechanism("SCRAM-SHA-1");
MongoJobStatusStore store = new MongoJobStatusStore();
store.setConnectionDetails(connDetails);
store.setCollectionName("jobs");
System.out.println("Writing/Reading this: "+ store);
StringWriter writer = new StringWriter();
store.saveToXML(writer);
writer.close();
System.out.println("Wrote this: " + writer.toString());
XMLConfigurationUtil.assertWriteRead(store);
}
@Test
public void testParseMinimal() throws IOException {
CountingConsoleAppender appender = new CountingConsoleAppender();
appender.startCountingFor(XMLConfigurationUtil.class, Level.WARN);
try {
String resourcePath = "/validate/MongoJobStatusStoreMinimal.xml";
Reader r = new InputStreamReader(getClass().getResourceAsStream(resourcePath));
Object obj = XMLConfigurationUtil.newInstance(r);
assertThat(obj, instanceOf(MongoJobStatusStore.class));
MongoJobStatusStore store = (MongoJobStatusStore) obj;
ActiveMongoConnectionDetails conn = store.getConnectionDetails();
assertThat(conn.getHost(), equalTo("10.1.0.77"));
assertThat(conn.getPort(), equalTo(0));
assertThat(conn.getUsername(), nullValue());
} finally {
appender.stopCountingFor(XMLConfigurationUtil.class);
}
Assert.assertEquals("Validation warnings/errors were found.", 0, appender.getCount());
}
@Test
public void testParseMaximal() throws IOException {
CountingConsoleAppender appender = new CountingConsoleAppender();
appender.startCountingFor(XMLConfigurationUtil.class, Level.WARN);
try {
String resourcePath = "/validate/MongoJobStatusStoreMaximal.xml";
Reader r = new InputStreamReader(getClass().getResourceAsStream(resourcePath));
Object obj = XMLConfigurationUtil.newInstance(r);
assertThat(obj, instanceOf(MongoJobStatusStore.class));
MongoJobStatusStore store = (MongoJobStatusStore) obj;
ActiveMongoConnectionDetails conn = store.getConnectionDetails();
assertThat(conn.getHost(), equalTo("10.1.0.77"));
assertThat(conn.getPort(), equalTo(5117));
assertThat(conn.getDatabaseName(), equalTo("foodb"));
assertThat(conn.getUsername(), equalTo("foouser"));
assertThat(conn.getPassword(), equalTo("f00Mat1cal"));
assertThat(store.getCollectionName(), equalTo("jobs"));
} finally {
appender.stopCountingFor(XMLConfigurationUtil.class);
}
Assert.assertEquals("Validation warnings/errors were found.", 0, appender.getCount());
}
/**
* This method is called by tests that aneed a
* live status store object, e.g. connected to a
* real MongoDB. These tests will not be able
* to run all the time, and so we use environment
* variables to pass the correct values to the test
* for MongoDB.
*
* If the environment variables are not present,
* the unit tests using a live Mongo environment are skipped.
*/
public MongoJobStatusStore buildLiveStatusStore() {
String host = System.getenv("MONGO_HOST");
String dbname = System.getenv("MONGO_DB");
assumeNotNull(host, dbname);
ActiveMongoConnectionDetails connDetails = new ActiveMongoConnectionDetails();
connDetails.setHost(host);
connDetails.setDatabaseName(dbname);
String username = System.getenv("MONGO_USER");
String password = System.getenv("MONGO_PASSWORD");
if (username != null) {
assumeNotNull(password);
connDetails.setUsername(username);
connDetails.setPassword(password);
connDetails.setMechanism("SCRAM-SHA1");
}
MongoJobStatusStore store = new MongoJobStatusStore();
store.setConnectionDetails(connDetails);
store.setCollectionName("unittest");
return store;
}
/**
* Computes a random date in 2018, the year
* in which this code was written.
*/
public Date randomStartTime() {
int month = random.nextInt(12);
int day = random.nextInt(28);
int hour = random.nextInt(24);
int minute = random.nextInt(58);
int second = random.nextInt(58);
Calendar cal = Calendar.getInstance();
cal.set(2018, month, day, hour, minute, second);
return cal.getTime();
}
/**
* Create a job status with a random start time and
* all fields set to test round trip and give us a
* status to backup.
*/
public IJobStatus createJobStatus(String jobId) {
MutableJobStatus status = new MutableJobStatus(jobId);
JobDuration duration = new JobDuration();
Date startTime = randomStartTime();
duration.setStartTime(startTime);
status.setProgress(0.23);
status.setNote("This is a unit test");
status.setResumeAttempts(1);
long resumedAt = startTime.getTime() + 3600000;
long lastActivity = resumedAt + 27*60*1000;
long endedAt = lastActivity + 12*60*1000;
duration.setResumedStartTime(new Date(resumedAt));
duration.setResumedLastActivity(new Date(lastActivity));
duration.setEndTime(new Date(endedAt));
status.setDuration(duration);
status.getProperties().addDouble("real", random.nextDouble());
return status;
}
@Test
public void testWriteReadStatus() throws IOException {
final String jobId = "testWriteRead";
Date testStartTime = new Date();
MongoJobStatusStore store = buildLiveStatusStore();
// create status
IJobStatus expectedStatus = createJobStatus(jobId);
JobDuration expectedDuration = expectedStatus.getDuration();
// write job status
store.write("unittests", expectedStatus);
// read job status
IJobStatus actualStatus = store.read("unittests", jobId);
JobDuration actualDuration = actualStatus.getDuration();
// assert various things about the status
assertThat(actualStatus.getNote(),
equalTo(expectedStatus.getNote()));
assertThat(actualStatus.getProgress(),
equalTo(expectedStatus.getProgress()));
assertThat(actualStatus.getResumeAttempts(),
equalTo(expectedStatus.getResumeAttempts()));
// assert various things about the duration
assertThat(actualDuration.getStartTime(),
equalTo(expectedDuration.getStartTime()));
assertThat(actualDuration.getEndTime(),
equalTo(expectedDuration.getEndTime()));
assertThat(actualDuration.getResumedStartTime(),
equalTo(expectedDuration.getResumedStartTime()));
assertThat(actualDuration.getResumedLastActivity(),
equalTo(expectedDuration.getResumedLastActivity()));
// last activity is computed by write
assertThat(actualStatus.getLastActivity(),
greaterThanOrEqualTo(testStartTime));
// properties is correct
assertThat(actualStatus.getProperties().getDouble("real"),
equalTo(expectedStatus.getProperties().getDouble("real")));
}
@Test
public void testRemoveReadStatus() throws Exception {
final String jobId = "testRemoveRead";
MongoJobStatusStore store = buildLiveStatusStore();
IJobStatus oldStatus = createJobStatus(jobId);
store.write("unittests", oldStatus);
store.remove("unittests", jobId);
IJobStatus status = store.read("unittests", jobId);
assertThat(status.getNote(), nullValue());
assertThat(status.getDuration().getStartTime(), nullValue());
}
@Test
public void testTouchStatus() throws Exception {
final String jobId = "testTouchStatus";
MongoJobStatusStore store = buildLiveStatusStore();
IJobStatus oldStatus = createJobStatus(jobId);
store.write("unittests", oldStatus);
// sleep for a time
Thread.sleep(2*1000);
Date wakedAt = new Date();
// touch that status
long expectedLastActivity = store.touch("unittests", jobId);
// read the status directly and verify
IJobStatus status = store.read("unittests", jobId);
assertThat(status.getLastActivity(),
equalTo(new Date(expectedLastActivity)));
assertThat(status.getLastActivity(),
greaterThanOrEqualTo(wakedAt));
}
@Test
public void testBackupStatus() throws IOException {
final String jobId = "testTouchStatus";
MongoJobStatusStore store = buildLiveStatusStore();
// write a s tatus
IJobStatus oldStatus = createJobStatus(jobId);
store.write("unittests", oldStatus);
// backup that status
Date expectedBackupDate = new Date();
store.backup("unittests", jobId, expectedBackupDate);
// read current status and see that it is gone
IJobStatus status = store.read("unittests", jobId);
assertThat(status.getNote(), nullValue());
assertThat(status.getDuration().getStartTime(), nullValue());
// using the collection directly,
// retrieve the backup document
Document document = store.getCollection()
.find(and(
eq("suite", "unittests"),
eq("job_id", jobId),
eq("status_type", "backup")))
.sort(descending("backup_date"))
.first();
assertThat(document, notNullValue());
assertThat(document.getDate("backup_date"),
equalTo(expectedBackupDate));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment