Thread

Represents an actual thread of execution

  • Each Thread runs independently and has its own call stack.
  • Create a thread by:
    • Extending the Thread class and overriding its run() method
    • Passing a Runnable object to a Thread constructor
  • Methods to control its execution:
    • start() : Begins the thread's execution, calling its run() method
    • sleep(long millis) : Pauses the current thread for the specified number of milliseconds without releasing locks.
    • join() : Causes the current thread to wait until the thread on which join() is called completes execution.
    • thread1.join(thread2): thread2 can only start after thread1 finishes
    • thread1.join(): the current thread must wait for thread1 to finish
    • interrupt() : Interrupts a thread, signaling it to stop its current operation (commonly used with sleep() or blocking operations).
    • isAlive() : Returns true if the thread has been started and has not yet terminated.

Runnable / Callable

Represents a task that can be executed by a thread.

  • Runnable is a functional interface with a single method: void run().
  • Callable is the same as Runnable but is able to return the result once finished
  • It does not represent a thread itself, only the code that a thread will execute.
  • Create a Runnable task by:
    • Implementing the Runnable interface and overriding run()
    • Using a lambda expression or method reference for short tasks
  • Execute a Runnable task by passing it to a Thread or an ExecutorService:
    • Thread t = new Thread(runnable); t.start(); : Executes the task in a new thread
    • executor.submit(runnable); : Executes the task via an ExecutorService

ExecutorService

A high-level framework for managing and executing tasks asynchronously.

  • Part of java.util.concurrent package.
  • Decouples task submission from thread management, making concurrency easier to handle.
  • Can execute Runnable and Callable tasks.
  • Commonly created using Executors factory methods:
    • Executors.newFixedThreadPool(int n) : A fixed-size pool of threads
    • Executors.newCachedThreadPool() : A dynamically growing pool
    • Executors.newSingleThreadExecutor() : A single-threaded executor
  • Provides advanced configuration:
    • corePoolSize : Minimum number of threads to keep alive
    • maximumPoolSize : Maximum number of threads allowed in the pool
    • keepAliveTime : Time excess idle threads wait for new tasks before terminating
    • workQueue : Queue for holding tasks before they are executed
  • Important methods:
    • execute(Runnable task) : Submits a task for execution without returning a result.
    • submit(Runnable/Callable task) : Submits a task for execution and returns a Future.
    • invokeAll(Collection<Callable> tasks) : Executes a collection of tasks and waits for all to complete.
    • shutdown() : Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks are accepted.
    • shutdownNow() : Attempts to stop all actively executing tasks and halts the processing of waiting tasks.
    • awaitTermination(long timeout, TimeUnit unit) : Blocks until all tasks have completed after a shutdown request, or the timeout occurs.
    • getPoolSize() : Returns the current number of threads in the pool.
    • getActiveCount() : Returns the approximate number of threads that are actively executing tasks.
  • Useful to know:
    • Runtime rt = Runtime.getRuntime(); int cpus = rt.availableProcessors();: to get number of available resource on system
  • Shutting down ExecutorService:
    • .shutdown(): wait for existing tasks to finish without allowing new tasks and threads terminate when work is done
    • .awaitTermination(x, TimeUnit.SECONDS): wait for x seconds before executing next
    • .shutdownNow(): forced termination and return a list containing tasks that never started execution
      • It can only interrupts threads that respond to interrupts
      • If the thread is blocked when it gets interrupted then it will throw an InterruptedException and need to be handle properly
      • Do not use while(true) or the thread will ignore the interrupt
      • // interrupt-aware task
         while (!Thread.currentThread().isInterrupted()) {
            doWork();
        } 
        
        // ===========================================
        // Handling InteruptedException    
        
        while (!Thread.currentThread().isInterrupted()) {
            try {
                blockingQueue.take();               // blocking method will cause InterruptedException if thread is blocked when it gets interrupted
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // restore flag to properly end thread
                break; // exit loop
            }
            doWork();
        }

  • Typical Shutdown Flow:
// soft shutdown start
executor.shutdown();

try {
    // wait 60s before calling hard shutdown
    if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
        executor.shutdownNow();
    }
} catch (InterruptedException e) {
    // Thread responsible for awaitingTermination got interrupted while waiting so we want to stop all tasks NOW instead of wait
    executor.shutdownNow();
    Thread.currentThread().interrupt();
} finally {
    if(!ex.isTerminated()) {
      // a collection of the unfinished tasks
      List<Runnable> unfinished = ex.shutdownNow();
      System.err.println("Pool did not terminate");  
    }
}

Data Structures

Common data structures for synchronization:

  • CopyOnWrite Collections:
    • Strongly consistent collection; iterators will take snapshot of a CopyOnWrite collections
    • Ideals for...
      • Reads >> Writes
      • Many threads read concurrently
      • iteration must be safe without locks
    • Examples:
      • eveny listener lists
      • configuration lists
      • routing tables
      • plugin registries
    • Avoid if...
      • Frequent writes
      • Large collection
      • Memory sensitive systems

  • Concurrent Collections: allow non-blocking reads
    • Weakly consistent collection
    • ConcurrentHashMap
    • ConcurrentLinkedQueue | ConcurrentLinkedDeque
    • Also has method .poll(x) to have consumer wait x sec before checking again instead of infinite wait
    • Examples:
      • Web servers
      • Message queues
      • Caches
      • Event systems

    • BlockingQueue
      • ArrayBlockingQueue | LinkedBlockingQueue | PriorityBlockingQueue | DelayQueue | SynchronousQueue (zero capacity queue)
      • Methods:
        • put(E e): waits until space becomes available, then inserts element
        • take(): waits until an element is available, then removes it
        • offer(E e, long timeout, TimeUnit unit): waits up to the timeout for space to become available
        • poll(long timeout, TimeUnit unit): waits up to the timeout for an element

        • add(E e): inserts element, throws IllegalStateException if the queue is full
        • remove(): removes and returns the head element, throws exception if the queue is empty
        • element(): returns the head element without removing it, throws exception if the queue is empty
        • offer(E e): inserts element, returns false if the queue is full
        • poll(): removes and returns the head element, returns null if the queue is empty
        • peek(): returns the head element without removing it, returns null if the queue is empty
    • BlockingDeque: double-ended queue
      • LinkedBlockingDeque

    • SynchronousQueue: zero-capacity queue

    • LinkedTransferQueue: same as normal queue but allow for direct transfer of item to a waiting consumer without storing the item first
      • reduces latency
      • Methods:
        • transfer(E e): transfers the element to a waiting consumer, blocking until the consumer receives it
        • tryTransfer(E e): immediately transfers the element if a consumer is already waiting; returns false otherwise
        • tryTransfer(E e, long timeout, TimeUnit unit): waits up to the specified timeout for a consumer to receive the element
        • hasWaitingConsumer(): returns true if there is at least one consumer waiting to receive an element
        • getWaitingConsumerCount(): returns an estimate of the number of consumers waiting to receive elements

  • Atomic Structures
    • AtomicInteger | AtomicLong | AtomicReference

  • CyclicBarrier
    • synchronization tool that let a group of threads wait for each other to reach a common point before continuing
    • Methods:
      • await(): waits until all parties reach the barrier
      • await(long timeout, TimeUnit unit): waits until all parties reach the barrier or the timeout expires
      • getParties(): returns the number of threads required to trip the barrier
      • getNumberWaiting(): returns the number of threads currently waiting at the barrier
      • isBroken(): returns true if the barrier is in a broken state
      • reset(): resets the barrier to its initial state
    • Examples:
      • Physics simulation
      • Game engine
      • Matrix calculations
      • Parallel algorithms

Synchronized/Monitor Blocks

Use to enforce mutual exclusion in an encapsulated manner

  • Manages a lock object to determine which thread can enter synchronized block
    • lock object should be private final Object, but Java accept any object
    • Calling lock.wait() suspend the current thread, release the lock, and free-up resources used by the now-waiting thread
      • Always call lock.wait() inside a while loop because multiple threads generally wake up at the same time, and threads are not allowed to immediately run after waking because the lock has not been released by the notifying thread
    • Calling lock.notifyAll() wake up all waiting threads, but the notifying thread still has the lock
  • Also, uses a conditional variable to check it's alright to run the critical section code
    • Conditional variable do not have to be atomic if only accessed inside the synchronized block

Key Methods:

  • synchronized(lock): acquire the lock to enter syncrhonized block
    • lock is released automatically after leaving syncrhonized block
  • lock.wait(): releases lock and suspend thread to free up resources
    • lock.sleep(): pause thread and does NOT release lock
  • lock.notifyAll(): keep lock while waking up all waiting threads
    • lock.notify(): wake up a random thread
private final Object lock = new Object();
private final boolean isFull = false;

public void put(item){
  synchronized(lock){
    while(isFull){
      lock.wait();
    }
    // critical section
    enqueue(item);

    // Make sure to wake up all waiting threads
    lock.notifyAll();
  }
}

Synchronized Method & Static

If synchronized is used on the whole method, then the lock is this - the current instance of the class

  • Eliminate needs for a lock object
  • static synchronized method would use the class, MyClass.class, itself as the lock causing all MyClass instance to be locked
    • static variable behave similarly
private final boolean isFull = false;

public synchronized void put(item){
  while(isFull){
     lock.wait();
  }
  // critical section
  enqueue(item);

  // Make sure to wake up all waiting threads
  lock.notifyAll();
}

Class Design Tips

  • Rule of thumb: share objects, not static state
    • Non-static version allows multiple TaskQueue of different types to be created like Queue A (CPU tasks) and Queue B (I/O tasks)
    // GOOD DESIGN
    
    public class TaskQueue{
      private queue;
      public synchronized void put() {}
    }
    ====================================
    // BAD DESIGN
    
    public class TaskQueue{
      private static queue;
      public static synchronized void put() {}
    }
  • Separation of concerns:
    • In multithreading, there are 2 needs:
      • Creating and scheduling some Java code for execution
      • Optimizing the execution of that code for the hardware resources you have available
    • Executors was designed to deal with number 2

Fork / Join Framework

The Fork / Join framework provide a way for programmers to do with one BIG problem utilizing threads; like filling a 100mil elements array with randomly generated value

  • Any problem that can be recursively divided can be solved using ForkJoinPool
  • The problem is broken down into subtask using divide-and-conquer
  • If the problem is broken into too many subtasks, then the overhead can cause sequential work to be better than parallel work
    • ForkJoinPool: thread pool that manages ForkJoinTask
      • ForkJoinPool pool = new ForkJoinPool(): create the pool to manage threads
      • pool.invoke(ForkJoinTask): perform the task and return the value if it's a RecursiveTask
    • ForkJoinTask: abstract class for task with the following concrete subclasses
      • RecursiveAction: returns nothing
      • RecursiveTask<V>: returns a value of type V
    • Methods of ForkJoinTask:
      • [ForkJoinTask].fork(): submit a subtask to the pool for parallel execution
      • [ForkJoinTask].join(): wait for a subtask to finish and get the result; normally called at the end of compute on the subtask that was forked
      • compute(): abstract method that must be implemented and where you define how the task is divided
      • invokeAll(task1, task2): shorthand way to use the 3 methods above
        • Slightly less efficient because invokeAll fork both tasks so it creates more threads and then wait

  • RecursiveAction Example:
  • // RecursiveAction (no return value)
    
    @Override
    protected void compute() {
      ...
      MyRecursiveAction a1 = new MyRecursiveAction(data, start, halfWay);
      MyRecursiveAction a2 = new MyRecursiveAction(data, halfWay, end);
    
      a1.fork();       // queue left half of task
      a2.compute();    // work on right half of task
      a1.join();       // wait for queued task to be completed
    }     
    
    // ============================================================
    // Using invokeAll
    
    MyRecursiveAction a1 = new MyRecursiveAction(data, start, halfWay);
    MyRecursiveAction a2 = new MyRecursiveAction(data, halfWay, end);
    invokeAll(a2, a1);

  • RecursiveTask<Integer> Example:
  • // RecursiveTask (returning int)
    
    @Override
    protected Integer compute() {
      ...
      FindMaxTask a1 = new FindMaxTask(data, start, halfWay);
      FindMaxTask a2 = new FindMaxTask(data, halfWay, end);
    
      a1.fork();                     // queue left half of task
      int result2 = a2.compute();    // work on right half of task
      int result1 = a1.join();       // wait for queued task to be completed
    
      return result2 > result1 ? result2 : result1;
    }     
    
    // ============================================================
    // Using invokeAll
    
    FindMaxTask a1 = new FindMaxTask(data, start, halfWay);
    FindMaxTask a2 = new FindMaxTask(data, halfWay, end);
    
    invokeAll(a2, a1);
    int result2 = a2.join();
    int result1 = a1.join();
    
    return result2 > result1 ? result2 : result1;

  • Overall Example:
  • public class SumTask extends RecursiveTask<Long> {
          private final long[] array;
            private final int start, end;
            private final int THRESHOLD = 1000; // adjust for optimal performance
    
            SumTask(long[] array, int start, int end) {
                this.array = array;
                this.start = start;
                this.end = end;
            }
    
            @Override
            protected Long compute() {
                int length = end - start;
                if (length <= THRESHOLD) {
                    long sum = 0;
                    for (int i = start; i < end; i++) sum += array[i];
                    return sum;
                } else {
                    int mid = start + length / 2;
                    SumTask left = new SumTask(array, start, mid);
                    SumTask right = new SumTask(array, mid, end);
    
                    // Fork left task to run asynchronously
                    left.fork();
    
                    // Compute right task in current thread
                    long rightResult = right.compute();
    
                    // Wait for left task to finish and get result
                    long leftResult = left.join();
    
                    // Combine results
                    return leftResult + rightResult;
                }
            }
    }
    
    // ================================================================
    // in main()
    
    public static void main(String[] args) {
            // Create a large array
            long[] array = new long[10_000_000];
            for (int i = 0; i < array.length; i++) array[i] = i + 1;
    
            // Create a ForkJoinPool
            ForkJoinPool pool = new ForkJoinPool();
    
            // Submit the main task and get the result
            long sum = pool.invoke(new SumTask(array, 0, array.length));
    
            System.out.println("Sum = " + sum);
    }

Parallel Streams

Parallel Streams is an abstraction of Fork/Join framework, but only work on streams/collections

  • Allows for parallel processing of collection data
  • .parallel() for IntStream | LongStream | DoubleStream
  • Arrays.stream(array)
          .parallel()
          .average();   // returns OptionalDouble
  • .parallelStream() and .collect() for collections
  • List<String> result = list.parallelStream()
                              .filter(s -> s.length() > 3)
                              .collect(Collectors.toList());

  • Transformative Methods
    • map(): Transform each element
    • mapToInt(), mapToLong(), mapToDouble(): Transform to primitive streams for numeric operations
    • filter(): Keep only elements matching a predicate
    • distinct(): Remove duplicates
    • sorted(): Sort the elements (parallel sort internally)
    • limit(n) / skip(n): Truncate or skip elements
    • flatMap(): Flatten nested streams
    • peek(): Inspect elements without modifying them

  • Terminal Methods: return a final result
    • forEach(): Apply an action to each element (order not guaranteed in parallel)
    • forEachOrdered(): Apply action in encounter order (slower in parallel)
    • collect(): Gather elements into a collection or map (e.g., Collectors.toList(), toSet(), groupingBy())
    • reduce(): Combine elements to a single value using a binary operator
    • min(), max(): Get the minimum or maximum element
    • count(): Count elements matching the pipeline
    • anyMatch(), allMatch(), noneMatch(): Boolean checks over elements
    • findAny(), findFirst(): Return one element (findAny can be faster in parallel)
    • sum(), average(), summaryStatistics(): Numeric aggregations for primitive streams

  • Gotchas:
    • When using transformative methods (map or filter), racing condition can be a problem if the method perform write on a shared variable/object
    • Methods that are order-sensitive (sorted() | limit() | skip() | distinct() | forEachOrdered() | findFirst()) can be slower in parallel
      • Ordering can be removed from the above by calling .unordered() and can improve performance, but cause random result
    List<Integer> nums = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    long sum = nums.stream()
                 .unordered()     // make the stream unordered
                 .parallel()  
                 .sum()

If Collecting Items into a List

  • Do NOT use .add(item) to add item to a list because it will cause racing condition
  • Instead use .collect(Collectors.toList()) or a synchronized list (but synchronization will impact performance)
  • // Bad way that will cause racing condition
    
    List<Dog> dogsOlderThan7 = new ArrayList<>();
    long count = dogs.stream()                // stream the dogs
        .unordered()                          // make the stream unordered
        .parallel()                           // make the stream parallel
        .filter(d -> d.getAge() > 7)          // filter the dogs
        .peek(d -> dogsOlderThan7.add(d))     // save... with a side effect -> cause racing condition
        .count();
    
    //====================================================================
    // Good way using collect()
    
    List<Dog> dogsOlderThan7 =
        dogs.stream()                       // stream the dogs
            .unordered()                    // make the stream unordered
            .parallelo                      // make the stream parallel
            .filter(d -> d.getAge() > 7)    // filter dogs older than 7
            .collect(Collectors.toList());  // collect older dogs into a List -> safer

Semaphore

Semaphores are low-level objects to synchronize threads, but offer more flexibility like handling N instances of a limited resource

Key Methods:

  • emptySlots.acquire(): request if there are available resources
    • Check if emptySlots > 0 and if is then permit the thread to continue and at the same time decrement emptySlots--, else cause thread to wait/block it
  • emptySlots.release(): wake up all waiting threads that were caused by emptySlots.acquire() and at the same time increment emptySlots++
private final Semaphore mutex = new Semaphore(1); -> mutual exclusion lock
private final Semaphore emptySlots = new Semaphore(10); -> capacity or N resources
private final Semaphore usedSlots = new Semaphore(0); 

public void put(item) {
  emptySlots.acquire();
  mutex.acquire();

  // critical section
  enqueue(item);

  mutex.release();
  usedSlots.release();
}

public T pop() {
  usedSlots.acquire();
  mutex.acquire();

  // critical section
  T = pop(item);

  mutex.release();
  emptySlots.release();

  return T;
}

ReentrantLock

Similar to synchronized block, but offer more flexibility like interruptible lock, first-come-first-served lock, and allow finer control of which thread wake up like semaphore

Key Methods:

  • lock.lock(): acquire the lock
  • lock.unlock(): release the lock
    • Should always be put in final block in case thread throws Exception
  • notFull.await(): wait and suspend thread until notFull.signal() is called
  • notFull.signal(): wake up a thread that is waiting because of notFull.await() is called
    • notFull.signalAll(): wake up all threads tied to the condition notFull
private final ReentrantLock lock = new ReentrantLock();

private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

public void put(item) {
    lock.lock();
    try {
        while (isFull) {
            notFull.await();   // while Full, will wait until notFull
         }

        // critical section
        enqueue(item);

        notEmpty.signal(); // equivalent to notifyAll()
     } finally {
        lock.unlock();
     }
}         

public T pop() {
    lock.lock();
    try {
      while (isEmpty) {
           notEmpty.await();  // while Empty, will wait until notEmpty
      }

      // critical section
      enqueue(item);

      notFull.signal(); 
    } finally {
      lock.unlock();
    }
}

ReentrantReadWriteLock

This lock separates readers and writers, and allow multiple readers in the critical section unless a writer is in it

Key Methods:

  • lock.readLock().lock(): acquire the lock for the purpose of reading, blocking any writer from entering
    • lock.readlock().unlock(): release the lock; should be placed in final block
  • lock.writeLock().lock(): acquire the lock for the purpose of writing, blocking both readers and writer from entering
    • lock.writeLock().unlock(): release the lock; should be placed in final block
class Cache {

    private final ReentrantReadWriteLock rwLock =
        new ReentrantReadWriteLock();

    public int read() {

        rwLock.readLock().lock();
        try {
            //critical section
            return value;
        } finally {
            rwLock.readLock().unlock();
        }
    }

    public void write(int newValue) {

        rwLock.writeLock().lock();
        try {
            //critical section
            value = newValue;
        } finally {
            rwLock.writeLock().unlock();
        }
    }
}

RAG & Wait-Graph Diagram

Resource Allocation Graph (RAG) and wait-graph can be used to identify deadlocks in a system: Diagram Section