Created
May 24, 2023 10:08
-
-
Save jainsahab/08bff4a8f3792781673a8efad582be17 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.google.cloud.datastore; | |
import com.google.cloud.datastore.StructuredQuery.PropertyFilter; | |
import com.google.cloud.datastore.aggregation.Aggregation; | |
import com.google.common.collect.Iterables; | |
import java.util.concurrent.BrokenBarrierException; | |
import java.util.concurrent.CyclicBarrier; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
public class GithubIssue295 { | |
static Datastore datastore = DatastoreOptions.getDefaultInstance().getService(); | |
public static void main(String[] args) throws BrokenBarrierException, InterruptedException { | |
cleanup(); | |
while (true) { | |
createEntities(); | |
verify(); | |
cleanup(); | |
} | |
} | |
private static void createEntities() throws BrokenBarrierException, InterruptedException { | |
ExecutorService executorService = Executors.newFixedThreadPool(4); | |
Key parentKey = getParentKey(); | |
Entity parent = Entity.newBuilder(parentKey).set("name", "parent").build(); | |
datastore.put(parent); | |
KeyFactory childKeyFactory = datastore.newKeyFactory().addAncestor(PathElement.of("Parent", 1)); | |
Key childOneKey = childKeyFactory.setKind("Child1").newKey("one"); | |
Key childTwoKey = childKeyFactory.setKind("Child2").newKey("two"); | |
Key childThreeKey = childKeyFactory.setKind("Child3").newKey("three"); | |
Key childFourKey = childKeyFactory.setKind("Child4").newKey("four"); | |
Entity childOne = Entity.newBuilder(childOneKey).set("name", "childOne").build(); | |
Entity childTwo = Entity.newBuilder(childTwoKey).set("name", "childTwo").build(); | |
Entity childThree = Entity.newBuilder(childThreeKey).set("name", "childThree").build(); | |
Entity childFour = Entity.newBuilder(childFourKey).set("name", "childFour").build(); | |
CyclicBarrier barrier = new CyclicBarrier(5); | |
Transaction transaction = datastore.newTransaction(); | |
executorService.submit(new MyThread(barrier, childOne, transaction)); | |
executorService.submit(new MyThread(barrier, childTwo, transaction)); | |
executorService.submit(new MyThread(barrier, childThree, transaction)); | |
executorService.submit(new MyThread(barrier, childFour, transaction)); | |
while (barrier.getNumberWaiting() < 4) {} // waiting until all threads are ready | |
System.out.println("-----------------------------------------------"); | |
System.out.println(" Now open the barrier:"); | |
System.out.println("-----------------------------------------------"); | |
barrier.await(); | |
executorService.shutdown(); | |
executorService.awaitTermination(1, TimeUnit.MINUTES); | |
transaction.commit(); | |
} | |
private static void verify() { | |
EntityQuery baseQuery = | |
Query.newEntityQueryBuilder().setFilter(PropertyFilter.hasAncestor(getParentKey())).build(); | |
AggregationQuery aggregationQuery = | |
Query.newAggregationQueryBuilder() | |
.over(baseQuery) | |
.addAggregation(Aggregation.count().as("count")) | |
.build(); | |
AggregationResult aggregationResult = | |
Iterables.getOnlyElement(datastore.runAggregation(aggregationQuery)); | |
System.out.println("Total count: " + aggregationResult.get("count")); // should be 5 (1 parent and 4 children entities) | |
} | |
private static void cleanup() { | |
KeyQuery baseQuery = | |
Query.newKeyQueryBuilder().setFilter(PropertyFilter.hasAncestor(getParentKey())).build(); | |
QueryResults<Key> results = datastore.run(baseQuery); | |
Key[] keys = Iterables.toArray(() -> results, Key.class); | |
// if length is not 5, then printing all keys to infer what is missing | |
if (keys.length < 5) { | |
for (Key key : keys) { | |
System.out.println(key); | |
} | |
} | |
datastore.delete(keys); | |
} | |
private static Key getParentKey() { | |
KeyFactory keyFactory = datastore.newKeyFactory().setKind("Parent"); | |
return keyFactory.newKey(1); | |
} | |
static class MyThread extends Thread { | |
private final CyclicBarrier cyclicBarrier; | |
private final Entity entity; | |
private final Transaction transaction; | |
MyThread(CyclicBarrier cyclicBarrier, Entity entity, Transaction transaction) { | |
super(entity.getKey().getName()); | |
this.cyclicBarrier = cyclicBarrier; | |
this.entity = entity; | |
this.transaction = transaction; | |
} | |
@Override | |
public void run() { | |
try { | |
cyclicBarrier.await(); // wait until all threads are ready. | |
this.transaction.add(this.entity); | |
} catch (InterruptedException | BrokenBarrierException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment