- View basic configuration parameters
- Confirm we receive the data in a Kafka Console Consumer
Variable | Value |
---|---|
bootstrap_servers | localhost:9092 |
username | user |
pwd | password |
- bootstrap.servers: <bootstrap_servers>
- sasl.mechanism: SCRAM-SHA-256
- security.protocol: SASL_SSL
- sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="%s" password="%s"
- key.serializer: org.apache.kafka.common.serialization.StringSerializer
- value.serializer: org.apache.kafka.common.serialization.StringSerializer
kafka-topics --command-config playground.config --bootstrap-server <bootstrap_servers> --topic demo_java --create --partitions 3
kafka-console-consumer --command-config playground.config --bootstrap-server <bootstrap_servers> --topic demo_java --from-beginning
- Confirm the partition and offset the message was sent to using Callbacks
- We'll look at the interesting behavior of
StickyPartitioner
properties :
- batch.size 會影響到 partition 的選擇
- partitioner.class: RoundRobinPartitioner StickyPartitioner
- Send non-null keys to the Kafka topic
- Same key = same partition
- Learn how to write a basic consumer to receive data from Kafka
- View basic configuration parameters
- Confirm we receive the data from the Kafka Producer written in Java
- key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
- group.id: my-first-application
- auto.offset.reset: earliest
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
- none: throw exception to the consumer if no previous offset is found for the consumer's group
- Ensure we have code in place to respond to termination signals
- Make your consumer in Java consume data as part of a consumer group
- Observer partition rebalanced mechanisms
- Moving partitions between consumers is called a rebalanced
- Reassignment of partitions happened when a consumer leaves or joins a group
- It also happens if an administrator adds new partitions into a topic
- All consumes stop, give up their membership of partitions
- They rejoin consumer group and get a new partition assignment
- Problems:
- During a short period of time, the entire consumer group stops processing
- Consumers don't necessarily get the same partitions they had before
- Reassigning a small subset of the partitions from one consumer to another
- Other consumers that don't have reassigned partitions can still process uninterrupted
- Can go through several iterations to find a stable assignment (hence "incremental")
- Avoids stop the world events where all consumers stop processing data
- Kafka Consumer:
partition.assignment.strategy
- RangeAssignor: assign partitions on a per-topic basis (can lead to imbalance)
- RoundRobin: assign partitions on a per-topic in round-robin fashion, optimal balance
- StickyAssignor: balance like RoundRobin, and then minimises partition movements when consumer join/ leave the group in order to minimise movements
- CooperativeStickyAssignor: rebalanced strategy is identical to StinkyAssignor but supports cooperative rebalances and therefore consumers can keep pn consuming from the topic
- The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list
- Kafka Connect: already implemented (enabled by default)
- Kafka Streams: turned on by default using StreamPartitionAssignor
- By default, when a consumer leaves a group, its practitioners are revoke and re-assigned
- If it joins back, it will have a new "member ID" and new partitions assigned
- If you specify
group.instance.id
it makes the consumer a static member - Upon leaving, the consumer has up to
session.timeout.ms
to join back and get back its partitions (else they will be re-assigned), without triggering a rebalanced - This is helpgul when consumers maintain local state and cache (to avoid re-building the cache)
- In the Java Consumer API, offsets are regular committed
- Enable at-least once reading scenario by default (under condition)
- Offsets are committed when you call
.poll()
andauto.commit.interval.ms
has elapsed - Example:
auto.commit.interval.ms=5000
andenable.auto.commit=true
will commit every 5 seconds - Make sure message are all successfully processed before you call poll() again
- If you don't, you will not be in at-least-once reading scenario
- If that (rare) case, you should disable
enable.auto.commit
and most likely most processing to a separate thread, and then from time-to-time callcommitSync()
orcommitAsync()
with the correct offsets manually (advanced)