Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #12266 - InvocationType improvements and cleanups. #12596

Open
wants to merge 21 commits into
base: jetty-12.1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift click to select a range
1345a6f
Issue #12266 - InvocationType improvements and cleanups.
sbordet Nov 29, 2024
6ff2e58
Renamed HttpClient.getTransport() to getHttpClientTransport().
sbordet Nov 29, 2024
751c886
Fixed FCGI parsing: onResponseHeaders() was called multiple times in …
sbordet Nov 30, 2024
5352c05
Reverted theh renaming of `HttpClient.getTransport()`.
sbordet Dec 1, 2024
4d4c88a
Fixed race condition when notifying HTTP/2 `HeadersFrame`s.
sbordet Dec 2, 2024
a3b728e
Merged branch 'jetty-12.1.x' into 'fix/jetty-12.1.x/client-invocation…
sbordet Dec 2, 2024
fa7c4a4
Fixed IteratingCallback tests due to changes in toString().
sbordet Dec 2, 2024
78d8a66
Reverted "optimization" in `HttpReceiver.responseHeaders()`.
sbordet Dec 3, 2024
b0b60d1
Reverted another "optimization" in `HttpReceiver.responseHeaders()`.
sbordet Dec 3, 2024
3438462
Merged branch 'jetty-12.1.x' into 'fix/jetty-12.1.x/client-invocation…
sbordet Dec 8, 2024
f7788da
Merged branch 'jetty-12.1.x' into 'fix/jetty-12.1.x/client-invocation…
sbordet Dec 16, 2024
8e036dd
Merged branch 'jetty-12.1.x' into 'fix/jetty-12.1.x/client-invocation…
sbordet Dec 17, 2024
630d5c5
Fixed HTTP/2 serialization in HttpReceiverOverHTTP2.
sbordet Dec 20, 2024
7947030
Fixed tests.
sbordet Dec 20, 2024
dbc7da7
Fixed tests.
sbordet Dec 20, 2024
21d4dfc
Fixed tests.
sbordet Dec 23, 2024
2836d34
Merged branch 'jetty-12.1.x' into 'fix/jetty-12.1.x/client-invocation…
sbordet Dec 23, 2024
ae197ab
Fixed flaky tests.
sbordet Dec 24, 2024
d4da186
Fixed tests.
sbordet Dec 25, 2024
39d7823
Fixed tests.
sbordet Dec 26, 2024
6dbeb96
Fixed handling of HTTP upgrade in CoreClientUpgradeRequest.
sbordet Dec 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixed HTTP/2 serialization in HttpReceiverOverHTTP2.
Fixed reset race in HTTP2Stream.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Dec 20, 2024
commit 630d5c5b5aebce997b7acbf680dd2913b5fc0fcd
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 136,6 @@ public void onComplete(Result result)
{
HttpRequest request = (HttpRequest)result.getRequest();
ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getMediaType(), getEncoding());
if (result.getResponseFailure() != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Authentication challenge failed", result.getFailure());
forwardFailureComplete(request, result.getRequestFailure(), response, result.getResponseFailure());
return;
}

String authenticationAttribute = getAuthenticationAttribute();
HttpConversation conversation = request.getConversation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 70,6 @@ public void onComplete(Result result)
{
Request request = result.getRequest();
Response response = result.getResponse();
if (result.getResponseFailure() == null)
redirector.redirect(request, response, null);
else
redirector.fail(request, response, result.getFailure());
redirector.redirect(request, response, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 157,7 @@ protected void responseBegin(HttpExchange exchange)
responseState = ResponseState.BEGIN;
HttpResponse response = exchange.getResponse();
HttpConversation conversation = exchange.getConversation();
// Probe the protocol handlers
// Probe the protocol handlers.
HttpClient client = getHttpDestination().getHttpClient();
ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
Response.Listener handlerListener = null;
Expand All @@ -170,7 170,7 @@ protected void responseBegin(HttpExchange exchange)
conversation.updateResponseListeners(handlerListener);

if (LOG.isDebugEnabled())
LOG.debug("Response begin {}", response);
LOG.debug("Notifying response begin for {} on {}", exchange, this);
conversation.getResponseListeners().notifyBegin(response);
});
}
Expand All @@ -189,23 189,23 @@ protected void responseBegin(HttpExchange exchange)
protected void responseHeader(HttpExchange exchange, HttpField field)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseHeader for {} on {}", field, this);
LOG.debug("Invoking responseHeader {} for {} on {}", field, exchange, this);

invoker.run(() ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseHeader on {}", this);
LOG.debug("Executing responseHeader for {} on {}", exchange, this);

if (exchange.isResponseCompleteOrTerminated())
return;

responseState = ResponseState.HEADER;
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Notifying header {}", field);
LOG.debug("Notifying response header {} for {} on {}", field, exchange, this);
boolean process = exchange.getConversation().getResponseListeners().notifyHeader(response, field);
if (LOG.isDebugEnabled())
LOG.debug("Header {} notified, {}processing needed", field, (process ? "" : "no "));
LOG.debug("Notified response header {}, processing {}", field, (process ? "needed" : "skipped"));
if (process)
{
response.addHeader(field);
Expand Down Expand Up @@ -241,12 241,12 @@ protected void storeCookie(URI uri, HttpField field)
protected void responseHeaders(HttpExchange exchange)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseHeaders on {}", this);
LOG.debug("Invoking responseHeaders for {} on {}", exchange, this);

invoker.run(() ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseHeaders on {}", this);
LOG.debug("Executing responseHeaders for {} on {}", exchange, this);

if (exchange.isResponseCompleteOrTerminated())
return;
Expand Down Expand Up @@ -288,6 288,8 @@ protected void responseHeaders(HttpExchange exchange)
}
}

if (LOG.isDebugEnabled())
LOG.debug("Notifying response headers for {} on {}", exchange, this);
ResponseListeners responseListeners = exchange.getConversation().getResponseListeners();
responseListeners.notifyHeaders(response);

Expand All @@ -298,6 300,7 @@ protected void responseHeaders(HttpExchange exchange)
{
if (LOG.isDebugEnabled())
LOG.debug("Interim response status {}, succeeding", response.getStatus());
// TODO: explain it's queued.
responseSuccess(exchange, this::onInterim);
return;
}
Expand All @@ -311,12 314,12 @@ protected void responseHeaders(HttpExchange exchange)
if (decoderFactory != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Decoding {} response content", decoderFactory.getEncoding());
LOG.debug("Decoding {} response content for {} on {}", decoderFactory.getEncoding(), exchange, this);
contentSource = new DecodedContentSource(decoderFactory.newDecoderContentSource(rawContentSource), response);
}

if (LOG.isDebugEnabled())
LOG.debug("Response content {} {}", response, contentSource);
LOG.debug("Notifying response content {} for {} on {}", contentSource, exchange, this);
responseListeners.notifyContentSource(response, contentSource);
});
}
Expand All @@ -327,21 330,22 @@ protected void responseHeaders(HttpExchange exchange)
* This method takes care of ensuring the {@link Content.Source} passed to
* {@link Response.ContentSourceListener#onContentSource(Response, Content.Source)}
* calls the demand callback.
* The call to the demand callback is serialized with other events.
*/
protected void responseContentAvailable(HttpExchange exchange)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseContentAvailable on {}", this);
LOG.debug("Invoking responseContentAvailable for {} on {}", exchange, this);

invoker.run(() ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseContentAvailable on {}", this);
LOG.debug("Executing responseContentAvailable for {} on {}", exchange, this);

if (exchange.isResponseCompleteOrTerminated())
return;

if (LOG.isDebugEnabled())
LOG.debug("Notifying data available for {} on {}", exchange, this);
rawContentSource.onDataAvailable();
});
}
Expand All @@ -358,7 362,7 @@ protected void responseContentAvailable(HttpExchange exchange)
protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseSuccess on {}", this);
LOG.debug("Invoking responseSuccess for {} on {}", exchange, this);

// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
Expand All @@ -368,15 372,15 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask)
Runnable successTask = () ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseSuccess on {}", this);
LOG.debug("Executing responseSuccess for {} on {}", exchange, this);

responseState = ResponseState.IDLE;

reset();

HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response success {}", response);
LOG.debug("Notifying response success for {} on {}", exchange, this);
exchange.getConversation().getResponseListeners().notifySuccess(response);

// Interim responses do not terminate the exchange.
Expand All @@ -403,11 407,11 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask)
*/
protected void responseFailure(Throwable failure, Promise<Boolean> promise)
{
if (LOG.isDebugEnabled())
LOG.debug("Failing with {} on {}", failure, this);

HttpExchange exchange = getHttpExchange();

if (LOG.isDebugEnabled())
LOG.debug("Response failure {} for {} on {}", failure, exchange, this);

// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
if (exchange != null && exchange.responseComplete(failure))
Expand Down Expand Up @@ -492,7 496,7 @@ private void cleanup()
public void abort(HttpExchange exchange, Throwable failure, Promise<Boolean> promise)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking abort with {} on {}", failure, this);
LOG.debug("Invoking abort for {} on {}", exchange, this, failure);

if (!exchange.isResponseCompleteOrTerminated())
throw new IllegalStateException();
Expand All @@ -510,13 514,15 @@ public void abort(HttpExchange exchange, Throwable failure, Promise<Boolean> pro

responseState = ResponseState.FAILURE;
this.failure = failure;

if (contentSource != null)
contentSource.fail(failure);

dispose();

HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response abort {} {} on {}", response, exchange, getHttpChannel(), failure);
LOG.debug("Notifying response failure {} for {} on {}", failure, exchange, this);
exchange.getConversation().getResponseListeners().notifyFailure(response, failure);

// Mark atomically the response as terminated, with
Expand Down Expand Up @@ -700,10 706,6 @@ public boolean rewind()
}
}

/**
* This Content.Source implementation guarantees that all {@link #read(boolean)} calls
* happening from a {@link #demand(Runnable)} callback must be serialized.
*/
private class ContentSource implements Content.Source, Invocable
{
private static final Logger LOG = LoggerFactory.getLogger(ContentSource.class);
Expand Down Expand Up @@ -761,9 763,9 @@ private void onDataAvailable()
public InvocationType getInvocationType()
{
Runnable demandCallback = demandCallbackRef.get();
if (demandCallback == null)
return Invocable.getInvocationType(getHttpChannel().getConnection());
return Invocable.getInvocationType(demandCallback);
if (demandCallback != null)
return Invocable.getInvocationType(demandCallback);
return Invocable.getInvocationType(getHttpChannel().getConnection());
}

@Override
Expand All @@ -775,8 777,6 @@ public void demand(Runnable demandCallback)
throw new IllegalArgumentException();
if (!demandCallbackRef.compareAndSet(null, demandCallback))
throw new IllegalStateException();
// The processDemand method may call HttpReceiver.read(boolean)
// so it must be called by the invoker.
invoker.run(processDemand);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 180,17 @@ public boolean handle(org.eclipse.jetty.server.Request request, org.eclipse.jett
return true;
}
});

client.setConnectBlocking(true);
ContentResponse response = client.GET(scenario.getScheme() "://localhost:" connector.getLocalPort());

assertNotNull(response);
assertEquals(200, response.getStatus());
byte[] content = response.getContent();
assertArrayEquals(data, content);
for (int i = 0; i < 2; i)
{
ContentResponse response = client.GET(scenario.getScheme() "://localhost:" connector.getLocalPort());

assertNotNull(response);
assertEquals(200, response.getStatus());
byte[] content = response.getContent();
assertArrayEquals(data, content);
}
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 23,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jetty.client.transport.HttpDestination;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.logging.StacklessLogging;
import org.eclipse.jetty.server.Handler;
Expand Down Expand Up @@ -498,20 499,19 @@ public boolean handle(org.eclipse.jetty.server.Request request, Response respons
@Override
public void onComplete(Result result)
{
// Fake the fact that the redirect failed.
// Fake the fact that the redirect failed,
// but the redirect should still be followed.
Result newResult = new Result(result, cause);
super.onComplete(newResult);
}
});

ExecutionException e = assertThrows(ExecutionException.class, () ->
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/redirect")
.timeout(5, TimeUnit.SECONDS)
.send();
});
assertSame(cause, e.getCause());
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/redirect")
.timeout(5, TimeUnit.SECONDS)
.send();

assertEquals(HttpStatus.OK_200, response.getStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 15,7 @@

import java.io.EOFException;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -43,16 44,21 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.SerializedInvoker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.Client
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP2.class);

private final SerializedInvoker invoker;

public HttpReceiverOverHTTP2(HttpChannel channel)
{
super(channel);
Executor executor = channel.getHttpDestination().getHttpClient().getExecutor();
invoker = new SerializedInvoker(HttpReceiverOverHTTP2.class.getName(), executor);
}

@Override
Expand Down Expand Up @@ -140,7 146,7 @@ private Runnable onResponse(Stream stream, HeadersFrame frame, Callback callback
return null;
}

return new Invocable.ReadyTask(getHttpConnection().getInvocationType(), () ->
return invoker.offer(new Invocable.ReadyTask(getHttpConnection().getInvocationType(), () ->
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
HttpResponse httpResponse = exchange.getResponse();
Expand Down Expand Up @@ -174,7 180,7 @@ private Runnable onResponse(Stream stream, HeadersFrame frame, Callback callback
responseHeaders(exchange);

callback.succeeded();
});
}));
}

private Runnable onTrailer(HeadersFrame frame, Callback callback)
Expand Down Expand Up @@ -240,7 246,7 @@ public Runnable onDataAvailable()
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return null;
return new Invocable.ReadyTask(getInvocationType(), () -> responseContentAvailable(exchange));
return invoker.offer(new Invocable.ReadyTask(getInvocationType(), () -> responseContentAvailable(exchange)));
}

@Override
Expand All @@ -252,7 258,8 @@ public Runnable onReset(ResetFrame frame, Callback callback)
callback.succeeded();
return null;
}
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () ->

return invoker.offer(() ->
{
int error = frame.getError();
IOException failure = new IOException(ErrorCode.toString(error, "reset_code_" error));
Expand All @@ -269,12 276,12 @@ public Runnable onTimeout(TimeoutException failure, Promise<Boolean> promise)
promise.succeeded(false);
return null;
}
return () -> promise.completeWith(exchange.getRequest().abort(failure));
return invoker.offer(() -> promise.completeWith(exchange.getRequest().abort(failure)));
}

@Override
public Runnable onFailure(Throwable failure, Callback callback)
{
return () -> responseFailure(failure, Promise.from(failed -> callback.succeeded(), callback::failed));
return invoker.offer(() -> responseFailure(failure, Promise.from(failed -> callback.succeeded(), callback::failed)));
}
}
Loading
Loading