Project loom: the light at the end of the tunnel

This article was first published in bazlur.com

In this article, I'm going to write an echo server in multiple ways. The server will not do anything; it will just keep listening for connections on a port and echo back whatever is sent. Then I will compare which way wins.

Let's first write the single-threaded echo sever-

package com.bazlur;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;

public class Day016 {
  public static void main(String[] args) throws IOException {
    if (args.length != 1) throw new IllegalArgumentException("Please specify port");
    var port = Integer.parseInt(args[0]);

    var serverSocket = new ServerSocket(port);
    System.out.println("Started server on port " + port);

    while (true) {
      try (var socket = serverSocket.accept();
           var in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
           var out = new PrintWriter(socket.getOutputStream(), true)) {
        String line;
        while ((line = in.readLine()) != null) {
          out.println(line.toUpperCase());
        }
      } catch (IOException e) {
        System.out.println("Was unable to establish or communicate with client socket:" + e.getMessage());
      }
    }
  }
}

The above code is simple and intuitive. We have used a ServerSocket that listens to a port and waits for an incoming connection. Whenever a client connects to it, it reads whatever is sent to it and replies back.

The only problem is, it cannot handle multiple connections simultaneously. If one client connects to it, no other will be able to make it to the server at that particular time. Once that specific client disconnect, other will get a chance.

However, we can change the experience just by allowing multiple threads to handle each connection here.
Let's look at the code-

package com.bazlur;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executors;

public class Day017 {
  public static void main(String[] args) throws IOException {
    if (args.length != 1) throw new IllegalArgumentException("Please specify port");
    var port = Integer.parseInt(args[0]);

    var executorService = Executors.newCachedThreadPool();
    var serverSocket = new ServerSocket(port);
    System.out.println("Started server on port " + port);

    while (true) {
      var socket = serverSocket.accept();
      executorService.submit(() -> handle(socket));
    }
  }

  private static void handle(Socket socket) {
    try (socket;
         var in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         var out = new PrintWriter(socket.getOutputStream(), true)) {
      String line;
      while ((line = in.readLine()) != null) {
        out.println(line.toUpperCase());
      }
    } catch (IOException e) {
      System.out.println("Was unable to establish or communicate with client socket:" + e.getMessage());
    }
  }
}

In the above code, we haven't done anything extraordinary. We have just created a ThreadPool using Executors, and whenever a client connects, we put that in a thread to handle that connection.
This server will do just fine dealing with many connections simultaneously. However, we are not just happy with the word many. We want to know exactly how many connections it can handle.

The answer lies in how many threads we can spawn. It is certainly not limited by the socket. Because we know modern OS practically can handle millions of open sockets at a time. So let's rewrite the question again- Can OS handle such an amount of Threads?

Well, the answer is no. Threads are limited and heavy. Let's find out how much we can write a simple java program.

package com.bazlur;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

public class ThreadCount {
  public static void main(String[] args) {
    AtomicInteger threadCount = new AtomicInteger();
    for (; ; ) {
      Thread thread = new Thread(() -> {
        var count = threadCount.incrementAndGet();
        System.out.println("count = " + count);
        LockSupport.park();
      });
      thread.start();
    }
  }
}

This little program will demonstrate how many threads we can create on a machine.
We can conclude that we are limited by the number of threads we can have and that many connections we can handle simultaneously.

Now the bigger question is, is there any alternative solution to it? We cannot have more threads; what else we can do to achieve an outstanding throughput.

Well, the answer is Non-Blocking IO. While doing the IO operation, the relationality is that most threads are just waiting idle, doing nothing. It connects to a client, and the client keeps communicating with a dedicated thread. But the communication is slow, and most of the time, the thread is idle, sitting there doing nothing. Why not use this thread's idle time to serve other clients.
That seems to be a fantastic idea. Let's do this.

package com.bazlur;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;

public class Day018 {
  static Map<SocketChannel, Queue<ByteBuffer>> pendingData = new HashMap<>();

