inspired-it.nl Open in urlscan Pro
91.184.0.32  Public Scan

URL: https://inspired-it.nl/
Submission: On March 18 via api from US — Scanned from IT

Form analysis 1 forms found in the DOM

GET https://inspired-it.nl/

<form role="search" method="get" class="search-form" action="https://inspired-it.nl/">
  <label>
    <span class="screen-reader-text">Zoeken naar:</span>
    <input type="search" class="search-field" placeholder="Zoeken …" value="" name="s">
  </label>
  <input type="submit" class="search-submit" value="Zoeken">
</form>

Text Content

Naar de inhoud springen
 * Hilversum
 * 
 * jeroen@inspired-it.nl




JEROEN GORDIJN

Inspired IT


JEROEN GORDIJN

sluiten
 * About me

 * Toon zoekformulier
   Zoeken naar:

Primair menu voor mobiel Primair menu voor desktop



HOW DOES LIMITRATE WORK IN REACTOR

Geplaatst op 21 maart 2022 door Jeroen GordijnLaat een reactie achter op How
does limitRate work in Reactor

Project Reactor is a great reactive streams project that you will probably run
into when you want to write reactive code in Spring. It is very powerful and can
also be complex to wrap your head around. In this article I will look at the
limitRate function of a Flux.

The first time I ran into limitRate I thought it would help in
limiting/throttling the amount of events flowing downstream. And according to
the documentation this is the case:

> Ensure that backpressure signals from downstream subscribers are split into
> batches capped at the provided prefetchRate when propagated upstream,
> effectively rate limiting the upstream Publisher.

This means that limitRate will split big requests from downstream into smaller
requests. It also states that this is effectively rate limiting the publisher.

> Typically used for scenarios where consumer(s) request a large amount of data
> (eg. Long.MAX_VALUE) but the data source behaves better or can be optimized
> with smaller requests (eg. database paging, etc...). All data is still
> processed, unlike with limitRequest(long) which will cap the grand total
> request amount.

According to this documentation it will typically be useful when the requests to
upstream is unlimited. The rate limiter can cut this up in smaller pieces. While
there might be a usecase for this, I think it is far more useful for rate
limiting the number of requests from downstream to upstream.


TO MANY DEMAND REQUESTS

Let's look at a scenario where we want to process messages from PubSub using
Spring.


fun process(msg: AcknowledgeablePubsubMessage): Mono<String> = ...

pubSubReactiveFactory.poll("exampleSubscription", 1000 /* not important with limited demand*/)
  .flatMap(::process, 16)
  ...
  .subscribe()

In above sample, there will be an initial demand of 16 element going up to the
source. The PubSubReactiveFactory will request 16 elements from PubSub and send
them downstream. Whenever one of the workers in the flatMap is done, it will
send a request(1) upstream. The pubSubReactiveFactory will request one element
from PubSub. A fraction later, another demand may reach the source and it needs
to do an extra call to pubsub to get 1 extra element. The pipeline is
effectively transformed such that it will pull message per message from PubSub.
Message handling time is pull latency + processing time. Doing a request for
just 1 element is very wasteful, certainly when processing time is well within
the deadline bounds and having a buffer makes sense.


LIMITING NUMBER OF DEMAND REQUESTS

Best way to minimize the impact of pulling messages from a source is make sure
we pull more than 1 message per request. This is exactly what limitRate can do.
It limits the number of demand requests to the source by grouping them together.
Internally, limitRate has a buffer from which it can feed the consumers
downstream, while making sure to fill the buffer in time, by requesting elements
from the source. By default, in time means when the buffer is 75% depleted.

When limitRate(100) is used, it will first demand 100 elements from the source,
to fill the buffer. The moment elements arrive, the limitRate can send them
downstream as long as there is demand. When the buffer only has 25 elements left
(75% depleted), it will request elements 75 elements request(75) from the source
to fill the buffer.

This makes sure the source can emit batches of events, making the latency
overhead much less of an issue. The limitRate function is then more of a
performance increaser than a throttler.


EXAMPLE

Let's create an example to show the impact of limitRate. The source in this
example can have unlimited outstanding requests and will add a 200ms latency to
getting the elements that are requested. Processing take somewhere between
10-15ms.

val start = Instant.now()
val job = Flux.create<Int> { sink ->
  sink.onRequest { demand ->
    scheduler.schedule({
      repeat(demand.toInt()) {
        sink.next(nextInt())
      }
    }, 200, TimeUnit.MILLISECONDS)
  }
}
  .log("demandflow", Level.INFO, SignalType.REQUEST)
  .limitRate(100)
  .flatMap({ nr ->
    Mono.fromCallable { nr.toString() }.delayElement(Duration.ofMillis(nextLong(10, 15)))
  }, 16)
  .subscribeOn(Schedulers.parallel())
  .take(1000)
  .doOnComplete {
    println("Time: ${Duration.between(start, Instant.now())}")
  }
  .subscribe()


WITHOUT LIMITRATE

If we start the code above with the line limitRate(100) commented, we get the
following result:

20:46:29.092 [parallel-1 ] INFO  demandflow - request(16)
20:46:29.367 [parallel-3 ] INFO  demandflow - request(1)
20:46:29.367 [parallel-8 ] INFO  demandflow - request(1)
20:46:29.368 [parallel-9 ] INFO  demandflow - request(1)
20:46:29.369 [parallel-1 ] INFO  demandflow - request(1)
20:46:29.369 [parallel-1 ] INFO  demandflow - request(1)
20:46:29.370 [parallel-10] INFO  demandflow - request(3)
20:46:29.370 [parallel-10] INFO  demandflow - request(1)
20:46:29.371 [parallel-2 ] INFO  demandflow - request(1)
20:46:29.371 [parallel-2 ] INFO  demandflow - request(1)
20:46:29.371 [parallel-2 ] INFO  demandflow - request(1)
...
20:46:42.551 [parallel-7 ] INFO  demandflow - request(1)
20:46:42.561 [parallel-10] INFO  demandflow - request(1)
20:46:42.732 [parallel-2 ] INFO  demandflow - request(1)
20:46:42.733 [parallel-3 ] INFO  demandflow - request(1)
20:46:42.735 [parallel-6 ] INFO  demandflow - request(1)
20:46:42.736 [parallel-4 ] INFO  demandflow - request(1)
20:46:42.736 [parallel-5 ] INFO  demandflow - request(1)
20:46:42.737 [parallel-7 ] INFO  demandflow - request(1)
20:46:42.739 [parallel-8 ] INFO  demandflow - request(1)

Time: PT13.752124S

After the first 16 elements that were demanded, it wil request mostly 1 at a
time. Sometimes multiple request are bundled together. As you can see,
processing this took over 13s. When ran with the limitRate(100) enable we have a
completely different result:

20:49:55.068 [parallel-1 ] INFO  demandflow - request(100)
20:49:55.407 [parallel-7 ] INFO  demandflow - request(75)
20:49:55.644 [parallel-4 ] INFO  demandflow - request(75)
20:49:55.884 [parallel-9 ] INFO  demandflow - request(75)
20:49:56.125 [parallel-3 ] INFO  demandflow - request(75)
20:49:56.362 [parallel-8 ] INFO  demandflow - request(75)
20:49:56.601 [parallel-12] INFO  demandflow - request(75)
20:49:56.843 [parallel-12] INFO  demandflow - request(75)
20:49:57.082 [parallel-8 ] INFO  demandflow - request(75)
20:49:57.320 [parallel-8 ] INFO  demandflow - request(75)
20:49:57.560 [parallel-8 ] INFO  demandflow - request(75)
20:49:57.794 [parallel-5 ] INFO  demandflow - request(75)
20:49:58.034 [parallel-9 ] INFO  demandflow - request(75)
20:49:58.270 [parallel-3 ] INFO  demandflow - request(75)

Time: PT3.273889S

The first request is 100 to fill the initial buffer and the every so often we'll
see a request for 75 elements to fill the buffer. With this configuration the
processing took only a bit over 3 seconds. The impact of the 200ms latency is
now minimized by requesting batches of elements.


CONCLUSION

The limitRate function is very useful to limit the number of demand requests
flowing upstream. Instead of limiting the number of messages that can be
processed by the pipeline, it actually greatly improves the performance. This
function has helped me a lot to improve the performance of processing pipelines
subscribing to a PubSub source.

Geplaatst in kotlin, reactive, reactorGetagd: kotlin, reactive, reactor


HOW TO USE GROUPBY IN REACTOR

Geplaatst op 12 maart 202226 maart 2022 door Jeroen Gordijn


HOW TO USE GROUPBY IN REACTOR

Project Reactor is a great reactive streams project that you will probably run
into when you want to write reactive code in Spring. It is very powerful and can
also be complex to wrap your head around. In this article I will look at the
groupBy function of a Flux.


GROUPBY

The groupBy function will split the current flux into multiple fluxes. See it
like a router. Based on a function you specify it will route the message to one
of the groups. For example, when you you have a stream of numbers and perform
intFlux.groupBy { it % 2 == 0 } , it will cut the flux in 2 fluxes. One will
have a stream of even numbers and the other will have a stream with odd numbers.
The resulting type of this groupBy is Flux<GroupedFlux<Boolean, Int>>. The outer
flux is actually a finite stream of 2 GroupedFlux<Boolean, Int> elements. If the
source on which the groupBy was applied was infinite, the 2 GroupedFlux objects
are also infinite.


PROCESSING THE GROUPS

Given the above example, there are 2 groups in a flux. Now we can write the
logic to be performed on each group. Each GroupedFlux can be treated like a
regular flux, but with an extra function: key(). This key function will return
the result of the grouping function for all elements in this group. So in our
example true for all the even numbers.

There is one little detail which is quite important. We need to make sure that
we subscribe to all groups. This sounds trivial, but because it is part of a
stream this could easily go wrong.

Let's work with another example in which we divide the numbers in 10 groups:
intFlux.groupBy { it % 10 }. Each group function will just count how many
numbers came through. This is what the countNumbers function does with the help
of the increment function:

val countOccurrences = ConcurrentHashMap<Int, Long>()  

fun increment(group: Int) = countOccurrences.compute(group) { _, k -> (k ?: 0) + 1 }  

fun countNumbers(group: GroupedFlux<Int, Int>): Flux<Int> =  
    group.doOnNext { increment(group.key()) }

The countNumbers function has to be wired together in the flux with the groupBy:

Flux.generate<Int> { it.next(emitCounter.incrementAndGet()) }  
 .groupBy { it % 10 }  
 .flatMap(::countNumbers)  
 .subscribeOn(Schedulers.parallel())  
 .subscribe()

Simple enough isn't it. This works and when we inspect the countOccurrences
every so often we would see something like:

nrs emited: 6324660 Occurrences per group: 0: 634584, 1: 634802, 2: 634804, 3: 634804, 4: 634805, 5: 634805, 6: 634805, 7: 634805, 8: 634806, 9: 634806
nrs emited: 13912044 Occurrences per group: 0: 1391214, 1: 1391220, 2: 1391221, 3: 1391221, 4: 1391222, 5: 1391222, 6: 1391222, 7: 1391222, 8: 1391223, 9: 1391223
nrs emited: 22109057 Occurrences per group: 0: 2210915, 1: 2210921, 2: 2210935, 3: 2210936, 4: 2210936, 5: 2210937, 6: 2210964, 7: 2210966, 8: 2210966, 9: 2210967
nrs emited: 30416867 Occurrences per group: 0: 3041697, 1: 3041703, 2: 3041704, 3: 3041704, 4: 3041704, 5: 3041704, 6: 3041704, 7: 3041705, 8: 3041705, 9: 3041705
nrs emited: 38748273 Occurrences per group: 0: 3874837, 1: 3874843, 2: 3874844, 3: 3874844, 4: 3874844, 5: 3874844, 6: 3874844, 7: 3874845, 8: 3874845, 9: 3874845
nrs emited: 47157048 Occurrences per group: 0: 4715713, 1: 4715719, 2: 4715720, 3: 4715720, 4: 4715720, 5: 4715720, 6: 4715720, 7: 4715720, 8: 4715721, 9: 4715721
nrs emited: 55470463 Occurrences per group: 0: 5547095, 1: 5547106, 2: 5547107, 3: 5547120, 4: 5547121, 5: 5547121, 6: 5547122, 7: 5547122, 8: 5547122, 9: 5547122
nrs emited: 62455436 Occurrences per group: 0: 6245552, 1: 6245557, 2: 6245557, 3: 6245558, 4: 6245558, 5: 6245558, 6: 6245558, 7: 6245558, 8: 6245558, 9: 6245559
nrs emited: 69543352 Occurrences per group: 0: 6954345, 1: 6954351, 2: 6954351, 3: 6954351, 4: 6954351, 5: 6954352, 6: 6954352, 7: 6954352, 8: 6954352, 9: 6954352

The elements are nicely distributed over the groups. Notice that we did not
specify an explicit concurrency on the flatMap. If it is left out it will
default to Queues.SMALL_BUFFER_SIZE, which is 256 (unless configured
differently). The groupBy made it such that we only have a limited amount of
groups and as long as the number of groups stay below 256, this will work
perfectly.

Let's look at what will happen when we tune the concurrency to be lower than the
number of groups:

Flux.generate<Int> { it.next(emitCounter.incrementAndGet()) }  
 .groupBy { it % 10 }  
 .flatMap(::countNumbers, 9)  
 .subscribeOn(Schedulers.parallel())  
 .subscribe()

The resulting output is:

nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256

This will continue forever without any progress. The problem is that we have 10
groups, but only 9 workers. Each worker consumes 1 GroupedFlux, which means that
there will be 1 group remaining without a worker. But why does the stream get
stuck?


NO MORE DEMAND

To understand why the stream grinds to a halt we should look at the demand. You
can read more about it in my blog "Debugging demand in Reactor". After adding
the log statements:

fun countNumbers(group: GroupedFlux<Key, Int>): Flux<Int> =  
    group  
 .log("countNumbers", Level.INFO, SignalType.REQUEST, SignalType.ON_SUBSCRIBE, SignalType.ON_NEXT, SignalType.ON_NEXT)  
 .doOnNext { increment(group.key()) }

Flux.generate<Int> { it.next(emitCounter.incrementAndGet()) }  
 .log("groupBy", Level.INFO, SignalType.REQUEST, SignalType.ON_SUBSCRIBE, SignalType.ON_NEXT)
 .groupBy { it % 10 }
 .log("flatMap", Level.INFO, SignalType.REQUEST, SignalType.ON_SUBSCRIBE, SignalType.ON_NEXT)
 .flatMap(::countNumbers, 9)
 .subscribeOn(Schedulers.parallel())  
 .subscribe()

The resulting output is like this:

[groupBy] - onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
[flatMap] - onSubscribe([Fuseable] FluxGroupBy.GroupByMain)
[subscribe] - onSubscribe(FluxFlatMap.FlatMapMain)
[subscribe] - request(unbounded)
[flatMap] - request(9)
[groupBy] - request(256)
[groupBy] - onNext(1)
[countNumbers-1] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-1] - request(32)
[countNumbers-1] - onNext(1)
[groupBy] - request(1)
[groupBy] - onNext(2)
[countNumbers-2] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-2] - request(32)
[countNumbers-2] - onNext(2)
[groupBy] - request(1)
[groupBy] - onNext(3)
[countNumbers-3] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-3] - request(32)
[countNumbers-3] - onNext(3)
[groupBy] - request(1)
[groupBy] - onNext(4)
[countNumbers-4] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-4] - request(32)
[countNumbers-4] - onNext(4)
[groupBy] - request(1)
[groupBy] - onNext(5)
[countNumbers-5] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-5] - request(32)
[countNumbers-5] - onNext(5)
[groupBy] - request(1)
[groupBy] - onNext(6)
[countNumbers-6] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-6] - request(32)
[countNumbers-6] - onNext(6)
[groupBy] - request(1)
[groupBy] - onNext(7)
[countNumbers-7] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-7] - request(32)
[countNumbers-7] - onNext(7)
[groupBy] - request(1)
[groupBy] - onNext(8)
[countNumbers-8] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-8] - request(32)
[countNumbers-8] - onNext(8)
[groupBy] - request(1)
[groupBy] - onNext(9)
[countNumbers-9] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-9] - request(32)
[countNumbers-9] - onNext(9)
[groupBy] - request(1)
[groupBy] - onNext(10)
[groupBy] - onNext(11)
[countNumbers-1] - onNext(11)
[groupBy] - request(1)
[groupBy] - onNext(12)
[countNumbers-2] - onNext(12)
[groupBy] - request(1)
[groupBy] - onNext(13)
[countNumbers-3] - onNext(13)
[groupBy] - request(1)
[groupBy] - onNext(14)
[countNumbers-4] - onNext(14)
[groupBy] - request(1)
[groupBy] - onNext(15)
[countNumbers-5] - onNext(15)
[groupBy] - request(1)
[groupBy] - onNext(16)
[countNumbers-6] - onNext(16)
[groupBy] - request(1)
[groupBy] - onNext(17)
[countNumbers-7] - onNext(17)
[groupBy] - request(1)
[groupBy] - onNext(18)
[countNumbers-8] - onNext(18)
[groupBy] - request(1)
[groupBy] - onNext(19)
...
[countNumbers-8] - onNext(468)
[groupBy] - request(1)
[groupBy] - onNext(469)
[countNumbers-9] - onNext(469)
[groupBy] - request(1)
[groupBy] - onNext(470)
[groupBy] - onNext(471)
[countNumbers-1] - onNext(471)
[countNumbers-1] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(472)
[countNumbers-2] - onNext(472)
[countNumbers-2] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(473)
[countNumbers-3] - onNext(473)
[countNumbers-3] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(474)
[countNumbers-4] - onNext(474)
[countNumbers-4] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(475)
[countNumbers-5] - onNext(475)
[countNumbers-5] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(476)
[countNumbers-6] - onNext(476)
[countNumbers-6] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(477)
[countNumbers-7] - onNext(477)
[countNumbers-7] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(478)
[countNumbers-8] - onNext(478)
[countNumbers-8] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(479)
[countNumbers-9] - onNext(479)
[countNumbers-9] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(480)
[groupBy] - onNext(481)
[countNumbers-1] - onNext(481)
[groupBy] - request(1)
[groupBy] - onNext(482)
[countNumbers-2] - onNext(482)
[groupBy] - request(1)
[groupBy] - onNext(483)
[countNumbers-3] - onNext(483)
[groupBy] - request(1)
[groupBy] - onNext(484)
[countNumbers-4] - onNext(484)
[groupBy] - request(1)
[groupBy] - onNext(485)
[countNumbers-5] - onNext(485)
...
[groupBy] - request(1)
[groupBy] - onNext(2558)
[countNumbers-8] - onNext(2558)
[groupBy] - request(1)
[groupBy] - onNext(2559)
[countNumbers-9] - onNext(2559)
[groupBy] - request(1)
[groupBy] - onNext(2560)
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256

The logs give a lot of information about what is going on under the hood. At
first the onSubscribe event that starts the Flux is passed along. Keep in mind
that a Flux is nothing but a definition until you subscribe, "nothing happens
until you subscribe". This is called a cold stream. When the subscribe reaches
the last element in the stream, the demand will start flowing back.

The subscribe has not back-pressure and can handle everything, so it wil request
an unbouded demand. The flatMap has a concurrency of 9, so it will send a demand
upstream of 9. Note that this is a demand for 9 elements of type
GroupedFlux<Int>, so we request 9 groups. The groupBy has the default behaviour
to request a demand of 256 elements. This will reach the source and the source
will start emitting 256 elements (if possible, which it is in this case). These
256 elements will be distributed over the 10 groups that are defined in the
grouping function. The output above shows that the first time an element is
emitted (onNext(1)) it will subscribe to the that group and we immediately see
demand flowing.

This shows that the subscription to a GroupedFlux only happens once the first
element for that group is available. We also see that the first element was
dispatched to a group and immediately the groupBy will signal new demand
upstream. This will happen 8 more times until we reach element 10 which would
end up in a countNumbers-10, but we do not have a processor for that group. So
it will stay in the groupBy, which has a 256 demand, but now 1 element cannot
dispatched. Element 11 will be dispatched to group 1 again. Every subflux has a
demand of 32 as we can see. The elements will be divided over the active 9
groups, but the elements that are for group 10 will get stuck.

When 3/4 of the demand for a group is fulfilled it will re-signal demand. This
is the request(24). The groupBy with a buffer of 256 will continuously pass
elements downstream when they are available. This will happen a few times until
the groupBy has 256 elements for group 10 and needs to keep that. The groupBy
indicated a demand of 256 and now all demand is filled with elements for group
10. There is no more demand and we have full back-pressure. Therefore, the
pipeline is now stuck "waiting" for demand for the elements of group 10.


CONCLUSION

If you use the groupBy function in a Flux, you must make sure that there are
enough subscribers in the flatMap, otherwise your stream will get stuck. To
"Debugging demand in Reactor" the loggin functionality is really helpful. I
learned a lot while I was writing this blog and got even more insight into the
internals of Reactor.

Geplaatst in kotlin, reactive, reactorGetagd: kotlin, reactive, reactor


DEBUGGING DEMAND IN REACTOR

Geplaatst op 6 maart 2022 door Jeroen GordijnLaat een reactie achter op
Debugging demand in Reactor


DEBUGGING DEMAND IN REACTOR

Project Reactor is a great reactive streams project that you will probably run
into when you want to write reactive code in Spring. It is very powerful and can
also be complex to wrap your head around. Something that can be confusing is how
demand flows upstream and messages flow downstream.


GETTING INSIGHT IN FLOW OF DEMAND

In any Flux it is possible to show demand by using the log function on a flux.
With this function you can specify what SignalType you want to be logged. Let's
look at an example:

val counter = AtomicLong()  

fun process(nr: Long): Mono<Long> =  
    Mono.just(nr).delayElement(Duration.ofMillis(nextLong(1, 25)))  

Flux.generate<Long> { it.next(counter.incrementAndGet()) }  
 .log("beforeFlatmap", Level.INFO, SignalType.REQUEST)  
 .flatMap(::process)  
 .log("beforeTake", Level.INFO, SignalType.REQUEST)  
 .take(100)  
 .log("beforeSubscribe", Level.INFO, SignalType.REQUEST)  
 .subscribeOn(Schedulers.parallel())  
 .subscribe()  

Thread.sleep(4000)  
println("Counter: ${counter.get()}")

When run this will print:

13:43:15.197 [parallel-1] INFO beforeSubscribe - request(unbounded)
13:43:15.200 [parallel-1] INFO beforeTake - request(unbounded)
13:43:15.200 [parallel-1] INFO beforeFlatmap - | request(256)
13:43:15.251 [parallel-6] INFO beforeFlatmap - | request(1)
13:43:15.251 [parallel-6] INFO beforeFlatmap - | request(1)
13:43:15.251 [parallel-6] INFO beforeFlatmap - | request(1)
13:43:15.252 [parallel-6] INFO beforeFlatmap - | request(1)
13:43:15.252 [parallel-8] INFO beforeFlatmap - | request(1)
...
13:43:15.260 [parallel-2] INFO beforeFlatmap - | request(4)
13:43:15.260 [parallel-2] INFO beforeFlatmap - | request(12)
13:43:15.260 [parallel-2] INFO beforeFlatmap - | request(2)
13:43:15.261 [parallel-2] INFO beforeFlatmap - | request(6)
13:43:15.261 [parallel-2] INFO beforeFlatmap - | request(7)
13:43:15.261 [parallel-2] INFO beforeFlatmap - | request(3)
13:43:15.261 [parallel-2] INFO beforeFlatmap - | request(2)
13:43:15.262 [parallel-2] INFO beforeFlatmap - | request(3)
13:43:15.262 [parallel-2] INFO beforeFlatmap - | request(3)
13:43:15.262 [parallel-2] INFO beforeFlatmap - | request(1)
Counter: 350

The logs we showing the request is the demand flowing up (towards the source)
and gives us insight in what happens with the demand. The first demand that is
sent is when the stream is subscribed to. Remember, demand flows upstream, so in
our code bottom to top. The subscribe function will always request an unbounded
amount of events. Next we will reach the take function that doesn't change the
demand and also sends unbounded demand. So up until this point we do not have
any back pressure control. Or said differently, these function can keep up with
anything upstream may send. Next we will hit the flatMap, with it's default
concurrency (256). The flatMap changes the demand. There are only 256 workers,
so it can only process 256 messages at this time. Therefore it signals a demand
of 256. This demand will reach the source and the source can now emit 256
elements. When a task in the flatMap is done it will not encounter any back
pressure, because the demand downstream is unbounded. This means, that when a
task is done it can immediately emit the message and signal it has new demand,
by requesting 1 extra message.

When 100 messages reached the take function the stream will be completed.
However, in the end we see much more messages were submitted from the source,
namely 350. This happens, because everything is happening at the same time. When
a task in the flatMap is done, it will signal demand by requesting a new
element. Therefore it can happen, that there are more messages emitted than the
100 requested.


CONCLUSION

Using the log on a Flux can greatly help in understanding what's going on under
the covers. We've seen in above example that even in trivial flows it leads to
interesting discoveries.

Geplaatst in kotlin, reactive, reactorGetagd: kotlin, reactive, reactor


STARTING INSPIRED IT

Geplaatst op 18 december 201821 december 2018 door Jeroen GordijnLaat een
reactie achter op Starting Inspired IT

Vanaf januari 2019 ben ik de trotse oprichter van Inspired IT. Met mijn kennis
en ervaring van Software Development in het algemeen en Scala & Akka in het
bijzonder, ga ik als bevlogen freelancer bedrijven helpen om complexe problemen
op te lossen.

Ik wens iedereen fijne feestdagen en een inspirerend 2019!

--------------------------------------------------------------------------------

Starting January 2019 I am the proud founder of Inspired IT. As an inspired
freelancer I will help companies, with my knowledge of Software Development in
general and Scala & Akka in particular, to solve complex problems.

I wish everybody happy holidays and an inspiring 2019!



Geplaatst in Uncategorized

Copyright © 2023 Jeroen Gordijn. Alle rechten voorbehouden. Thema Suffice door
ThemeGrill. Aangedreven door: WordPress.




JEROEN GORDIJN

sluiten
 * About me