Created

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist
View MergeTimeRx.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
 
import org.neo4j.cypher.javacompat.ExecutionEngine;
import org.neo4j.cypher.javacompat.ExecutionResult;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.kernel.impl.util.FileUtils;
 
public class MergeTimeRx
{
public static void main( final String[] args ) throws InterruptedException, IOException
{
String pathToDb = "/tmp/foo";
FileUtils.deleteRecursively( new File( pathToDb ) );
 
GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );
final ExecutionEngine engine = new ExecutionEngine( db );
 
int numberOfThreads = 50;
int numberOfUsers = 10;
int numberOfEvents = 50;
int iterations = 100;
 
Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );
 
events.subscribe( new Action1<ExecutionResult>()
{
@Override
public void call( ExecutionResult result )
{
for ( Map<String, Object> row : result )
{
}
}
} );
 
ExecutionResult userResult = engine.execute(
"MATCH (u:User)\n" +
"RETURN u.id as userId, COUNT(u) AS count\n" +
"ORDER BY userId" );
 
System.out.println( userResult.dumpToString() );
}
 
private static List<Integer> generateIds( int amount )
{
List<Integer> ids = new ArrayList<>();
for ( int i = 1; i <= amount; i++ )
{
ids.add( i );
}
return ids;
}
 
 
private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers,
final int numberOfEvents, final int numberOfThreads,
final int iterations )
{
final Random random = new Random();
final List<Integer> userIds = generateIds( numberOfUsers );
final List<Integer> eventIds = generateIds( numberOfEvents );
 
return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>()
{
@Override
public Subscription onSubscribe( final Observer<? super ExecutionResult> observer )
{
final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );
 
List<Future<ExecutionResult>> jobs = new ArrayList<>();
for ( int i = 0; i < iterations; i++ )
{
Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>()
{
@Override
public ExecutionResult call()
{
Integer userId = userIds.get( random.nextInt( numberOfUsers ) );
Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );
 
return engine.execute(
"MERGE (u:User {id: {userId}})\n" +
"MERGE (e:Event {id: {eventId}})\n" +
"MERGE (u)-[:HAS_EVENT]->(e)\n" +
"RETURN u, e",
MapUtil.map( "userId", userId, "eventId", eventId ) );
}
} );
jobs.add( job );
}
 
for ( Future<ExecutionResult> future : jobs )
{
try
{
observer.onNext( future.get() );
}
catch ( InterruptedException | ExecutionException ignored )
{
}
}
 
observer.onCompleted();
executor.shutdown();
 
return Subscriptions.empty();
}
} );
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.