Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack #17751

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

lordcheng10
Copy link
Contributor

@lordcheng10 lordcheng10 commented Sep 20, 2022

Motivation

When isAutoSkipNonRecoverableData=true and individual ack, the markdelete position does not move forward.

} else if (cursor.config.isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(),
readPosition, exception.getMessage());
// try to find and move to next valid ledger
final Position nexReadPosition = cursor.getNextLedgerPosition(readPosition.getLedgerId());
// fail callback if it couldn't find next valid ledger
if (nexReadPosition == null) {
callback.readEntriesFailed(exception, ctx);
cursor.ledger.mbean.recordReadEntriesError();
recycle();
return;
}
updateReadPosition(nexReadPosition);
checkReadCompletion();
} else {

Modifications

  1. Add a skipped variable in EntryImpl to record whether the entry is automatically skipped;
  2. When pushing entrys to the consumer, automatically ack the skipped entrys;

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

Matching PR in forked repository

PR in forked repository: lordcheng10#19

@lordcheng10 lordcheng10 changed the title When isAutoSkipNonRecoverableData=true, fix the problem that the mark… [fix][broker] Fix the problem that the markdelete position does not move forward Sep 20, 2022
@lordcheng10 lordcheng10 changed the title [fix][broker] Fix the problem that the markdelete position does not move forward [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true Sep 20, 2022
@lordcheng10 lordcheng10 changed the title [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack Sep 20, 2022
@lordcheng10
Copy link
Contributor Author

@eolivelli @codelipenghui @Technoboy- PTAL,thanks!

*/
public List<Entry> filterAndAcknowledgeSkippedEntry(List<Entry> entries) {
List<Position> skippedPositions = new ArrayList<>();
List<Entry> filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can move this into filterEntriesForConsumer? This way we save some allocations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!
@eolivelli PTAL,thanks!

entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.skipped = skipped;
entry.data = Unpooled.wrappedBuffer(new byte[0]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to allocate this ? what about using a constant ?

Copy link
Contributor Author

@lordcheng10 lordcheng10 Sep 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entry.data = Unpooled.EMPTY_BUFFER ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! PTAL,thanks! @eolivelli

@@ -66,6 67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
return entry;
}

public static EntryImpl create(long ledgerId, long entryId, boolean skipped) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about "createSkippedEntry" ?
otherwise people may want to use this factory method for other purposes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! PTAL,thanks! @eolivelli

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lordcheng10
Copy link
Contributor Author

@codelipenghui @Technoboy- @Jason918 PTAL,thanks!

if (entriesToFiltered == null) {
entriesToFiltered = new ArrayList<>();
}
entriesToFiltered.add(entryImpl.getPosition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need entries.set(i, null); and entry.release();?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , I will fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, PTAL, thanks! @Jason918

PositionImpl endPosition = (PositionImpl) nexReadPosition;
while (startPosition.compareTo(endPosition) < 0) {
skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
startPosition = ledger.getNextValidPosition(startPosition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems, normally, it will just goes from readPosition to nexReadPosition?
Will you miss other entries to be acked?

Copy link
Contributor Author

@lordcheng10 lordcheng10 Sep 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems, normally, it will just goes from readPosition to nexReadPosition?

YES

Will you miss other entries to be acked?

IMO, won't miss other entries to be acked

@lordcheng10 lordcheng10 requested review from Jason918 and removed request for AnonHxy September 27, 2022 13:43
@codelipenghui
Copy link
Contributor

@lordcheng10 Thanks for the clarification. I understand the issue for now.

Can we just ack the messages from the read position(first read failed) to the LAC of the Ledger? For example, if we have a Ledger with 1000 entries. but failed when reading the 501th entry (NonRecoverableLedgerException ). Maybe the Ledger has two fragments. Only the data of the last fragment data is non-recoverable. Then we can just perform an ack operation from 501th to 999th.

So that we don't need to introduce new options to users and this is more intuitive and easier to understand

@codelipenghui
Copy link
Contributor

Add a skipped variable in EntryImpl to record whether the entry is automatically skipped;
When pushing entrys to the consumer, automatically ack the skipped entrys;

Sorry, I didn't understand here(From the PR description). If the entry is not able to be read, how can the broker push the entries to the consumer?

@lordcheng10
Copy link
Contributor Author

lordcheng10 commented Oct 8, 2022

Add a skipped variable in EntryImpl to record whether the entry is automatically skipped;
When pushing entrys to the consumer, automatically ack the skipped entrys;

Sorry, I didn't understand here(From the PR description). If the entry is not able to be read, how can the broker push the entries to the consumer?

For unreadable entries, an entry with the skpped flag will be created. When pushing messages, the entries with the skipped flag as true will be filtered out, and then these messages will be acked:
image

@codelipenghui
Copy link
Contributor

For unreadable entries, an entry with the skpped flag will be created. When pushing messages, the entries with the skipped flag as true will be filtered out, and then these messages will be acked

I see. It looks like the read operation will return some fake entries to the caller. Then the caller ack the entries according to the skip flag introduced in the EntryImpl. What is the difference between ack the entries directly when we complete the read operation by checkReadCompletion? In this way, we don't need to introduce a new concept to the EntryImpl, and the caller also has no responsibility to help managed cursor handle internal ack state maintenance issues.

BTW, the Pulsar SQL also uses the cursor API to read data from bookies. If we add skipped field to the EntryImpl, the Pulsar SQL might also need to handle this case.

@lordcheng10
Copy link
Contributor Author

For unreadable entries, an entry with the skpped flag will be created. When pushing messages, the entries with the skipped flag as true will be filtered out, and then these messages will be acked

I see. It looks like the read operation will return some fake entries to the caller. Then the caller ack the entries according to the skip flag introduced in the EntryImpl. What is the difference between ack the entries directly when we complete the read operation by checkReadCompletion? In this way, we don't need to introduce a new concept to the EntryImpl, and the caller also has no responsibility to help managed cursor handle internal ack state maintenance issues.

BTW, the Pulsar SQL also uses the cursor API to read data from bookies. If we add skipped field to the EntryImpl, the Pulsar SQL might also need to handle this case.

OK, I will fix

@mattisonchao
Copy link
Member

mattisonchao commented Oct 8, 2022

1 Agree with @codelipenghui concern. individual ack directly is better than introducing new stuff.

@lordcheng10
Copy link
Contributor Author

1 Agree with @codelipenghui concern. individual ack directly is better than introducing new stuff.

OK , I will fixed!

break;
}
}
List<Entry> filteredEntries = cursor.filterReadEntries(skippedEntries);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just drop the data with the instruction cursor.delete(positions)? This saves the memory of entries and is easier to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , I will fix

skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
startPosition = ledger.getNextValidPosition(startPosition);
toAckEntryNum ;
if (toAckEntryNum > cursor.getConfig().getMaxAckEntryNumForAutoSkipNonRecoverableData()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the state of the cursor is like this:

read position {1:0}
individual deleted messages [ {1:0}, {1:50001} ]

After the entry filter, the nexReadPosition will be {1,10000} and filteredEntries will be {1,0}, then maxAckEntryNumForAutoSkipNonRecoverableData could not work perfect. Can we let cursor do the filtering?

Copy link
Contributor Author

@lordcheng10 lordcheng10 Oct 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix

@github-actions
Copy link

github-actions bot commented Nov 9, 2022

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Nov 9, 2022
@congbobo184
Copy link
Contributor

@lordcheng10 hi, I move this PR to release/2.9.5, if you have any questions, please ping me. thanks.

@lordcheng10
Copy link
Contributor Author

OpReadEntry.java

OK

@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@michaeljmarshall
Copy link
Member

As discussed on the mailing list https://lists.apache.org/thread/w4jzk27qhtosgsz7l9bmhf1t7o9mxjhp, there is no plan to release 2.9.6, so I am going to remove the release/2.9.6 label

@github-actions github-actions bot removed the Stale label Jun 28, 2023
@Technoboy- Technoboy- added this to the 3.2.0 milestone Jul 31, 2023
@Technoboy- Technoboy- modified the milestones: 3.2.0, 3.3.0 Dec 22, 2023
@coderzc coderzc modified the milestones: 3.3.0, 3.4.0 May 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs release/2.8.5 release/2.10.4 release/2.11.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.