Skip to content

@mneedham /MergeTimeRx.java
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import org.neo4j.cypher.javacompat.ExecutionEngine;
import org.neo4j.cypher.javacompat.ExecutionResult;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.kernel.impl.util.FileUtils;
public class MergeTimeRx
{
public static void main( final String[] args ) throws InterruptedException, IOException
{
String pathToDb = "/tmp/foo";
FileUtils.deleteRecursively( new File( pathToDb ) );
GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );
final ExecutionEngine engine = new ExecutionEngine( db );
int numberOfThreads = 50;
int numberOfUsers = 10;
int numberOfEvents = 50;
int iterations = 100;
Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );
events.subscribe( new Action1<ExecutionResult>()
{
@Override
public void call( ExecutionResult result )
{
for ( Map<String, Object> row : result )
{
}
}
} );
ExecutionResult userResult = engine.execute(
"MATCH (u:User)\n" +
"RETURN u.id as userId, COUNT(u) AS count\n" +
"ORDER BY userId" );
System.out.println( userResult.dumpToString() );
}
private static List<Integer> generateIds( int amount )
{
List<Integer> ids = new ArrayList<>();
for ( int i = 1; i <= amount; i++ )
{
ids.add( i );
}
return ids;
}
private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers,
final int numberOfEvents, final int numberOfThreads,
final int iterations )
{
final Random random = new Random();
final List<Integer> userIds = generateIds( numberOfUsers );
final List<Integer> eventIds = generateIds( numberOfEvents );
return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>()
{
@Override
public Subscription onSubscribe( final Observer<? super ExecutionResult> observer )
{
final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );
List<Future<ExecutionResult>> jobs = new ArrayList<>();
for ( int i = 0; i < iterations; i++ )
{
Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>()
{
@Override
public ExecutionResult call()
{
Integer userId = userIds.get( random.nextInt( numberOfUsers ) );
Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );
return engine.execute(
"MERGE (u:User {id: {userId}})\n" +
"MERGE (e:Event {id: {eventId}})\n" +
"MERGE (u)-[:HAS_EVENT]->(e)\n" +
"RETURN u, e",
MapUtil.map( "userId", userId, "eventId", eventId ) );
}
} );
jobs.add( job );
}
for ( Future<ExecutionResult> future : jobs )
{
try
{
observer.onNext( future.get() );
}
catch ( InterruptedException | ExecutionException ignored )
{
}
}
observer.onCompleted();
executor.shutdown();
return Subscriptions.empty();
}
} );
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.