A Java Parallel Server Through the Ages

Background

A server has been written in Java:

public void serve(int port) throws IOException, UncheckedIOException {
  var server = new ServerSocket(port);
  while (true) {
    try (var socket = server.accept()) {
      var request = new Request(socket);
      var dataA = taskA(request);
      var dataB = taskB(request);
      var dataC = taskC(request);
      var response = aggregate(dataA, dataB, dataC);
      response.write(socket);
    }
  }
}

The server accepts an incoming connection (line 4), reads a request from a socket (line 5), executes three tasks (lines 6-8), aggregates the results of these tasks (line 9) and replies to the client (line 10).

This server is purely sequential: Requests are handled one at a time and each request is processed sequentially. The topic for this page is the parallelization of this server, using different programming styles.

Creating threads, the old-fashioned way

It is fairly straightforward to handle independent requests in parallel by creating and starting a new thread for each request:

public void serve(int port) throws IOException {
  var server = new ServerSocket(port);
  while (true) {
    var socket = server.accept();
    new Thread(() -> handleRequest(socket)).start();
  }
}

private void handleRequest(Socket socket) {
  IOException ioe = null;
  try (socket) {
    var request = new Request(socket);
    var dataA = taskA(request);
    var dataB = taskB(request);
    var dataC = taskC(request);
    var response = aggregate(dataA, dataB, dataC);
    response.write(socket);
  } catch (UncheckedIOException e) {
    ioe = e.getCause();
  } catch (IOException e) {
    ioe = e;
  }
  if (ioe != null)
    logger.severe(String.format("I/O error: %s", ioe.getMessage()));
}

The request handling code is moved into a separate method. With each incoming connection, a thread is created and starts to execute this method. The processing of each request is still sequential, but multiple request can be processed in parallel. Note that we have also improved the robustness of our server: If something goes wrong while handling a request, the corresponding thread terminates but other requests (current and future) can still be processed.

Using thread pools

A drawback of the previous approach is that it can potentially create large number of threads. This is undesirable because thread creation (and teardown) has a cost and because having too many threads run concurrently can impact the performance of the server. Instead, thread pools can be used to limit the number of threads and reuse existing threads to handle requests:

private final ExecutorService exec = Executors.newFixedThreadPool(16);

public void serve(int port) throws IOException {
  var server = new ServerSocket(port);
  while (true) {
    var socket = server.accept();
    exec.execute(() -> handleRequest(socket));
  }
}

The required modifications to the code are minimal; in particular, method handleRequest does not change at all. All that changes is to replace new Thread(() -> handleRequest(socket)).start() with exec.execute(() -> handleRequest(socket)). Here, exec is chosen to be a pool of 16 worker threads. The server can then process up to 16 requests in parallel. Additional requests will be queued in the ExecutorService and picked up by worker threads after they finish their previous task. Thread pools can be customized based on services and hardware used: minimum and maximum number of threads, idle time before threads are terminated, queue size and type (FIFO, priority), etc. Given how easy it is to use thread pools, there is no reason to stick to a style that creates threads on-demand. All the remaining code on this page will be using thread pools.

Adding concurrency within requests

In the previous implementation, requests are handled in parallel but each request in processed sequentially in a single thread. Suppose tasks A, B and C are independent and time-consuming. For performance, they could be processed in parallel within each request. This could be done by creating three threads (fork) within the request-handling thread and waiting for them to finish (join) to aggregate the results. This fork/join pattern (sometimes also called scatter/gather) can also be implemented using a thread pool:

private void handleRequest(Socket socket) {
  IOException ioe = null;
  try (socket) {
    var request = new Request(socket);
    var dataA = exec.submit(() -> taskA(request));
    var dataB = exec.submit(() -> taskB(request));
    var dataC = exec.submit(() -> taskC(request));
    try {
      var response = aggregate(dataA.get(), dataB.get(), dataC.get());
      response.write(socket);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    } catch (ExecutionException e) {
      logger.severe(String.format("task failed: %s", e.getCause().getMessage()));
    }
  } catch (UncheckedIOException e) {
    ioe = e.getCause();
  } catch (IOException e) {
    ioe = e;
  }
  if (ioe != null)
    logger.severe(String.format("I/O error: %s", ioe.getMessage()));
}

Method serve does not change. Inside method handleRequest, the calls to taskA, taskB and taskC are executed by worker threads from the pool, in parallel. Method submit returns an object of type Future, which can be used to retrieve the output of each task. Method Future.get blocks the calling thread until the task finishes. It then returns the value produced by the task or rethrows an exception if the task failed. The three calls to get together implement the join part of the fork-join (the calls to submit implement the fork part). We now have a server that processes requests and parts of requests in parallel, all in the same pool of threads.

Minimizing blocking

The code above uses a worker thread from the pool to wait for the results of other workers (by making calls to get). This is undesirable for at least two reasons:

For these reasons, a newer trend has been to minimize thread blocking. The idea is that, instead of waiting for data, a thread can schedule the aggregation of dataA, dataB, and dataC for later, after all three tasks have completed.

A non blocking version of the server can be written as follows:

private void handleRequest(Socket socket) {
  var request = CompletableFuture.supplyAsync(() -> new Request(socket));
  var dataA = request.thenApplyAsync(Process::taskA);
  var dataB = request.thenApplyAsync(Process::taskB);
  var dataC = request.thenApplyAsync(Process::taskC);
  var response =
      dataA.thenCompose(a ->
          dataB.thenCompose(b ->
              dataC.thenApply(c ->
                  aggregate(a, b, c)
              )));
  var reply = response.thenAccept(res -> res.write(socket));
  reply.whenComplete((v, e) -> {
    try {
      socket.close();
    } catch (IOException ex) {
      if (e == null) e = ex;
    }
    if (e instanceof UncheckedIOException)
      e = e.getCause();
    if (e instanceof IOException)
      logger.severe(String.format("I/O error: %s", e.getMessage()));
    else if (e != null)
      logger.severe(String.format("task failed: %s", e.getMessage()));
  });
}

In this implementation, tasks are submitted to a thread pool via a slightly different mechanism in order to obtain values of type CompletableFuture instead of Future. This richer form of future was introduced in Java 8.

Methods thenApply/thenApplyAsync implement a form of map: given a future of type T and a function from T to U, they produce a future of type U (sometimes called a continuation of the original future). When the value of type T becomes available (that is, when the original future completes), the computation of the function can begin. When this function terminates, a value of type U is produced and the second future is completed. Whether the function runs in the same thread as the original future or in a new thread from the pool is controlled by the Async suffix. In the case of the server:

var dataA = request.thenApplyAsync(Process::taskA);

takes a CompletableFuture<Request> (which will complete when the request has been read from the socket) and schedules a call to taskA to produce a CompletableFuture<Adata> (which will complete after the request has been built and function taskA has been applied to it). The call is non-blocking and no call to taskA takes place at that time. By using thenApplyAsync, we make sure that three threads will run taskA, taskB and taskC in parallel.1

Aggregating multiple futures is a little more complex. Given a future of type T and a function from T to a future of type U, method thenCompose produces a future of type U by applying the function to the value produced by the first future. Note that using thenApply would produce a future of a future of type U instead. Method thenCompose is traditionally called flatMap in functional languages (it flattens what would be a future of future into a simple future).

Variable response has type CompletableFuture<Response> and method thenAccept is used to schedule a callback to write the response to the socket.2

Methods thenApply, thenCompose, thenAccept, … only perform a task if the underlying future is successful. Method onComplete is more general and can be used to schedule code that will run whether the future terminates normally or abnormally. In the call:

reply.whenComplete((v, e) -> ...);

variable v will contain the value produced by the future if it was successful, and variable e will contain an exception if the future failed (one of v or e will be null). Method onComplete is used here to close the socket (whether the request was processed successfully or not) and to log exceptions (if any).

Note that method handleRequest only schedules callbacks and continuations. It does not perform any request processing itself and does not make any blocking calls. Accordingly, it can be (quickly) run by the accepting thread, without the need to create an asynchronous task. Method serve is thus simplified:

public void serve(int port) throws IOException {
  var server = new ServerSocket(port);
  while (true) {
    var socket = server.accept();
    handleRequest(socket);  // instead of exec.execute(() -> handleRequest(socket))
  }
}

The same server in Scala

This non-blocking programming style looks nicer in Scala than in Java:

// this is Scala code
def serve(port: Int): Unit = {
  val server = new ServerSocket(port)
  while (true) {
    val socket = server.accept()
    handleRequest(socket)
  }
}

private def handleRequest(socket: Socket): Unit = {
  val request = Future(new Request(socket))
  val dataA = request.map(taskA)
  val dataB = request.map(taskB)
  val dataC = request.map(taskC)
  val response = for {
    a <- dataA
    b <- dataB
    c <- dataC
  } yield aggregate(a, b, c)
  val reply = response.map(_.write(socket))
  reply.onComplete(_ => socket.close())

  reply.failed foreach {
    case e: IOException => logger.severe(String.format("I/O error: %s", e.getMessage))
    case e: UncheckedIOException => logger.severe(String.format("I/O error: %s", e.getCause.getMessage))
    case e: Exception => logger.severe(String.format("task failed: %s", e.getMessage))
  }
}

The main reason the Scala code looks nicer is due to the for/yield loop construct, which is used here to aggregate three futures into one.3 It is actually compiled into calls to map and flatMap, very similar to the Java version. The code could have been written as:

// this is Scala code
val response =
  dataA.flatMap(a =>
    dataB.flatMap(b =>
      dataC.map(c =>
        aggregate(a, b, c)
      )))

More than three tasks?

In the server example, there is a need to aggregate the results of three futures into a single future. The Scala code for this step looks better than the Java code, but neither would generalize nicely to a larger number of tasks (50 nested calls to thenCompose or 50 -> in a for loop would be impractical). A more general approach is to combine a list of futures into a future of a list, then apply any needed aggregation to this list using thenApply/map.

Specifically, the problem is to transform a value of type List<CompletableFuture<T>> into a value of type CompletableFuture<List<T>>. In Scala, method Future.sequence does exactly that. In Java, however, there is no standard library method that achieves this transformation. It can easily be implemented using method thenCombine, which produces a future of type V from a future of type T, a future of type U and a function from (T,U) to V:

public static <T> CompletableFuture<List<T>> sequence(Iterable<? extends CompletableFuture<? extends T>> stages) {
  CompletableFuture<List<T>> f = CompletableFuture.completedFuture(new LinkedList<>());
  for (var stage : stages)
    f = stage.thenCombine(f, (t, list) -> {
      list.add(t);
      return list;
    });
  return f;
}

Note that this implementation is non-blocking and does not involve any additional thread.

Do it yourself with promises

In some circumstances, the combination of thenApply, thenCompose, thenAccept, thenCombine … is insufficient and new futures need to be created from scratch. A future can be created with no value and given a value later, often from a different thread. Such futures are often called promises. For instance, the thenApply method could be implemented as:

public <U> CompletableFuture<U> thenApply(Function<T, U> f) {
  var promise = new CompletableFuture<U>();
  whenComplete((v, e) -> {
    if (e != null)
      promise.completeExceptionally(e);
    else
      try {
        promise.complete(f.apply(v));
      } catch (Exception ex) {
        promise.completeExceptionally(ex);
      }
  });
  return promise;
}

Final remarks

The code on this page uses (and abuses) Java 10’s var construct in an attempt to not get distracted by the various future types of Java (Future, FutureTask, CompletableFuture and CompletionStage). It also makes the Java code closer to the Scala implementation. Also, most methods are assumed to throw UncheckedIOException instead of IOException because the arguments to supplyAsync, thenApply, thenAccept, … cannot throw checked exceptions and the code is harder to follow with try/catch blocks inside the lambdas. As with other “functional” aspects, this programming style works better in Scala than in Java. The Java functional interfaces are bloated (CompletableFuture has 60 public methods compared to 23 methods in Scala’s Future), in part because Java still needs to deal with primitive types and void methods (Java’s types Function, Consumer, UnaryOperator, IntFunction, ToIntFunction, DoubleFunction, IntToDoubleFunction, DoubleToIntFunction, Predicate, IntPredicate, DoublePredicate, IntBinaryOperator, IntConsumer, etc. all correspond to Function1 in Scala). Finally, we should also mention that the fork/join pattern can also be implemented without blocking threads by using Java’s ForkJoinPool, which was added in Java 7. In practice, ForkJoinPool tends to be unwieldy and the CompletableFuture style is preferable, even if it takes some getting used to.


  1. More accurately, all three tasks will be submitted to the thread pool concurrently. They will only run in parallel if enough threads are available in the pool to run them.↩︎

  2. The difference between thenApply and thenAccept is that thenAccept takes a function of type void and produces a future with no useful value.↩︎

  3. Other ways in which the Scala code is superior here include: pattern-matching, η-conversion and the Future.failed method.↩︎