Skip to content

Instantly share code, notes, and snippets.

@gjb2048
Last active December 19, 2015 17:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gjb2048/5990478 to your computer and use it in GitHub Desktop.
Save gjb2048/5990478 to your computer and use it in GitHub Desktop.
Program to demonstrate how threads can be employed to generate and consume data.
/*
* Thread Pool.
* © G J Barnard 2012
* License: Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported (CC BY-NC-SA 3.0).
* See: http://creativecommons.org/licenses/by-nc-sa/3.0/
*/
package threadpool;
import java.util.Date;
import java.util.LinkedList;
import java.util.Random;
import static java.lang.System.out; // Shorten System.out
/**
* Class to demonstrate thread pools in a producer consumer setting.
*
* @author Gareth J Barnard 2012
*/
public class ThreadPool extends Thread
{
private static short dataGeneratorInstanceNoCount = 0;
private static short threadProcessorInstanceNoCount = 0;
private final static int ONE_SECOND = 1000;
private final static byte DATA_DETERMINATION_FALSE = 1;
private final static byte DATA_DETERMINATION_TRUE = 2;
private Random rand = new Random();
private final LinkedList<Data> dataStore = new LinkedList<Data>();
private long dataGenerated = 0;
private long dataProcessed = 0;
private Date startTime = null;
private short noOfGenerators = 0;
private short noOfProcessors = 0;
private short generatorThreadSleepTime = 0;
private short processorThreadSleepTime = 0;
private boolean dataTerminationOutput = true;
/**
* Create the Thread Pool Example and set the start time we were created.
*/
public ThreadPool()
{
startTime = new Date();
}
/**
* Setup the Thread Pool Example.
*
* @param noOfGenerators Number of data generators to create.
* @param noOfProcessors Number of data processors to create.
* @param generatorThreadSleepTime The number of seconds (not including random extra milliseconds) that the generator threads will sleep for.
* @param processorThreadSleepTime The number of seconds (not including random extra milliseconds) that the processor threads will sleep for.
* @param dataTerminationOutput Display (ThreadPool.DATA_DETERMINATION_TRUE) or not display (ThreadPool.DATA_DETERMINATION_FALSE) the individual data details when they are processed. This can help
* with output when using lots of generators.
*/
public void setup(short noOfGenerators, short noOfProcessors, short generatorThreadSleepTime, short processorThreadSleepTime, byte dataTerminationOutput)
{
this.noOfGenerators = noOfGenerators;
this.noOfProcessors = noOfProcessors;
this.generatorThreadSleepTime = generatorThreadSleepTime;
this.processorThreadSleepTime = processorThreadSleepTime;
if (dataTerminationOutput == DATA_DETERMINATION_FALSE)
{
this.dataTerminationOutput = false;
}
Thread t;
// Create Data Processors in a 'pool'.
for (short pool = 0; pool < noOfProcessors; pool++)
{
t = new Thread(new DataProcessor(this));
t.start();
}
// Create Data Generators in a 'pool'.
for (short pool = 0; pool < noOfGenerators; pool++)
{
t = new Thread(new DataGenerator(this));
t.start();
}
}
/**
* @param args DataTerminationOutput(yes/no) or 'help' RunDurationInSeconds DataGenerators DataProcessors GeneratorThreadSleepSeconds ProcessorThreadSleepSeconds
*/
public static void main(String[] args)
{
short runDuration = 0;
short noOfGenerators = 0;
short noOfProcessors = 0;
short generatorThreadSleepTime = 0;
short processorThreadSleepTime = 0;
byte dataTerminationOutput = 0;
out.println("Thread Pool Example");
out.println("-------------------");
if (args.length > 0)
{
if (args[0].equalsIgnoreCase("no"))
{
dataTerminationOutput = DATA_DETERMINATION_FALSE;
}
else if (args[0].equalsIgnoreCase("yes"))
{
dataTerminationOutput = DATA_DETERMINATION_TRUE;
}
else if (args[0].equalsIgnoreCase("help"))
{
out.println("Usage: java ThreadPool DataTerminationOutput(yes/no) or 'help' RunDurationInSeconds(max " + Short.MAX_VALUE + ") DataGenerators(max " + Short.MAX_VALUE + ") DataProcessors(max " + Short.MAX_VALUE + ") GeneratorThreadSleepSeconds(max " + Short.MAX_VALUE + ") ProcessorThreadSleepSeconds(max " + Short.MAX_VALUE + ").");
out.println("");
out.println("Program to demonstrate how threads can be employed to generate and consume data.");
out.println("In this case a small data class that contains information about when it was");
out.println("created and by which thread. Then one of the threads in the 'thread pool' is");
out.println("allocated to 'consume' it by printing out its details. Results should be more");
out.println("varied on multiple CPU machines. Information printed at the start and statistics");
out.println("at the end showing how many 'data' were not consumed.");
out.println("");
out.println("The parameters are as follows (brackets indicate the default):");
out.println("DataTerminationOutput - (yes) or no - state to print out the 'data' details.");
out.println("RunDurationInSeconds - (20) - runtime of the program.");
out.println("DataGenerators - (5) - number of generating threads.");
out.println("DataProcessors - (5) - number of generating threads.");
out.println("GeneratorThreadSleepSeconds - (2) - number of seconds(+random element) a thread will sleep for after performing its task.");
out.println("ProcessorThreadSleepSeconds - (2) - number of seconds(+random element) a thread will sleep for after performing its task.");
System.exit(0);
}
else
{
out.println("Invalid Data Termination Output: " + args[0]);
}
if (args.length > 1)
{
try
{
runDuration = Short.parseShort(args[1]);
}
catch (NumberFormatException ex)
{
out.println("Invalid duration: " + args[1]);
}
}
if (args.length > 2)
{
try
{
noOfGenerators = Short.parseShort(args[2]);
}
catch (NumberFormatException ex)
{
out.println("Invalid number of generators: " + args[2]);
}
}
if (args.length > 3)
{
try
{
noOfProcessors = Short.parseShort(args[3]);
}
catch (NumberFormatException ex)
{
out.println("Invalid number of generators: " + args[3]);
}
}
if (args.length > 4)
{
try
{
generatorThreadSleepTime = Short.parseShort(args[4]);
}
catch (NumberFormatException ex)
{
out.println("Invalid generator sleep time: " + args[4]);
}
}
if (args.length > 5)
{
try
{
processorThreadSleepTime = Short.parseShort(args[5]);
}
catch (NumberFormatException ex)
{
out.println("Invalid processor sleep time: " + args[5]);
}
}
}
else
{
out.println("Usage: java -jar ThreadPool DataTerminationOutput(yes/no) RunDurationInSeconds(max " + Short.MAX_VALUE + ") DataGenerators(max " + Short.MAX_VALUE + ") DataProcessors(max " + Short.MAX_VALUE + ") GeneratorThreadSleepSeconds(max " + Short.MAX_VALUE + ") ProcessorThreadSleepSeconds(max " + Short.MAX_VALUE + ").");
}
if (dataTerminationOutput == 0)
{
out.println("Usinf default Data Termination Output of yes.");
dataTerminationOutput = DATA_DETERMINATION_TRUE;
}
if (runDuration == 0)
{
out.println("Using runtime duration default of twenty seconds.");
runDuration = 20;
}
if (noOfGenerators == 0)
{
out.println("Using defualt of five generators.");
noOfGenerators = 5;
}
if (noOfProcessors == 0)
{
out.println("Using default of five processors.");
noOfProcessors = 5;
}
if (generatorThreadSleepTime == 0)
{
out.println("Using generator sleep time default of two seconds.");
generatorThreadSleepTime = 2;
}
if (processorThreadSleepTime == 0)
{
out.println("Using processor sleep time default of two seconds.");
processorThreadSleepTime = 2;
}
// Create us.
ThreadPool us = new ThreadPool();
// Prepare for shutdown...
Runtime rt = Runtime.getRuntime();
rt.addShutdownHook(us);
// Setup.
us.setup(noOfGenerators, noOfProcessors, generatorThreadSleepTime, processorThreadSleepTime, dataTerminationOutput);
// Output some information.
out.println();
out.println("Thread Pool Information");
out.println("-----------------------");
out.println("Available CPU's : " + rt.availableProcessors());
out.println("Available memory : " + rt.maxMemory() + " bytes.");
out.println("Generators : " + noOfGenerators);
out.println("Processors : " + noOfProcessors);
out.println("Generator Sleep Time : " + generatorThreadSleepTime + " seconds.");
out.println("Processor Sleep Time : " + processorThreadSleepTime + " seconds.");
out.println("Runtime : " + runDuration + " seconds.");
out.println("Output Individual Data: " + getBooleanState(us.dataTerminationOutput));
out.println("Start time : " + us.startTime.toString());
out.println();
// Run for runDuration seconds.
try
{
ThreadPool.sleep(ONE_SECOND * runDuration);
}
catch (InterruptedException ex)
{
}
System.exit(0);
}
/**
* Called at shutdown in a response to a Ctrl-C or system exit.
*/
@Override
public void run()
{
Date endTime = new Date();
out.println();
out.println("Exiting...");
// Output stats.
out.println("Data generated : " + dataGenerated);
out.println("Data processed : " + dataProcessed);
out.println("Unprocessed data : " + dataStore.size());
out.println("Generators : " + noOfGenerators);
out.println("Processors : " + noOfProcessors);
out.println("Generator Sleep Time : " + generatorThreadSleepTime + " seconds.");
out.println("Processor Sleep Time : " + processorThreadSleepTime + " seconds.");
out.println("Output Individual Data: " + getBooleanState(dataTerminationOutput));
out.println("Start time : " + startTime.toString());
out.println("End time : " + endTime.toString());
out.println("Run duration : " + (endTime.getTime() - startTime.getTime()) / ONE_SECOND + " seconds.");
out.println("Exited");
}
/**
* Converts a boolean value into 'yes' or 'no'.
*
* @param state The value to convert.
* @return 'yes' or 'no'.
*/
private static String getBooleanState(boolean state)
{
if (state)
{
return "yes";
}
else
{
return "no";
}
}
/**
* Adds to the count of data that has been generated.
*/
private synchronized void addDataGenerated()
{
dataGenerated++;
}
/**
* Adds to the count of data that has been processed.
*/
private synchronized void addDataProcessed()
{
dataProcessed++;
}
/**
* Class that will have many instances in the pool to process the data placed in 'dataStore'.
*/
private class DataProcessor implements Runnable
{
private short instanceNo;
private ThreadPool owner = null;
public DataProcessor(ThreadPool owner)
{
instanceNo = ++threadProcessorInstanceNoCount;
this.owner = owner;
}
/**
* Process the data every 'ProcessorThreadSleepSeconds' + (Instance Number * Random(Max value of 'DataProcessors') milliseconds).
*/
@Override
public void run()
{
Data currentData;
while (true)
{
synchronized (dataStore)
{
while (dataStore.isEmpty())
{
try
{
dataStore.wait();
}
catch (InterruptedException ex)
{
}
}
currentData = dataStore.remove(0);
}
if (owner.dataTerminationOutput)
{
out.println(currentData.creationDetails() + " and processed by Processor Thread " + instanceNo + " in " + (System.currentTimeMillis() - currentData.timeAdded.getTime()) + " milliseconds");
}
owner.addDataProcessed(); // Interesting to note that an inner class object can call a private method on the parent class which is another object.
try
{
Thread.sleep((processorThreadSleepTime * ONE_SECOND) + (instanceNo * rand.nextInt(noOfProcessors)));
}
catch (InterruptedException ex)
{
}
}
}
}
/**
* Class to contain data to be processed.
*/
private class Data
{
private Date timeAdded;
private short byThread;
public Data(short byThread)
{
this.timeAdded = new Date();
this.byThread = byThread;
}
public String creationDetails()
{
return "Data was created by Data Generator Thread " + byThread + " on " + timeAdded.toString();
}
}
/**
* Class to generate data at a pseudo-random interval.
*/
private class DataGenerator implements Runnable
{
private short instanceNo;
private ThreadPool owner = null;
public DataGenerator(ThreadPool owner)
{
instanceNo = ++dataGeneratorInstanceNoCount;
this.owner = owner;
}
/**
* Create the data to be processed every 'GeneratorThreadSleepSeconds' + (Instance Number * Random(Max value of 'DataGenerators') milliseconds).
*/
@Override
public void run()
{
while (true)
{
synchronized (dataStore)
{
dataStore.add(new Data(instanceNo));
// Experiment with...
//dataStore.notifyAll();
// or
dataStore.notify();
}
owner.addDataGenerated(); // Interesting to note that an inner class object can call a private method on the parent class which is another object.
try
{
Thread.sleep((generatorThreadSleepTime * ONE_SECOND) + (instanceNo * rand.nextInt(noOfGenerators)));
}
catch (InterruptedException ex)
{
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment