Show List

How Flux works

Flux is a core component of Project Reactor, representing a reactive stream emitting zero to N elements. It's useful for scenarios where you have multiple values to emit asynchronously over time, such as streaming data from a database, receiving events from a message broker, or processing large collections of data. Let's dive into how Flux works with code examples and then write some test cases to test it.

How Flux Works:


import reactor.core.publisher.Flux; public class Main { public static void main(String[] args) { // Creating a Flux emitting a sequence of integers Flux<Integer> numbersFlux = Flux.just(1, 2, 3, 4, 5); // Subscribing to the Flux to consume emitted elements numbersFlux.subscribe( number -> System.out.println("Received number: " + number), // onNext callback error -> System.err.println("Error occurred: " + error), // onError callback () -> System.out.println("Stream completed") // onComplete callback ); } }

In this example:

  • We create a Flux named numbersFlux emitting a sequence of integers using Flux.just().
  • We subscribe to the Flux using the subscribe() method, providing lambdas for handling emitted elements, errors, and completion.
  • When subscribed, the Flux emits each integer from the sequence to the onNext callback, handles any errors in the onError callback, and signals completion in the onComplete callback.

Writing Test Cases for Flux:

To test the behavior of a Flux, we can use the StepVerifier utility provided by Reactor Test. Here's how we can write test cases to verify the behavior of the numbersFlux from the previous example:


import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; public class FluxTest { @Test public void testFluxBehavior() { // Creating the Flux emitting a sequence of integers Flux<Integer> numbersFlux = Flux.just(1, 2, 3, 4, 5); // Verifying the behavior of the Flux using StepVerifier StepVerifier.create(numbersFlux) .expectNext(1, 2, 3, 4, 5) // Verifying emitted elements .expectComplete() // Verifying completion .verify(); // Triggering the verification } }

In this test case:

  • We create a Flux named numbersFlux emitting a sequence of integers.
  • We use StepVerifier.create() to create a StepVerifier for the numbersFlux.
  • We chain expectNext() to verify the emitted elements, expectComplete() to verify completion, and verify() to trigger the verification process.
  • If the Flux emits the expected elements and completes successfully, the test case passes. Otherwise, it fails with appropriate error messages.

These test cases ensure that the Flux behaves as expected, emitting the correct elements in the correct order and completing successfully. They provide confidence in the behavior of reactive streams and help catch any unexpected issues or regressions in the code.


    Leave a Comment


  • captcha text