Sunday, December 6, 2015

Learn java util concurrent part4

This is yet another series of learning into java util concurrent package. If you have not read the series before, you can find part1, part2 and part3 at the respectively links. In this series, we will study the classes in java.util.concurrent package.

Provides default implementations of ExecutorService execution methods. This class implements the submit, invokeAny and invokeAll methods using a RunnableFuture returned by newTaskFor, which defaults to the FutureTask class provided in this package.

1:        AbstractExecutorService aes = null;  
3:        aes = new ForkJoinPool();  
4:        System.out.println(aes.isShutdown());  
5:        Future<Integer> total = aes.submit(new Summer(33, 44));  
6:        System.out.println(total.get());  
7:        aes.shutdown();  
10:        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(4);  
11:        aes = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, blockingQueue);  
12:        System.out.println(aes.isShutdown());  
13:        total = aes.submit(new Summer(33, 44));  
14:        System.out.println(total.get());  
15:        aes.shutdown();  

In the example above, we see that two concrete implementation of AbstractExecutorService, ForkJoinPool and ThreadPoolExecutor both invoking method submit from abstract class  AbstractExecutorService.

A bounded blocking queue backed by an array. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.

1:        ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<Integer>(5);  
2:        abq.add(1);  
3:        abq.offer(2);  
4:        System.out.println(abq.size());  

A simple queue implementation, just like any other collections in java collection framwork, you can add, remove, or drain the collection.

A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.

1:        CompletableFuture<Integer> cf = new CompletableFuture<Integer>();  
2:        System.out.println(cf.isCancelled());  

A hash table supporting full concurrency of retrievals and high expected concurrency for updates.
However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access.
this class does not allow null to be used as a key or value.

1:        ConcurrentHashMap<String,Integer> chm = new ConcurrentHashMap<String,Integer>();  
2:        chm.put("one", 1);  
3:        chm.put("two", 2);  
4:        chm.put("six", 6);  

A view of a ConcurrentHashMap as a Set of keys, in which additions may optionally be enabled by mapping to a common value.

1:        ConcurrentHashMap.KeySetView<String, Integer> keys = chm.keySet(10);  
2:        System.out.println(keys.isEmpty());  
3:        System.out.println(keys.toString());  
4:        keys.add("ten");  
6:        keys.forEach((s) -> System.out.println(s));  
7:        System.out.println(chm.toString());  
9:        ConcurrentHashMap.KeySetView<String, Boolean> keys1 = chm.newKeySet();  
10:        System.out.println(keys1.isEmpty());  
11:        System.out.println(keys1.toString());  
12:        keys1.add("four");  
14:        keys1.forEach((s) -> System.out.println(s));  
15:        System.out.println(chm.toString());  

The above give two examples of usage of ConcurrentHashMap.KeySetView. The first one notice that changes to the keys affect the original concurrentHashMap chm whilst the second does not. So read the javadoc and pick the implementation that suit your requirements.

Now, we will take a look at two for the concurrent linked queues.


  • An unbounded concurrent deque based on linked nodes.
  • Concurrent insertion, removal, and access operations execute safely across multiple threads. 
  • this class does not permit the use of null elements.
  • the size method is NOT a constant-time operation. Because of the asynchronous nature of these deques, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.


  • An unbounded thread-safe queue based on linked nodes.
  • This queue orders elements FIFO (first-in-first-out).
  • The head of the queue is that element that has been on the queue the longest time. 
  • The tail of the queue is that element that has been on the queue the shortest time. 
  • this class does not permit the use of null elements.

  insert                              element  
  always                              oldest  
  at tail                             at head  
  and                                 and retrieve  
  youngest                            here  
    |                                    |  
    |                                    |  
   tail                               head  

2:        ConcurrentLinkedDeque<Integer> cldq = new ConcurrentLinkedDeque<Integer>();   
3:        cldq.add(1);  
4:        cldq.add(2);  
5:        cldq.add(3);  
8:        ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>();  
9:        clq.add(4);  
10:        clq.add(5);  
11:        clq.add(6);  

With the examples above, the apparent benefits is not actually express as the only thread is the main adding element to the queues serially. Like the javadoc mentioned, you should really use these queues on multithreaded situation.


  • A scalable concurrent ConcurrentNavigableMap implementation.
  • The map is sorted according to the natural ordering of its keys, or by a Comparator provided at map creation time, depending on which constructor is used.
  • providing expected average log(n) time cost for the containsKey, get, put and remove operations and their variants. 
  • Insertion, removal, update, and access operations safely execute concurrently by multiple threads.
  • Ascending key ordered views and their iterators are faster than descending ones.
  • the size method is not a constant-time operation.


  • A scalable concurrent NavigableSet implementation based on a ConcurrentSkipListMap.
  • The elements of the set are kept sorted according to their natural ordering, or by a Comparator provided at set creation time, depending on which constructor is used.
  • expected average log(n) time cost for the contains, add, and remove operations and their variants.
  • Insertion, removal, and access operations safely execute concurrently by multiple threads.
  • Ascending ordered views and their iterators are faster than descending ones.
  • the size method is not a constant-time operation. 

1:        ConcurrentSkipListMap<String,Integer> cslm = new ConcurrentSkipListMap<String, Integer>();  
2:        cslm.put("one", 1);  
3:        cslm.put("two", 2);  
4:        cslm.put("six", 6);  
6:        ConcurrentSkipListSet<Integer> csls = new ConcurrentSkipListSet<Integer>();  
7:        csls.add(1);  
8:        csls.add(1);  
9:        System.out.println("set size " + csls.size());  

With the examples above, the apparent benefits is not actually express as the only thread is the main adding element to the collections serially. Like the javadoc mentioned, you should really use these collections on multithreaded situation.


  • A thread-safe variant of ArrayList in which all mutative operations (add, set, and so on) are implemented by making a fresh copy of the underlying array.
  • This is ordinarily too costly, but may be more efficient than alternatives when traversal operations vastly outnumber mutations, and is useful when you cannot or don't want to synchronize traversals, yet need to preclude interference among concurrent threads.
  • All elements are permitted, including null.


  • A Set that uses an internal CopyOnWriteArrayList for all of its operations.
  • It is best suited for applications in which set sizes generally stay small, read-only operations vastly outnumber mutative operations, and you need to prevent interference among threads during traversal.
  • It is thread-safe.
  • Mutative operations (add, set, remove, etc.) are expensive since they usually entail copying the entire underlying array.

1:        CopyOnWriteArrayList<Integer> cowal = new CopyOnWriteArrayList<Integer>();  
2:        cowal.add(1);  
3:        cowal.add(2);  
4:        cowal.add(3);  
6:        CopyOnWriteArraySet<Integer> cowas = new CopyOnWriteArraySet<Integer>();  
7:        cowas.add(1);  
8:        cowas.add(1);  
9:        System.out.println("set size " + cowas.size());  

Just like the four collections above, these beneifts best shown on multithreaded applications.


  • A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
  • A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset

1:        int N = 10;  
2:        CountDownLatch startSignal = new CountDownLatch(1);  
3:        CountDownLatch doneSignal = new CountDownLatch(N);  
5:        for (int i = 0; i < N; ++i) // create and start threads  
6:           new Thread(new Worker(startSignal, doneSignal)).start();  
8:        doSomethingElse();     // don't let run yet  
9:        startSignal.countDown();  // let all threads proceed  
10:        doSomethingElse();  
11:        doneSignal.await();    // wait for all to finish  
14:     private static void doSomethingElse() throws InterruptedException {  
15:          Thread.sleep(3000);  
16:        System.out.println(Thread.currentThread().getName() + " doing something else");  
18:     }  
21:     static class Worker implements Runnable {  
22:          private final CountDownLatch startSignal;  
23:          private final CountDownLatch doneSignal;  
24:          Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {  
25:           this.startSignal = startSignal;  
26:           this.doneSignal = doneSignal;  
27:          }  
28:          public void run() {  
29:           try {  
30:            startSignal.await();  
31:            doWork();  
32:            doneSignal.countDown();  
33:           } catch (InterruptedException ex) {} // return;  
34:          }  
36:          void doWork() { System.out.println(Thread.currentThread().getName() + " doing work"); try {  
37:           Thread.sleep(200);  
38:      }  

we see that ten worker threads were started but it was in waiting state in the run method. Until the startSignal started to count down, then only all the workers thread started. In the individual worker threads, we will see doneSignal is counting down one by one for 10 tens for each worker thread respectively. In the main thread, doneSignal is in waiting state before all the worker thread done all the signals.


  • A ForkJoinTask with a completion action performed when triggered and there are no remaining pending actions.
  • Sample Usages.
  • Parallel recursive decomposition.
  • Searching. 
  • Recording subtasks. 
  • Completion Traversals. 
  • Triggers.
1:        // CountedCompleter<T>  
2:        Integer[] numbers = {1,2,3,4,5};  
3:        // null ?  
4:        MapReducer<Integer> numbersReducer = new MapReducer<Integer>(null, numbers, new MyMapper(), new MyReducer(), 1, 10);  
5:        Integer result = numbersReducer.getRawResult();  
6:        System.out.println(result);  


  • A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
  • CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. 
  • The barrier is called cyclic because it can be re-used after the waiting threads are released. 
  • The CyclicBarrier uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException (or InterruptedException if they too were interrupted at about the same time). 

1:        // CyclicBarrier  
2:        float[][] matrix = {{1,2}, {2,3}};  
3:        new Solver(matrix);  
5:  public class Solver {  
7:     final int N;  
8:     final float[][] data;  
9:     final CyclicBarrier barrier;  
11:     class Worker implements Runnable {  
12:        int myRow;  
13:        boolean done;  
15:        Worker(int row) {  
16:           myRow = row;  
17:        }  
19:        public void run() {  
20:           while (!done()) {  
21:              processRow(myRow);  
23:              try {  
24:                 barrier.await();  
25:              } catch (InterruptedException ex) {  
26:                 return;  
27:              } catch (BrokenBarrierException ex) {  
28:                 return;  
29:              }  
30:           }  
31:        }  
33:        public boolean done() {  
34:           return done;  
35:        }  
37:        private void processRow(int row) {  
38:           System.out.println(Thread.currentThread().getName() + " processing row " + row );  
39:           done = true;  
40:        }  
41:     }  
43:     public Solver(float[][] matrix) {  
44:        data = matrix;  
45:        N = matrix.length;  
46:        Runnable barrierAction = new Runnable() {   
47:           public void run() {   
48:              //mergeRows(...);   
49:              System.out.println("merging row");  
50:           }  
51:        };  
52:        barrier = new CyclicBarrier(N, barrierAction);  
54:        List<Thread> threads = new ArrayList<Thread>(N);  
55:        for (int i = 0; i < N; i++) {  
56:         Thread thread = new Thread(new Worker(i));  
57:         threads.add(thread);  
58:         thread.start();  
59:        }  
61:        // wait until done  
62:        for (Thread thread : threads)  
63:           try {  
64:              thread.join();  
65:           } catch (InterruptedException e) {  
66:              e.printStackTrace();  
67:           }  
68:       }  
69:  }  

In the example above, we see the main class initialized a new solver object passing a two dimentional floating matrix for process. In the solver class, we see that a cyclicbarrier is initialized with a runnable barrier action. Depending on the matrix length, the length shall be used to initialize the workers threads. The solver thread waits until all the workers thread done.

In the worker thread, I simplified the processRow to just printout and set done to true, you can of cause process data[myRow] to make the sample code near to the real world problem. Noticed that barrier in each individual worker thread is call method await. IN this example, if two workers are done process the row and barrier await is executed, then the final barrierAction object will run the final merging row.

DelayQueue<E extends Delayed>

  • An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.
  • The head of the queue is that Delayed element whose delay expired furthest in the past.
  • If no delay has expired there is no head and poll will return null.
  • Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.
  • the size method returns the count of both expired and unexpired elements.
  • This queue does not permit null elements.

1:        DelayQueue<SalaryDelay> delayQueue = new DelayQueue<SalaryDelay>();  
2:        delayQueue.add(new SalaryDelay("August", 1));  
3:        delayQueue.add(new SalaryDelay("September", 2));  
4:        delayQueue.add(new SalaryDelay("October", 3));  
6:        System.out.println(delayQueue.size());  
7:        System.out.println(delayQueue.poll());  

Like this queue before, you add the class that implement delayed and be place in this delayqueue.


  • A synchronization point at which threads can pair and swap elements within pairs.

1:        Exchanger<?> exchanger = new Exchanger<>();  
2:        ExchangerRunnable exchangerRunnable1 = new ExchangerRunnable(exchanger, "keychain");  
3:        ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger, "chocalate");  
5:        new Thread(exchangerRunnable1).start();  
6:        new Thread(exchangerRunnable2).start();  
8:  public class ExchangerRunnable implements Runnable {  
10:     Exchanger exchanger = null;  
11:     Object object = null;  
13:     public ExchangerRunnable(Exchanger exchanger, Object object) {  
14:        this.exchanger = exchanger;  
15:        this.object = object;  
16:     }  
18:     public void run() {  
19:        try {  
20:           Object previous = this.object;  
22:           this.object =;  
24:           System.out.println(Thread.currentThread().getName() + " exchanged "  
25:                 + previous + " for " + this.object);  
26:        } catch (InterruptedException e) {  
27:           e.printStackTrace();  
28:        }  
29:     }  
31:  }  

With the code above, there are two thread that exchange a string object to each other.


  • A CompletionService that uses a supplied Executor to execute tasks.

1:        ExecutorService executorService = Executors.newFixedThreadPool(1);  
2:        CompletionService<Integer> longRunningCompletionService = new ExecutorCompletionService<Integer>(executorService);  
3:        longRunningCompletionService.submit(() -> {System.out.println("done"); return 1;});  
4:        longRunningCompletionService.take();  
5:        executorService.shutdown();  

Moving onto our last 2 classes in this lesson.

- Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package.

1:        Executors.newCachedThreadPool();  
2:        Executors.defaultThreadFactory();  
3:        Executors.newFixedThreadPool(10);  
4:        Executors.newScheduledThreadPool(1);  
5:        Executors.newSingleThreadExecutor();  
6:        Executors.privilegedThreadFactory();  
7:        Executors.newWorkStealingPool();  

Just try different pools implementation in java to get some idea the specific pools.


  • An ExecutorService for running ForkJoinTasks.
  • This implementation restricts the maximum number of running threads to 32767. 
1:        ForkJoinPool fjPool = new ForkJoinPool();  
2:        Future<Integer> sum = fjPool.submit(new Summer(11, 89));  
3:        System.out.println(sum.get());  
4:        fjPool.shutdown();  

A trivial example of using ForkJoinPool to submit a runnable task which return a result.

That's it for this long but brief lesson. This article end with the source code where you can get from the links below.

No comments:

Post a Comment