Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][java-client] check consumer pause status before consumer receive/batchReceive #17182

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

yapxue
Copy link
Contributor

@yapxue yapxue commented Aug 19, 2022

Motivation

*consumer client has method pause(), but consumer is not really paused. paused consumer only stop send flow permit request to broker but broker still can push messages to consumer if there is already existed flow permit. consumer can get messages if it invoked consumer.receive(). This may mislead users since no message is expected after pause. *

Modifications

park for some time if consumer is paused. consumer hold prefetched messages in client cache.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

Check the box below or label this PR directly.

Need to update docs?

  • doc-required
    (Your PR needs to update docs and you will update later)

  • [ x] doc-not-needed
    (already documented)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@MarvinCai
Copy link
Contributor

the doc for Consumer#pause is

/**
 * Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause
 * {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker.
 */

it doesn't say anything about stop receiving message instantly, I think current behavior is acceptable?

@yapxue
Copy link
Contributor Author

yapxue commented Aug 22, 2022

the doc for Consumer#pause is

/**
 * Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause
 * {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker.
 */

it doesn't say anything about stop receiving message instantly, I think current behavior is acceptable?

User may expect stop receive instanly. Is it possible to change the api doc?
For example in kafka, after pause is invoked, consumer just have empty poll and prefeched messages stay in cache. This mismatched behavior may surprise users migrated from kafka.

Comment on lines 236 to 238
return pauseFuture.thenCompose((v) -> {
return internalReceiveAsync();
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution would need to be optimized. Although using a CompletableFuture addresses the functional aspects, it's not optimal at all. For example, in this case, .thenCompose will create new objects on every call, also when pausing is disabled.

@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Sep 23, 2022
@Technoboy- Technoboy- added this to the 3.2.0 milestone Jul 31, 2023
@Technoboy- Technoboy- modified the milestones: 3.2.0, 3.3.0 Dec 22, 2023
@coderzc coderzc modified the milestones: 3.3.0, 3.4.0 May 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants