Do i really need reactor netty?
Reactor creators state in their website about reactor that reactor is "Well-suited for a microservices architecture, Reactor offers backpressure-ready network engines for HTTP (including Websockets), TCP, and UDP."
And the answer to your question is the annoying answer, it depends. In general, reactor netty
is a framework that will let you “squeeze” the most from your resources with its main feature Non-Blocking IO
, but we will get to it soon.
The questions that you need to ask yourself before picking reactor-netty is:
- Does my app scalable?
- Do I need to fully utilze my resources?
- Does my app do
Blocking IO
operations (access to DB, files, etc)? - Am I willing to maintain a slightly more complex code?
If you answered yes to these questions, reactor will probably solve some problems for you and improve performance.
Do I need to use Spring Framework to work with reactor netty?
Absolutely not! Spring Framework has the reactive
extension which allows to working with netty and reactor, but we won’t use it in this guide, and if you don’t need the overhead and complexity of Spring and what comes with it, this is great for you!
Let’s get started
You can clone this project from GitHub with all of the following code. I’ll walk through it in this guide. We will start a new clean maven project with java 8, with this pom.xml, which will add to our classpath the reactor netty library:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>reactor-netty-quick-start</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.maven.shared</groupId>
<artifactId>maven-shared-utils</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Dysprosium-SR10</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
Starting your HTTP Server
Create new class, lets call it Example
with a main function start your HTTP Server use the following:
DisposableServer server = HttpServer.create()
.host("0.0.0.0")
.port(8080)
.route(routes ->
routes
.get("/", Example::hello)
.post("/", Example::saveData)
)
.bindNow();
server.onDispose().block();
The following code will create a new HTTP server with binding to port 8080
, and 0.0.0.0
will indicate netty to use all interfaces available.
Create some routes
As you can see we have created 2 routes to our application:
- GET / - The method hello() in our Example class will handle it
- POST / - Will be handled with saveData()
Let’s look at hello
method:
private static Publisher<Void> hello(HttpServerRequest req, HttpServerResponse res) {
return res.status(HttpResponseStatus.OK).sendString(Mono.just("OK"));
}
Any method that handles route need to have the signature of returning Publisher<T>
and taking as argument HttpServerRequest
and HttpServerResponse
.
Publisher<Void>
is just a way to say that our method is not returning anything that requires further actions - it’s the end of the pipeline.
Publisher
is an abstract class, and the classes Mono
and Flux
are implementations of it, I won’t go into it here, reactive programming is a topic that needs more than few paragraphs to cover, but you can read more about it here.
our hello
method simply uses the HttpServerResponse
to return a response with the string OK
and response code 200
. We need to wrap our response in a Publisher
as we never returning regular types in reactor. The solution is just to use the Mono.just("<your message>")
. It will return Mono<String>
object which is a Publisher
implementations.
So we already have our first Hello World Reactor Netty app, but we would like to do more!
Let’s raise the level a bit now. Assume that we have a Blocking IO
operation like saving a file to disk. We don’t want to block our application on every write to disk, we don’t want that when we are saving do disk we won’t be able to respond 200 OK Hello
in our other route.
We will a thread pool that will handle our saving operations without interfering with the application flow.
We will create new thread pool:
tpe = new ThreadPoolExecutor(20, 40, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50));
We created a thread pool with 20
threads, which can scale to 40
threads on pressure, and a BlockingQueue with a size of 50
. That means that if we will have more than 50 requests that we didn’t start to handle yet, we will reject the next requests until we can handle more. This configuration allows us to control the pressure that we want to handle in our app, and we can tune this number to our needs.
Now we will use it in our saveData flow:
private static Publisher<Void> saveData(HttpServerRequest request HttpServerResponse response) {
return request
.receive()
.aggregate()
.asByteArray()
.publishOn(Schedulers.fromExecutorService(tpe))
.handle((data, sink) -> {
try {
String fileName = String.format("./example-%s.txt", UUID.randomUUID());
FileUtils.writeByteArrayToFile(new File(fileName), data);
sink.next(data.length);
}
catch (IOException e) {
sink.error(e);
}
})
.doOnSuccess(dataLegnth -> {
response.status(200).sendString(Mono.just(String.format("OK Saved %d bytes.", dataLegnth))).then().subscribe();
})
.doOnError(ex -> {
response.status(500).sendString(Mono.just(String.format("ERROR %s" ,ex.getMessage())).then().subscribe();
})
.then();
}
I will go step by step:
request.receive()
- We will start to pull data from our HttpRequest, by default, we will get chunks of1024 ByteBufFlux
..aggregate()
- In this example, we would like to get all of the request body once, and not in chunks, so we willaggregate
the chunks to a singleByteBufMono
. As you can see,Flux
is used for single or many entities in your data,Mono
is for 0 or 1 entities..asByteArray()
- We would like to access the actual data, this function will cast ourByteBufMono
toMono<byte[]>
which is an actual data type that we can use..publishOn(Schedulers.fromExecutorService(tpe))
- We declare to reactor that we want our next actions on the data to be published to a thread from ourtpe
Thradpool. We don’t want it to happen on our main thread and block other requests..handle((data, sink) ->
-handle
will allow us to runThrowing
code, and let us decide how do we want to treat it. An important thing to understand is that only inside operations likemap
,flatMap
,handle
, etc, we can access the actual type that insideMono
orFlux
. In this handle function ourbyte[]
will be available asdata
which is the posted bytes that we aggregated before. We will save it to a file namedexample-{random-guid}.txt
to avoid duplicates, and if the write succeeded we willsink
to our next part of the pipelinedata.length
, just as an example. The sinked type will beMono<Integer>
. OnIOException
we willsink.error(ex)
, so we can access this exception later on our response..doOnSuccess(dataLength ->
- Will be called if we havn’t sinked an error from ourhandle
. Inside our anonymous function we will:response.status(200).sendString(Mono.just(String.format("OK Saved %d bytes.", dataLegnth)))
- Generate our response message..then()
- End our pipeline.subscribe()
- Wait tothen
singal of finishing.
.doOnError(ex ->
- Basically like on success, just forsink.errror
.response.status(500).sendString(Mono.just(String.format("ERROR %s" ,ex.getMessage()))
- Will return response code500
and the exception message..then()
.subscribe()
.then()
- end of the pipleline.
And here we got Non-Blocking
saveData
route.
Testing your Server
So after running the server, you can test it with curl
. Here is a quick example:
curl -d "Test data" http://localhost:8080/