Skip to content

Commit

Permalink
feat: generify document queue/set ICIJ/datashare#1283
Browse files Browse the repository at this point in the history
  • Loading branch information
bamthomas committed Jan 10, 2024
1 parent b10f944 commit ea8d8c3
Show file tree
Hide file tree
Showing 11 changed files with 34 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 14,7 @@

@Option(name = "queueTable", description = "The queue table Defaults to \"document_queue\".", parameter = "name")
@OptionsClass(SQLDocumentQueueCodec.class)
public class MySQLDocumentQueue extends MySQLBlockingQueue<Path> implements DocumentQueue {
public class MySQLDocumentQueue extends MySQLBlockingQueue<Path> implements DocumentQueue<Path> {

public MySQLDocumentQueue(final DataSource dataSource, final DocumentFactory factory,
final Options<String> options) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 1,15 @@
package org.icij.extract.queue;

import org.icij.extract.document.TikaDocument;

import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

/**
* The interface for a queue of {@link TikaDocument} objects.
*
* The interface for a document queue represented by {@link T} objects.
* T should provide a unique attribute to identify Documents
* @since 2.0.0
*/
public interface DocumentQueue extends BlockingQueue<Path>, AutoCloseable {
public interface DocumentQueue<T> extends BlockingQueue<T>, AutoCloseable {
String getName();
boolean delete();

Expand All @@ -33,7 30,7 @@ default boolean remove(Object o, int count) {
}

default int removeDuplicates() {
Map<Path, Integer> documents = new HashMap<>();
Map<T, Integer> documents = new HashMap<>();
final int initialSize = size();
forEach(path -> documents.compute(path, (k, v) -> (v == null) ? 1 : v 1));
documents.entrySet().stream().filter((e -> e.getValue()>1)).forEach(e -> remove(e.getKey(), e.getValue() - 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 22,11 @@
*/
@Option(name = "queuePoll", description = "Time to wait when polling the queue e.g. \"5s\" or \"1m\". "
"Defaults to 0.", parameter = "duration")
public class DocumentQueueDrainer extends ExecutorProxy {
public class DocumentQueueDrainer<T> extends ExecutorProxy {
private static final Duration DEFAULT_TIMEOUT = Duration.ZERO;

private final DocumentQueue queue;
private final Consumer<Path> consumer;
private final DocumentQueue<T> queue;
private final Consumer<T> consumer;

private SealableLatch latch = null;
private Duration pollTimeout = DEFAULT_TIMEOUT;
Expand All @@ -39,13 39,13 @@ public class DocumentQueueDrainer extends ExecutorProxy {
* @param queue the queue to drain
* @param consumer must accept documents drained from the queue
*/
public DocumentQueueDrainer(final DocumentQueue queue, final Consumer<Path> consumer) {
public DocumentQueueDrainer(final DocumentQueue<T> queue, final Consumer<T> consumer) {
super(Executors.newSingleThreadExecutor());
this.queue = queue;
this.consumer = consumer;
}

public DocumentQueueDrainer configure(final Options<String> options) {
public DocumentQueueDrainer<T> configure(final Options<String> options) {
options.get("queuePoll").parse().asDuration().ifPresent(this::setPollTimeout);
return this;
}
Expand Down Expand Up @@ -170,13 170,13 @@ private class DrainingTask implements Callable<Long> {
*
* @throws InterruptedException if interrupted while polling
*/
private Path poll() throws InterruptedException {
private T poll() throws InterruptedException {

// Store the latch and timeout in local constants so that they be used in a thread-safe way.
final Duration pollTimeout = getPollTimeout();
final SealableLatch latch = getLatch();

Path path;
T path;

if (null != latch) {
path = queue.poll();
Expand Down Expand Up @@ -210,7 210,7 @@ private Path poll() throws InterruptedException {
private long drain() throws InterruptedException {
long consumed = 0;

Path path = poll();
T path = poll();
while (null != path && (null == poison || !path.equals(poison))) {
consumer.accept(path);
consumed ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 8,7 @@
*
*/
public enum DocumentQueueType {
ARRAY, REDIS, MYSQL;
ARRAY, REDIS, MYSQL, AMQP;

/**
* Return the name of the queue type.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 1,7 @@
package org.icij.extract.queue;

import java.nio.file.Path;
import java.util.Set;

public interface DocumentSet extends Set<Path>, AutoCloseable {
public interface DocumentSet<T> extends Set<T>, AutoCloseable {
String getName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 3,6 @@
import org.icij.task.Options;
import org.icij.task.annotation.Option;

import java.nio.file.Path;
import java.util.concurrent.ArrayBlockingQueue;

/**
Expand All @@ -14,7 13,7 @@
@Option(name = "queueName", description = "The name of the queue.", parameter = "name")
@Option(name = "queueBuffer", description = "The size of the internal file path buffer used by the queue.",
parameter = "size")
public class MemoryDocumentQueue extends ArrayBlockingQueue<Path> implements DocumentQueue {
public class MemoryDocumentQueue<T> extends ArrayBlockingQueue<T> implements DocumentQueue<T> {

private static final long serialVersionUID = -7491630465350342533L;
private final String queueName;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 1,8 @@
package org.icij.extract.queue;

import java.nio.file.Path;
import java.util.LinkedHashSet;

public class MemoryDocumentSet extends LinkedHashSet<Path> implements DocumentSet {
public class MemoryDocumentSet<T> extends LinkedHashSet<T> implements DocumentSet<T> {
private final String name;

public MemoryDocumentSet(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 17,6 @@

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.HashMap;

/**
Expand All @@ -28,7 27,7 @@
@Option(name = "queueName", description = "The name of the queue.", parameter = "name")
@Option(name = "charset", description = "Set the output encoding for strings. Defaults to UTF-8.", parameter = "name")
@OptionsClass(RedissonClientFactory.class)
public class RedisDocumentQueue extends RedissonBlockingQueue<Path> implements DocumentQueue {
public class RedisDocumentQueue<T> extends RedissonBlockingQueue<T> implements DocumentQueue<T> {
/**
* The default name for a queue in Redis.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 9,9 @@
import org.redisson.liveobject.core.RedissonObjectBuilder;

import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.HashMap;

public class RedisDocumentSet extends RedissonSet<Path> implements DocumentSet {
public class RedisDocumentSet<T> extends RedissonSet<T> implements DocumentSet<T> {
public static String DEFAULT_NAME = "extract:filter";
private final RedissonClient redissonClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 21,7 @@
import static org.junit.Assert.assertEquals;

public class ScannerTest {
private final DocumentQueue queue = new MemoryDocumentQueue("extract:queue", 100);
private final DocumentQueue<Path> queue = new MemoryDocumentQueue<>("extract:queue", 100);
private final Scanner scanner = new Scanner(queue);

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 27,8 @@ Deque<Path> getAccepted() {
}
}

private DocumentQueue createQueue() {
final DocumentQueue queue = new MemoryDocumentQueue("extract:queue", 26);
private DocumentQueue<Path> createQueue() {
final DocumentQueue<Path> queue = new MemoryDocumentQueue<>("extract:queue", 26);

for (char a = 'a'; a <= 'z'; a ) {
queue.add(Paths.get(Character.toString(a)));
Expand All @@ -39,18 39,18 @@ private DocumentQueue createQueue() {

@Test
public void testDefaultPollTimeoutIs0() {
final DocumentQueue queue = createQueue();
final DocumentQueue<Path> queue = createQueue();
final Consumer<Path> consumer = new MockConsumer();
final DocumentQueueDrainer drainer = new DocumentQueueDrainer(queue, consumer);
final DocumentQueueDrainer<Path> drainer = new DocumentQueueDrainer<>(queue, consumer);

Assert.assertEquals(0, drainer.getPollTimeout().getSeconds());
}

@Test
public void testDrainsQueue() throws Throwable {
final DocumentQueue queue = createQueue();
final DocumentQueue<Path> queue = createQueue();
final MockConsumer consumer = new MockConsumer();
final DocumentQueueDrainer drainer = new DocumentQueueDrainer(queue, consumer);
final DocumentQueueDrainer<Path> drainer = new DocumentQueueDrainer<>(queue, consumer);

final long drained = drainer.drain().get();
final Queue<Path> accepted = consumer.getAccepted();
Expand All @@ -66,9 66,9 @@ public void testDrainsQueue() throws Throwable {

@Test
public void testDrainsQueueUntilPoison() throws Throwable {
final DocumentQueue queue = createQueue();
final DocumentQueue<Path> queue = createQueue();
final MockConsumer consumer = new MockConsumer();
final DocumentQueueDrainer drainer = new DocumentQueueDrainer(queue, consumer);
final DocumentQueueDrainer<Path> drainer = new DocumentQueueDrainer<>(queue, consumer);
final Path poison = Paths.get("c");

final long drained = drainer.drain(poison).get();
Expand All @@ -83,9 83,9 @@ public void testDrainsQueueUntilPoison() throws Throwable {

@Test
public void testClearPollTimeout() throws Throwable {
final DocumentQueue queue = createQueue();
final DocumentQueue<Path> queue = createQueue();
final MockConsumer consumer = new MockConsumer();
final DocumentQueueDrainer drainer = new DocumentQueueDrainer(queue, consumer);
final DocumentQueueDrainer<Path> drainer = new DocumentQueueDrainer<>(queue, consumer);
final Path poison = Paths.get("1");

drainer.clearPollTimeout();
Expand All @@ -112,9 112,9 @@ public void run() {

@Test
public void testSetPollTimeout() throws Throwable {
final DocumentQueue queue = createQueue();
final DocumentQueue<Path> queue = createQueue();
final MockConsumer consumer = new MockConsumer();
final DocumentQueueDrainer drainer = new DocumentQueueDrainer(queue, consumer);
final DocumentQueueDrainer<Path> drainer = new DocumentQueueDrainer<>(queue, consumer);

drainer.setPollTimeout(HumanDuration.parse("2s"));
Assert.assertEquals(2, drainer.getPollTimeout().getSeconds());
Expand All @@ -139,9 139,9 @@ public void run() {

@Test
public void testSetLatch() throws Throwable {
final DocumentQueue queue = createQueue();
final DocumentQueue<Path> queue = createQueue();
final MockConsumer consumer = new MockConsumer();
final DocumentQueueDrainer drainer = new DocumentQueueDrainer(queue, consumer);
final DocumentQueueDrainer<Path> drainer = new DocumentQueueDrainer<>(queue, consumer);
final SealableLatch latch = new BooleanSealableLatch();

drainer.setLatch(latch);
Expand Down

0 comments on commit ea8d8c3

Please sign in to comment.