Skip to content

Instantly share code, notes, and snippets.

@vishnu-saini
Last active March 6, 2020 07:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vishnu-saini/c7c72dd20eb765e3ee82241349a2a7c6 to your computer and use it in GitHub Desktop.
Save vishnu-saini/c7c72dd20eb765e3ee82241349a2a7c6 to your computer and use it in GitHub Desktop.

Concurrency

Concurrent applications also make optimal use of the processors. But concurrent applications are difficult to develop, maintain, and debug. To develop thread-safe, high-performance, and scalable applications, Java’s low-level threading capabilities are insufficient.

Concurrent collection classes

The java.util.concurrent package includes a number of additions to the Java Collections Framework. These are most easily categorized by the collection interfaces provided:

  • BlockingQueue defines a first-in-first-out data structure that blocks or times out when you attempt to add items to a full queue, or retrieve from an empty queue.
  • ConcurrentMap is a subinterface of java.util.Map that defines useful atomic operations. These operations remove or replace a key-value pair only if the key is present, or add a key-value pair only if the key is absent. Making these operations atomic helps avoid synchronization. The standard general-purpose implementation of ConcurrentMap is ConcurrentHashMap, which is a concurrent analog of HashMap.

Writing concurrent programs is difficult—you need to deal with thread safety and performance. The individual operations of ConcurrentHashMap are safe—that is, multiple threads can put values into the same map object in a safe manner.

Interface BlockingQueue

The BlockingQueue interface is a queue that’s safe to use when shared between multiple threads. The implementing classes like ArrayBlockingQueue include a constructor to define an initial capacity (which can’t be modified) from which items are added and removed. It blocks adding new elements if the queue has reached its capacity. It also blocks removing elements from an empty queue. It works on the producer–consumer pattern, which is when a single thread or multiple threads produce elements and add them to a queue to be consumed by other threads.

Imagine multiple clients (producers) that send requests to a server. The server (consumer) responds to all the requests that it receives. To manage the requests that all the clients might send to the server, the server can limit the maximum number of requests that it can accept at a given point in time. The requests can be added to a blocking queue, which will block adding new requests if it reaches its upper limit. Similarly, if no new requests are available in a queue, the server thread will block until requests are made available to it.

class Request {}
class Client implements Runnable {
  private BlockingQueue<Request> queue;
  Client(BlockingQueue<Request> queue) {
    this.queue = queue;
  }
  public void run() {
    try {
      Request req = null;
      while(true) {
        req = new Request();
        queue.put(req);
        System.out.println("added request - " + req);
      }
    }
    catch (InterruptedException ex) {
      System.out.println(ex);
    }
  }
}
class Server implements Runnable {
  private BlockingQueue<Request> queue;
  Server(BlockingQueue<Request> queue) {
    this.queue = queue;
  }
  public void run() {
    try {
      while (true) {
        System.out.println("processing .. " + queue.take());
      }
    }
    catch (InterruptedException ex) {
      System.out.println(ex);
    }
  }
}
class LoadTesting{
  public static void main(String args[]) {
    BlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(3);
    Client client = new Client(queue);
    Server server = new Server(queue);
    new Thread(client).start();
    new Thread(server).start();
  }
}

Interface ConcurrentMap

The ConcurrentMap interface extends the java.util.Map interface. It defines methods to replace or remove a key-value pair if the key is present, or add a value if the key is absent.

Class ConcurrentHashMap

A concrete implementation of the ConcurrentMap interface, class ConcurrentHashMap is a concurrent class analogous to class HashMap. A HashMap is an unsynchronized collection. If you’re manipulating a HashMap using multiple threads, you must synchronize its access. But locking the entire HashMap object can create serious performance issues when it’s being accessed by multiple threads. If multiple threads are retrieving values, it makes sense to allow concurrent read operations and monitor write operations.

ConcurrentHashMap allows access by multiple threads. It concurrently allows multiple threads to read its values and limited threads to modify its values. Also, the iterators of ConcurrentHashMap don’t throw a ConcurrentModificationException. So what happens if new elements are added to Concurrent- HashMap after you accessed its iterator? The iterator may still traverse only the elements that existed at the time of creation of the iterator. Because it doesn’t lock the complete collections while modifying their elements, methods like size() might not return the exact accurate size of a ConcurrentHashMap when invoked by multiple threads.

