Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentMergeScheduler may spawn more merge threads than specified #13593

Open
aoli-al opened this issue Jul 19, 2024 · 1 comment
Open

ConcurrentMergeScheduler may spawn more merge threads than specified #13593

aoli-al opened this issue Jul 19, 2024 · 1 comment
Labels

Comments

@aoli-al
Copy link

aoli-al commented Jul 19, 2024

Description

I saw a failure in the test testMaxMergeCount due to concurrency issues.

𞤃𞤮𞤪𞤧𞤮 19, 2024 12:08:03 𞤇𞤎 com.carrotsearch.randomizedtesting.RandomizedRunner$QueueUncaughtExceptionsHandler uncaughtException
WARNING: Uncaught exception in thread: Thread[#25,Lucene Merge Thread #2,5,TGRP-TestConcurrentMergeScheduler]
org.apache.lucene.index.MergePolicy$MergeException: java.lang.RuntimeException: java.lang.AssertionError: count=2 vs maxMergeCount=1
	at __randomizedtesting.SeedInfo.seed([852F062F4E7F50BA]:0)
	at org.apache.lucene.index.ConcurrentMergeScheduler.handleMergeException(ConcurrentMergeScheduler.java:774)
	at org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:766)
Caused by: java.lang.RuntimeException: java.lang.AssertionError: count=2 vs maxMergeCount=1
	at org.apache.lucene.index.TestConcurrentMergeScheduler$3.doMerge(TestConcurrentMergeScheduler.java:372)
	at org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:739)
Caused by: java.lang.AssertionError: count=2 vs maxMergeCount=1
	at org.junit.Assert.fail(Assert.java:89)
	at org.junit.Assert.assertTrue(Assert.java:42)
	at org.apache.lucene.index.TestConcurrentMergeScheduler$3.doMerge(TestConcurrentMergeScheduler.java:347)
	... 1 more

The root cause is that a race could happen in the merge method.

if (maybeStall(mergeSource) == false) {
break;
}
OneMerge merge = mergeSource.getNextMerge();

The method calls maybeStall to check if the merge should be stalled due to the thread limit. However, the loop will break if mergeSource.hasPendingMerges() returns false. So, a concurrency issue may arise if mergeSource adds new pending merges after this method is evaluated. This will allow ConcurrentMergeScheduler to create more merge threads than specified.

while (mergeSource.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) {

Here is the code to reproduce the failure:

diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
index d0e79375ea6..a479637155e 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
    b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
@@ -541,6  541,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
     }
   }
 
   public static boolean threadMergeIssued = false;
   @Override
   public synchronized void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException {
 
@@ -562,6  563,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
       message("  index(source): "   mergeSource.toString());
     }
 
     boolean mergeThreadCreated = false;
     // Iterate, pulling from the IndexWriter's queue of
     // pending merges, until it's empty:
     while (true) {
@@ -569,6  571,13 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
       if (maybeStall(mergeSource) == false) {
         break;
       }
       if (Thread.currentThread().getName().contains("Merge Thread") && mergeThreadCreated) {
         threadMergeIssued = true;
         try {
           Thread.sleep(1000);
         } catch (Exception e) {
         }
       }
 
       OneMerge merge = mergeSource.getNextMerge();
       if (merge == null) {
@@ -591,6  600,7 @@ public class ConcurrentMergeScheduler extends MergeScheduler {
           message("    launch new thread ["   newMergeThread.getName()   "]");
         }
 
         mergeThreadCreated = true;
         newMergeThread.start();
         updateMergeThreads();
 
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 44d8bee8460..f83c4cac519 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
    b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -2305,10  2305,25 @@ public class IndexWriter
     maybeMerge(config.getMergePolicy(), MergeTrigger.EXPLICIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
   }
 
   int index = 0;
   private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
       throws IOException {
     ensureOpen(false);
-    if (updatePendingMerges(mergePolicy, trigger, maxNumSegments) != null) {
     boolean shouldWait = false;
     if (index == 1) {
       while (!ConcurrentMergeScheduler.threadMergeIssued) {
         Thread.yield();
         shouldWait = true;
       }
     }
     MergePolicy.MergeSpecification result = updatePendingMerges(mergePolicy, trigger, maxNumSegments);
     if (result != null) {
       index = 1;
       if (shouldWait) {
         try {
           Thread.sleep(50000);
         } catch (Exception e) {}
       }
       executeMerge(trigger);
     }
   }
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
index 5f85b5d3774..407e6f5f4bb 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
    b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
@@ -321,8  321,8 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
     IndexWriterConfig iwc =
         new IndexWriterConfig(new MockAnalyzer(random())).setCommitOnClose(false);
 
-    final int maxMergeCount = TestUtil.nextInt(random(), 1, 5);
-    final int maxMergeThreads = TestUtil.nextInt(random(), 1, maxMergeCount);
     final int maxMergeCount = 1;
     final int maxMergeThreads = 1;
     final CountDownLatch enoughMergesWaiting = new CountDownLatch(maxMergeCount);
     final AtomicInteger runningMergeCount = new AtomicInteger(0);
     final AtomicBoolean failed = new AtomicBoolean();
@@ -331,6  331,7 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
       System.out.println(
           "TEST: maxMergeCount="   maxMergeCount   " maxMergeThreads="   maxMergeThreads);
     }
     int[] index = new int[] {0};
 
     ConcurrentMergeScheduler cms =
         new ConcurrentMergeScheduler() {
@@ -358,6  359,9 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
                 // Then sleep a bit to give a chance for the bug
                 // (too many pending merges) to appear:
                 Thread.sleep(20);
                 if (Thread.currentThread().getName().contains("#1")) {
                   Thread.sleep(50000);
                 }
                 super.doMerge(mergeSource, merge);
               } finally {
                 runningMergeCount.decrementAndGet();
@@ -381,17  385,17 @@ public class TestConcurrentMergeScheduler extends LuceneTestCase {
     IndexWriter w = new IndexWriter(dir, iwc);
     Document doc = new Document();
     doc.add(newField("field", "field", TextField.TYPE_NOT_STORED));
-    while (enoughMergesWaiting.getCount() != 0 && !failed.get()) {
     while (true) {
       for (int i = 0; i < 10; i  ) {
         w.addDocument(doc);
       }
     }
-    try {
-      w.commit();
-    } finally {
-      w.close();
-    }
-    dir.close();
 //    try {
 //      w.commit();
 //    } finally {
 //      w.close();
 //    }
 //    dir.close();
   }
 
   public void testSmallMergesDonNotGetThreads() throws IOException {

With the patch TestConcurrentMergeScheduler#testMaxMergeCount will raise assertion failure.

Version and environment details

Version: 33a4c1d

@aoli-al
Copy link
Author

aoli-al commented Aug 21, 2024

Please use the following fork to reproduce the failure: https://github.com/aoli-al/lucene/tree/LUCENE-13593

Note that the patch adds an infinite loop at the end of the test. So you need to use an editor (e.g., intellij) to run the test and see the failure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant