Skip to content

Instantly share code, notes, and snippets.

@rupert-ong
Last active September 3, 2022 10:53
Show Gist options
  • Save rupert-ong/87c6ce14155bbc692a2628638b621d64 to your computer and use it in GitHub Desktop.
Save rupert-ong/87c6ce14155bbc692a2628638b621d64 to your computer and use it in GitHub Desktop.
Java: Direct Threading, ExecutorService, Future and Callable #java #thread #runnable #future #callable #executorservice
package com.ps;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
public class Adder implements Runnable {
private String inFile, outFile;
public Adder(String inFile, String outFile) {
this.inFile = inFile;
this.outFile = outFile;
}
public void doAdd() throws IOException {
int total = 0;
String line = null;
try(BufferedReader reader = Files.newBufferedReader(Paths.get(inFile))) {
while((line = reader.readLine()) != null) {
total += Integer.parseInt(line);
}
}
try(BufferedWriter writer = Files.newBufferedWriter(Paths.get(outFile))) {
writer.write("Total: " + total);
}
}
@Override
public void run() {
try {
doAdd();
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.ps;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.Callable;
/**
* Callable interface represents task to be run on a thread
* - Can return results or throw exceptions (to caller thread)
* - Only member is call method
*/
public class AdderCallable implements Callable<Integer> {
private String inFile;
public AdderCallable(String inFile) {
this.inFile = inFile;
}
public int doAdd() throws IOException {
int total = 0;
String line = null;
try (BufferedReader reader = Files.newBufferedReader(Paths.get(inFile))) {
while ((line = reader.readLine()) != null) {
total += Integer.parseInt(line);
}
}
return total;
}
/**
* Implement Callable call method to return result or exception
* @return
* @throws IOException
*/
@Override
public Integer call() throws IOException {
return doAdd();
}
}
import com.ps.Adder;
import com.ps.AdderCallable;
import java.io.IOException;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
String[] inputFiles = {"./input.1.txt", "./input.2.txt", "./input.3.txt", "./input.4.txt", "./input.5.txt"};
String[] outputFiles = {"./output.1.txt", "./output.2.txt", "./output.3.txt", "./output.4.txt", "./output.5.txt"};
// doSimpleThread(inputFiles, outputFiles);
// doSimpleThreadWithBlockingWait(inputFiles, outputFiles);
doThreadPoolWithCallableAndFuture(inputFiles);
}
/**
* A thread can be opened to run tasks concurrently. Tasks must
* implement a Runnable interface and its run method (See Adder)
* <p>
* The issue here is that the main thread may shut down before the
* background threads have finished their tasks
*/
static void doSimpleThread(String[] inputFiles, String[] outputFiles) {
for (int i = 0; i < inputFiles.length; i++) {
Adder adder = new Adder(inputFiles[i], outputFiles[i]);
// Multi-threaded way
Thread thread = new Thread(adder);
thread.start();
}
}
/**
* This process will ensure the main thread doesn't shut down before the
* background threads have completed first
*/
static void doSimpleThreadWithBlockingWait(String[] inputFiles, String[] outputFiles) {
Thread[] threads = new Thread[inputFiles.length];
for (int i = 0; i < inputFiles.length; i++) {
Adder adder = new Adder(inputFiles[i], outputFiles[i]);
// Multi-threaded way
threads[i] = new Thread(adder);
threads[i].start();
}
try {
for (Thread thread : threads)
thread.join(); // Blocks waiting for thread completion
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Using (Fixed) Thread Pool abstraction to manage threads using ExecutorService and Executors
*
* @param inputFiles list of files to read from
* @param outputFiles list of files to write into
*/
static void doThreadPool(String[] inputFiles, String[] outputFiles) {
ExecutorService es = Executors.newFixedThreadPool(3);
for (int i = 0; i < inputFiles.length; i++) {
Adder adder = new Adder(inputFiles[i], outputFiles[i]);
es.submit(adder);
}
try {
// Don't allow any new tasks into thread pool
// Willing to wait 60 seconds for the shutdown
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Future allows us to get a result from one or more threads to use
* in the main (calling) thread
* <p>
* Future represents a (future) result of a thread, which can be typed,
* that is returned by ExecutorService.submit(). Future has a get() method
* that blocks until the task is complete, and returns a Callable interface's
* result or throws a Callable's exception
*
* @param inputFiles list of files to read from
*/
static void doThreadPoolWithCallableAndFuture(String[] inputFiles) {
ExecutorService es = Executors.newFixedThreadPool(3);
Future<Integer>[] results = new Future[inputFiles.length];
for (int i = 0; i < inputFiles.length; i++) {
AdderCallable adder = new AdderCallable(inputFiles[i]);
results[i] = es.submit(adder);
}
for (Future<Integer> result : results) {
try {
int value = result.get();
System.out.println(value);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
Throwable adderException = e.getCause(); // Get AdderCallable exception
System.out.println(adderException);
}
// Can catch other Exceptions...
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment