Skip to content

Commit

Permalink
JSON and definable output formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Mar 9, 2015
1 parent 14c7bc5 commit 7e278cb
Show file tree
Hide file tree
Showing 8 changed files with 717 additions and 81 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 1,10 @@
include Makefile.config

BIN= kafkacat

SRCS= kafkacat.c
OBJS= $(SRCS:.c=.o)
SRCS_y= kafkacat.c format.c
SRCS_$(ENABLE_JSON) = json.c
OBJS= $(SRCS_y:.c=.o)

.PHONY:

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 24,9 @@ kafkacat is fast and lightweight; statically linked it is no more than 150Kb.
# Requirements

* librdkafka - https://github.com/edenhill/librdkafka
* libyajl (for JSON support, optional)

On Ubuntu or Debian: `sudo apt-get install librdkafka-dev libyajl-dev`


# Build
Expand Down
51 changes: 39 additions & 12 deletions bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 11,64 @@

set -o errexit -o nounset -o pipefail

LIBRDKAFKA_VERSION=${LIBRDKAFKA_VERSION:-master}
LIBRDKAFKA_DIR=librdkafka-${LIBRDKAFKA_VERSION}
LIBRDKAFKA_URL=https://github.com/edenhill/librdkafka/archive/${LIBRDKAFKA_VERSION}.tar.gz

mkdir -p tmp-bootstrap
pushd tmp-bootstrap > /dev/null
function github_download {
repo=$1
version=$2
dir=$3

url=https://github.com/${repo}/archive/${version}.tar.gz

if [[ -d $dir ]]; then
echo "Directory $dir already exists, not downloading $url"
return 0
fi

if [[ ! -d ${LIBRDKAFKA_DIR} ]]; then
echo "Downloading ${LIBRDKAFKA_DIR}"
echo "Downloading $url to $dir"
if which wget 2>&1 > /dev/null; then
DL='wget -q -O-'
else
DL='curl -s -L'
fi
$DL "${LIBRDKAFKA_URL}" | tar xzf -
fi

echo "Building ${LIBRDKAFKA_DIR}"
pushd ${LIBRDKAFKA_DIR} > /dev/null
mkdir -p "$dir"
pushd "$dir" > /dev/null
($DL "$url" | tar -xzf - --strip-components 1) || exit 1
popd > /dev/null
}


mkdir -p tmp-bootstrap
pushd tmp-bootstrap > /dev/null

github_download "edenhill/librdkafka" "master" "librdkafka"
github_download "lloyd/yajl" "master" "libyajl"


pushd librdkafka > /dev/null
echo "Building librdkafka"
./configure
make
make DESTDIR="${PWD}/../" install
popd > /dev/null


pushd libyajl > /dev/null
echo "Building libyajl"
./configure
make
make DESTDIR="${PWD}/../" install
popd > /dev/null


popd > /dev/null

echo "Building kafkacat"
export CPPFLAGS="${CPPFLAGS:-} -Itmp-bootstrap/usr/local/include"
export LDFLAGS="${LDFLAGS:-} -Ltmp-bootstrap/usr/local/lib"
export STATIC_LIB_rdkafka="tmp-bootstrap/usr/local/lib/librdkafka.a"
./configure --enable-static
export STATIC_LIB_yajl="tmp-bootstrap/usr/local/lib/libyajl.a"
./configure --enable-static --enable-json
make

echo ""
Expand Down
23 changes: 22 additions & 1 deletion configure.kafkacat
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 2,8 @@
#

mkl_require good_cflags
mkl_require gitversion as KAFKACAT_VERSION


function checks {

Expand All @@ -20,4 22,23 @@ struct rd_kafka_metadata foo;"

# -lrt required on linux
mkl_lib_check "librt" "" cont CC "-lrt"
}


mkl_meta_set "yajl" "deb" "libyajl-dev"
# Check for JSON library (yajl)
if [[ $WITH_JSON == y ]] && \
mkl_lib_check --static=-lrdkfaka "yajl" HAVE_YAJL disable CC "-lyajl" \
"#include <yajl/yajl_version.h>
#if YAJL_MAJOR >= 2
#else
#error \"Requires libyajl2\"
#endif
"
then
mkl_allvar_set "json" ENABLE_JSON y
fi
}



mkl_toggle_option "kafkacat" WITH_JSON --enable-json "JSON support (requires libyajl2)" y
228 changes: 228 additions & 0 deletions format.c
Original file line number Diff line number Diff line change
@@ -0,0 1,228 @@
/*
* kafkacat - Apache Kafka consumer and producer
*
* Copyright (c) 2015, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "kafkacat.h"



static void fmt_add (fmt_type_t type, const char *str, int len) {
if (conf.fmt_cnt == KC_FMT_MAX_SIZE)
FATAL("Too many formatters & strings (KC_FMT_MAX_SIZE=%i)",
KC_FMT_MAX_SIZE);

conf.fmt[conf.fmt_cnt].type = type;

/* For STR types */
if (len) {
const char *s;
char *d;
conf.fmt[conf.fmt_cnt].str = d = malloc(len 1);
memcpy(d, str, len);
d[len] = '\0';
s = d;

/* Convert \.. sequences */
while (*s) {
if (*s == '\\' && *(s 1)) {
int base = 0;
const char *next;
s ;
switch (*s) {
case 't':
*d = '\t';
break;
case 'n':
*d = '\n';
break;
case 'r':
*d = '\r';
break;
case 'x':
s ;
base = 16;
/* FALLTHRU */
case '0'...'9':
*d = (char)strtoul(s, (char **)&next,
base);
if (next > s)
s = next-1;
break;
default:
*d = *s;
break;
}
} else {
*d = *s;
}
s ;
d ;
}

*d = '\0';

conf.fmt[conf.fmt_cnt].str_len =
strlen(conf.fmt[conf.fmt_cnt].str);
}

conf.fmt_cnt ;
}


/**
* Parse a format string to create a formatter list.
*/
void fmt_parse (const char *fmt) {
const char *s = fmt, *t;

while (*s) {
if ((t = strchr(s, '%'))) {
if (t > s)
fmt_add(KC_FMT_STR, s, (int)(t-s));

s = t 1;
switch (*s)
{
case 'o':
fmt_add(KC_FMT_OFFSET, NULL, 0);
break;
case 'k':
fmt_add(KC_FMT_KEY, NULL, 0);
break;
case 's':
fmt_add(KC_FMT_PAYLOAD, NULL, 0);
break;
case 't':
fmt_add(KC_FMT_TOPIC, NULL, 0);
break;
case 'p':
fmt_add(KC_FMT_PARTITION, NULL, 0);
break;
case '%':
fmt_add(KC_FMT_STR, s, 1);
break;
case '\0':
FATAL("Empty formatter");
break;
default:
FATAL("Unsupported formatter: %%%c", *s);
break;
}
s ;
} else {
fmt_add(KC_FMT_STR, s, strlen(s));
break;
}

}
}




void fmt_init (void) {
#ifdef ENABLE_JSON
if (conf.flags & CONF_F_FMT_JSON)
fmt_init_json();
#endif
}

void fmt_term (void) {
#ifdef ENABLE_JSON
if (conf.flags & CONF_F_FMT_JSON)
fmt_term_json();
#endif
}



/**
* Delimited output
*/
static void fmt_msg_output_str (FILE *fp,
const rd_kafka_message_t *rkmessage) {
int i;

for (i = 0 ; i < conf.fmt_cnt ; i ) {
int r = 1;

switch (conf.fmt[i].type)
{
case KC_FMT_OFFSET:
r = fprintf(fp, "%"PRId64, rkmessage->offset);
break;

case KC_FMT_KEY:
if (rkmessage->key_len)
r = fwrite(rkmessage->key,
rkmessage->key_len, 1, fp);
break;

case KC_FMT_PAYLOAD:
if (rkmessage->len)
r = fwrite(rkmessage->payload,
rkmessage->len, 1, fp);
break;

case KC_FMT_STR:
r = fwrite(conf.fmt[i].str, conf.fmt[i].str_len, 1, fp);
break;

case KC_FMT_TOPIC:
r = fprintf(fp, "%s",
rd_kafka_topic_name(rkmessage->rkt));
break;

case KC_FMT_PARTITION:
r = fprintf(fp, "%"PRId32, rkmessage->partition);
break;

}

if (r < 1)
FATAL("Write error for message "
"of %zd bytes at offset %"PRId64"): %s",
rkmessage->len, rkmessage->offset,
strerror(errno));
}

}


/**
* Format and output a received message.
*/
void fmt_msg_output (FILE *fp, const rd_kafka_message_t *rkmessage) {

#ifdef ENABLE_JSON
if (conf.flags & CONF_F_FMT_JSON)
fmt_msg_output_json(fp, rkmessage);
else
#endif
fmt_msg_output_str(fp, rkmessage);

}
Loading

0 comments on commit 7e278cb

Please sign in to comment.