Do i really need reactor netty?

Reactor creators state in their website about reactor that reactor is "Well-suited for a microservices architecture, Reactor offers backpressure-ready network engines for HTTP (including Websockets), TCP, and UDP." And the answer to your question is the annoying answer, it depends. In general, reactor netty is a framework that will let you “squeeze” the most from your resources with its main feature Non-Blocking IO, but we will get to it soon. The questions that you need to ask yourself before picking reactor-netty is:

  • Does my app scalable?
  • Do I need to fully utilze my resources?
  • Does my app do Blocking IO operations (access to DB, files, etc)?
  • Am I willing to maintain a slightly more complex code?

If you answered yes to these questions, reactor will probably solve some problems for you and improve performance.

Do I need to use Spring Framework to work with reactor netty?

Absolutely not! Spring Framework has the reactive extension which allows to working with netty and reactor, but we won’t use it in this guide, and if you don’t need the overhead and complexity of Spring and what comes with it, this is great for you!

Let’s get started

You can clone this project from GitHub with all of the following code. I’ll walk through it in this guide. We will start a new clean maven project with java 8, with this pom.xml, which will add to our classpath the reactor netty library:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>reactor-netty-quick-start</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>
<dependencies>
    <dependency>
        <groupId>io.projectreactor.netty</groupId>
        <artifactId>reactor-netty</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.maven.shared</groupId>
        <artifactId>maven-shared-utils</artifactId>
        <version>3.2.1</version>
    </dependency>

</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>Dysprosium-SR10</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
</project>

Starting your HTTP Server

Create new class, lets call it Example with a main function start your HTTP Server use the following:

    DisposableServer server = HttpServer.create()
                    .host("0.0.0.0")
                    .port(8080)
                    .route(routes ->
                            routes
                            .get("/", Example::hello)
                            .post("/", Example::saveData)
                          )
                    .bindNow();
    server.onDispose().block();

The following code will create a new HTTP server with binding to port 8080, and 0.0.0.0 will indicate netty to use all interfaces available.

Create some routes

As you can see we have created 2 routes to our application:

  • GET / - The method hello() in our Example class will handle it
  • POST / - Will be handled with saveData()

Let’s look at hello method:

private static Publisher<Void> hello(HttpServerRequest req, HttpServerResponse res) {
    return res.status(HttpResponseStatus.OK).sendString(Mono.just("OK"));
}

Any method that handles route need to have the signature of returning Publisher<T> and taking as argument HttpServerRequest and HttpServerResponse. Publisher<Void> is just a way to say that our method is not returning anything that requires further actions - it’s the end of the pipeline. Publisher is an abstract class, and the classes Mono and Flux are implementations of it, I won’t go into it here, reactive programming is a topic that needs more than few paragraphs to cover, but you can read more about it here.

our hello method simply uses the HttpServerResponse to return a response with the string OK and response code 200. We need to wrap our response in a Publisher as we never returning regular types in reactor. The solution is just to use the Mono.just("<your message>"). It will return Mono<String> object which is a Publisher implementations.

So we already have our first Hello World Reactor Netty app, but we would like to do more!

Let’s raise the level a bit now. Assume that we have a Blocking IO operation like saving a file to disk. We don’t want to block our application on every write to disk, we don’t want that when we are saving do disk we won’t be able to respond 200 OK Hello in our other route. We will a thread pool that will handle our saving operations without interfering with the application flow.

We will create new thread pool:

tpe = new ThreadPoolExecutor(20, 40, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50));

We created a thread pool with 20 threads, which can scale to 40 threads on pressure, and a BlockingQueue with a size of 50. That means that if we will have more than 50 requests that we didn’t start to handle yet, we will reject the next requests until we can handle more. This configuration allows us to control the pressure that we want to handle in our app, and we can tune this number to our needs.

Now we will use it in our saveData flow:

private static Publisher<Void> saveData(HttpServerRequest request HttpServerResponse response) {
    return request
            .receive()
            .aggregate()
            .asByteArray()
            .publishOn(Schedulers.fromExecutorService(tpe))
            .handle((data, sink) -> {
                try {
                    String fileName = String.format("./example-%s.txt", UUID.randomUUID());                        
                    FileUtils.writeByteArrayToFile(new File(fileName), data);
                    sink.next(data.length);
                }
                catch (IOException e) {
                    sink.error(e);
                }
            })
            .doOnSuccess(dataLegnth -> {
                response.status(200).sendString(Mono.just(String.format("OK Saved %d bytes.", dataLegnth))).then().subscribe();
            })
            .doOnError(ex -> {
                response.status(500).sendString(Mono.just(String.format("ERROR %s" ,ex.getMessage())).then().subscribe();
            })
            .then();
}

I will go step by step:

  1. request.receive() - We will start to pull data from our HttpRequest, by default, we will get chunks of 1024 ByteBufFlux.
  2. .aggregate() - In this example, we would like to get all of the request body once, and not in chunks, so we will aggregate the chunks to a single ByteBufMono. As you can see, Flux is used for single or many entities in your data, Mono is for 0 or 1 entities.
  3. .asByteArray() - We would like to access the actual data, this function will cast our ByteBufMono to Mono<byte[]> which is an actual data type that we can use.
  4. .publishOn(Schedulers.fromExecutorService(tpe)) - We declare to reactor that we want our next actions on the data to be published to a thread from our tpe Thradpool. We don’t want it to happen on our main thread and block other requests.
  5. .handle((data, sink) -> - handle will allow us to run Throwing code, and let us decide how do we want to treat it. An important thing to understand is that only inside operations like map, flatMap, handle, etc, we can access the actual type that inside Mono or Flux. In this handle function our byte[] will be available as data which is the posted bytes that we aggregated before. We will save it to a file named example-{random-guid}.txt to avoid duplicates, and if the write succeeded we will sink to our next part of the pipeline data.length, just as an example. The sinked type will be Mono<Integer>. On IOException we will sink.error(ex), so we can access this exception later on our response.
  6. .doOnSuccess(dataLength -> - Will be called if we havn’t sinked an error from our handle. Inside our anonymous function we will:
    1. response.status(200).sendString(Mono.just(String.format("OK Saved %d bytes.", dataLegnth))) - Generate our response message.
    2. .then() - End our pipeline
    3. .subscribe() - Wait to then singal of finishing.
  7. .doOnError(ex -> - Basically like on success, just for sink.errror.
    1. response.status(500).sendString(Mono.just(String.format("ERROR %s" ,ex.getMessage())) - Will return response code 500 and the exception message.
    2. .then()
    3. .subscribe()
  8. .then() - end of the pipleline.

And here we got Non-Blocking saveData route.

Testing your Server

So after running the server, you can test it with curl. Here is a quick example:

curl -d "Test data" http://localhost:8080/