Mapping of classes and interfaces from package java.util.concurrent and its corresponding package java.util analog

Package java.util.concurrent java.util analog
BlockingQueue Queue
ArrayBlockingQueue Queue
LinkedBlockingQueue Queue
ConcurrentMap Map
ConcurrentHashMap HashMap
ConcurrentSkipListMap TreeMap
CopyOnWriteArrayList ArrayList
LinkedBlockingDeque eque

Locks

Lock objects offer multiple advantages over implicit locking of an object’s monitor. Unlike an implicit lock, a thread can use explicit lock objects to wait to acquire a lock until a time duration elapses. Lock objects also support interruptible lock waits, nonblock- structured locks, multiple condition variables, lock polling, and scalability benefits.

Note: To execute synchronized code, a thread must acquire either an implicit or an explicit lock on an object’s monitor.

Method Description
void lock() Acquires the lock. If the lock isn’t available then the current thread becomes disabled for thread scheduling purposes and lies dormant until the lock has been acquired.
void lockInterruptibly() Acquires the lock unless the current thread is interrupted.
Condition newCondition() Returns a new Condition instance that’s bound to this Lock instance.
boolean tryLock() Acquires the lock only if it’s free at the time of invocation.
boolean tryLock(long time, TimeUnit unit) Acquires the lock if it’s free within the given waiting time and the current thread hasn’t been interrupted.
void unlock() Releases the lock.

Acquire lock

Method lock() acquires a lock on a Lock object. If the lock isn’t available, it waits until the lock can be acquired. For instance

class Rainbow {
	Lock myLock = new ReentrantLock();
	static List<String> colors = new ArrayList<>();
	public void addColor(String newColor) {
		myLock.lock();
		try {
			colors.add(newColor);
		}
		finally {
			myLock.unlock();
		}
	}
}

Method lock() is comparable to intrinsic locks because it waits until a lock can be acquired on a Lock object.

Note: Call method unlock() on a Lock object to release its lock when you no longer need it.

Acquire lock and return immediately

Once a thread initiates a request to acquire an implicit lock on an object monitor, it can neither stop itself nor can it be asked to do so by any other thread. With explicit locks, you can request a thread to acquire a lock on an object monitor if it’s available and return immediately.

import java.util.concurrent.locks.*;
class Inventory {
    int inStock;
    String name;
    Lock lock = new ReentrantLock();
    Inventory(String name) {
        this.name = name;
    }
    public void stockIn(long qty) {
        inStock += qty;
    }
    public void stockOut(long qty) {
        inStock -= qty;
    }
}
class Shipment extends Thread {
    Inventory loc1, loc2;
    int qty;
    Shipment(Inventory loc1, Inventory loc2, int qty) {
        this.loc1 = loc1;
        this.loc2 = loc2;
        this.qty = qty;
    }
    public void run() {
        if (loc1.lock.tryLock()) {
            if (loc2.lock.tryLock()) {
                loc2.stockOut(qty);
                loc1.stockIn(qty);
                System.out.println(loc1.inStock + ":" + loc2.inStock);
                loc2.lock.unlock();
                loc1.lock.unlock();
            } else
                System.out.println("Locking false:" + loc2.name);
        } else
            System.out.println("Locking false:" + loc1.name);
    }
    public static void main(String args[]) {
        Inventory loc1 = new Inventory("Seattle");
        loc1.inStock = 100;
        Inventory loc2 = new Inventory("LA");
        loc2.inStock = 200;
        Shipment s1 = new Shipment(loc1, loc2, 1);
        Shipment s2 = new Shipment(loc2, loc1, 10);
        s1.start();
        s2.start();
    }
}

Method main() starts two new threads, s1 and s2, passing them objects loc1 and loc2. No matter how threads s1 and s2 execute, they will never deadlock. Unlike waiting to acquire an implicit lock on objects loc1 and loc2, they call loc1.lock.try- Lock() and loc2.lock.tryLock(), which return immediately.

Interruptible locks

The following methods of Lock enable you to specify a waiting timeout or to try and acquire a lock while being available for interruption:

  • lockInterruptibly()
  • tryLock(long time, TimeUnit unit)
//below is run method
public void run() {
    try {
	bus.lock.lockInterruptibly();
	try {
	    bus.boardBus(name);

	} finally {
	    bus.lock.unlock();
	}
    } catch (InterruptedException e) {
	System.out.println(name + ": Interrupted!!");
	Thread.currentThread().interrupt();
    }
}

//below is main method
public static void main(String args[]) {
	Bus bus = new Bus();
	Employee e1 = new Employee("Paul", bus);
	e1.start();
	e1.interrupt();
	Employee e2 = new Employee("Shreya", bus);
	e2.start();
}

Nonblock-structured locking

With intrinsic locks, you must release the lock on an object’s monitor at the end of the synchronized code blocks or methods. Because code blocks can’t span across methods, intrinsic locks can’t be acquired across methods. Extrinsic locks or a lock on Lock objects can be acquired across methods.

import java.util.concurrent.locks.*;
class Bus {
    ReentrantLock lock = new ReentrantLock();
    boolean locked = false;
    public void board(String name) {
        if (lock.tryLock()) {
            locked = true;
            System.out.println(name + ": boarded");
        }
    }
    public void deboard(String name) {
        if (lock.isHeldByCurrentThread() && locked) {
            System.out.println(name + ": deboarded");
            lock.unlock();
            locked = false;
        }
    }
}

Interface ReadWriteLock

Interface ReadWriteLock maintains a pair of associated locks, one for read-only operations and another for write-only operations. The read-only lock may be held simultaneously by multiple reader threads as long as there are no writing processes in progress. The write-only lock is an exclusive lock. It can be acquired by only one thread.The ReadWriteLock interface defines only two methods: readLock() and writeLock().

Note: The ReadWriteLock interface doesn’t extend Lock or any other interface. It maintains a pair of associated Locks—one for only reading operations and one for writing.

You can use methods readLock() and writeLock() to get a reference to the read or write Lock

Class ReentrantReadWriteLock

A ReentrantReadWriteLock has a read and a write lock associated with it. You can access these locks (reference variables of type Lock) by calling its methods readLock() and writeLock(). You can acquire multiple read locks as long as no write lock has been acquired on a ReadWriteLock object. The writeLock is an exclusive lock; it can be acquired by only one thread when no read thread has been acquired.

import java.util.*;
import java.util.concurrent.locks.*;
class Rainbow {
    private final ReadWriteLock myLock = new ReentrantReadWriteLock();
    private static int pos;
    static Map < Integer, String > colors = new HashMap < > ();
    public void addColor(String newColor) {
        myLock.writeLock().lock();
        try {
            colors.put(new Integer(++pos), newColor);
        } finally {
            myLock.writeLock().unlock();
        }
    }
    public void display() {
        myLock.readLock().lock();
        try {
            for (String s: colors.values()) {
                System.out.println(s);
            }
        } finally {
            myLock.readLock().unlock();
        }
    }
}

Atomic variables

The java.util.concurrent.atomic package defines multiple classes that support atomic operations of read-compare/modify-write on single variables. At the surface, these operations might seem similar to the operations with volatile variables. Though modifications to a volatile variable are guaranteed to be visible to other threads, volatile variables can’t define a sequence of operations (like read-compare/modify-write) as an atomic operation.

Java defines multiple convenient classes in the java.util.concurrent.atomic package that define frequently used operations like read-modify-write as atomic operations.Let’s use one of these classes, AtomicInteger, in class Book, and replace the type of its primitive int variable copiesSold:

class Book {
    String title;
    AtomicInteger copiesSold = new AtomicInteger(0);
    Book(String title) {
        this.title = title;
    }
    public void newSale() {
        copiesSold.incrementAndGet();
    }
    public void returnBook() {
        copiesSold.decrementAndGet();
    }
}

Class AtomicInteger defines multiple methods xxxAndGet() and getAndXxx(), where Xxx refers to an operation like increment, decrement, and add. xxxAndGet() returns an updated value and getAndXxx() returns the previous value.

Note: Method incrementAndGet() returns the updated value but method AtomicInteger’s getAndIncrement() returns the previous value.

Other commonly used classes defined in the java.util.concurrent.atomic package are AtomicBoolean, AtomicLong, AtomicIntegerArray, AtomicLongArray, and Atomic- Reference<V>. AtomicLong defines the same methods as class AtomicInteger. The difference is the type of method parameters and their return types (long instead of int).

Note: The java.util.concurrent.atomic package doesn’t define classes by the names AtomicShort, AtomicByte, AtomicFloat, or AtomicDouble. These invalid class names might be used on the exam.

Volatile vs Atomic

Read Here

Executors

large applications call for a separation of tasks and the threads for thread creation and their management.

The Executor framework helps to decouple a command submission from command execution.

In the java.util.concurrent package there are three interfaces:

  • Executor — Used to submit a new task.

  • ExecutorService — A subinterface of Executor that adds methods to manage lifecycle of threads used to run the submitted tasks and methods to produce a Future to get a result from an asynchronous computation.

  • ScheduledExecutorService — A subinterface of ExecutorService, to execute commands periodically or after a given delay.

Executor Interface

Using an Executor it is possible to remove the manual creation of threads to execute a command.

Given any Runnable implementation, like the following:

public class MyCommand implements Runnable {
  public void run() {
    // code to be executed
  }
}

The following code with an explicit Thread creation:

Thread t = new Thread(new MyRunnable());
t.start();

can be replaced with the following code that instead uses an Executor:

Executor executor = ... // Executor creation
executor.execute(new MyRunnable());

ExecutorService Interface

ExecutorService adds a more useful and advanced version method to execute commands, submit.

Passing a Callable to the submit method is possible to get a Future object and use it to retrieve the result of the asynchronous computation.

Additionally it is possible to shutdown an ExecutorService rejecting submissions of new commands. Using the shutdown method all submitted commands will be executed before stopping the ExecutorService, but no new command is accepted. A call to shutdownNow prevents waiting tasks to be executed and try to stop all currently executing commands.

Runnable myCommand1 = ...
Callable<String> myCommand2 = ...
ExecutorService executorService = ... // Build an executorService
executorService.submit(myCommand1);
// submit Accepts also a Callable
Future<String> resultFromMyCommand2 = executorService.submit(myCommand2);   
// Will wait for myCommand1 and myCommand2 termination
executorService.shutdown();  
Runnable myCommand3 = ...;
// Will throw a RejectedExecutionException because no new task can be submitted
executorService.submit(myCommand3);

ScheduledExecutorService

The ScheduledExecutorService is used to schedule command executions after a given delay or periodically, it must be used as a replacement for Timer and TimerTask.

It uses the method schedule to run the command after a given delay of time, scheduleAtFixedRate and scheduleWithFixedDelay are used to execute a task periodically:

ScheduledExecutorService executor = ...;
Runnable command1 = ...;
Runnable command2 = ...;
Runnable command3 = ...;

// Will start command1 after 50 seconds
executor.schedule(command1, 50L, TimeUnit.SECONDS);

// Will start command 2 after 20 seconds, 25 seconds, 30 seconds ...
executor.scheduleAtFixedRate(command2, 20L, 5L, TimeUnit.SECONDS);

// Will start command 3 after 10 seconds and if command3 takes 2 seconds to be
// executed also after 17, 24, 31, 38 seconds...
executor.scheduleWithFixedDelay(command3, 10L, 5L, TimeUnit.SECONDS);

How to Create an Executor

To create an Executor it is possible to use the factory Executors class.

Most common methods are used to create:

  • an ExecutorService with a single thread to execute commands with method newSingleThreadExecutor.

  • a ScheduledExecutorService with a single thread to execute commands with the method newSingleThreadScheduledExecutor.

  • an ExecutorService that use a fixed length pool of threads to execute commands with the method newFixedThreadPool.

  • an ExecutorService with a pool of threads that creates a new thread if no thread is available and reuse an existing thread if they are available with newCachedThreadPool.

  • a ScheduledExecutorService with a fixed length pool of threads to execute scheduled commands with the method newScheduledThreadPool.

Examples to creates ExecutorService and ScheduledExecutorService instances:

// Creates a single thread ExecutorService
ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();

// Creates a single thread ScheduledExecutorService
ScheduledExecutorService singleScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

// Creates an ExecutorService that use a pool of 10 threads
ExecutorService fixedExecutorService = Executors.newFixedThreadPool(10);

// Creates an ExecutorService that use a pool that creates threads on demand
// And that kill them after 60 seconds if they are not used
ExecutorService onDemandExecutorService = Executors.newCachedThreadPool();

// Creates a ScheduledExecutorService that use a pool of 5 threads
ScheduledExecutorService fixedScheduledExecutorService = Executors.newScheduledThreadPool(5);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment