Skip to content

Instantly share code, notes, and snippets.

@amihalik
Created October 5, 2017 18:49
Show Gist options
  • Save amihalik/078ba751248ef0d5caf084f9488bad73 to your computer and use it in GitHub Desktop.
Save amihalik/078ba751248ef0d5caf084f9488bad73 to your computer and use it in GitHub Desktop.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.mongo;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.BasicConfigurator;
import org.apache.rya.indexing.GeoRyaSailFactory;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.accumulo.geo.OptionalConfigUtils;
import org.apache.rya.indexing.geotemporal.mongo.MongoITBase;
import org.apache.rya.indexing.mongodb.MongoIndexingConfiguration;
import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.repository.sail.SailRepository;
import org.openrdf.repository.sail.SailRepositoryConnection;
import org.openrdf.sail.Sail;
import com.google.common.collect.Lists;
import com.mongodb.MongoClient;
/**
* Integration tests for the {@link MongoDbBatchWriter}.
*/
public class MongoDBRyaBatchWriterIT extends MongoITBase {
private Sail sail;
private SailRepositoryConnection conn;
private static ValueFactory vf = new ValueFactoryImpl();
private static final AtomicInteger COUNTER = new AtomicInteger(1);
private static void setupLogging() {
BasicConfigurator.configure();
}
@BeforeClass
public static void setUpClass() throws Exception {
setupLogging();
}
@Before
public void setUp() throws Exception {
conf.setBoolean("rya.mongodb.dao.flusheachupdate", false);
conf.setInt("rya.mongodb.dao.batchwriter.size", 50_000);
conf.setLong("rya.mongodb.dao.batchwriter.flushtime", 100L);
final MongoClient client = super.getMongoClient();
conf.setBoolean(OptionalConfigUtils.USE_GEO, true);
conf.set(ConfigUtils.GEO_PREDICATES_LIST, "http://www.opengis.net/ont/geosparql#asWKT");
conf.setBoolean(ConfigUtils.USE_MONGO, true);
conf.setMongoClient(client);
sail = GeoRyaSailFactory.getInstance(conf);
conn = new SailRepository(sail).getConnection();
}
@Test
public void testSingleWrite() throws Exception {
conn.add(Lists.newArrayList(
statement(0)
));
conn.close();
Thread.sleep(1_000);
Assert.assertEquals(1, getRyaCollection().count());
}
@Test
public void testDuplicateKeys() throws Exception {
conn.add(Lists.newArrayList(
statement(0),
statement(1),
statement(1),
statement(2),
statement(2)
));
Thread.sleep(1_000);
conn.add(Lists.newArrayList(
statement(0),
statement(1),
statement(2),
statement(2),
statement(3)
));
conn.close();
Thread.sleep(1_000);
Assert.assertEquals(4, getRyaCollection().count());
}
private static URI ryaURI(final int v) {
return vf.createURI("u:" + v);
}
private static Statement statement(final int v) {
return vf.createStatement(ryaURI(v), ryaURI(v), ryaURI(v));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment