- It is a new programming paradigm.
- Async and non-blocking
- Event/message driven
- functional code style in Java
- Back Pressure on Data Streams
- Give me some data but I will not wait
- I will not block myself, I will continue doing my other job, you give me the response when you are ready, I will take it and process.
- By doing this, threads are released to do other works.
- Suppose in above example, Database is sending huge amount of data to our application. So in that case our application may not be able to handle all the data at once.
- So we will tell Database that Hey DB, kindly send this much amount of data at once and don't send all the data once.
- Publisher
- This interface has subscribe() method which takes subscriber argument as input.
- The entity (database) which will send you the data.
- Subscriber
- This interface has 4 methods
- onSubscribe
- onNext
- onError
- onComplete
- This will be generally your application which will implement this interface and call above methods to fetch the data from publisher.
- This interface has 4 methods
- Subscription
- It connects publisher & subscriber.
- It has request() method which will take long args that defines how much data you wants.
- It has cancel() method as well to cancel the subscription.
- Processor
- It extends the both, publisher & subscriber.
- It can work as both.
Below diagram shows the sequence of how Application (subscriber) & Database (publisher) works together.
Types to handle the data in reactive programming
- Flux - works with 0 to N elements
- Mono - works with 0 to 1 elements
public class FluxAndMonoService {
public Flux<String> getCountryFlux() {
return Flux.fromIterable(Arrays.asList("India", "USA", "UK")).log();
}
public Mono<String> getCountryMono() {
return Mono.just("India").log();
}
public static void main(String[] args) {
FluxAndMonoService fluxAndMonoService = new FluxAndMonoService();
fluxAndMonoService.getCountryFlux().subscribe(c -> {
System.out.println("Flux of : " c);
});
fluxAndMonoService.getCountryMono().subscribe(c -> {
System.out.println("Mono of : " c);
});
}
}
Here we are - Getting the Flux and Mono of a String - subscribing to it - and printing the elements of it
If you will log the flux/mono, you will see the events that we saw earlier. Here are the logs for the same :
00:12:36.957 [main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
00:12:36.963 [main] INFO reactor.Flux.Iterable.1 - | request(unbounded)
00:12:36.964 [main] INFO reactor.Flux.Iterable.1 - | onNext(India)
Flux of : India
00:12:36.965 [main] INFO reactor.Flux.Iterable.1 - | onNext(USA)
Flux of : USA
00:12:36.965 [main] INFO reactor.Flux.Iterable.1 - | onNext(UK)
Flux of : UK
00:12:36.966 [main] INFO reactor.Flux.Iterable.1 - | onComplete()
00:12:37.048 [main] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
00:12:37.049 [main] INFO reactor.Mono.Just.2 - | request(unbounded)
00:12:37.049 [main] INFO reactor.Mono.Just.2 - | onNext(India)
Mono of : India
00:12:37.050 [main] INFO reactor.Mono.Just.2 - | onComplete()
Now writing the test-cases for the same.
private FluxAndMonoService fluxAndMonoService = new FluxAndMonoService();
@Test
public void test_getCountryFlux() {
var flux = fluxAndMonoService.getCountryFlux();
StepVerifier.create(flux)
.expectNext("India", "USA", "UK")
.verifyComplete();
}
@Test
public void test_getCountryMono() {
var mono = fluxAndMonoService.getCountryMono();
StepVerifier.create(mono)
.expectNext("India")
.verifyComplete();
}
More example with other stream functions with flux & mono :
public Flux<String> getCountryFlux() {
return Flux.fromIterable(Arrays.asList("India", "USA", "UK")).log();
}
public Flux<String> getCountryMapFlux() {
return Flux.fromIterable(Arrays.asList("India", "USA", "UK"))
.map(String::toUpperCase)
.log();
}
public Flux<String> getCountryFilterFlux() {
return Flux.fromIterable(Arrays.asList("India", "USA", "UK"))
.filter(s -> s.startsWith("I"))
.log();
}
public Flux<String> getCountryFlatMapFlux() {
return Flux.fromIterable(Arrays.asList("India", "USA", "UK"))
.flatMap(s -> Flux.just(s.split("")))
.log();
}
public Flux<String> getCountryFlatMapFluxAsync() {
return Flux.fromIterable(List.of("India", "USA", "UK"))
.flatMap(s -> Flux.just(s.split("")))
.delayElements(Duration.ofMillis(new Random().nextInt(1000)))
.log();
}
public Mono<String> getCountryMono() {
return Mono.just("India").log();
}
public Mono<List<String>> getCountryFlapMapMono() {
return Mono.just("India")
.flatMap(s -> Mono.just(List.of(s.split(""))))
.log();
}
Below is one interesting example where we are passing whole function as transformation parameter.
/**
* Creating a function for the flux which takes the flux and returns the flux.
*
* @param number
* @return
*/
public Flux<String> getCountryFluxTransform(int number) {
// create a function for a flux
Function<Flux<String>, Flux<String>> transformFluxFunc = f -> f.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("India", "Dubai", "USA", "Germany", "France"))
.transform(transformFluxFunc)
.defaultIfEmpty("default_data") // case when there is not match
.log();
}
- onErrorReturn : return value when error occurred
public Flux<String> getCountryOnErrorReturn() {
return Flux.just("India", "USA", "UK")
.concatWith(
Flux.error
(new RuntimeException("Exception !!")
))
.onErrorReturn("China");
}
- onErrorContinue : continue when error occurred.
public Flux<String> getCountryOnErrorContinue() {
return Flux.just("India", "USA", "uk").map(s -> {
if (s.equalsIgnoreCase("USA")) {
throw new RuntimeException("Super power is detected !!");
} else {
return s.toUpperCase();
}
}).onErrorContinue((e, c) -> {
System.out.println("e : " e.getMessage());
System.out.println("c : " c);
});
}