Java Async/Concurrent Programming Model.

In this post, we are going to look at the asynchronous programming in Java, its need, evolution, and some examples.

Introduction

As we all know, hardware and software go hand in hand. There are a lot of advancements in recent years on the way we build software, but hardware growth is relatively slow. But with multi-core processors fairly available to commodity hardware and consumers, it is high time that we developers write code in a way to leverage the available processors and make sure we capitalize on the precious CPU cycles. For any computations to be performed, OS creates a thread and assigns it to the CPU to execute.

There are two ways to take advantage of the CPU Cycles.

  1. Parallel processing - This is to make sure that all the cores are being used by executing threads in parallel. If the underlying hardware has 4 core processors, then this means using all 4 cores at the same time. If you have enough threads created in the way you have programmed, then OS will take care of assigning the threads to the CPU core.
  2. Concurrent processing - This is to make sure that a given CPU core executes multiple threads. For this to happen, our program should take care of not blocking the thread and free the CPU core to execute other threads. For example, if a part of your program is consuming a REST service or executing an expensive DB query, then thread that is executing the part of the program is blocked until it gets are response back.

Writing code to support Parallel processing is a good way and would be enough for typical CRUD applications but writing code to support Concurrent processing will make your services highly scalable and performant.

Let’s consider an example service that we will try to build in this post: User Creation. Below are the steps that should happen when a new user is created.

  • Create a new user and save it in the database. Once the user is successfully created, execute below steps
  • Send Email notification to the user.
  • Trigger a REST service to build User recommendations based on his profile information.
  • Post “User Created” Event to a message queue to be consumed by the downstream process for reporting, analytics, etc.

Below examples will show building the above User service in Synchronous/Sequential/Blocking Vs Asynchronous/Parallel/Non-Blocking.

Sequential/Blocking Programming model

In this way of the programming model, we will be executing the actions sequentially one by one

public void createUser(UserDto user){
    userService.saveUser(user);
    notificationService.send(user);
    recommendationService.initiate(user);
    eventService.sendEvent(user, "USER_CREATED");
}

As seen above, each action is executed one by one. If we assume that each service takes 2 seconds on average to execute, it will take 8s for it to complete. Also, the main thread that executes the createUser() method is blocked until all 4 actions are completed.

Concurrent/Non-Blocking Programming model

Java 5 introduced Futures which allowed us to execute a piece of code in a separate thread that returned a Future object. We can then use that Future object to check the status of the execution and proceed further.

This model allowed us to wrap the time-consuming process in a method ( that submits to the ExecutorService which itself a wrapper around Threads API) that returns Future which allowed the executing thread to do other meaningful tasks and then check the Future result for its completion.

So in this model, we will break the steps and allow them to be executed in multiple threads ( thus taking advantage of extra cores and also the current core) and check or assemble results before returning to the user.

Future released part of Java 5 has its own flaws which resulted in CompletableFutures part of Java 8. The above example has been rewritten to use the concurrent programming model below.

CompletableFutures requires a custom Executor with a fixed size thread pool and submit the work to this thread pool.

Note that just to simulate the delay, I have used 2s sleep.

class UserService{
    private final Executor executor =  Executors.newFixedThreadPool(500);
    NotificationService notificationService = new NotificationService();
    EventService eventService = new EventService();
    RecommendationService recommendationService = new RecommendationService();

    public CompletableFuture<Void> create(User user){
        long start = System.currentTimeMillis();
        return this.createUser(user).thenCompose(
                uDto -> CompletableFuture.allOf(
                        CompletableFuture.supplyAsync(() -> notificationService.sendEmailNotification(uDto),executor),
                        CompletableFuture.supplyAsync(() -> eventService.pushEvenToRedis(uDto),executor),
                        CompletableFuture.supplyAsync(() -> recommendationService.recommend(uDto),executor)
                       )
        ).whenComplete( (a,ex) -> {
        long end = System.currentTimeMillis();
        System.out.println("Time taken: "  +  (end - start));
        System.out.println("All Threads Completed");
        } );
    }

    public CompletableFuture<User> createUser(User u){
        try { Thread.sleep(2000); }catch(Exception e){ }
        return CompletableFuture.completedFuture(u);
    }
}

class NotificationService {
    public CompletableFuture<String> sendEmailNotification(User user){
        try { Thread.sleep(2000); }catch(Exception e){ }
        return CompletableFuture.completedFuture("Success");
    }
}

class EventService {
    public CompletableFuture<String> pushEvenToRedis(User user){
        try { Thread.sleep(2000); }catch(Exception e){ }
        return CompletableFuture.completedFuture("Success");
    }
}

class RecommendationService {
    public CompletableFuture<String> recommend(User user){
        try { Thread.sleep(2000); }catch(Exception e){ }
        return CompletableFuture.completedFuture("Success");
    }
}

So what happens above is that instead of returning the regular value we return the Future wrapped return value.

Now instead of taking 8 seconds to execute this async version uses just 4 seconds.

Note: The above code doesn’t use @EnableAsync and @Async annotations to showcase the plain java version of CompletableFuture.

Happy Coding.