Do i really need reactor netty?
Reactor creators state in their website about reactor that reactor is "Well-suited for a microservices architecture, Reactor offers backpressure-ready network engines for HTTP (including Websockets), TCP, and UDP."
And the answer to your question is the annoying answer, it depends. In general, reactor netty is a framework that will let you “squeeze” the most from your resources with its main feature Non-Blocking IO, but we will get to it soon.
The questions that you need to ask yourself before picking reactor-netty is:
- Does my app scalable?
- Do I need to fully utilze my resources?
- Does my app do
Blocking IOoperations (access to DB, files, etc)? - Am I willing to maintain a slightly more complex code?
If you answered yes to these questions, reactor will probably solve some problems for you and improve performance.
Do I need to use Spring Framework to work with reactor netty?
Absolutely not! Spring Framework has the reactive extension which allows to working with netty and reactor, but we won’t use it in this guide, and if you don’t need the overhead and complexity of Spring and what comes with it, this is great for you!
Let’s get started
You can clone this project from GitHub with all of the following code. I’ll walk through it in this guide. We will start a new clean maven project with java 8, with this pom.xml, which will add to our classpath the reactor netty library:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>reactor-netty-quick-start</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.maven.shared</groupId>
<artifactId>maven-shared-utils</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Dysprosium-SR10</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
Starting your HTTP Server
Create new class, lets call it Example with a main function start your HTTP Server use the following:
DisposableServer server = HttpServer.create()
.host("0.0.0.0")
.port(8080)
.route(routes ->
routes
.get("/", Example::hello)
.post("/", Example::saveData)
)
.bindNow();
server.onDispose().block();
The following code will create a new HTTP server with binding to port 8080, and 0.0.0.0 will indicate netty to use all interfaces available.
Create some routes
As you can see we have created 2 routes to our application:
- GET / - The method hello() in our Example class will handle it
- POST / - Will be handled with saveData()
Let’s look at hello method:
private static Publisher<Void> hello(HttpServerRequest req, HttpServerResponse res) {
return res.status(HttpResponseStatus.OK).sendString(Mono.just("OK"));
}
Any method that handles route need to have the signature of returning Publisher<T> and taking as argument HttpServerRequest and HttpServerResponse.
Publisher<Void> is just a way to say that our method is not returning anything that requires further actions - it’s the end of the pipeline.
Publisher is an abstract class, and the classes Mono and Flux are implementations of it, I won’t go into it here, reactive programming is a topic that needs more than few paragraphs to cover, but you can read more about it here.
our hello method simply uses the HttpServerResponse to return a response with the string OK and response code 200. We need to wrap our response in a Publisher as we never returning regular types in reactor. The solution is just to use the Mono.just("<your message>"). It will return Mono<String> object which is a Publisher implementations.
So we already have our first Hello World Reactor Netty app, but we would like to do more!
Let’s raise the level a bit now. Assume that we have a Blocking IO operation like saving a file to disk. We don’t want to block our application on every write to disk, we don’t want that when we are saving do disk we won’t be able to respond 200 OK Hello in our other route.
We will a thread pool that will handle our saving operations without interfering with the application flow.
We will create new thread pool:
tpe = new ThreadPoolExecutor(20, 40, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50));
We created a thread pool with 20 threads, which can scale to 40 threads on pressure, and a BlockingQueue with a size of 50. That means that if we will have more than 50 requests that we didn’t start to handle yet, we will reject the next requests until we can handle more. This configuration allows us to control the pressure that we want to handle in our app, and we can tune this number to our needs.
Now we will use it in our saveData flow:
private static Publisher<Void> saveData(HttpServerRequest request HttpServerResponse response) {
return request
.receive()
.aggregate()
.asByteArray()
.publishOn(Schedulers.fromExecutorService(tpe))
.handle((data, sink) -> {
try {
String fileName = String.format("./example-%s.txt", UUID.randomUUID());
FileUtils.writeByteArrayToFile(new File(fileName), data);
sink.next(data.length);
}
catch (IOException e) {
sink.error(e);
}
})
.doOnSuccess(dataLegnth -> {
response.status(200).sendString(Mono.just(String.format("OK Saved %d bytes.", dataLegnth))).then().subscribe();
})
.doOnError(ex -> {
response.status(500).sendString(Mono.just(String.format("ERROR %s" ,ex.getMessage())).then().subscribe();
})
.then();
}
I will go step by step:
request.receive()- We will start to pull data from our HttpRequest, by default, we will get chunks of1024 ByteBufFlux..aggregate()- In this example, we would like to get all of the request body once, and not in chunks, so we willaggregatethe chunks to a singleByteBufMono. As you can see,Fluxis used for single or many entities in your data,Monois for 0 or 1 entities..asByteArray()- We would like to access the actual data, this function will cast ourByteBufMonotoMono<byte[]>which is an actual data type that we can use..publishOn(Schedulers.fromExecutorService(tpe))- We declare to reactor that we want our next actions on the data to be published to a thread from ourtpeThradpool. We don’t want it to happen on our main thread and block other requests..handle((data, sink) ->-handlewill allow us to runThrowingcode, and let us decide how do we want to treat it. An important thing to understand is that only inside operations likemap,flatMap,handle, etc, we can access the actual type that insideMonoorFlux. In this handle function ourbyte[]will be available asdatawhich is the posted bytes that we aggregated before. We will save it to a file namedexample-{random-guid}.txtto avoid duplicates, and if the write succeeded we willsinkto our next part of the pipelinedata.length, just as an example. The sinked type will beMono<Integer>. OnIOExceptionwe willsink.error(ex), so we can access this exception later on our response..doOnSuccess(dataLength ->- Will be called if we havn’t sinked an error from ourhandle. Inside our anonymous function we will:response.status(200).sendString(Mono.just(String.format("OK Saved %d bytes.", dataLegnth)))- Generate our response message..then()- End our pipeline.subscribe()- Wait tothensingal of finishing.
.doOnError(ex ->- Basically like on success, just forsink.errror.response.status(500).sendString(Mono.just(String.format("ERROR %s" ,ex.getMessage()))- Will return response code500and the exception message..then().subscribe()
.then()- end of the pipleline.
And here we got Non-Blocking saveData route.
Testing your Server
So after running the server, you can test it with curl. Here is a quick example:
curl -d "Test data" http://localhost:8080/