  public static void main(String[] args) throws IOException {
    if (args.length != 1) throw new IllegalArgumentException("Please specify port");
    var port = Integer.parseInt(args[0]);

    ServerSocketChannel socketChannel = ServerSocketChannel.open();
    socketChannel.bind(new InetSocketAddress(port));
    socketChannel.configureBlocking(false);

    var selector = Selector.open();
    socketChannel.register(selector, SelectionKey.OP_ACCEPT);

    while (true) {
      var select = selector.select();
      if (select == 0) continue;
      var selectionKeys = selector.selectedKeys();
      var iterator = selectionKeys.iterator();

      while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        if (key.isValid()) {
          if (key.isAcceptable()) {
            accept(key);
          } else if (key.isReadable()) {
            read(key);
          } else if (key.isWritable()) {
            write(key);
          }
        }
        iterator.remove();
      }
    }
  }

  private static void accept(SelectionKey selectionKey) throws IOException {
    var channel = (ServerSocketChannel) selectionKey.channel();
    var sc = channel.accept(); //never null
    System.out.println("Connected: " + sc);
    pendingData.put(sc, new ConcurrentLinkedDeque<>());
    sc.configureBlocking(false);
    sc.register(selectionKey.selector(), SelectionKey.OP_READ);
  }

  private static void read(SelectionKey selectionKey) throws IOException {
    var channel = (SocketChannel) selectionKey.channel();
    var byteBuffer = ByteBuffer.allocateDirect(80);
    var read = channel.read(byteBuffer);
    if (read == -1) {
      channel.close();
      pendingData.remove(channel);
      return;
    }

    if (read > 0) {
      processBuffer(byteBuffer);
      pendingData.get(channel).add(byteBuffer);
      selectionKey.interestOps(SelectionKey.OP_WRITE);
    }
  }

  private static void write(SelectionKey selectionKey) throws IOException {
    var channel = (SocketChannel) selectionKey.channel();
    var queue = pendingData.getOrDefault(channel, new ArrayDeque<>());
    while (!queue.isEmpty()) {
      var buff = queue.peek();
      if (buff.position() != buff.limit()) {
        buff.flip();
      }
      var written = channel.write(buff);
      if (written == -1) {
        channel.close();
        pendingData.remove(channel);
        return;
      }
      if (buff.hasRemaining()) return;
      queue.remove();
    }
    selectionKey.interestOps(SelectionKey.OP_READ);
  }

  private static void processBuffer(ByteBuffer byteBuffer) {
    byteBuffer.flip();
    StringBuilder line = new StringBuilder();
    line.append(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.getDefault())))
            .append("<<server>>: ");

    for (int b = 0; b < byteBuffer.limit(); b++) {
      var b1 = byteBuffer.get(b);
      line.append(Character.toUpperCase((char) b1));
    }
    var s = line.toString();
    var bytes = s.getBytes();
    byteBuffer.clear();
    byteBuffer.put(bytes);
    System.out.println("Executing from: " + Thread.currentThread());
  }
}

The above code seems a lot complex than the earlier and a lot more lines than the previous codebase. However, with the above code, we can achieve whatever we want to achieve. This single-threaded would give us a better throughput. We can even make it even better if we throw a few more threads to it.

We achieved it, but with a complex programming model, which isn't easy. It has many other problems; one particular is that it will become harder to debug with this complex model.

Is there any other way?

Well, that's where we see the light at the end of the tunnel, the Project loom. It would enable just writing million virtual threads without making any effort, and the programming model would remain the same, and we will achieve the outstanding throughput we intended to. Let's see an example-

package com.bazlur;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executors;

public class Day018_1 {
  public static void main(String[] args) throws IOException {
    if (args.length != 1) throw new IllegalArgumentException("Please specify port");
    var port = Integer.parseInt(args[0]);
    var serverSocket = new ServerSocket(port);
    System.out.println("Started server on port " + port);

    try (var executors = Executors.newVirtualThreadExecutor()) {
      while (true) {
        var socket = serverSocket.accept();
        executors.submit(() -> handle(socket));
      }
    }
  }

  private static void handle(Socket socket) {
    try (socket;
         var in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         var out = new PrintWriter(socket.getOutputStream(), true)) {
      String line;
      while ((line = in.readLine()) != null) {
        out.println(line.toUpperCase());
      }
    } catch (IOException e) {
      System.out.println("Was unable to establish or communicate with client socket:" + e.getMessage());
    }
  }
}

The programming model is the same as we did earlier. We are just using a different Executor, that's it. But this would enable us to have millions of threads.

Cheers.

20