The guide below demonstrates how to get a minimal environment up and running. Choose the distribution that's right for you.
When you're ready to learn more, take the next steps with use-case-driven tutorials.
Since ksqlDB runs natively on Apache Kafka®, you'll need to have a Kafka installation running that ksqlDB is configured to use. The docker-compose files to the right will run everything for you via Docker, including ksqlDB itself.
Select the docker-compose file that you'd like to use, depending on whether or not you're already running Kafka. Next, copy and paste it into a file named docker-compose.yml on your local filesystem.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
ksqldb-server:
image: confluentinc/ksqldb-server:0.29.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.29.0
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true
---
version: '2'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.29.0
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: $CONFLUENT_CLOUD_BROKER_ENDPOINT
KSQL_SASL_JAAS_CONFIG: >
org.apache.kafka.common.security.plain.PlainLoginModule required
username="$CONFLUENT_CLOUD_API_KEY"
password="$CONFLUENT_CLOUD_API_SECRET";
KSQL_SECURITY_PROTOCOL: SASL_SSL
KSQL_SASL_MECHANISM: PLAIN
KSQL_KSQL_INTERNAL_TOPIC_REPLICAS: 3
KSQL_KSQL_SINK_REPLICAS: 3
KSQL_KSQL_STREAMS_REPLICATION_FACTOR: 3
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 3
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.29.0
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
---
version: '2'
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.29.0
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: $BROKER_ENDPOINT
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.29.0
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
From a directory containing the docker-compose.yml file created in the previous step, run this command in order to start all services in the correct order.
Once all services have successfully launched, you will have a ksqlDB server running and ready to use.
docker-compose up
ksqlDB runs as a server which clients connect to in order to issue queries.
Run this command to connect to the ksqlDB server and enter an interactive command-line interface (CLI) session.
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
The first thing we're going to do is create a stream. A stream essentially associates a schema with an underlying Kafka topic. Here's what each parameter in the CREATE STREAM statement does:
Check the documentation for more information about streams.
Copy and paste this statement into your interactive CLI session, and press enter to execute the statement.
CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
WITH (kafka_topic='locations', value_format='json', partitions=1);
We might also want to keep track of the latest location of the riders using a materialized view. For this we create a table currentLocation by issuing a SELECT statement over the previously created stream. Note that the table will be incrementally updated as new rider location data arrives. We use the LATEST_BY_OFFSET aggregate function to denote the fact that we are only interested in the latest location of a rider.
To make it more fun, let us also materialize a derived table (Table ridersNearMountainView) that captures how far the riders are from a given location or city.
Copy and paste those table statements into your interactive CLI session, and press enter to execute.
Check the documentation for more information about tables and materialized views.
CREATE TABLE currentLocation AS
SELECT profileId,
LATEST_BY_OFFSET(latitude) AS la,
LATEST_BY_OFFSET(longitude) AS lo
FROM riderlocations
GROUP BY profileId
EMIT CHANGES;
CREATE TABLE ridersNearMountainView AS
SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
COLLECT_LIST(profileId) AS riders,
COUNT(*) AS count
FROM currentLocation
GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
Now, let us run a push query over the stream. Run the given query using your interactive CLI session.
This query will output all rows from the riderLocations stream whose coordinates are within 5 miles of Mountain View.
This is the first thing that may feel a bit unfamiliar to you, because the query will never return until it's terminated. It will perpetually push output rows to the client as events are written to the riderLocations stream.
Leave this query running in the CLI session for now. Next, we're going to write some data into the riderLocations stream so that the query begins producing output.
-- Mountain View lat, long: 37.4133, -122.1162
SELECT * FROM riderLocations
WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
Since the CLI session from (5) is busy waiting for output from the push query, let's start another session that we can use to write some data into ksqlDB.
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Run each of the given INSERT statements within the new CLI session, and keep an eye on the CLI session from (5) as you do.
The push query will output matching rows in real time as soon as they're written to the riderLocations stream.
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
Finally, we run a pull query against the materialized view to retrieve all the riders that are currently within 10 miles from Mountain View.
In contrast to the previous push query which runs continuously, the pull query follows a traditional request-response model retrieving the latest result from the materialized view.
SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;