Concurrency in Java 8

In programming language terminology Concurrency term refers to Multithreading. Basically, multithreading is a type of execution model which will allow multiple threads which exist within the context of a process so that they all can execute independently but they can share their process resources.
The  Java memory model describes how threads in the Java programming language interact through memory. In modern platforms, the code is frequently not executed in the order it was written. It is reordered by the compiler, the processor and the memory subsystem  to achieve maximum performance
In Java, JVM (Java Virtual Machine) has been designed to support concurrent programming and all these executions will take place in the context of threads. In Java, memory model describes how threads in the Java programming language can interact through memory. In this modern platform, basically the code is frequently not executed in the order it was written. It can be reordered by the compiler, processor and memory can achieve maximum utilized performance.

The aim of this article is not to make you masters in Java 8 Concurrency, but to help
you guide towards that goal. Sometimes it can help just to know that there is some API that
might be suitable for a particular situation.

A brief introduction on Enhancement Over Concurrency in java:
Java 1.0: Thread, Runnable
Java 1.2: ThreadLocal
Java 5: Executor framework
Java 6: Minimal changes related to concurrency
Java 7: Fork/Join framework and the Phaser
Java 8: introduced Stream API along with many other classes
Concurrency API also includes many concurrent data structures and synchronous mechanisms.

Important Java 8 concurrency enhancements:

  • Stream API, Lambda expressions and Parallel streams
  • Stamped Lock
  • Parallel sort for arrays
  • Default ForkJoinPool: Common pool.
  • CountedCompleter
  • CompletableFuture
  • Double Added, LongAdder, DoubleAccumulator, LongAccumulator.
  • New methods in Collection, ConcurrentMap and ConcurrentHashMap

1. Working With parallel stream:

    • Stream() vs. parallelStream() in Collection interface
    • Arrays do not have a parallelStream() method.
    • parallel() vs. sequential()
    • Parallel streams internally uses fork/join framework
    • Elements may be processed in any order in parallel streams
    • Avoid using stateful operations or that based on order within parallel streams
    • Operations that depend on previous state like a loop iteration that is based on previous.

eg. Sorted, findFirst.

Program to illustrate the performance using stream vs parallel stream

  • Student: Its a Dto class which will have information about student details objects.


package com.code.adda.java8.concurrency;

public class Student {

private int roll;
private String name;
private String subject;
private int marks;

public Student() {
super();
}

public Student(int roll, String name, String subject, int marks) {
super();
this.roll = roll;
this.name = name;
this.subject = subject;
this.marks = marks;
}

public int getRoll() {
return roll;
}

public void setRoll(int roll) {
this.roll = roll;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getSubject() {
return subject;
}

public void setSubject(String subject) {
this.subject = subject;
}

public int getMarks() {
return marks;
}

public void setMarks(int marks) {
this.marks = marks;
}

@Override
public String toString() {
return “Student [roll=” + roll + “, name=” + name + “, subject=” + subject + “, marks=” + marks + “]”;
}
}

  • ConcurrencyStreamTest: Class contain business logic where student marks sorted using sequential stream and parallel stream.


package com.code.adda.java8.concurrency;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

public class ConcurrencyStreamTest {

public static void main(String args[]) {

List studentList = new ArrayList();
studentList.add(new Student(111, “AAA”, “CS”, 80));
studentList.add( new Student(222, “BBB”, “CS”, 70 ));
studentList.add( new Student(333, “CCC”, “CS”, 60 ));
studentList.add( new Student(444, “DDD”, “IT”, 50 ));
studentList.add( new Student(555, “EEE”, “IT”, 65 ));
studentList.add( new Student(666, “EEE”, “IT”, 75 ));

// Original Student List Value

System.out.println(“Original List\n”);
studentList.forEach(System.out::println);

// Apply sequential stream for list of object

long startTime = System.currentTimeMillis();
List sortedItems = studentList.stream().sorted(Comparator.comparing(Student::getMarks)).collect(Collectors.toList());
long endTime = System.currentTimeMillis();

System.out.println(“\nSorting marks using sequential stream: \n”);
sortedItems.forEach(System.out::println);
System.out.println(“Total time taken in process : “+ (endTime – startTime) + ” milisec.”);

// Using parallel stream

startTime = System.currentTimeMillis();
List anotherSortedItems = studentList.parallelStream().sorted(Comparator.comparing(Student::getMarks)).collect(Collectors.toList());
endTime = System.currentTimeMillis();

System.out.println(“\nSorting marks using parallel stream:\n “);
anotherSortedItems.forEach(System.out::println);
System.out.println(“Total the time taken process : “+ (endTime – startTime) + ” milisec.”);

Integer totalMarks=studentList.parallelStream().map(e->e.getMarks()).reduce(0, (a1,a2) -> a1+a2);
System.out.println(“Total Marks: “+totalMarks);
}
}

Output:

Original List

Student [roll=111, name=AAA, subject=CS, marks=80]
Student [roll=222, name=BBB, subject=CS, marks=70]
Student [roll=333, name=CCC, subject=CS, marks=60]
Student [roll=444, name=DDD, subject=IT, marks=50]
Student [roll=555, name=EEE, subject=IT, marks=65]
Student [roll=666, name=EEE, subject=IT, marks=75]

Sorting marks using sequential stream:

Student [roll=444, name=DDD, subject=IT, marks=50]
Student [roll=333, name=CCC, subject=CS, marks=60]
Student [roll=555, name=EEE, subject=IT, marks=65]
Student [roll=222, name=BBB, subject=CS, marks=70]
Student [roll=666, name=EEE, subject=IT, marks=75]
Student [roll=111, name=AAA, subject=CS, marks=80]

Total time taken in process : 7 milisec.

Sorting marks using parallel stream:

Student [roll=444, name=DDD, subject=IT, marks=50]
Student [roll=333, name=CCC, subject=CS, marks=60]
Student [roll=555, name=EEE, subject=IT, marks=65]
Student [roll=222, name=BBB, subject=CS, marks=70]
Student [roll=666, name=EEE, subject=IT, marks=75]
Student [roll=111, name=AAA, subject=CS, marks=80]

Total the time taken process : 1 milisec.
Total Marks: 400

Note:
If you want particular job using multiple threads in parallel cores, then only you need to call parallelStream() method instead of stream() method. The execution time will vary according to the system performance because parallelism is not automatically faster than performing operations , although it can be if you have enough data and processor cores.
Basically , parallelStream() method should be use when the output of the operation won’t needed to be dependent on the order of elements present in collection object.

 

2. WORKING WITH ATOMIC VARIABLES

2.1 : DoubleAdder : preferable to alternatives when frequently updated but less frequently read
2.2 : LongAdder : under high contention, expected throughput of this class is significantly higher compared to AtomicLong, at the expense of higher space consumption.
2.3 : DoubleAccumulator : preferable to alternatives when frequently updated but less frequently read
2.4 : LongAccumulator : under high contention, expected throughput of this class is significantly higher compared to AtomicLong, at the expense of higher space consumption.

3. Working with Executor Service
3.1 : ExecutorService : The executor services are one of the most important features of the Concurrency API in java 8. We can do how to execute code in parallel via tasks and executor services wherein Java 8 introduces the ExecutorService interface.
3.2 : Basically ExecutorService is a higher-level replacement for working with threads directly.
3.3 : Executors are capable of running asynchronous tasks and typically manage a pool of threads, so there is no need to create new threads manually.
3.4: ExecutorService interface has the following implementations in the java.util.concurrent package:
3.4.1: ThreadPoolExecutor
3.4.2 : ScheduledThreadPoolExecutor

Java has introduces two new interfaces and Four new classes for the enhancement of concurrency in java 8 under
java.util.concurrent package.

Interfaces:

1. CompletableFuture.AsynchronousCompletionTask: A marker interface identifying asynchronous tasks produced by async methods.
2. CompletionStage: It’s a stage of a possibly asynchronous computation, which performs an action or computes a value when another  CompletionStage  completes

Classes:

1. CompletableFuture: It’s a  Future  that may be explicitly completed (setting its value and status), and may be used as a  CompletionStage, supporting dependent functions and actions that trigger upon its completion.
2. ConcurrentHashMap.KeySetView: It is a view of a ConcurrentHashMap as a Set of keys, in which additions may optionally be enabled by mapping to a common value.
3. CountedCompleter: A  ForkJoinTask  with a completion action performed when triggered and there are no remaining pending actions.
4. CompletionException: It will throw an exception when an error or other exception is encountered in the course of completing a result or task.

(Note: For the details of concurrency classes and interfaces I’ll explain use cases and examples in my next article which will be coming soon.)

Some important guidelines we should follow for Where and How we can use concurrency using Java 8:

  • 1. Identify correct independent tasks.
  • 2. Find an easily parallelizable version of the algorithm.
  • 3. Use right identity while using reduce() with parallel streams.
  • 4. Avoid using stateful operations within parallel streams.
  • 5. Intermediate operations are not executed until a terminal operation (for all streams)
  • 6. Collect might be more efficient that reduce in most cases (for all streams)
  • 7. Iterate() method of Stream interface should be avoided as much as possible.
  • 8. SplittableRandom class is more suitable in parallel processing than Random class.
  • 9. Should use immutable objects as Identity object.
  • 10. All threads share the identity object in case of parallel streams.
  • 11. Parallelism threshold parameter of Concurrency methods should be used wisely

eg: search(), reduce(), compute, etc. of ConcurrentHashMap

  • 12. Parallel Streams can quickly iterate over the large-sized collections.
  • 13. Parallel streams are not always faster ones.

Be the first to comment

Leave a Reply