Last active
December 19, 2015 17:19
-
-
Save gjb2048/5990478 to your computer and use it in GitHub Desktop.
Program to demonstrate how threads can be employed to generate and consume data.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Thread Pool. | |
* © G J Barnard 2012 | |
* License: Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported (CC BY-NC-SA 3.0). | |
* See: http://creativecommons.org/licenses/by-nc-sa/3.0/ | |
*/ | |
package threadpool; | |
import java.util.Date; | |
import java.util.LinkedList; | |
import java.util.Random; | |
import static java.lang.System.out; // Shorten System.out | |
/** | |
* Class to demonstrate thread pools in a producer consumer setting. | |
* | |
* @author Gareth J Barnard 2012 | |
*/ | |
public class ThreadPool extends Thread | |
{ | |
private static short dataGeneratorInstanceNoCount = 0; | |
private static short threadProcessorInstanceNoCount = 0; | |
private final static int ONE_SECOND = 1000; | |
private final static byte DATA_DETERMINATION_FALSE = 1; | |
private final static byte DATA_DETERMINATION_TRUE = 2; | |
private Random rand = new Random(); | |
private final LinkedList<Data> dataStore = new LinkedList<Data>(); | |
private long dataGenerated = 0; | |
private long dataProcessed = 0; | |
private Date startTime = null; | |
private short noOfGenerators = 0; | |
private short noOfProcessors = 0; | |
private short generatorThreadSleepTime = 0; | |
private short processorThreadSleepTime = 0; | |
private boolean dataTerminationOutput = true; | |
/** | |
* Create the Thread Pool Example and set the start time we were created. | |
*/ | |
public ThreadPool() | |
{ | |
startTime = new Date(); | |
} | |
/** | |
* Setup the Thread Pool Example. | |
* | |
* @param noOfGenerators Number of data generators to create. | |
* @param noOfProcessors Number of data processors to create. | |
* @param generatorThreadSleepTime The number of seconds (not including random extra milliseconds) that the generator threads will sleep for. | |
* @param processorThreadSleepTime The number of seconds (not including random extra milliseconds) that the processor threads will sleep for. | |
* @param dataTerminationOutput Display (ThreadPool.DATA_DETERMINATION_TRUE) or not display (ThreadPool.DATA_DETERMINATION_FALSE) the individual data details when they are processed. This can help | |
* with output when using lots of generators. | |
*/ | |
public void setup(short noOfGenerators, short noOfProcessors, short generatorThreadSleepTime, short processorThreadSleepTime, byte dataTerminationOutput) | |
{ | |
this.noOfGenerators = noOfGenerators; | |
this.noOfProcessors = noOfProcessors; | |
this.generatorThreadSleepTime = generatorThreadSleepTime; | |
this.processorThreadSleepTime = processorThreadSleepTime; | |
if (dataTerminationOutput == DATA_DETERMINATION_FALSE) | |
{ | |
this.dataTerminationOutput = false; | |
} | |
Thread t; | |
// Create Data Processors in a 'pool'. | |
for (short pool = 0; pool < noOfProcessors; pool++) | |
{ | |
t = new Thread(new DataProcessor(this)); | |
t.start(); | |
} | |
// Create Data Generators in a 'pool'. | |
for (short pool = 0; pool < noOfGenerators; pool++) | |
{ | |
t = new Thread(new DataGenerator(this)); | |
t.start(); | |
} | |
} | |
/** | |
* @param args DataTerminationOutput(yes/no) or 'help' RunDurationInSeconds DataGenerators DataProcessors GeneratorThreadSleepSeconds ProcessorThreadSleepSeconds | |
*/ | |
public static void main(String[] args) | |
{ | |
short runDuration = 0; | |
short noOfGenerators = 0; | |
short noOfProcessors = 0; | |
short generatorThreadSleepTime = 0; | |
short processorThreadSleepTime = 0; | |
byte dataTerminationOutput = 0; | |
out.println("Thread Pool Example"); | |
out.println("-------------------"); | |
if (args.length > 0) | |
{ | |
if (args[0].equalsIgnoreCase("no")) | |
{ | |
dataTerminationOutput = DATA_DETERMINATION_FALSE; | |
} | |
else if (args[0].equalsIgnoreCase("yes")) | |
{ | |
dataTerminationOutput = DATA_DETERMINATION_TRUE; | |
} | |
else if (args[0].equalsIgnoreCase("help")) | |
{ | |
out.println("Usage: java ThreadPool DataTerminationOutput(yes/no) or 'help' RunDurationInSeconds(max " + Short.MAX_VALUE + ") DataGenerators(max " + Short.MAX_VALUE + ") DataProcessors(max " + Short.MAX_VALUE + ") GeneratorThreadSleepSeconds(max " + Short.MAX_VALUE + ") ProcessorThreadSleepSeconds(max " + Short.MAX_VALUE + ")."); | |
out.println(""); | |
out.println("Program to demonstrate how threads can be employed to generate and consume data."); | |
out.println("In this case a small data class that contains information about when it was"); | |
out.println("created and by which thread. Then one of the threads in the 'thread pool' is"); | |
out.println("allocated to 'consume' it by printing out its details. Results should be more"); | |
out.println("varied on multiple CPU machines. Information printed at the start and statistics"); | |
out.println("at the end showing how many 'data' were not consumed."); | |
out.println(""); | |
out.println("The parameters are as follows (brackets indicate the default):"); | |
out.println("DataTerminationOutput - (yes) or no - state to print out the 'data' details."); | |
out.println("RunDurationInSeconds - (20) - runtime of the program."); | |
out.println("DataGenerators - (5) - number of generating threads."); | |
out.println("DataProcessors - (5) - number of generating threads."); | |
out.println("GeneratorThreadSleepSeconds - (2) - number of seconds(+random element) a thread will sleep for after performing its task."); | |
out.println("ProcessorThreadSleepSeconds - (2) - number of seconds(+random element) a thread will sleep for after performing its task."); | |
System.exit(0); | |
} | |
else | |
{ | |
out.println("Invalid Data Termination Output: " + args[0]); | |
} | |
if (args.length > 1) | |
{ | |
try | |
{ | |
runDuration = Short.parseShort(args[1]); | |
} | |
catch (NumberFormatException ex) | |
{ | |
out.println("Invalid duration: " + args[1]); | |
} | |
} | |
if (args.length > 2) | |
{ | |
try | |
{ | |
noOfGenerators = Short.parseShort(args[2]); | |
} | |
catch (NumberFormatException ex) | |
{ | |
out.println("Invalid number of generators: " + args[2]); | |
} | |
} | |
if (args.length > 3) | |
{ | |
try | |
{ | |
noOfProcessors = Short.parseShort(args[3]); | |
} | |
catch (NumberFormatException ex) | |
{ | |
out.println("Invalid number of generators: " + args[3]); | |
} | |
} | |
if (args.length > 4) | |
{ | |
try | |
{ | |
generatorThreadSleepTime = Short.parseShort(args[4]); | |
} | |
catch (NumberFormatException ex) | |
{ | |
out.println("Invalid generator sleep time: " + args[4]); | |
} | |
} | |
if (args.length > 5) | |
{ | |
try | |
{ | |
processorThreadSleepTime = Short.parseShort(args[5]); | |
} | |
catch (NumberFormatException ex) | |
{ | |
out.println("Invalid processor sleep time: " + args[5]); | |
} | |
} | |
} | |
else | |
{ | |
out.println("Usage: java -jar ThreadPool DataTerminationOutput(yes/no) RunDurationInSeconds(max " + Short.MAX_VALUE + ") DataGenerators(max " + Short.MAX_VALUE + ") DataProcessors(max " + Short.MAX_VALUE + ") GeneratorThreadSleepSeconds(max " + Short.MAX_VALUE + ") ProcessorThreadSleepSeconds(max " + Short.MAX_VALUE + ")."); | |
} | |
if (dataTerminationOutput == 0) | |
{ | |
out.println("Usinf default Data Termination Output of yes."); | |
dataTerminationOutput = DATA_DETERMINATION_TRUE; | |
} | |
if (runDuration == 0) | |
{ | |
out.println("Using runtime duration default of twenty seconds."); | |
runDuration = 20; | |
} | |
if (noOfGenerators == 0) | |
{ | |
out.println("Using defualt of five generators."); | |
noOfGenerators = 5; | |
} | |
if (noOfProcessors == 0) | |
{ | |
out.println("Using default of five processors."); | |
noOfProcessors = 5; | |
} | |
if (generatorThreadSleepTime == 0) | |
{ | |
out.println("Using generator sleep time default of two seconds."); | |
generatorThreadSleepTime = 2; | |
} | |
if (processorThreadSleepTime == 0) | |
{ | |
out.println("Using processor sleep time default of two seconds."); | |
processorThreadSleepTime = 2; | |
} | |
// Create us. | |
ThreadPool us = new ThreadPool(); | |
// Prepare for shutdown... | |
Runtime rt = Runtime.getRuntime(); | |
rt.addShutdownHook(us); | |
// Setup. | |
us.setup(noOfGenerators, noOfProcessors, generatorThreadSleepTime, processorThreadSleepTime, dataTerminationOutput); | |
// Output some information. | |
out.println(); | |
out.println("Thread Pool Information"); | |
out.println("-----------------------"); | |
out.println("Available CPU's : " + rt.availableProcessors()); | |
out.println("Available memory : " + rt.maxMemory() + " bytes."); | |
out.println("Generators : " + noOfGenerators); | |
out.println("Processors : " + noOfProcessors); | |
out.println("Generator Sleep Time : " + generatorThreadSleepTime + " seconds."); | |
out.println("Processor Sleep Time : " + processorThreadSleepTime + " seconds."); | |
out.println("Runtime : " + runDuration + " seconds."); | |
out.println("Output Individual Data: " + getBooleanState(us.dataTerminationOutput)); | |
out.println("Start time : " + us.startTime.toString()); | |
out.println(); | |
// Run for runDuration seconds. | |
try | |
{ | |
ThreadPool.sleep(ONE_SECOND * runDuration); | |
} | |
catch (InterruptedException ex) | |
{ | |
} | |
System.exit(0); | |
} | |
/** | |
* Called at shutdown in a response to a Ctrl-C or system exit. | |
*/ | |
@Override | |
public void run() | |
{ | |
Date endTime = new Date(); | |
out.println(); | |
out.println("Exiting..."); | |
// Output stats. | |
out.println("Data generated : " + dataGenerated); | |
out.println("Data processed : " + dataProcessed); | |
out.println("Unprocessed data : " + dataStore.size()); | |
out.println("Generators : " + noOfGenerators); | |
out.println("Processors : " + noOfProcessors); | |
out.println("Generator Sleep Time : " + generatorThreadSleepTime + " seconds."); | |
out.println("Processor Sleep Time : " + processorThreadSleepTime + " seconds."); | |
out.println("Output Individual Data: " + getBooleanState(dataTerminationOutput)); | |
out.println("Start time : " + startTime.toString()); | |
out.println("End time : " + endTime.toString()); | |
out.println("Run duration : " + (endTime.getTime() - startTime.getTime()) / ONE_SECOND + " seconds."); | |
out.println("Exited"); | |
} | |
/** | |
* Converts a boolean value into 'yes' or 'no'. | |
* | |
* @param state The value to convert. | |
* @return 'yes' or 'no'. | |
*/ | |
private static String getBooleanState(boolean state) | |
{ | |
if (state) | |
{ | |
return "yes"; | |
} | |
else | |
{ | |
return "no"; | |
} | |
} | |
/** | |
* Adds to the count of data that has been generated. | |
*/ | |
private synchronized void addDataGenerated() | |
{ | |
dataGenerated++; | |
} | |
/** | |
* Adds to the count of data that has been processed. | |
*/ | |
private synchronized void addDataProcessed() | |
{ | |
dataProcessed++; | |
} | |
/** | |
* Class that will have many instances in the pool to process the data placed in 'dataStore'. | |
*/ | |
private class DataProcessor implements Runnable | |
{ | |
private short instanceNo; | |
private ThreadPool owner = null; | |
public DataProcessor(ThreadPool owner) | |
{ | |
instanceNo = ++threadProcessorInstanceNoCount; | |
this.owner = owner; | |
} | |
/** | |
* Process the data every 'ProcessorThreadSleepSeconds' + (Instance Number * Random(Max value of 'DataProcessors') milliseconds). | |
*/ | |
@Override | |
public void run() | |
{ | |
Data currentData; | |
while (true) | |
{ | |
synchronized (dataStore) | |
{ | |
while (dataStore.isEmpty()) | |
{ | |
try | |
{ | |
dataStore.wait(); | |
} | |
catch (InterruptedException ex) | |
{ | |
} | |
} | |
currentData = dataStore.remove(0); | |
} | |
if (owner.dataTerminationOutput) | |
{ | |
out.println(currentData.creationDetails() + " and processed by Processor Thread " + instanceNo + " in " + (System.currentTimeMillis() - currentData.timeAdded.getTime()) + " milliseconds"); | |
} | |
owner.addDataProcessed(); // Interesting to note that an inner class object can call a private method on the parent class which is another object. | |
try | |
{ | |
Thread.sleep((processorThreadSleepTime * ONE_SECOND) + (instanceNo * rand.nextInt(noOfProcessors))); | |
} | |
catch (InterruptedException ex) | |
{ | |
} | |
} | |
} | |
} | |
/** | |
* Class to contain data to be processed. | |
*/ | |
private class Data | |
{ | |
private Date timeAdded; | |
private short byThread; | |
public Data(short byThread) | |
{ | |
this.timeAdded = new Date(); | |
this.byThread = byThread; | |
} | |
public String creationDetails() | |
{ | |
return "Data was created by Data Generator Thread " + byThread + " on " + timeAdded.toString(); | |
} | |
} | |
/** | |
* Class to generate data at a pseudo-random interval. | |
*/ | |
private class DataGenerator implements Runnable | |
{ | |
private short instanceNo; | |
private ThreadPool owner = null; | |
public DataGenerator(ThreadPool owner) | |
{ | |
instanceNo = ++dataGeneratorInstanceNoCount; | |
this.owner = owner; | |
} | |
/** | |
* Create the data to be processed every 'GeneratorThreadSleepSeconds' + (Instance Number * Random(Max value of 'DataGenerators') milliseconds). | |
*/ | |
@Override | |
public void run() | |
{ | |
while (true) | |
{ | |
synchronized (dataStore) | |
{ | |
dataStore.add(new Data(instanceNo)); | |
// Experiment with... | |
//dataStore.notifyAll(); | |
// or | |
dataStore.notify(); | |
} | |
owner.addDataGenerated(); // Interesting to note that an inner class object can call a private method on the parent class which is another object. | |
try | |
{ | |
Thread.sleep((generatorThreadSleepTime * ONE_SECOND) + (instanceNo * rand.nextInt(noOfGenerators))); | |
} | |
catch (InterruptedException ex) | |
{ | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment