Concurrency and Improvements in Java 8: Part 2

Jan 14, 2020
hackajob Staff

Java 8 has added the ‘CompletableFuture’ class in the ‘java.util.concurrent’ package. In Part 1 of this article, we explored what a ‘CompletableFuture’ class is, as well as how to create it and run tasks asynchronously using a ‘CompletableFuture’. In today’s article, we’ll be exploring some of its features and how it can help to overcome the shortcomings of the Future interface:

Manually Completing a task

As we mentioned earlier, one of the disadvantages of the Future interface is that there’s no way to manually complete a task. Thankfully, ‘CompleteableFuture’ provides a method called ‘complete’ which can be used to complete a task. The following code demonstrates this:

public class ManualCompleteDemo {

public static void main(String[] args) throws InterruptedException, ExecutionException {

Supplier<String> supplier = () -> {

try {

Thread.sleep(5000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return "hello";

};

System.out.println("Starting new Thread");

CompletableFuture<String> cFuture = CompletableFuture.supplyAsync(supplier);

System.out.println("Doing something else");

System.out.println("Waiting for Thread to complete..");

int count = 0;

while (!cFuture.isDone()) {

Thread.sleep(500);

System.out.println("Still waiting");

count++;

if(count == 4)

break;

}

System.out.println("Waited enough, completing manually");

cFuture.complete("Hello2");

String result = cFuture.get();

System.out.println("Result is " + result);

System.out.println("Main thread completed");

}

}

In the example above, a ‘Supplier’ instance is created that returns the String “hello”. We’ve added a 'Thread.sleep' within the 'Supplier' implementation to simulate a long running task. The 'supplyAsync' method is invoked with this 'Supplier' instance, whilst a ‘while loop’ checks if the thread is complete using the 'cFuture.isDone' method. The ‘while loop’ waits for the thread to complete for 2 seconds, after which it’s terminated. The 'cFuture.complete' method is then invoked which completes the future manually. This code prints the following output:

Starting new Thread

Doing something else

Waiting for Thread to complete..

Still waiting

Still waiting

Still waiting

Still waiting

Waited enough, completing manually

Result is Hello2

Main thread completed

Attaching Callbacks

As we discussed previously, one of the disadvantages to the 'Future.get' method is that it blocks until the result of computation is available. If you want to perform some actions on the result in the main thread, you’ll need to wait for the result to be available. The ‘CompletableFuture’ overcomes this limitation by providing the ability to attach callback methods. These callback methods execute once the result of the computation is available, so you’ll no longer need to wait for the result. The examples below are callback methods supported by ‘CompletableFuture’:

thenAccept

The 'thenAccept' method can be used to perform some actions on the result of the asynchronous computation. Note that this method doesn’t return any value. Instead, it accepts a ‘Consumer instance’ and operates on it. The following code shows how this works:

Supplier<String> supplier = () -> "World!";

CompletableFuture.supplyAsync(supplier).thenAccept((str) -> System.out.println("Hello "+str));

Above, the code creates a 'Supplier' instance that simply returns the text “World”. From there, it’s passed to the 'supplyAsync' method which then spawns a new Thread. The 'thenAccept' method is invoked after 'supplyAsync', so it operates on the result of the asynchronous computation, in this case the String “World”. The 'thenAccept' method accepts a 'Consumer' instance. It’s implemented via a lambda expression that appends the text “Hello” and prints the result. This code produces the following output:

Hello World!

thenApply

The 'thenApply' method can be used to transform the result of an asynchronous computation. It accepts a Function instance, applies it to the result of the 'CompletableFuture' and returns it. The following code demonstrates this:

Supplier<String> supplier = () -> "Hello";

CompletableFuture<String> cFuture = CompletableFuture.supplyAsync(supplier).thenApply(str -> str.toUpperCase());

String result = cFuture.get();

System.out.println("Result:"+result);

As before, the 'supplyAsync' method is invoked with a 'Supplier' instance that returns the text “Hello”. The 'thenApply' method is invoked on the result of the 'supplyAsync' method. The 'thenApply' method accepts a Function instance and in this case, this is implemented via a lambda expression which converts the String to Uppercase. Note that the 'cFuture.get' method is used to obtain this result. This code prints the following output:

Result:HELLO

Chaining CallBacks

The real power of callbacks is that they can be chained together to perform a series of operations, like so:

Supplier<String> supplier = () -> "Hello";

CompletableFuture.supplyAsync(supplier).thenApply(str -> str.toUpperCase()).thenAccept(str -> System.out.println(str));

As before, the 'supplyAsync' method is invoked with a 'Supplier' instance that returns the text “Hello”. The 'thenApply' method is invoked on the result of the 'supplyAsync' method which converts the result to uppercase. The 'thenAccept' is then invoked on the result of the 'thenApply' which simply prints the result. This code will display like so:

HELLO

Async Methods

Most of the callback methods have an 'async' version which has the “async” text appended to the method name. The 'async' version of the method helps to further parallelise the operations by executing the callback task in a separate thread. The following code demonstrates the 'thenAcceptAsync' method:

Supplier<String> supplier = () -> "World!";

CompletableFuture.supplyAsync(supplier).thenAcceptAsync((str) -> System.out.println("Hello "+str));

In this particular case, the lambda expression provided to the 'thenAcceptAsync' method will be executed in a separate thread.

Combining Results

‘CompletableFuture’ provides a method called 'thenCombine' which can be used to perform some actions after two independent Futures are completed. It accepts as argument a 'CompletableFuture' instance and a 'BiFunction' instance and applies the 'BiFunction' to the results of the ‘CompletableFuture’, on which it’s invoked and passed as an argument. The following code demonstrates this:

CompletableFuture<Double> lengthOfRectangle = CompletableFuture.supplyAsync(() -> 5.6);

CompletableFuture<Double> widthOfRectangle = CompletableFuture.supplyAsync(() -> 2.4);

System.out.println("Calculating the area of the Rectangle:");

CompletableFuture<Double> combinedFuture = widthOfRectangle.thenCombine(lengthOfRectangle, (length, width) -> length*width);

System.out.println("Area is "+combinedFuture.get());

Here the 'lengthOfRectangle' is a ‘CompletableFuture’ that returns a Double value corresponding to the length of a Rectangle. Similarly, the 'widthOfRectangle' is a ‘CompletableFuture’ that returns a Double value corresponding to the width of a Rectangle. The 'thenCombine' method is invoked on the 'widthOfRectangle' future. It accepts the 'lengthOfRectangle' instance. It also accepts a 'BiFunction' instance which is implemented via a lambda expression that multiplies the results of 'lengthOfRectangle' and 'widthOfRectangle' and returns the 'combinedFuture' corresponding to the area of the Rectangle. Essentially, the 'thenCombine' method waits for both the Futures to complete and then performs the specified action on the results of the Futures. This code prints the following output:

Calculating the area of the Rectangle:

Area is 13.44

It's worth noting that there's also an 'async' version of the 'thenCombine' method that performs the specified task in a new thread.

Combining Multiple Futures

allOf

‘CompletableFuture’ provides a static method called 'allOf' which can be used to perform some actions after several Futures are completed. The following code demonstrates this:

CompletableFuture<String> cFuture1 = CompletableFuture.supplyAsync(() -> "This");

CompletableFuture<String> cFuture2 = CompletableFuture.supplyAsync(() -> "is a ");

CompletableFuture<String> cFuture3 = CompletableFuture.supplyAsync(() -> "Java");

CompletableFuture<String> cFuture4 = CompletableFuture.supplyAsync(() -> "Program");

CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(cFuture1, cFuture2, cFuture3, cFuture4);

combinedFuture.get();

Here, four ‘CompletableFuture’ instances are created, each of which returns a String value. The 'allOf' method is invoked with each of the ‘CompletableFuture’ instances. When the 'combinedFuture.get' method is invoked, it waits for all the futures to be completed.

anyOf

The 'anyOf' method is also a static method on the ‘CompletableFuture’ class and returns a ‘CompletableFuture’ that gets completed when any of the futures with which it is invoked is completed:

CompletableFuture<String> cFuture1 = CompletableFuture.supplyAsync(() -> "This");

CompletableFuture<String> cFuture2 = CompletableFuture.supplyAsync(() -> "is a ");

CompletableFuture<String> cFuture3 = CompletableFuture.supplyAsync(() -> "Java");

CompletableFuture<String> cFuture4 = CompletableFuture.supplyAsync(() -> "Program");

CompletableFuture<Object> anyofFuture = CompletableFuture.anyOf(cFuture1, cFuture2, cFuture3, cFuture4);

System.out.println("Waiting for any of the futures to complete");

String result = (String)anyofFuture.get();

System.out.println("Result:"+result);

In the example above, ‘CompletableFuture’ instances are created and each return a String value. The 'anyOf' method is invoked with each of the ‘CompletableFuture’ instances. The get method is invoked on the resultant future which returns a result when any one of the Futures is completed. It returns the result by that Future. So this code prints the following output:

Waiting for any of the futures to complete

Result:This

Exception Handling

‘CompletableFuture’ provides a method called 'exceptionally' for exception handling. If an error occurs, the code within the 'exceptionally' method gets executed. The following code demonstrates this:

String name = null;

CompletableFuture<String> cFuture = CompletableFuture.supplyAsync(() -> {

if(name != null) {

return "Hello "+name;

}

else{

throw new RuntimeException("Null Input");

}

}).exceptionally(ex -> {

System.out.println("Wrong input: " + ex.getMessage());

return "Unknown!";

});

System.out.println("Result : " + cFuture.get());

In the example above, a 'RuntimeException' is thrown within the 'supplyAsync' method due to which the code within the exceptionally method gets executed. So this code will print the following output:

Wrong input: java.lang.RuntimeException: Null Input

Result : Unknown!

In this article, we covered some of the most important features in the ‘CompletableFuture’ class in order to understand best practises on how it can be used to overcome the limitations of the Future interface. In addition to the methods listed above, there are many approaches available on the ‘CompletableFuture’ interface. Covering each method would be beyond the scope of this article, but we recommend checking out the API documentation for further reading.