Skip to content

Instantly share code, notes, and snippets.

@rponte
Last active December 30, 2023 14:37
Show Gist options
  • Star 11 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save rponte/83c2542abf6fafd351ae0d2ff0646dae to your computer and use it in GitHub Desktop.
Save rponte/83c2542abf6fafd351ae0d2ff0646dae to your computer and use it in GitHub Desktop.
THEORY: Example of a simple Single Thread Pool implementation in Java
public class HowToUseIt {
/**
* Usually we'll have a single instance per client
*/
private static final SingleThreadPool THREAD_POOL = new SingleThreadPool();
public void executeAsync() {
try {
THREAD_POOL
.runInBackground(new PrintWorker());
} catch (RejectedExecutionException e) {
throw new RuntimeException("There's a thread running already!");
}
}
static class PrintWorker implements Runnable {
@Override
public void run() {
System.out.println("executing in background...");
}
}
}
package br.com.rponte.util.threads;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* ThreadFactory que permite definir o nome das threads criadas,
* facilitando assim tracing e logging das threads.
*
* <p>
* A factory cria threads com nomes na forma de <i>pool-{poolName}-thread-{N}</i>,
* onde <i>{poolName}</i> é a string fornecida no construtor, e <i>{N}</i> é
* o número sequencial da thread criada por essa factory.
*
* Inspirada na classe <code>Executors.DefaultThreadFactory</code> do JDK6.
*
* <p>
* http://dubravsky.com/b/seven-rules-of-executorservice-usage-in-java-rule-3-name-the-threads-of-a-thread-pool
* https://stackoverflow.com/questions/6113746/naming-threads-and-thread-pools-of-executorservice
*/
public class NamedThreadFactory implements ThreadFactory {
private final String poolName;
private final AtomicInteger threadNumber = new AtomicInteger(1);
public NamedThreadFactory(String poolName) {
this.poolName = poolName;
}
@Override
public Thread newThread(Runnable r) {
String threadName = "pool-{poolName}-thread-{N}"
.replace("{poolName}", poolName)
.replace("{N}", String.valueOf(threadNumber.getAndIncrement()))
;
Thread t = new Thread(r, threadName);
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
package br.com.rponte.util.threads;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Pool de thread de tamanho máximo 1. Rejeita execução caso já
* exista uma thread em andamento.
*/
public class SingleThreadPool {
/**
* Pool de thread de tamanho 1.<br/>
* A idéia é não permitir mais do que uma execução simultanea de uma thread
*/
private final ExecutorService THREAD_POOL = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), // rejeita tarefas excendentes
new NamedThreadFactory("my_pool")); // define nome para as threads
/**
* Executa worker em background porém não permite mais do que
* um worker por vez.
*
* Caso exista um processo em execução uma exceção <code>RejectedExecutionException</code>
* será lançada.
*/
public void runInBackground(Runnable worker) throws RejectedExecutionException {
THREAD_POOL.submit(worker);
}
}
@rponte
Copy link
Author

rponte commented Mar 8, 2019

@rponte
Copy link
Author

rponte commented Mar 8, 2019

@rponte
Copy link
Author

rponte commented Mar 12, 2019

@rponte
Copy link
Author

rponte commented Mar 14, 2019

@rponte
Copy link
Author

rponte commented Mar 18, 2019

@rponte
Copy link
Author

rponte commented Mar 18, 2019

Thread pools on the JVM should usually be divided into three categories: 1) CPU-bound; 2) Blocking IO and; 3) Non-blocking IO polling:

Thread Pools

Thread pools on the JVM should usually be divided into the following three categories:

  1. CPU-bound
  2. Blocking IO
  3. Non-blocking IO polling

Each of these categories has a different optimal configuration and usage pattern.

For CPU-bound tasks, you want a bounded thread pool which is pre-allocated and fixed to exactly the number of CPUs. The only work you will be doing on this pool will be CPU-bound computation, and so there is no sense in exceeding the number of CPUs unless you happen to have a really particular workflow that is amenable to hyperthreading (in which case you could go with double the number of CPUs). Note that the old wisdom of "number of CPUs + 1" comes from mixed-mode thread pools where CPU-bound and IO-bound tasks were merged. We won't be doing that.

The problem with a fixed thread pool is that any blocking IO operation (well, any blocking operation at all) will eat a thread, which is an extremely finite resource. Thus, we want to avoid blocking at all costs on the CPU-bound pool. Unfortunately, this isn't always possible (e.g. when being forced to use a blocking IO library). When this is the case, you should always push your blocking operations (IO or otherwise) over to a separate thread pool. This separate thread pool should be caching and unbounded with no pre-allocated size. To be clear, this is a very dangerous type of thread pool. It isn't going to prevent you from just allocating more and more threads as the others block, which is a very dangerous state of affairs. You need to make sure that any data flow which results in running actions on this pool is externally bounded, meaning that you have semantically higher-level checks in place to ensure that only a fixed number of blocking actions may be outstanding at any point in time (this is often done with a non-blocking bounded queue).

The final category of useful threads (assuming you're not a Swing/SWT application) is asynchronous IO polls. These threads basically just sit there asking the kernel whether or not there is a new outstanding async IO notification, and forward that notification on to the rest of the application. You want to handle this with a very small number of fixed, pre-allocated threads. Many applications handle this task with just a single thread! These threads should be given the maximum priority, since the application latency will be bounded around their scheduling. You need to be careful though to never do any work whatsoever on this thread pool! Never ever ever. The moment you receive an async notification, you should be immediately shifting back to the CPU pool. Every nanosecond you spend on the async IO thread(s) is added latency on your application. For this reason, some applications may find slightly better performance by making their async IO pool 2 or 4 threads in size, rather than the conventional 1.

@rponte
Copy link
Author

rponte commented Mar 21, 2019

StackOverflow: How to decide on the ThreadPoolTaskExecutor pools and queue sizes?

Thus, in that case, you should put up a sign telling your client "sorry, we're overbooked, you shouldn't make any new order now, as we won't be able to comply within an acceptable time range".

Then, the queue size would be : acceptable time range / time to complete a task.

Concrete Example : if your client service expects that the task it submits would have to be completed in less than 100 seconds, and knowing that every task takes 1-2 seconds, you should limit the queue to 50-100 tasks because once you have 100 tasks waiting in the queue, you're pretty sure that the next one won't be completed in less than 100 seconds, thus rejecting the task to prevent the service from waiting for nothing.

@rponte
Copy link
Author

rponte commented Mar 29, 2019

@rponte
Copy link
Author

rponte commented Apr 4, 2020

@rponte
Copy link
Author

rponte commented Apr 22, 2020

@rponte
Copy link
Author

rponte commented Jun 2, 2020

@rponte
Copy link
Author

rponte commented Jun 15, 2021

Zalando Blog: How to set an ideal thread pool size

Comentario que fiz no LinkedIn sobre como ajustar o pool size (nada demais)

opa,

sinceramente não tenho algo claro, mas no geral olho pro contexto e workload esperado, e faço os possíveis testes de carga e estresse em >ambiente mais proximo da produção! mas um bom ponto de partida eh a Queuing Theory e a formula Little's Law: >https://vladmihalcea.com/the-simple-scalability-equation/

entender seu tipo de workload ajuda bastante também, por exemplo se é algo CPU-bound ou IO-bound: https://stackoverflow.com/questions/868568/what-do-the-terms-cpu-bound-and-i-o-bound-mean (ex: aplicações web são em geral IO-bound).

nesse gist tem diversas dicas e insights, mas essa em especial sobre ajustar o pool de threads de acordo com o tipo de workload pode ajudar bastante: https://gist.github.com/rponte/83c2542abf6fafd351ae0d2ff0646dae#gistcomment-2865322

enfim, também chuto que não exista uma receita unica para encontrar numeros exatos (sem os devidos testes e monitoramento) dado as variações de contextos e workloads, distribuição dos processos, comunicação entre serviços, tipos de features e por ai vai.

se alguém souber e puder indicar aqui ajudaria bastante :-)

@rponte
Copy link
Author

rponte commented Jun 18, 2021

Martin Fowler Blog: Singular Update Queue

Implement a workqueue and a single thread working off the queue. Multiple concurrent clients can submit state changes to the queue. But a single thread works on state changes.

Choice of the queue

The choice of the queue data structure is an important one to be made. On JVM, there are various data structures available to chose from:

  • ArrayBlockingQueue (Used in Kafka request queue)
    As the name suggests, this is an array-backed blocking queue. This is used when a fixed bounded queue needs to be created. Once the queue fills up, the producer will block. This provides blocking backpressure and is helpful when we have slow consumers and fast producers

  • ConcurrentLinkedQueue along with ForkJoinPool (Used in Akka Actors mailbox implementation)
    ConcurrentLinkedQueue can be used when we do not have consumers waiting for the producer, but there is some coordinator which schedules consumers only after tasks are queued onto the ConcurrentLinkedQueue.

  • LinkedBlockingDeque (Used By Zookeeper and Kafka response queue)
    This is mostly used when unbounded queuing needs to be done, without blocking the producer. We need to be careful with this choice, as the queue might fill up quickly if no backpressure techniques are implemented and can go on consuming all the memory

  • RingBuffer (Used in LMAX Disruptor, )
    As discussed in LMAX Disruptor, sometimes, task processing is latency sensitive. So much so, that copying tasks between processing stages with ArrayBlockingQueue can add to latencies which are not acceptable. RingBuffer can be used in these cases to pass tasks between stages.

@rponte
Copy link
Author

rponte commented Dec 1, 2021

Becareful when working with Spring Boot's @Async because its default thread pool works with an unbounded queue containing only 8 active threads in the pool (in this case, maxSize is ignored). It means that if the creation of new tasks is way faster than the execution of those tasks your application may suffer an OutOfMemoryError.

It's important to configure the thread pool properly for your workload, something like this:

@EnableAsync
@Configuration
class AsyncConfig {

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(24);
        executor.setQueueCapacity(20);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("async-task-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // if queue is full, throw a RejectedExecutionException
        return executor;
    }
 }

Or you can configure it through application.properties:

spring.task.execution.pool.core-size=4
spring.task.execution.pool.max-size=24
spring.task.execution.pool.queue-capacity=20
spring.task.execution.pool.keep-alive=60s
spring.task.execution.pool.allow-core-thread-timeout=true
spring.task.execution.shutdown.await-termination=false
spring.task.execution.shutdown.await-termination-period=
spring.task.execution.thread-name-prefix=async-task-

In this article you can see a more detailed example of how to configure the Spring Boot's thread pool.

Another interesting point here is: the thread pool's default configuration (unbounded queue, poolSize=8 etc) is not configured by Spring itself. In fact, it's overwritten by Spring Boot auto-configure feature in its task module. You can see its new defaults in the TaskExecutionProperties class. Also, Spring Boot's documentation talks a little bit about its defaults.

By the way, it's important to know how the Spring's thread pool executor works. I mean, you don't need to trust me or anything I told you above, but here's what the Spring's documentation says about its thread pool executor:
Screen Shot 2021-12-05 at 11 10 53 - spring thread pool

@rponte
Copy link
Author

rponte commented Dec 27, 2021

Working with Ruby Threads: How Many Threads Are Too Many?

I showed some heuristics for code that is fully IO-bound or fully CPU-bound. In reality, your application is probably not so clear cut. Your app may be IO-bound in some places, and CPU-bound in other places. Your app may not be bound by CPU or IO. It may be memory-bound, or simply not maximizing resources in any way.

[...]

So, as I said at the beginning of this chapter, the only way to a surefire answer is to measure. Run your code with different thread counts, measure the results, and then decide. Without measuring, you may never find the ‘right’ answer.

@rponte
Copy link
Author

rponte commented Nov 16, 2023

Can We Follow a Concrete Formula?

The formula for determining thread pool size can be written as follows:

Number of threads = Number of Available Cores * Target CPU utilization * (1 + Wait time / Service time)

Number of Available Cores: This is the number of CPU cores available to your application. It is important to note that this is not > the same as the number of CPUs, as each CPU may have multiple cores.

Target CPU utilization: This is the percentage of CPU time that you want your application to use. If you set the target CPU utilization too high, your application may become unresponsive. If you set it too low, your application will not be able to fully utilize the available CPU resources.

Wait time: This is the amount of time that threads spend waiting for I/O operations to complete. This can include waiting for network responses, database queries, or file operations.

Service time: This is the amount of time that threads spend performing computation.

Blocking coefficient: This is the ratio of wait time to service time. It is a measure of how much time threads spend waiting for I/O operations to complete relative to the amount of time they spend performing computation.

Example Usage

Suppose you have a server with 4 CPU cores and you want your application to use 50% of the available CPU resources.

Your application has two classes of tasks: I/O-intensive tasks and CPU-intensive tasks.

The I/O-intensive tasks have a blocking coefficient of 0.5, meaning that they spend 50% of their time waiting for I/O operations to complete.

Number of threads = 4 cores * 0.5 * (1 + 0.5) = 3 threads

The CPU-intensive tasks have a blocking coefficient of 0.1, meaning that they spend 10% of their time waiting for I/O operations to complete.

Number of threads = 4 cores * 0.5 * (1 + 0.1) = 2.2 threads

In this example, you would create two thread pools, one for the I/O-intensive tasks and one for the CPU-intensive tasks. The I/O-intensive thread pool would have 3 threads and the CPU-intensive thread pool would have 2 threads.

@rponte
Copy link
Author

rponte commented Dec 30, 2023

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment