Skip to content

Instantly share code, notes, and snippets.

@Crim
Last active November 21, 2016 19:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Crim/61537958df65a5e13b3844b2d5e28cde to your computer and use it in GitHub Desktop.
Save Crim/61537958df65a5e13b3844b2d5e28cde to your computer and use it in GitHub Desktop.
package com.pardot.storm.grouping;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.task.WorkerTopologyContext;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Everybody do the Pardot Shuffle. A slightly different take/implementation on the "ShuffleGrouping" grouping.
*/
public class ThePardotShuffle implements CustomStreamGrouping, Serializable {
private ArrayList<List<Integer>> choices;
private AtomicInteger counter;
private int totalSize;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
// Clone the list
choices = new ArrayList<List<Integer>>(targetTasks.size());
for (Integer i : targetTasks) {
choices.add(Arrays.asList(i));
}
counter = new AtomicInteger(0);
totalSize = choices.size();
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
final int nextTask = Math.abs(counter.getAndIncrement() % totalSize);
return choices.get(nextTask);
}
}
package com.pardot.storm.grouping;
import com.google.common.collect.Lists;
import org.apache.storm.grouping.ShuffleGrouping;
import org.apache.storm.task.WorkerTopologyContext;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
/**
*
*/
public class ThePardotShuffleTest {
private static final Logger logger = LoggerFactory.getLogger(ThePardotShuffleTest.class);
/**
* Tests that we round robbin correctly using ThePardotShuffle implementation.
*
* Output: Total Distribution [5000, 5000, 5000, 5000, 5000, 5000]
*/
@Test
public void testThePardotShuffle() {
// Task Id not used, so just pick a static value
final int inputTaskId = 100;
// Define our taskIds
final List<Integer> availableTaskIds = Lists.newArrayList(0, 1, 2, 3, 4, 5);
ThePardotShuffle grouper = new ThePardotShuffle();
WorkerTopologyContext context = mock(WorkerTopologyContext.class);
grouper.prepare(context, null, availableTaskIds);
// Keep track of how many times we see each taskId
int[] taskCounts = {0, 0, 0, 0, 0, 0};
for (int i = 1; i <= 30000; i++) {
List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList());
// Validate a single task id return
assertNotNull("Not null taskId list returned", taskIds);
assertEquals("Single task Id returned", 1, taskIds.size());
int taskId = taskIds.get(0);
assertTrue("TaskId should exist", taskId >= 0 && taskId <= 5);
taskCounts[taskId]++;
}
logger.info("Total Distribution {}", taskCounts);
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[0]);
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[1]);
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[2]);
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[3]);
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[4]);
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[5]);
}
/**
* Tests that we round robbin correctly with multiple threads using ThePardotShuffle implementation.
*
* Output:
* Result: [4865, 4966, 5120, 5023, 5070, 4956]
* Result: [4942, 4964, 5102, 5071, 4924, 4997]
* Result: [5123, 4889, 5034, 5005, 5134, 4815]
* Result: [5003, 5057, 4880, 5196, 4836, 5028]
* Result: [5068, 5018, 4915, 4786, 5045, 5168]
* Result: [4999, 5106, 4949, 4919, 4991, 5036]
*/
@Test
public void testThePardotShuffleMultiThreaded() throws InterruptedException, ExecutionException {
// Task Id not used, so just pick a static value
final int inputTaskId = 100;
// Define our taskIds
final List<Integer> availableTaskIds = Lists.newArrayList(0, 1, 2, 3, 4, 5);
final ThePardotShuffle grouper = new ThePardotShuffle();
final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
// Call prepare with our available taskIds
grouper.prepare(context, null, availableTaskIds);
List<Callable<int[]>> threadTasks = Lists.newArrayList();
for (int x=0; x<availableTaskIds.size(); x++) {
Callable<int[]> threadTask = new Callable<int[]>() {
@Override
public int[] call() throws Exception {
int[] taskCounts = {0, 0, 0, 0, 0, 0};
for (int i = 1; i <= 30000; i++) {
List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList());
// Validate a single task id return
assertNotNull("Not null taskId list returned", taskIds);
assertEquals("Single task Id returned", 1, taskIds.size());
int taskId = taskIds.get(0);
assertTrue("TaskId should exist", taskId >= 0 && taskId <= 5);
taskCounts[taskId]++;
}
return taskCounts;
}
};
// Add to our collection.
threadTasks.add(threadTask);
}
ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size());
List<Future<int[]>> taskResults = executor.invokeAll(threadTasks);
// Wait for all tasks to complete
for (Future taskResult: taskResults) {
while (!taskResult.isDone()) {
Thread.sleep(1000);
}
logger.info("Result: {}", taskResult.get());
}
}
/**
* Tests that we round robbin correctly using ShuffleGrouping implementation.
*
* OUTPUT: "total Distribution [5000, 5000, 5000, 4999, 5000, 5001]"
*/
@Test
public void testShuffleGrouping() {
// Task Id not used, so just pick a static value
final int inputTaskId = 100;
// Define our taskIds
final List<Integer> availableTaskIds = Lists.newArrayList(0, 1, 2, 3, 4, 5);
ShuffleGrouping grouper = new ShuffleGrouping();
WorkerTopologyContext context = mock(WorkerTopologyContext.class);
grouper.prepare(context, null, availableTaskIds);
// Keep track of how many times we see each taskId
int[] taskCounts = {0, 0, 0, 0, 0, 0};
for (int i = 1; i <= 30000; i++) {
List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList());
// Validate a single task id return
assertNotNull("Not null taskId list returned", taskIds);
assertEquals("Single task Id returned", 1, taskIds.size());
int taskId = taskIds.get(0);
assertTrue("TaskId should exist", taskId >= 0 && taskId <= 5);
taskCounts[taskId]++;
}
logger.info("Total Distribution {}", taskCounts);
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[0]);
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[1]);
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[2]);
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[3]);
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[4]);
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[5]);
}
/**
* Tests that we round robbin correctly with multiple threads using ShuffleGrouping implementation.
* Output:
* Result: [27312, 34, 2226, 252, 93, 83]
* Result: [26244, 55, 2918, 398, 248, 137]
* Result: [27190, 44, 2357, 232, 88, 89]
* Result: [27220, 26, 2424, 209, 63, 58]
* Result: [27226, 0, 2313, 178, 191, 92]
* Result: [27288, 0, 2285, 254, 134, 39]
*/
@Test
public void testShuffleGroupMultiThreaded() throws InterruptedException, ExecutionException {
// Task Id not used, so just pick a static value
final int inputTaskId = 100;
// Define our taskIds
final List<Integer> availableTaskIds = Lists.newArrayList(0, 1, 2, 3, 4, 5);
final ShuffleGrouping grouper = new ShuffleGrouping();
final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
// Call prepare with our available taskIds
grouper.prepare(context, null, availableTaskIds);
List<Callable<int[]>> threadTasks = Lists.newArrayList();
for (int x=0; x<availableTaskIds.size(); x++) {
Callable<int[]> threadTask = new Callable<int[]>() {
@Override
public int[] call() throws Exception {
int[] taskCounts = {0, 0, 0, 0, 0, 0};
for (int i = 1; i <= 30000; i++) {
List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList());
// Validate a single task id return
assertNotNull("Not null taskId list returned", taskIds);
assertEquals("Single task Id returned", 1, taskIds.size());
int taskId = taskIds.get(0);
assertTrue("TaskId should exist", taskId >= 0 && taskId <= 5);
taskCounts[taskId]++;
}
return taskCounts;
}
};
// Add to our collection.
threadTasks.add(threadTask);
}
ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size());
List<Future<int[]>> taskResults = executor.invokeAll(threadTasks);
// Wait for all tasks to complete
for (Future taskResult: taskResults) {
while (!taskResult.isDone()) {
Thread.sleep(1000);
}
logger.info("Result: {}", taskResult.get());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment