Kafka
Apache Kafka is one of the most popular Pub/Subs. We are providing Pub/Sub implementation based on Shopify’s Sarama.
Installation
go get github.com/ThreeDotsLabs/watermill-kafka/v2
Characteristics
Feature | Implements | Note |
---|---|---|
ConsumerGroups | yes | |
ExactlyOnceDelivery | no | in theory can be achieved with Transactions, currently no support for any Golang client |
GuaranteedOrder | yes | require partition key usage |
Persistent | yes |
Configuration
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go
// ...
type SubscriberConfig struct {
// Kafka brokers list.
Brokers []string
// Unmarshaler is used to unmarshal messages from Kafka format into Watermill format.
Unmarshaler Unmarshaler
// OverwriteSaramaConfig holds additional sarama settings.
OverwriteSaramaConfig *sarama.Config
// Kafka consumer group.
// When empty, all messages from all partitions will be returned.
ConsumerGroup string
// How long after Nack message should be redelivered.
NackResendSleep time.Duration
// How long about unsuccessful reconnecting next reconnect will occur.
ReconnectRetrySleep time.Duration
InitializeTopicDetails *sarama.TopicDetail
// If true then each consumed message will be wrapped with Opentelemetry tracing, provided by otelsarama.
//
// Deprecated: pass OTELSaramaTracer to Tracer field instead.
OTELEnabled bool
// Tracer is used to trace Kafka messages.
// If nil, then no tracing will be used.
Tracer SaramaTracer
}
// NoSleep can be set to SubscriberConfig.NackResendSleep and SubscriberConfig.ReconnectRetrySleep.
const NoSleep time.Duration = -1
func (c *SubscriberConfig) setDefaults() {
if c.OverwriteSaramaConfig == nil {
c.OverwriteSaramaConfig = DefaultSaramaSubscriberConfig()
}
if c.NackResendSleep == 0 {
c.NackResendSleep = time.Millisecond * 100
}
if c.ReconnectRetrySleep == 0 {
c.ReconnectRetrySleep = time.Second
}
}
func (c SubscriberConfig) Validate() error {
if len(c.Brokers) == 0 {
return errors.New("missing brokers")
}
if c.Unmarshaler == nil {
return errors.New("missing unmarshaler")
}
return nil
}
// DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
//
// Custom config can be passed to NewSubscriber and NewPublisher.
//
// saramaConfig := DefaultSaramaSubscriberConfig()
// saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
//
// subscriberConfig.OverwriteSaramaConfig = saramaConfig
//
// subscriber, err := NewSubscriber(subscriberConfig, logger)
// // ...
func DefaultSaramaSubscriberConfig() *sarama.Config {
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.Consumer.Return.Errors = true
config.ClientID = "watermill"
return config
}
// Subscribe subscribers for messages in Kafka.
// ...
Passing custom Sarama
config
You can pass custom config parameters via overwriteSaramaConfig *sarama.Config
in NewSubscriber
and NewPublisher
.
When nil
is passed, default config is used (DefaultSaramaSubscriberConfig
).
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go
// ...
// DefaultSaramaSubscriberConfig creates default Sarama config used by Watermill.
//
// Custom config can be passed to NewSubscriber and NewPublisher.
//
// saramaConfig := DefaultSaramaSubscriberConfig()
// saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
//
// subscriberConfig.OverwriteSaramaConfig = saramaConfig
//
// subscriber, err := NewSubscriber(subscriberConfig, logger)
// // ...
func DefaultSaramaSubscriberConfig() *sarama.Config {
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.Consumer.Return.Errors = true
config.ClientID = "watermill"
return config
}
// ...
Connecting
Publisher
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/publisher.go
// ...
// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
config PublisherConfig,
logger watermill.LoggerAdapter,
) (*Publisher, error) {
// ...
Example:
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// equivalent of auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{"kafka:9092"},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
// ...
Subscriber
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go
// ...
// NewSubscriber creates a new Kafka Subscriber.
func NewSubscriber(
config SubscriberConfig,
logger watermill.LoggerAdapter,
) (*Subscriber, error) {
// ...
Example:
Full source: github.com/ThreeDotsLabs/watermill/_examples/pubsubs/kafka/main.go
// ...
publisher, err := kafka.NewPublisher(
kafka.PublisherConfig{
Brokers: []string{"kafka:9092"},
Marshaler: kafka.DefaultMarshaler{},
},
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
// ...
Publishing
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/publisher.go
// ...
// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
// ...
Subscribing
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/subscriber.go
// ...
// Subscribe subscribers for messages in Kafka.
//
// There are multiple subscribers spawned
func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
// ...
Marshaler
Watermill’s messages cannot be directly sent to Kafka - they need to be marshaled. You can implement your marshaler or use default implementation.
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/marshaler.go
// ...
// Marshaler marshals Watermill's message to Kafka message.
type Marshaler interface {
Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error)
}
// Unmarshaler unmarshals Kafka's message to Watermill's message.
type Unmarshaler interface {
Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
}
type MarshalerUnmarshaler interface {
Marshaler
Unmarshaler
}
type DefaultMarshaler struct{}
func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
// ...
Partitioning
Our Publisher has support for the partitioning mechanism.
It can be done with special Marshaler implementation:
Full source: github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka/marshaler.go
// ...
type kafkaJsonWithPartitioning struct {
DefaultMarshaler
generatePartitionKey GeneratePartitionKey
}
func NewWithPartitioningMarshaler(generatePartitionKey GeneratePartitionKey) MarshalerUnmarshaler {
return kafkaJsonWithPartitioning{generatePartitionKey: generatePartitionKey}
}
func (j kafkaJsonWithPartitioning) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) {
// ...
When using, you need to pass your function to generate partition key. It’s a good idea to pass this partition key with metadata to not unmarshal entire message.
marshaler := kafka.NewWithPartitioningMarshaler(func(topic string, msg *message.Message) (string, error) {
return msg.Metadata.Get("partition"), nil
})