Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provides supportive class to hide concurrency complexity #807

Merged
merged 7 commits into from
May 1, 2020
Merged
Prev Previous commit
Next Next commit
optimizes safety by wrapping every hook call into trycatch
Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Apr 30, 2020
commit e867b941b12c7a3dee2e81db48c6a41988e4b25e
11 changes: 3 additions & 8 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -465,14 465,9 @@ void hookOnFirstRequest(long n) {
final int streamId = streamIdSupplier.nextStreamId(receivers);
this.streamId = streamId;

final ByteBuf frame;
try {
frame =
RequestChannelFrameFlyweight.encodeReleasingPayload(
allocator, streamId, false, n, initialPayload);
} catch (IllegalReferenceCountException | NullPointerException e) {
return;
}
final ByteBuf frame =
RequestChannelFrameFlyweight.encodeReleasingPayload(
allocator, streamId, false, n, initialPayload);

senders.put(streamId, upstreamSubscriber);
receivers.put(streamId, receiver);
Expand Down
31 changes: 26 additions & 5 deletions rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 68,11 @@ public Context currentContext() {
public void request(long n) {
this.s.request(n);
if (!firstRequest) {
hookOnRestRequests(n);
try {
this.hookOnRestRequests(n);
} catch (Throwable throwable) {
onError(throwable);
}
return;
}
this.firstRequest = false;
Expand All @@ -82,9 86,18 @@ public void request(long n) {
for (; ; ) {
if (firstLoop) {
firstLoop = false;
hookOnFirstRequest(n);
try {
this.hookOnFirstRequest(n);
} catch (Throwable throwable) {
onError(throwable);
return;
}
} else {
hookOnCancel();
try {
this.hookOnCancel();
} catch (Throwable throwable) {
onError(throwable);
}
return;
}

Expand Down Expand Up @@ -126,13 139,21 @@ public void onNext(Payload t) {
@Override
public void onError(Throwable t) {
this.actual.onError(t);
this.hookOnTerminal(SignalType.ON_ERROR);
try {
this.hookOnTerminal(SignalType.ON_ERROR);
} catch (Throwable throwable) {
Operators.onErrorDropped(throwable, currentContext());
}
}

@Override
public void onComplete() {
this.actual.onComplete();
this.hookOnTerminal(SignalType.ON_COMPLETE);
try {
this.hookOnTerminal(SignalType.ON_COMPLETE);
} catch (Throwable throwable) {
Operators.onErrorDropped(throwable, currentContext());
}
}

@Override
Expand Down