Skip to content

Commit

Permalink
Pipe: Fix infinite loop when thread is interrupted in invoking PipeMe…
Browse files Browse the repository at this point in the history
…moryBlock#close & Avoid throwing new InterruptedException in conditions that can be self-restoring (#14471) (#14486)
  • Loading branch information
luoluoyuyu authored Dec 18, 2024
1 parent 0a00469 commit a8940b6
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,13 414,20 @@ public Iterable<TabletInsertionEvent> toTabletInsertionEvents(final long timeout
}
waitForResourceEnough4Parsing(timeoutMs);
return initDataContainer().toTabletInsertionEvents();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final Exception e) {
close();

// close() should be called before re-interrupting the thread
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}

final String errorMsg =
String.format(
"Interrupted when waiting for closing TsFile %s.", resource.getTsFilePath());
e instanceof InterruptedException
? String.format(
"Interrupted when waiting for closing TsFile %s.", resource.getTsFilePath())
: String.format(
"Parse TsFile %s error. Because: %s", resource.getTsFilePath(), e.getMessage());
LOGGER.warn(errorMsg, e);
throw new PipeException(errorMsg);
}
Expand Down Expand Up @@ -458,7 465,7 @@ private void waitForResourceEnough4Parsing(final long timeoutMs) throws Interrup

if (waitTimeSeconds * 1000 > timeoutMs) {
// should contain 'TimeoutException' in exception message
throw new InterruptedException(
throw new PipeException(
String.format("TimeoutException: Waited %s seconds", waitTimeSeconds));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,20 170,32 @@ public String toString() {

@Override
public void close() {
boolean isInterrupted = false;

while (true) {
try {
if (lock.tryLock(50, TimeUnit.MICROSECONDS)) {
try {
pipeMemoryManager.release(this);
if (isInterrupted) {
LOGGER.warn("{} is released after thread interruption.", this);
}
break;
} finally {
lock.unlock();
}
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
// Each time the close task is run, it means that the interrupt status left by the previous
// tryLock does not need to be retained. Otherwise, it will lead to an infinite loop.
isInterrupted = true;
LOGGER.warn("Interrupted while waiting for the lock.", e);
}
}

// Restore the interrupt status of the current thread
if (isInterrupted) {
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 234,7 @@ private void waitForResourceEnough4Slicing(final long timeoutMs) throws Interrup
if (waitTimeSeconds * 1000 > timeoutMs) {
// should contain 'TimeoutException' in exception message
// see org.apache.iotdb.rpc.subscription.exception.SubscriptionTimeoutException.KEYWORD
throw new InterruptedException(
throw new SubscriptionException(
String.format("TimeoutException: Waited %s seconds", waitTimeSeconds));
}
}
Expand Down

0 comments on commit a8940b6

Please sign in to comment.