24 Sep, 2017 - About 11 minutes
Streaming SQL for Kafka (KSQL)
Intro
In this article i’ll explore Apache Kafka KSQL
Requirements
- Docker
- docker-compose
Description
KSQL is an open source streaming SQL engine that implements continuous, interactive queries against Apache Kafka™. It allows you to query, read, write, and process data in Apache Kafka in real-time, at scale using SQL commands. KSQL interacts directly with the Kafka Streams API, removing the requirement of building a Java app.
PoC Setup
Create the following file docker-compose.yml
|
Prepare the environment with the following command
sudo docker-compose up -d |
This will create the minimal services to test ksql
- zookeeper
- schema-registry
- kafka
- ksql-cli
It also will create 2 dockers which will generate data in order to test:
- ksql-datagen-pageviews
- ksql-datagen-users
Those dockers are kafka producers, which we will use to test KSQL.
KSQL
Execute the CLI in order to test
sudo docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092 |
You will see something like this
====================================== |
Now let’s create a new STREAM
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED'); |
Let’s describe the created object
ksql> DESCRIBE pageviews_original; |
Now lets create a users TABLE
CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON'); |
And confirm the DDL
ksql> DESCRIBE users_original; |
So there are two main objects TABLES and STREAMS. To see the existing ones, execute:
ksql> SHOW STREAMS; |
and
ksql> SHOW TABLES; |
Differences between STREAMS and TABLES
Stream
A stream is an unbounded sequence of structured data (“facts”). For example, we could have a stream of financial transactions such as “Alice sent $100 to Bob, then Charlie sent $50 to Bob”. Facts in a stream are immutable, which means new facts can be inserted to a stream, but existing facts can never be updated or deleted. Streams can be created from a Kafka topic or derived from existing streams and tables.
Table
A table is a view of a stream, or another table, and represents a collection of evolving facts. For example, we could have a table that contains the latest financial information such as “Bob’s current account balance is $150”. It is the equivalent of a traditional database table but enriched by streaming semantics such as windowing. Facts in a table are mutable, which means new facts can be inserted to the table, and existing facts can be updated or deleted. Tables can be created from a Kafka topic or derived from existing streams and tables.
Extending the Model
Lets create a new pageviews stream with key pageid
and joi it with table users
.
First create the new stream
CREATE STREAM pageviews \ |
And users table
CREATE TABLE users \ |
Aggregations
Let’s test some aggregations
SELECT count(*),userid FROM pageviews GROUP BY userid; |
As data arrives in stream this is constantly updating , but we can verify that the count values begins from the start of the query, this could be useful to dump to another topic with transformed data.
Let’s do that.
CREATE TABLE user_counts AS select count(*),userid from pageviews group by userid; |
We can use the statement SHOW TOPICS
which is useful and confirmed that the USER_COUNTS TOPICS is created.
The following aggregations are available
Function | Example | Description |
---|---|---|
COUNT | COUNT(col1) | Count the number of rows |
MAX | MAX(col1) | Return the maximum value for a given column and window |
MIN | MIN(col1) | Return the minimum value for a given column and window |
SUM | SUM(col1) | Sums the column values |
Window
The WINDOW clause lets you control how to group input records that have the same key into so-called windows for operations such as aggregations or joins. Windows are tracked per record key. KSQL supports the following WINDOW types:
- TUMBLING
- HOPPING
- SESSION
Window Tumbling
TUMBLING: Tumbling windows group input records into fixed-sized, non-overlapping windows based on the records’ timestamps. You must specify the window size for tumbling windows. Note: Tumbling windows are a special case of hopping windows where the window size is equal to the advance interval.
Example:
SELECT item_id, SUM(quantity) |
Window HOPPING
HOPPING: Hopping windows group input records into fixed-sized, (possibly) overlapping windows based on the records’ timestamps. You must specify the window size and the advance interval for hopping windows.
Example:
SELECT item_id, SUM(quantity) |
Window SESSION
SESSION: Session windows group input records into so-called sessions. You must specify the session inactivity gap parameter for session windows. For example, imagine you set the inactivity gap to 5 minutes. If, for a given record key such as “alice”, no new input data arrives for more than 5 minutes, then the current session for “alice” is closed, and any newly arriving data for “alice” in the future will mark the beginning of a new session.
Example:
SELECT item_id, SUM(quantity) |
Transformations
Let’s create a new stream with a column transformation
CREATE STREAM pageviews_transformed \ |
Joining
And joining the STREAM with enriched data from TABLE
CREATE STREAM pageviews_enriched AS \ |
Use Cases
Common KSQL use cases are:
- Fraud detection - identify and act on out of the ordinary data to provide real-time awareness.
- Personalization - create real-time experiences and insight for end users driven by data.
- Notifications - build custom alerts and messages based on real-time data.
- Real-time Analytics - power real-time dashboards to understand what’s happening as it does.
- Sensor data and IoT - understand and deliver sensor data how and where it needs to be.
- Customer 360 - provide a clear, real-time understanding of your customers across every interaction.
Streaming ETL
KSQL makes it simple to transform data within the pipeline, readying messages to cleanly land in another system
Anomaly Detection
KSQL is a good fit for identifying patterns or anomalies on real-time data. By processing the stream as data arrives you can identify and properly surface out of the ordinary events with millisecond latency.
Monitoring
Kafka’s ability to provide scalable ordered messages with stream processing make it a common solution for log data monitoring and alerting. KSQL lends a familiar syntax for tracking, understanding, and managing alerts.
Conclusion
On a first glance KSQL seems pretty easy to start using it. It’s still pretty new, i would give it a time to mature before start using it on production.
For small transformations, data-quality control and analytical queries that don’t take into account a large windows this seems a good solution. For more complex queries i still prefer to keep Kafka to it’s core business and let the computation work to Spark.
More Information
- KSQL is available as a developer preview on Github
- Read the documentation for KSQL
- Join the conversation in the #ksql channel in the Confluent Community Slack.