Storage Handler that allows user to Connect/Analyse/Transform Kafka topics. The workflow is as follow, first the user will create an external table that is a view over one Kafka topic, then the user will be able to run any SQL query including write back to the same table or different kafka backed table.
Use following statement to create table:
CREATE EXTERNAL TABLE kafka_table
(`timestamp` timestamp , `page` string, `newPage` boolean,
added int, deleted bigint, delta double)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
("kafka.topic" = "test-topic", "kafka.bootstrap.servers"="localhost:9092");
Table property kafka.topic
is the Kafka Topic to connect to and kafka.bootstrap.servers
is the Broker connection string.
Both properties are mandatory.
On the write path if such a topic does not exists the topic will be created if Kafka broker admin policy allow such operation.
By default the serializer and deserializer is Json org.apache.hadoop.hive.serde2.JsonSerDe
.
If you want to switch serializer/deserializer classes you can use alter table.
ALTER TABLE kafka_table SET TBLPROPERTIES ("kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
List of supported Serializer Deserializer:
Supported Serializer Deserializer |
---|
org.apache.hadoop.hive.serde2.JsonSerDe |
org.apache.hadoop.hive.serde2.OpenCSVSerde |
org.apache.hadoop.hive.serde2.avro.AvroSerDe |
org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe |
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
In addition to the user defined payload schema Kafka Storage Handler will append additional columns allowing user to query the Kafka metadata fields:
__key
Kafka record key (byte array)__partition
Kafka record partition identifier (int 32)__offset
Kafka record offset (int 64)__timestamp
Kafka record timestamp (int 64)
List the table properties and all the partition/offsets information for the topic.
Describe extended kafka_table;
Count the number of records with Kafka record timestamp within the last 10 minutes interval.
SELECT count(*) from kafka_table
where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES);
The storage handler allow filter push-down read optimization, for instance the query above will only read the records with timestamp satisfying the filter predicate. Please note that such time based seek is only viable if the Kafka broker allow time based lookup (Kafka 0.11 or later versions)
In addition to time based seek, the storage handler reader is able to seek to a particular partition offset using the SQL WHERE clause. Currently only support OR/AND with (<, <=, >=, >)
SELECT count(*) from kafka_table
where (`__offset` < 10 and `__offset`>3 and `__partition` = 0)
or (`__partition` = 0 and `__offset` < 105 and `__offset` > 99)
or (`__offset` = 109);
User can define a view to take of the last 15 minutes and mask what ever column as follow:
CREATE VIEW last_15_minutes_of_kafka_table as select `timestamp`, `user`, delta, added from kafka_table
where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '15' MINUTES);
Join the Kafka Stream to Hive table. For instance assume you want to join the last 15 minutes of stream to dimension table like the following.
CREATE TABLE user_table (`user` string, `first_name` string , age int, gender string, comments string) STORED as ORC ;
Join the view of the last 15 minutes to user_table
, group by user gender column and compute aggregates
over metrics from fact table and dimension table.
SELECT sum(added) as added, sum(deleted) as deleted, avg(delta) as delta, avg(age) as avg_age , gender
FROM last_15_minutes_of_kafka_table join user_table on `last_15_minutes_of_kafka_table`.`user` = `user_table`.`user`
GROUP BY gender limit 10;
Join the Stream to the Stream it self. In cases where you want to perform some Ad-Hoc query over the last 15 minutes view. In the following example we show how you can perform classical user retention analysis over the Kafka Stream.
-- Steam join over the view it self
-- The example is adapted from https://www.periscopedata.com/blog/how-to-calculate-cohort-retention-in-sql
-- Assuming l15min_wiki is a view of the last 15 minutes
select count( distinct activity.`user`) as active_users, count(distinct future_activity.`user`) as retained_users
from l15min_wiki as activity
left join l15min_wiki as future_activity on
activity.`user` = future_activity.`user`
and activity.`timestamp` = future_activity.`timestamp` - interval '5' minutes ;
-- Stream to stream join
-- Assuming wiki_kafka_hive is the entire stream.
select floor_hour(activity.`timestamp`), count( distinct activity.`user`) as active_users, count(distinct future_activity.`user`) as retained_users
from wiki_kafka_hive as activity
left join wiki_kafka_hive as future_activity on
activity.`user` = future_activity.`user`
and activity.`timestamp` = future_activity.`timestamp` - interval '1' hour group by floor_hour(activity.`timestamp`);
Property | Description | Mandatory | Default |
---|---|---|---|
kafka.topic | Kafka topic name to map the table to. | Yes | null |
kafka.bootstrap.servers | Table property indicating Kafka broker(s) connection string. | Yes | null |
kafka.serde.class | Serializer and Deserializer class implementation. | No | org.apache.hadoop.hive.serde2.JsonSerDe |
hive.kafka.poll.timeout.ms | Parameter indicating Kafka Consumer poll timeout period in millis. FYI this is independent from internal Kafka consumer timeouts. | No | 5000 (5 Seconds) |
hive.kafka.max.retries | Number of retries for Kafka metadata fetch operations. | No | 6 |
hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata. | No | 30000 (30 Seconds) |
kafka.write.semantic | Writer semantics, allowed values (AT_LEAST_ONCE, EXACTLY_ONCE) | No | AT_LEAST_ONCE |
User can inject custom Kafka consumer/producer properties via the Table properties.
To do so user can add any key/value pair of Kafka config to the Hive table property
by prefixing the key with kafka.consumer
for consumer configs and kafka.producer
for producer configs.
For instance the following alter table query adds the table property "kafka.consumer.max.poll.records" = "5000"
and will inject max.poll.records=5000
to the Kafka Consumer.
ALTER TABLE kafka_table SET TBLPROPERTIES ("kafka.consumer.max.poll.records"="5000");
load form Kafka every Record exactly once Goal is to read data and commit both data and its offsets in a single Transaction
First create the offset table.
Drop table kafka_table_offsets;
create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp);
Initialize the table
insert overwrite table kafka_table_offsets select `__partition`, min(`__offset`) - 1, CURRENT_TIMESTAMP
from wiki_kafka_hive group by `__partition`, CURRENT_TIMESTAMP ;
Create the end target table on the Hive warehouse.
Drop table orc_kafka_table;
Create table orc_kafka_table (partition_id int, koffset bigint, ktimestamp bigint,
`timestamp` timestamp , `page` string, `user` string, `diffurl` string,
`isrobot` boolean, added int, deleted int, delta bigint
) stored as ORC;
This an example tp insert up to offset = 2 only
From wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table
on (ktable.`__partition` = offset_table.partition_id
and ktable.`__offset` > offset_table.max_offset and ktable.`__offset` < 3 )
insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`,
`timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta
Insert overwrite table kafka_table_offsets select
`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP;
Double check the insert
select max(`koffset`) from orc_kafka_table limit 10;
select count(*) as c from orc_kafka_table group by partition_id, koffset having c > 1;
Repeat this periodically to insert all data.
From wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table
on (ktable.`__partition` = offset_table.partition_id
and ktable.`__offset` > offset_table.max_offset )
insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`,
`timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta
Insert overwrite table kafka_table_offsets select
`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP;
First create the table in have that will be the target table. Now all the inserts will go to the topic mapped by this Table.
CREATE EXTERNAL TABLE moving_avg_wiki_kafka_hive
(`channel` string, `namespace` string,`page` string, `timestamp` timestamp , avg_delta double )
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
("kafka.topic" = "moving_avg_wiki_kafka_hive_2",
"kafka.bootstrap.servers"="cn105-10.l42scl.hortonworks.com:9092",
-- STORE AS AVRO IN KAFKA
"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
Then insert data into the table. Keep in mind that Kafka is an append only, thus you can not use insert overwrite.
insert into table moving_avg_wiki_kafka_hive select `channel`, `namespace`, `page`, `timestamp`,
avg(delta) over (order by `timestamp` asc rows between 60 preceding and current row) as avg_delta,
null as `__key`, null as `__partition`, -1, -1 from l15min_wiki;