Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jainsahab/08bff4a8f3792781673a8efad582be17 to your computer and use it in GitHub Desktop.
Save jainsahab/08bff4a8f3792781673a8efad582be17 to your computer and use it in GitHub Desktop.
package com.google.cloud.datastore;
import com.google.cloud.datastore.StructuredQuery.PropertyFilter;
import com.google.cloud.datastore.aggregation.Aggregation;
import com.google.common.collect.Iterables;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class GithubIssue295 {
static Datastore datastore = DatastoreOptions.getDefaultInstance().getService();
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
cleanup();
while (true) {
createEntities();
verify();
cleanup();
}
}
private static void createEntities() throws BrokenBarrierException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
Key parentKey = getParentKey();
Entity parent = Entity.newBuilder(parentKey).set("name", "parent").build();
datastore.put(parent);
KeyFactory childKeyFactory = datastore.newKeyFactory().addAncestor(PathElement.of("Parent", 1));
Key childOneKey = childKeyFactory.setKind("Child1").newKey("one");
Key childTwoKey = childKeyFactory.setKind("Child2").newKey("two");
Key childThreeKey = childKeyFactory.setKind("Child3").newKey("three");
Key childFourKey = childKeyFactory.setKind("Child4").newKey("four");
Entity childOne = Entity.newBuilder(childOneKey).set("name", "childOne").build();
Entity childTwo = Entity.newBuilder(childTwoKey).set("name", "childTwo").build();
Entity childThree = Entity.newBuilder(childThreeKey).set("name", "childThree").build();
Entity childFour = Entity.newBuilder(childFourKey).set("name", "childFour").build();
CyclicBarrier barrier = new CyclicBarrier(5);
Transaction transaction = datastore.newTransaction();
executorService.submit(new MyThread(barrier, childOne, transaction));
executorService.submit(new MyThread(barrier, childTwo, transaction));
executorService.submit(new MyThread(barrier, childThree, transaction));
executorService.submit(new MyThread(barrier, childFour, transaction));
while (barrier.getNumberWaiting() < 4) {} // waiting until all threads are ready
System.out.println("-----------------------------------------------");
System.out.println(" Now open the barrier:");
System.out.println("-----------------------------------------------");
barrier.await();
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
transaction.commit();
}
private static void verify() {
EntityQuery baseQuery =
Query.newEntityQueryBuilder().setFilter(PropertyFilter.hasAncestor(getParentKey())).build();
AggregationQuery aggregationQuery =
Query.newAggregationQueryBuilder()
.over(baseQuery)
.addAggregation(Aggregation.count().as("count"))
.build();
AggregationResult aggregationResult =
Iterables.getOnlyElement(datastore.runAggregation(aggregationQuery));
System.out.println("Total count: " + aggregationResult.get("count")); // should be 5 (1 parent and 4 children entities)
}
private static void cleanup() {
KeyQuery baseQuery =
Query.newKeyQueryBuilder().setFilter(PropertyFilter.hasAncestor(getParentKey())).build();
QueryResults<Key> results = datastore.run(baseQuery);
Key[] keys = Iterables.toArray(() -> results, Key.class);
// if length is not 5, then printing all keys to infer what is missing
if (keys.length < 5) {
for (Key key : keys) {
System.out.println(key);
}
}
datastore.delete(keys);
}
private static Key getParentKey() {
KeyFactory keyFactory = datastore.newKeyFactory().setKind("Parent");
return keyFactory.newKey(1);
}
static class MyThread extends Thread {
private final CyclicBarrier cyclicBarrier;
private final Entity entity;
private final Transaction transaction;
MyThread(CyclicBarrier cyclicBarrier, Entity entity, Transaction transaction) {
super(entity.getKey().getName());
this.cyclicBarrier = cyclicBarrier;
this.entity = entity;
this.transaction = transaction;
}
@Override
public void run() {
try {
cyclicBarrier.await(); // wait until all threads are ready.
this.transaction.add(this.entity);
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment