-
Notifications
You must be signed in to change notification settings - Fork 353
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
provides supportive class to hide concurrency complexity #807
Conversation
Signed-off-by: Oleh Dokuka <[email protected]>
…ttern Signed-off-by: Oleh Dokuka <[email protected]>
0e716a5
to
6c4c075
Compare
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Outdated
Show resolved
Hide resolved
rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Outdated
Show resolved
Hide resolved
rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, much easier to read the request logic. Some minor changes requested.
rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java
Outdated
Show resolved
Hide resolved
rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java
Outdated
Show resolved
Hide resolved
rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Outdated
Show resolved
Hide resolved
CancelFrameFlyweight.encode(allocator, streamId)); | ||
} else { | ||
payload.release(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one! Avoiding the reliance on the refCnt to check if the request was sent.
Co-authored-by: Rossen Stoyanchev <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
This PR includes extra supportive class which should be used only for requestXXX interactions via
Operators.lift
in order to simplify business logic insideRSocketRequester
(e.g. wipLoops andcancel
/request
signals serialization)Also, this PR, ensures that
streamId
will be generated exactly during the first request phase so we will not waste sequence in case stream was not subscribed. The same is relevant toholders which are going to get logical stream
put
in it only in case of successful subscription to the generated interaction with the subsequentrequest(n)
call on it.Note.
StreamId
should not be issued until the firstrequestN
as well as the holders should not store anything prior to the first frame sent (it means that if cancel signal happened prior the first request, the only thing we have to do is releasing given payload)Signed-off-by: Oleh Dokuka [email protected]