Skip to content

Commit

Permalink
Added mock mode (-M)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Apr 7, 2022
1 parent c0faad8 commit 033e2ac
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 1,9 @@
# kcat v1.8.0

* Added new mock cluster mode
`kcat -M <broker-cnt>` spins up a mock cluster that applications
can produce to and consume from.
* Mixing modes is now prohibited (e.g., `-P .. -C`).
* Producer: Fix stdin buffering: no messages would be produced
until Ctrl-D or at least 1024 bytes had accumulated (#343).

Expand Down
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 330,42 @@ Partition: 0 Offset: 2
--
% Reached end of topic test [0] at offset 3
```


## Run a mock Kafka cluster

With kcat you can spin up an ephemeral in-memory mock Kafka cluster
that you you can connect your Kafka applications to for quick
testing.
The mock cluster supports a reasonable subset of the Kafka
protocol, such as:

* Producer
* Idempotent Producer
* Transactional Producer
* Low-level consumer
* High-level balanced consumer groups with offset commits
* Topic Metadata and auto creation


Spin the cluster by running kcat in the `-M` (for mock) mode:

```bash

# Create mock cluster with 3 brokers
$ kcat -M 3
...
BROKERS=localhost:12345,localhost:46346,localhost:23599
...
```

While kcat runs, let your Kafka applications connect to the mock cluster
by configuring them with the `bootstrap.servers` emitted in the `BROKERS`
line above.

Let kcat run for as long as you need the cluster, then terminate it by
pressing `Ctrl-D`.


Since the cluster runs all in memory, with no disk IO, it is quite suitable
for performance testing.
107 changes: 100 additions & 7 deletions kcat.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 51,10 @@
#include "kcat.h"
#include "input.h"

#if ENABLE_MOCK
#include <librdkafka/rdkafka_mock.h>
#endif

#if RD_KAFKA_VERSION >= 0x01040000
#define ENABLE_TXNS 1
#endif
Expand Down Expand Up @@ -1099,6 1103,45 @@ static void consumer_run (FILE *fp) {
}


#if ENABLE_MOCK
/**
* @brief Run mock cluster until stdin is closed.
*/
static void mock_run (void) {
rd_kafka_t *rk;
rd_kafka_mock_cluster_t *mcluster;
const char *bootstraps;
char errstr[512];
char buf[64];

if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf.rk_conf,
errstr, sizeof(errstr))))
KC_FATAL("Failed to create client instance for "
"mock cluster: %s", errstr);

mcluster = rd_kafka_mock_cluster_new(rk, conf.mock.broker_cnt);
if (!mcluster)
KC_FATAL("Failed to create mock cluster");

bootstraps = rd_kafka_mock_cluster_bootstraps(mcluster);

KC_INFO(1, "Mock cluster started with bootstrap.servers=%s\n",
bootstraps);
KC_INFO(1, "Press Ctrl-C Enter or Ctrl-D to terminate.\n");

printf("BROKERS=%s\n", bootstraps);

while (conf.run && fgets(buf, sizeof(buf), stdin)) {
/* nop */
}

KC_INFO(1, "Terminating mock cluster\n");

rd_kafka_mock_cluster_destroy(mcluster);
rd_kafka_destroy(rk);
}
#endif

/**
* Print metadata information
*/
Expand Down Expand Up @@ -1268,6 1311,9 @@ static void RD_NORETURN usage (const char *argv0, int exitcode,
#endif
#if ENABLE_INCREMENTAL_ASSIGN
"IncrementalAssign, "
#endif
#if ENABLE_MOCK
"MockCluster, "
#endif
,
#if ENABLE_JSON
Expand All @@ -1282,12 1328,19 @@ static void RD_NORETURN usage (const char *argv0, int exitcode,
exit(exitcode);

fprintf(out, "\n"
"General options:\n"
" -C | -P | -L | -Q Mode: Consume, Produce, Metadata List, Query mode\n"
"Mode:\n"
" -P Producer\n"
" -C Consumer\n"
#if ENABLE_KAFKACONSUMER
" -G <group-id> Mode: High-level KafkaConsumer (Kafka >=0.9 balanced consumer groups)\n"
" Expects a list of topics to subscribe to\n"
" -G <group-id> High-level KafkaConsumer (Kafka >=0.9 balanced consumer groups)\n"
#endif
" -L Metadata List\n"
" -Q Query mode\n"
#if ENABLE_MOCK
" -M <broker-cnt> Start Mock cluster\n"
#endif
"\n"
"General options for most modes:\n"
" -t <topic> Topic to consume from, produce to, "
"or list\n"
" -p <partition> Partition\n"
Expand Down Expand Up @@ -1425,6 1478,20 @@ static void RD_NORETURN usage (const char *argv0, int exitcode,
" Multiple -t .. are allowed but a partition\n"
" must only occur once.\n"
"\n"
#if ENABLE_MOCK
"Mock cluster options (-M):\n"
" The mock cluster is provided by librdkafka and supports a\n"
" reasonable set of Kafka protocol functionality:\n"
" producing, consuming, consumer groups, transactions, etc.\n"
" When kcat is started with -M .. it will print the mock cluster\n"
" bootstrap.servers to stdout, like so:\n"
" BROKERS=broker1:port,broker2:port,..\n"
" Use this list of brokers as bootstrap.servers in your Kafka application.\n"
" When kcat exits (Ctrl-C, Ctrl-D or when stdin is closed) the\n"
" cluster will be terminated.\n"
"\n"
#endif
"\n"
"Format string tokens:\n"
" %%s Message payload\n"
" %%S Message payload length (or -1 for NULL)\n"
Expand Down Expand Up @@ -2017,24 2084,44 @@ static void argparse (int argc, char **argv,
int i;

while ((opt = getopt(argc, argv,
":PCG:LQt:p:b:z:o:eED:K:k:H:Od:qvF:X:c:Tuf:ZlVh"
":PCG:LQM:t:p:b:z:o:eED:K:k:H:Od:qvF:X:c:Tuf:ZlVh"
"s:r:Jm:U")) != -1) {
switch (opt) {
case 'P':
case 'C':
case 'L':
case 'Q':
if (conf.mode && conf.mode != opt)
KC_FATAL("Do not mix modes: -%c seen when "
"-%c already set",
(char)opt, conf.mode);
conf.mode = opt;
break;
#if ENABLE_KAFKACONSUMER
case 'G':
if (conf.mode && conf.mode != opt)
KC_FATAL("Do not mix modes: -%c seen when "
"-%c already set",
(char)opt, conf.mode);
conf.mode = opt;
conf.group = optarg;
if (rd_kafka_conf_set(conf.rk_conf, "group.id", optarg,
errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK)
KC_FATAL("%s", errstr);
break;
#endif
#if ENABLE_MOCK
case 'M':
if (conf.mode && conf.mode != opt)
KC_FATAL("Do not mix modes: -%c seen when "
"-%c already set",
(char)opt, conf.mode);
conf.mode = opt;
conf.mock.broker_cnt = atoi(optarg);
if (conf.mock.broker_cnt <= 0)
KC_FATAL("-M <broker_cnt> expected");
break;
#endif
case 't':
if (conf.mode == 'Q') {
Expand Down Expand Up @@ -2273,7 2360,7 @@ static void argparse (int argc, char **argv,
exit(0);
}

if (!(conf.flags & CONF_F_BROKERS_SEEN))
if (!(conf.flags & CONF_F_BROKERS_SEEN) && conf.mode != 'M')
usage(argv[0], 1, "-b <broker,..> missing", 0);

/* Decide mode if not specified */
Expand All @@ -2287,7 2374,7 @@ static void argparse (int argc, char **argv,
}


if (!strchr("GLQ", conf.mode) && !conf.topic)
if (!strchr("GLQM", conf.mode) && !conf.topic)
usage(argv[0], 1, "-t <topic> missing", 0);
else if (conf.mode == 'Q' && !*rktparlistp)
usage(argv[0], 1,
Expand Down Expand Up @@ -2519,6 2606,12 @@ int main (int argc, char **argv) {
rd_kafka_topic_partition_list_destroy(rktparlist);
break;

#if ENABLE_MOCK
case 'M':
mock_run();
break;
#endif

default:
usage(argv[0], 0, NULL, 0);
break;
Expand Down
10 changes: 10 additions & 0 deletions kcat.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 59,10 @@
#define HAVE_CONTROLLERID 0
#endif

#if RD_KAFKA_VERSION >= 0x01030000
#define ENABLE_MOCK 1
#endif


typedef enum {
KC_FMT_STR,
Expand Down Expand Up @@ -155,6 159,12 @@ struct conf {
serdes_conf_t *srconf;
char *schema_registry_url;
#endif

#if ENABLE_MOCK
struct {
int broker_cnt;
} mock;
#endif
};

extern struct conf conf;
Expand Down

0 comments on commit 033e2ac

Please sign in to comment.