Java - Concurrency

JVM

Thread States

  • java.lang.Thread.State

    NEW

    A thread is in NEW state if you create an instance of Thread class but before invocation of start() method.

    RUNNABLE

    A thread is in RUNNABLE state after invocation of start() method, but the thread scheduler has not selected it to be the running thread.

    BLOCKED

    A thread is in BLOCKED state when it is waiting for a monitor lock to enter a synchronized block/method.

    WAITING

    A thread is in WAITING state when one of the following methods is invoked:

    • Object.wait() with no timeout
    • Thread.join() with no timeout
    • LockSupport.park()

    TIMED_WAITING

    A thread is in TIMED_WAITING state when one of the following methods is invoked with a specified waiting time:

    • Thread.sleep(long millis)
    • Object.wait(long timeout)
    • Thread.join(long millis)
    • LockSupport.parkNanos()
    • LockSupport.parkUntil()

    TERMINATED

    A thread is in TERMINATED state when its run() method exits.

Thread Dump

Note: JFR is the modern way to collect thread dumps.

Thread dumps is a plain text file that contains information about the state of the JVM and all the threads that are running within it.

Thread Pool

Construction

  • 线程池必须手动通过 ThreadPoolExecutor 的构造函数来声明,避免使用 Executors 类创建线程池,会有 OOM 风险。

    Executors返回线程池对象的弊端如下(后文会详细介绍到):

    • newFixedThreadPool()newSingleThreadExecutor()

      使用的是无界LinkedBlockingQueue (default constructor),任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM

    • newCachedThreadPool()

      使用的是同步队列 SynchronousQueue, no queue needed

    • newScheduledThreadPoolnewSingleThreadScheduledExecutor

      使用的无界的延迟阻塞队列 DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM

    说白了就是:使用有界队列,控制线程创建数量

  • Customize thread pool parameters in practice based on computing power and business scenario, such as the number of core threads, the task queue, and so on.

    核心参数

    • corePoolSize

      核心线程数线程数定义了最小可以同时运行的线程数量。

    • maximumPoolSize

      当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。

    • workQueue

      当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

  • Rename the thread pool explicitly with meaningful names to facilitate troubleshooting.

  • Avoid using default Spring thread pool implementation, which creates one thread per request.

Monitoring

Thread pool isolation

  • Use dedicated thread pools, using shared thread pool could cause dead lock.

Thread pool config

  • Thread count

    • 如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的,CPU 根本没有得到充分利用。

    • Too many threads could cause too many context switching and excessive resource usage.

    • Strategy for determining the number of threads needed

      • CPU 密集型任务(N+1)

        这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

      • I/O 密集型任务(2N)

        这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N

      • 线程数更严谨的计算的方法应该是:最佳线程数 = N(CPU 核心数)∗(1+WT(线程等待时间)/ST(线程计算时间)),其中 WT(线程等待时间)=线程运行总时间 - ST(线程计算时间)。

        线程等待时间所占比例越高,需要越多线程。线程计算时间所占比例越高,需要越少线程。

        我们可以通过 JDK 自带的工具 VisualVM 来查看 WT/ST 比例。

        CPU 密集型任务的 WT/ST 接近或者等于 0,因此, 线程数可以设置为 N(CPU 核心数)∗(1+0)= N,和我们上面说的 N(CPU 核心数)+1 差不多。

        IO 密集型任务下,几乎全是线程等待时间,从理论上来说,你就可以将线程数设置为 2N(按道理来说,WT/ST 的结果应该比较大,这里选择 2N 的原因应该是为了避免创建过多线程吧)。

Lock

  • Use non-blocking methods such as tryLock to avoid blocking and provide finer-grained control over behaviors.

    reentrantLockObject.lock() is equivalent to:

    // You can add more control over the behaviors of acquiring locks
    private void acquireLock() {
        var acquired = philosopherOneAtaTime.tryLock();
        while (!acquired) {
            log.debug("{} tries to acquire lock.", philosopherId);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                log.warn(INTERRUPTED, e);
                Thread.currentThread().interrupt();
            }
            acquired = philosopherOneAtaTime.tryLock();
        }
    }

CompletableFuture

CompletableFuture

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
 
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. Task Creation
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            System.out.println("runAsync: Asynchronous task with no return value");
        });
 
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
            System.out.println("supplyAsync: Asynchronous task with a return value");
            return "Hello";
        });
 
        // 2. Async Callbacks
        CompletableFuture<String> thenApply = supplyAsync.thenApply(s -> s + " World");
        CompletableFuture<String> thenCompose = supplyAsync.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " Composed"));
        CompletableFuture<Void> thenAccept = supplyAsync.thenAccept(s -> System.out.println("thenAccept: " + s));
        CompletableFuture<Void> thenRun = supplyAsync.thenRun(() -> System.out.println("thenRun: No input, just run"));
 
        // 3. Composition
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Future1");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Future2");
 
        CompletableFuture<String> thenCombine = future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2);
        CompletableFuture<Void> thenAcceptBoth = future1.thenAcceptBoth(future2, (s1, s2) -> 
            System.out.println("thenAcceptBoth: " + s1 + " and " + s2));
        CompletableFuture<Void> runAfterBoth = future1.runAfterBoth(future2, () -> 
            System.out.println("runAfterBoth: Both done"));
 
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(
            CompletableFuture.supplyAsync(() -> "Result A"),
            CompletableFuture.supplyAsync(() -> "Result B")
        );
        CompletableFuture<String> anyOfResult = anyOf.thenApply(Object::toString);
 
        CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2)
            .thenRun(() -> System.out.println("allOf: All futures completed"));
 
        // applyToEither / acceptEither / runAfterEither
        CompletableFuture<String> fastFuture = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(100); } catch (InterruptedException e) { }
            return "Fast";
        });
        CompletableFuture<String> slowFuture = CompletableFuture.supplyAsync(() -> {
            try { Thread.sleep(500); } catch (InterruptedException e) { }
            return "Slow";
        });
 
        CompletableFuture<String> applyToEither = fastFuture.applyToEither(slowFuture, s -> "Winner: " + s);
        CompletableFuture<Void> acceptEither = fastFuture.acceptEither(slowFuture, s -> 
            System.out.println("acceptEither: " + s));
        CompletableFuture<Void> runAfterEither = fastFuture.runAfterEither(slowFuture, () -> 
            System.out.println("runAfterEither: One finished"));
 
        // 4. Result Handling
        CompletableFuture<String> handle = supplyAsync.handle((result, ex) -> {
            if (ex != null) {
                return "Handled exception: " + ex.getMessage();
            } else {
                return "Handled result: " + result;
            }
        });
 
        CompletableFuture<String> whenComplete = supplyAsync.whenComplete((result, ex) -> {
            if (ex != null) {
                System.out.println("whenComplete - Exception: " + ex.getMessage());
            } else {
                System.out.println("whenComplete - Result: " + result);
            }
        });
 
        CompletableFuture<String> exceptionally = supplyAsync.exceptionally(ex -> {
            System.out.println("exceptionally caught: " + ex.getMessage());
            return "Default value on error";
        });
 
        // 5. Result Retrieval
        try {
            String resultGet = supplyAsync.get(); // Throws checked exceptions
            System.out.println("Result via get(): " + resultGet);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
 
        String resultJoin = supplyAsync.join(); // Throws unchecked CompletionException
        System.out.println("Result via join(): " + resultJoin);
 
        // Wait briefly to allow async tasks to complete (for demo clarity)
        Thread.sleep(600);
    }
}

Concurrency Patterns

Reactor (opens in a new tab)

Thread-local storage (opens in a new tab)

  • Allows storage of data that appears to be global in a system with separate threads to avoid race conditions.