Simply put Dempsy (Distributed Elastic Message Processing SYstem) is an framework for easily writing distributed and dynamically scalable applications that process unbounded streams of (near-)real-time messages. Conceptually it's similar to Apache Flink and Apache Storm.
Note: Dempsy does NOT guarantee message delivery and will opt to discard messages in the presence of "back-pressure." This means it's not suitable for all streaming applications. However, if your application doesn't require guaranteed delivery, then Dempsy provides programming model that makes distributed stream processing applications easier to develop and maintain than other frameworks.
In this example we have an stream of Word messages and we want to keep track of how many times each Word appears in the stream.
You can find the complete working example here: Simple WordCount
To start with we need a source of Word messages. This is done in Dempsy by implementing an Adaptor.
...
import net.dempsy.messages.Adaptor;
import net.dempsy.messages.Dispatcher;
public class WordAdaptor implements Adaptor {
private Dispatcher dempsy;
private final AtomicBoolean running = new AtomicBoolean(false);
/**
* This method is called by the framework to provide a handle to the
* Dempsy message bus. It's called prior to start()
*/
@Override
public void setDispatcher(final Dispatcher dispatcher) {
this.dempsy = dispatcher;
}
@Override
public void start() {
// ... set up the source for the words.
running.set(true);
while(running.get()) {
// obtain data from an external source
final String wordString = getNextWordFromSoucre();
if(wordString == null) // the first null ends the stream.
running.set(false);
else {
// Create a "Word" message and send it into the processing stream.
try {
dempsy.dispatchAnnotated(new Word(wordString));
} catch(IllegalAccessException | IllegalArgumentException | InvocationTargetException | InterruptedException e) {
throw new RuntimeException(e); // This will stop the flow of Words from this adaptor.
// Optimally you'd like to recover and keep going.
}
}
}
}
@Override
public void stop() {
running.set(false);
}
private static final String[] wordSource = {"it","was","the","best","of","times","it","was","the","worst","of","times"};
private int wordSourceIndex = 0;
private String getNextWordFromSoucre() {
if(wordSourceIndex >= wordSource.length)
return null;
return wordSource[wordSourceIndex ];
}
}
When a WordAdaptor
is registered with Dempsy, the following will happen in order:
- Dempsy will call
setDispatcher
and pass aDispatcher
that theAdaptor
can use to dispatch messages. - Dempsy will then call the
start()
method to indicate that theAdaptor
can start sending messages. This will be called in a separate thread so theAdaptor
doesn't have to return from thestart()
method until it's done sending messages. However, theAdaptor
is free to use theDispatcher
in its own threads if it wants and can return fromstart()
without causing a problem. - When Dempsy is shut down, the
Adaptor
will be notified by calling thestop()
method.
In the above the adaptor sends Word
messages. Messages in Dempsy need to satisfy a few requirements.
- They need to have a MessageKey which uniquely identifies a MessageProcessor that will handle processing that message.
- The MessageKey needs to have the appropriate identity semantics (hashCode and equals)
- In most cases when Dempsy is distributed, the Message needs to be serializable according to whatever serialization technique is chosen.
So the Word
message can be defined as follows:
import net.dempsy.lifecycle.annotation.MessageKey;
@MessageType
public class Word implements Serializable {
private final String wordText;
public Word(final String data) {
this.wordText = data;
}
@MessageKey
public String getWordText() {
return this.wordText;
}
}
Using annotations you can identify the class as a Message. The MessageType annotation tells Dempsy that the full class name identifies a Dempsy compliant message. Notice it satisfies the criteria:
- It has a MessageKey which can be retrieved by calling
getWordText()
. - The MessageKey is a
String
which has appropriate identity semantics. - The
Word
class is serializable when using Java serialization.
Dempsy will route each message to an appropriate Message Processor. A unique Message Processor instance will handle each Word
message with a given MessageKey. For example:
import net.dempsy.lifecycle.annotation.MessageHandler;
import net.dempsy.lifecycle.annotation.Mp;
@Mp
public class WordCount implements Cloneable {
private long count = 0;
@MessageHandler
public void countWord(final Word word) {
count ;
System.out.println("The word \"" word.getWordText() "\" has a count of " count);
}
@Override
public Object clone() throws CloneNotSupportedException {
return super.clone();
}
}
So when Dempsy receives a message of type Word
, it retrieves the MessageKey using the annotated method getWordText()
. That MessageKey will become the address of a message processor somewhere on the system. Dempsy will find the message processor instance (in this case an instance of the class WordCount
) within a cluster of nodes responsible for running the WordCount
message processing. In the case that the instance doesn't already exist, Dempsy will clone()
a WordCount
instance prototype.
Note: You should consider the MessageKey as the address of a unique MessageProcessor instance. |
Dempsy will manage the lifecycle of Message Processor instances. It will start with a single instance that will be used as a Prototype. When it needs more instances it will clone()
the prototype. In this example Dempsy will create an instance of WordCount
for every unique MessageKey of a Word
message that gets dispatched. It will call the MessageHandler on the corresponding instance.
The following will pull all the pieces together and process a group of Word
s.
...
import net.dempsy.NodeManager;
import net.dempsy.cluster.local.LocalClusterSessionFactory;
import net.dempsy.config.Cluster;
import net.dempsy.config.Node;
import net.dempsy.lifecycle.annotation.MessageProcessor;
import net.dempsy.monitoring.dummy.DummyNodeStatsCollector;
import net.dempsy.transport.blockingqueue.BlockingQueueReceiver;
public class SimpleWordCount {
public static void main(final String[] args) {
@SuppressWarnings("resource")
final NodeManager nodeManager = new NodeManager()
// add a node
.node(
// a node in an application called word-count
new Node.Builder("word-count")
// with the following clusters
.clusters(
// a cluster that has the adaptor
new Cluster("adaptor")
.adaptor(new WordAdaptor()),
// and a cluster that contains the WordCount message processor
new Cluster("counter")
.mp(new MessageProcessor<WordCount>(new WordCount()))
// with the following routing strategy
.routingStrategyId("net.dempsy.router.simple")
)
// this will basically disable monitoring for the example
.nodeStatsCollector(new DummyNodeStatsCollector())
// use a blocking queue as the transport mechanism since this is all running in the same process
.receiver(new BlockingQueueReceiver(new ArrayBlockingQueue<>(100000)))
.build()
)
// define the infrastructure to be used. Since we're in a single process
// we're going to use a local collaborator. Alternatively we'd specify
// using zookeeper to coordinate across processes and machines.
.collaborator(new LocalClusterSessionFactory().createSession());
// start dempsy processing for this node in the background.
nodeManager.start();
// wait for the processing to be complete
...
nodeManager.stop();
System.out.println("Exiting Main");
}
}
The output from running the example is:
The word "it" has a count of 1
The word "worst" has a count of 1
The word "was" has a count of 1
The word "times" has a count of 1
The word "the" has a count of 1
The word "of" has a count of 1
The word "best" has a count of 1
The word "it" has a count of 2
The word "the" has a count of 2
The word "times" has a count of 2
The word "was" has a count of 2
The word "of" has a count of 2
Exiting Main
In this example we have a Dempsy application with a single node with two clusters. One cluster contains the WordAdaptor
and another contains the set of WordCount
instances being used as message processors.
Once the example runs to completion, the number of WordCount
message processors will be equal to the number of unique message keys from all of the messages streamed. In this case the number is:
Set.of("it","was","the","best","of","times","it","was","the","worst","of","times").size()
So there will be 7 instances of WordCount
being used as message processors and an additional one representing the message processor prototype.
This is illustrated in the following:
To run the "Word Count" example distributed we need to change some of the infrastructure we instantiated. But first, lets convert the stream of words to an unbounded stream by looping in the WordAdaptor
. We'll simply change WordAdaptor.getNextWordFromSoucre()
to the following:
private String getNextWordFromSoucre() {
if(wordSourceIndex >= wordSource.length)
wordSourceIndex = 0;
return wordSource[wordSourceIndex ];
}
To change the infrastructure we need start Dempsy selecting distributed implementations. The updated SimpleWordCount
class would be:
...
import net.dempsy.NodeManager;
import net.dempsy.cluster.ClusterInfoException;
import net.dempsy.cluster.zookeeper.ZookeeperSessionFactory;
import net.dempsy.config.Cluster;
import net.dempsy.config.Node;
import net.dempsy.lifecycle.annotation.MessageProcessor;
import net.dempsy.monitoring.dummy.DummyNodeStatsCollector;
import net.dempsy.serialization.jackson.JsonSerializer;
import net.dempsy.serialization.java.JavaSerializer;
import net.dempsy.transport.tcp.nio.NioReceiver;
public class SimpleWordCount {
public static void main(final String[] args) throws InterruptedException, IllegalStateException, IllegalArgumentException, ClusterInfoException {
final WordAdaptor wordAdaptor = new WordAdaptor();
@SuppressWarnings("resource")
final NodeManager nodeManager = new NodeManager()
// add a node
.node(
// a node in an application called word-count
new Node.Builder("word-count")
// with the following clusters
.clusters(
// a cluster that has the adaptor
new Cluster("adaptor")
.adaptor(wordAdaptor),
// and a cluster that contains the WordCount message processor
new Cluster("counter")
.mp(new MessageProcessor<WordCount>(new WordCount()))
// with the following routing strategy
.routingStrategyId("net.dempsy.router.managed")
)
// this will basically disable monitoring for the example
.nodeStatsCollector(new DummyNodeStatsCollector())
// use a Java NIO the transport mechanism
.receiver(new NioReceiver<Object>(new JavaSerializer()))
.build()
)
// define the infrastructure to be used.
// we want to connect to a zookeeper instance running on this machine.
.collaborator(new ZookeeperSessionFactory("localhost:2181", 3000, new JsonSerializer()).createSession());
// start dempsy processing for this node in the background.
nodeManager.start();
// wait for the node manager to be started.
while(!nodeManager.isReady())
Thread.yield();
// we're just going to wait *forever*
Thread.sleep(999999);
}
}
The changes from the original example include:
- the routing strategy is now set using:
.routingStrategyId("net.dempsy.router.managed")
. The "managed" routing strategy attempts to dynamically distribute all message processors for a given cluster (in this case, allWordCount
instances) across all available nodes. - the receiver is set using:
.receiver(new NioReceiver<Object>(new JavaSerializer()))
. This identifies the technique that this node can be reached as using Java NIO with the given serialization technique. - the dempsy nodes will collaborate with each other using Apache Zookeeper as set using:
.collaborator(new ZookeeperSessionFactory("localhost:2181", 3000, new JsonSerializer()).createSession())
.
Now if we start multiple instances of this Java program, and we have Apache Zookeeper running on port 2181, then the instances of WordCount
message processors will be balanced between the running nodes.
If you want to try it you can start a single node instance of Apache Zookeeper using docker with the following command:
docker run --name zookeeper --network=host -d zookeeper
Then you can run as many instances of SimpleWordCount
as you like.
A working version of this example can be found here: Distributed Simple Word Count Example
Having gone through the "Word Count" example we should codify some of the terminology and concepts touched on.
Term | Definition |
---|---|
message processor | an instance of a cloned message processor prototype responsible for processing every message of a particular type with a particular unique key. |
message processor prototype | an instance used by Dempsy to serve as a template when it needs to `clone()` more instances of a message processor. |
message | is an object that Dempsy routes to a message processor based on the message's key. |
message key | A key for the message, obtained from a message. Each unique message key addresses a unique message processor instance in a cluster |
cluster | a cluster is the collection of all message processors or adaptors of a common type in the same stage of processing of a Dempsy application. A cluster contains a complete set of potential message processors keyed by all of the potential keys from a particular message type.That is, a cluster of message processor instances covers the entire key-space of a message. |
node | a node is a subset of a set of clusters containing a portion of the each cluster's message processors. nodes are almost always (except in some possible test situations) the intersection of a cluster and a Java process. That is, the portion of a cluster that's running in a particular process is the cluster's node |
container | Sometimes also referred to as a message processor container, it is the part of the Dempsy infrastructure that manages the lifecycle of message processors within an individual node. That is, there is a one-to-one between the portion of a cluster running in a particular node, and a container. |