Thread
Represents an actual thread of execution
- Each
Threadruns independently and has its own call stack. - Create a thread by:
- Extending the
Threadclass and overriding itsrun()method - Passing a
Runnableobject to aThreadconstructor - Methods to control its execution:
start(): Begins the thread's execution, calling itsrun()methodsleep(long millis): Pauses the current thread for the specified number of milliseconds without releasing locks.join(): Causes the current thread to wait until the thread on whichjoin()is called completes execution.thread1.join(thread2): thread2 can only start after thread1 finishesthread1.join(): the current thread must wait for thread1 to finishinterrupt(): Interrupts a thread, signaling it to stop its current operation (commonly used withsleep()or blocking operations).isAlive(): Returnstrueif the thread has been started and has not yet terminated.
Runnable / Callable
Represents a task that can be executed by a thread.
Runnableis a functional interface with a single method:void run().Callableis the same asRunnablebut is able to return the result once finished- It does not represent a thread itself, only the code that a thread will execute.
- Create a Runnable task by:
- Implementing the
Runnableinterface and overridingrun() - Using a lambda expression or method reference for short tasks
- Implementing the
- Execute a Runnable task by passing it to a
Threador anExecutorService:Thread t = new Thread(runnable); t.start();: Executes the task in a new threadexecutor.submit(runnable);: Executes the task via an ExecutorService
ExecutorService
A high-level framework for managing and executing tasks asynchronously.
- Part of
java.util.concurrentpackage. - Decouples task submission from thread management, making concurrency easier to handle.
- Can execute
RunnableandCallabletasks. - Commonly created using
Executorsfactory methods:Executors.newFixedThreadPool(int n): A fixed-size pool of threadsExecutors.newCachedThreadPool(): A dynamically growing poolExecutors.newSingleThreadExecutor(): A single-threaded executor
- Provides advanced configuration:
corePoolSize: Minimum number of threads to keep alivemaximumPoolSize: Maximum number of threads allowed in the poolkeepAliveTime: Time excess idle threads wait for new tasks before terminatingworkQueue: Queue for holding tasks before they are executed
- Important methods:
execute(Runnable task): Submits a task for execution without returning a result.submit(Runnable/Callable task): Submits a task for execution and returns aFuture.invokeAll(Collection<Callable> tasks): Executes a collection of tasks and waits for all to complete.shutdown(): Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks are accepted.shutdownNow(): Attempts to stop all actively executing tasks and halts the processing of waiting tasks.awaitTermination(long timeout, TimeUnit unit): Blocks until all tasks have completed after a shutdown request, or the timeout occurs.getPoolSize(): Returns the current number of threads in the pool.getActiveCount(): Returns the approximate number of threads that are actively executing tasks.
- Useful to know:
Runtime rt = Runtime.getRuntime(); int cpus = rt.availableProcessors();: to get number of available resource on system- Shutting down ExecutorService:
.shutdown(): wait for existing tasks to finish without allowing new tasks and threads terminate when work is done.awaitTermination(x, TimeUnit.SECONDS): wait for x seconds before executing next.shutdownNow(): forced termination and return a list containing tasks that never started execution- It can only interrupts threads that respond to interrupts
- If the thread is blocked when it gets interrupted then it will throw an
InterruptedExceptionand need to be handle properly - Do not use
while(true)or the thread will ignore the interrupt
// interrupt-aware task
while (!Thread.currentThread().isInterrupted()) {
doWork();
}
// ===========================================
// Handling InteruptedException
while (!Thread.currentThread().isInterrupted()) {
try {
blockingQueue.take(); // blocking method will cause InterruptedException if thread is blocked when it gets interrupted
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // restore flag to properly end thread
break; // exit loop
}
doWork();
}// soft shutdown start
executor.shutdown();
try {
// wait 60s before calling hard shutdown
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
// Thread responsible for awaitingTermination got interrupted while waiting so we want to stop all tasks NOW instead of wait
executor.shutdownNow();
Thread.currentThread().interrupt();
} finally {
if(!ex.isTerminated()) {
// a collection of the unfinished tasks
List<Runnable> unfinished = ex.shutdownNow();
System.err.println("Pool did not terminate");
}
}Data Structures
Common data structures for synchronization:
- CopyOnWrite Collections:
- Strongly consistent collection; iterators will take snapshot of a CopyOnWrite collections
- Ideals for...
- Reads >> Writes
- Many threads read concurrently
- iteration must be safe without locks
- Examples:
- eveny listener lists
- configuration lists
- routing tables
- plugin registries
- Avoid if...
- Frequent writes
- Large collection
- Memory sensitive systems
- Concurrent Collections: allow non-blocking reads
- Weakly consistent collection
ConcurrentHashMapConcurrentLinkedQueue | ConcurrentLinkedDeque- Also has method
.poll(x)to have consumer wait x sec before checking again instead of infinite wait - Examples:
- Web servers
- Message queues
- Caches
- Event systems
- BlockingQueue
ArrayBlockingQueue | LinkedBlockingQueue | PriorityBlockingQueue | DelayQueue | SynchronousQueue (zero capacity queue)- Methods:
put(E e): waits until space becomes available, then inserts elementtake(): waits until an element is available, then removes itoffer(E e, long timeout, TimeUnit unit): waits up to the timeout for space to become availablepoll(long timeout, TimeUnit unit): waits up to the timeout for an elementadd(E e): inserts element, throwsIllegalStateExceptionif the queue is fullremove(): removes and returns the head element, throws exception if the queue is emptyelement(): returns the head element without removing it, throws exception if the queue is emptyoffer(E e): inserts element, returnsfalseif the queue is fullpoll(): removes and returns the head element, returnsnullif the queue is emptypeek(): returns the head element without removing it, returnsnullif the queue is empty- BlockingDeque: double-ended queue
LinkedBlockingDeque- SynchronousQueue: zero-capacity queue
- LinkedTransferQueue: same as normal queue but allow for direct transfer of item to a waiting consumer without storing the item first
- reduces latency
- Methods:
transfer(E e): transfers the element to a waiting consumer, blocking until the consumer receives ittryTransfer(E e): immediately transfers the element if a consumer is already waiting; returnsfalseotherwisetryTransfer(E e, long timeout, TimeUnit unit): waits up to the specified timeout for a consumer to receive the elementhasWaitingConsumer(): returnstrueif there is at least one consumer waiting to receive an elementgetWaitingConsumerCount(): returns an estimate of the number of consumers waiting to receive elements- Atomic Structures
AtomicInteger | AtomicLong | AtomicReference- CyclicBarrier
- synchronization tool that let a group of threads wait for each other to reach a common point before continuing
- Methods:
await(): waits until all parties reach the barrierawait(long timeout, TimeUnit unit): waits until all parties reach the barrier or the timeout expiresgetParties(): returns the number of threads required to trip the barriergetNumberWaiting(): returns the number of threads currently waiting at the barrierisBroken(): returnstrueif the barrier is in a broken statereset(): resets the barrier to its initial state- Examples:
- Physics simulation
- Game engine
- Matrix calculations
- Parallel algorithms
Synchronized/Monitor Blocks
Use to enforce mutual exclusion in an encapsulated manner
- Manages a lock object to determine which thread can enter
synchronizedblock - lock object should be
private final Object, but Java accept any object - Calling
lock.wait()suspend the current thread, release the lock, and free-up resources used by the now-waiting thread - Always call
lock.wait()inside awhileloop because multiple threads generally wake up at the same time, and threads are not allowed to immediately run after waking because the lock has not been released by the notifying thread - Calling
lock.notifyAll()wake up all waiting threads, but the notifying thread still has the lock - Also, uses a conditional variable to check it's alright to run the critical section code
- Conditional variable do not have to be atomic if only accessed inside the
synchronizedblock
Key Methods:
synchronized(lock): acquire the lock to entersyncrhonizedblock- lock is released automatically after leaving
syncrhonizedblock lock.wait(): releases lock and suspend thread to free up resourceslock.sleep(): pause thread and does NOT release locklock.notifyAll(): keep lock while waking up all waiting threadslock.notify(): wake up a random thread
private final Object lock = new Object();
private final boolean isFull = false;
public void put(item){
synchronized(lock){
while(isFull){
lock.wait();
}
// critical section
enqueue(item);
// Make sure to wake up all waiting threads
lock.notifyAll();
}
}Synchronized Method & Static
If synchronized is used on the whole method, then the lock is this - the current instance of the class
- Eliminate needs for a
lockobject static synchronizedmethod would use the class,MyClass.class, itself as the lock causing allMyClassinstance to be lockedstaticvariable behave similarly
private final boolean isFull = false;
public synchronized void put(item){
while(isFull){
lock.wait();
}
// critical section
enqueue(item);
// Make sure to wake up all waiting threads
lock.notifyAll();
}Class Design Tips
- Rule of thumb: share objects, not static state
- Non-static version allows multiple
TaskQueueof different types to be created likeQueue A (CPU tasks)andQueue B (I/O tasks)
// GOOD DESIGN
public class TaskQueue{
private queue;
public synchronized void put() {}
}
====================================
// BAD DESIGN
public class TaskQueue{
private static queue;
public static synchronized void put() {}
}- In multithreading, there are 2 needs:
- Creating and scheduling some Java code for execution
- Optimizing the execution of that code for the hardware resources you have available
Executorswas designed to deal with number 2
Fork / Join Framework
The Fork / Join framework provide a way for programmers to do with one BIG problem utilizing threads; like filling a 100mil elements array with randomly generated value
- Any problem that can be recursively divided can be solved using
ForkJoinPool - The problem is broken down into subtask using divide-and-conquer
- If the problem is broken into too many subtasks, then the overhead can cause sequential work to be better than parallel work
ForkJoinPool: thread pool that managesForkJoinTaskForkJoinPool pool = new ForkJoinPool(): create the pool to manage threadspool.invoke(ForkJoinTask): perform the task and return the value if it's aRecursiveTaskForkJoinTask: abstract class for task with the following concrete subclassesRecursiveAction: returns nothingRecursiveTask<V>: returns a value of typeV- Methods of
ForkJoinTask: [ForkJoinTask].fork(): submit a subtask to the pool for parallel execution[ForkJoinTask].join(): wait for a subtask to finish and get the result; normally called at the end of compute on the subtask that was forkedcompute(): abstract method that must be implemented and where you define how the task is dividedinvokeAll(task1, task2): shorthand way to use the 3 methods above- Slightly less efficient because
invokeAllfork both tasks so it creates more threads and then wait - RecursiveAction Example:
// RecursiveAction (no return value)
@Override
protected void compute() {
...
MyRecursiveAction a1 = new MyRecursiveAction(data, start, halfWay);
MyRecursiveAction a2 = new MyRecursiveAction(data, halfWay, end);
a1.fork(); // queue left half of task
a2.compute(); // work on right half of task
a1.join(); // wait for queued task to be completed
}
// ============================================================
// Using invokeAll
MyRecursiveAction a1 = new MyRecursiveAction(data, start, halfWay);
MyRecursiveAction a2 = new MyRecursiveAction(data, halfWay, end);
invokeAll(a2, a1);// RecursiveTask (returning int)
@Override
protected Integer compute() {
...
FindMaxTask a1 = new FindMaxTask(data, start, halfWay);
FindMaxTask a2 = new FindMaxTask(data, halfWay, end);
a1.fork(); // queue left half of task
int result2 = a2.compute(); // work on right half of task
int result1 = a1.join(); // wait for queued task to be completed
return result2 > result1 ? result2 : result1;
}
// ============================================================
// Using invokeAll
FindMaxTask a1 = new FindMaxTask(data, start, halfWay);
FindMaxTask a2 = new FindMaxTask(data, halfWay, end);
invokeAll(a2, a1);
int result2 = a2.join();
int result1 = a1.join();
return result2 > result1 ? result2 : result1;public class SumTask extends RecursiveTask<Long> {
private final long[] array;
private final int start, end;
private final int THRESHOLD = 1000; // adjust for optimal performance
SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) sum += array[i];
return sum;
} else {
int mid = start + length / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
// Fork left task to run asynchronously
left.fork();
// Compute right task in current thread
long rightResult = right.compute();
// Wait for left task to finish and get result
long leftResult = left.join();
// Combine results
return leftResult + rightResult;
}
}
}
// ================================================================
// in main()
public static void main(String[] args) {
// Create a large array
long[] array = new long[10_000_000];
for (int i = 0; i < array.length; i++) array[i] = i + 1;
// Create a ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
// Submit the main task and get the result
long sum = pool.invoke(new SumTask(array, 0, array.length));
System.out.println("Sum = " + sum);
}Parallel Streams
Parallel Streams is an abstraction of Fork/Join framework, but only work on streams/collections
- Allows for parallel processing of collection data
.parallel()forIntStream | LongStream | DoubleStream
Arrays.stream(array)
.parallel()
.average(); // returns OptionalDouble.parallelStream() and .collect() for collectionsList<String> result = list.parallelStream()
.filter(s -> s.length() > 3)
.collect(Collectors.toList());map(): Transform each elementmapToInt(),mapToLong(),mapToDouble(): Transform to primitive streams for numeric operationsfilter(): Keep only elements matching a predicatedistinct(): Remove duplicatessorted(): Sort the elements (parallel sort internally)limit(n)/skip(n): Truncate or skip elementsflatMap(): Flatten nested streamspeek(): Inspect elements without modifying them
forEach(): Apply an action to each element (order not guaranteed in parallel)forEachOrdered(): Apply action in encounter order (slower in parallel)collect(): Gather elements into a collection or map (e.g.,Collectors.toList(),toSet(),groupingBy())reduce(): Combine elements to a single value using a binary operatormin(),max(): Get the minimum or maximum elementcount(): Count elements matching the pipelineanyMatch(),allMatch(),noneMatch(): Boolean checks over elementsfindAny(),findFirst(): Return one element (findAnycan be faster in parallel)sum(),average(),summaryStatistics(): Numeric aggregations for primitive streams
- When using transformative methods (
map or filter), racing condition can be a problem if the method perform write on a shared variable/object - Methods that are order-sensitive (
sorted() | limit() | skip() | distinct() | forEachOrdered() | findFirst()) can be slower in parallel - Ordering can be removed from the above by calling
.unordered()and can improve performance, but cause random result
List<Integer> nums = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
long sum = nums.stream()
.unordered() // make the stream unordered
.parallel()
.sum()If Collecting Items into a List
- Do NOT use
.add(item)to add item to a list because it will cause racing condition - Instead use
.collect(Collectors.toList())or a synchronized list (but synchronization will impact performance)
// Bad way that will cause racing condition
List<Dog> dogsOlderThan7 = new ArrayList<>();
long count = dogs.stream() // stream the dogs
.unordered() // make the stream unordered
.parallel() // make the stream parallel
.filter(d -> d.getAge() > 7) // filter the dogs
.peek(d -> dogsOlderThan7.add(d)) // save... with a side effect -> cause racing condition
.count();
//====================================================================
// Good way using collect()
List<Dog> dogsOlderThan7 =
dogs.stream() // stream the dogs
.unordered() // make the stream unordered
.parallelo // make the stream parallel
.filter(d -> d.getAge() > 7) // filter dogs older than 7
.collect(Collectors.toList()); // collect older dogs into a List -> saferSemaphore
Semaphores are low-level objects to synchronize threads, but offer more flexibility like handling N instances of a limited resource
Key Methods:
emptySlots.acquire(): request if there are available resources- Check if
emptySlots > 0and if is then permit the thread to continue and at the same time decrementemptySlots--, else cause thread to wait/block it emptySlots.release(): wake up all waiting threads that were caused byemptySlots.acquire()and at the same time incrementemptySlots++
private final Semaphore mutex = new Semaphore(1); -> mutual exclusion lock
private final Semaphore emptySlots = new Semaphore(10); -> capacity or N resources
private final Semaphore usedSlots = new Semaphore(0);
public void put(item) {
emptySlots.acquire();
mutex.acquire();
// critical section
enqueue(item);
mutex.release();
usedSlots.release();
}
public T pop() {
usedSlots.acquire();
mutex.acquire();
// critical section
T = pop(item);
mutex.release();
emptySlots.release();
return T;
}ReentrantLock
Similar to synchronized block, but offer more flexibility like interruptible lock, first-come-first-served lock, and allow finer control of which thread wake up like semaphore
Key Methods:
lock.lock(): acquire thelocklock.unlock(): release thelock- Should always be put in
finalblock in case thread throws Exception notFull.await(): wait and suspend thread untilnotFull.signal()is callednotFull.signal(): wake up a thread that is waiting because ofnotFull.await()is callednotFull.signalAll(): wake up all threads tied to the conditionnotFull
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public void put(item) {
lock.lock();
try {
while (isFull) {
notFull.await(); // while Full, will wait until notFull
}
// critical section
enqueue(item);
notEmpty.signal(); // equivalent to notifyAll()
} finally {
lock.unlock();
}
}
public T pop() {
lock.lock();
try {
while (isEmpty) {
notEmpty.await(); // while Empty, will wait until notEmpty
}
// critical section
enqueue(item);
notFull.signal();
} finally {
lock.unlock();
}
}ReentrantReadWriteLock
This lock separates readers and writers, and allow multiple readers in the critical section unless a writer is in it
Key Methods:
lock.readLock().lock(): acquire thelockfor the purpose of reading, blocking any writer from enteringlock.readlock().unlock(): release the lock; should be placed infinalblocklock.writeLock().lock(): acquire thelockfor the purpose of writing, blocking both readers and writer from enteringlock.writeLock().unlock(): release the lock; should be placed infinalblock
class Cache {
private final ReentrantReadWriteLock rwLock =
new ReentrantReadWriteLock();
public int read() {
rwLock.readLock().lock();
try {
//critical section
return value;
} finally {
rwLock.readLock().unlock();
}
}
public void write(int newValue) {
rwLock.writeLock().lock();
try {
//critical section
value = newValue;
} finally {
rwLock.writeLock().unlock();
}
}
}RAG & Wait-Graph Diagram
Resource Allocation Graph (RAG) and wait-graph can be used to identify deadlocks in a system: Diagram Section