Skip to main content

Streaming SQL for Kafka (KSQL)

·1850 words·9 mins·
KSQL Kafka SQL Streaming Docker
Table of Contents

Excertpt
#

In this article i’ll explore Apache Kafka KSQL

Intro
#

In this article i’ll explore Apache Kafka KSQL

Requirements
#

  • Docker
  • docker-compose

Description
#

{% blockquote Official Website Definition%}

KSQL is an open source streaming SQL engine that implements continuous, interactive queries against Apache Kafka™. It allows you to query, read, write, and process data in Apache Kafka in real-time, at scale using SQL commands. KSQL interacts directly with the Kafka Streams API, removing the requirement of building a Java app.

{% endblockquote %}

PoC Setup
#

Create the following file docker-compose.yml

---
version: '2'
services:
  zookeeper:
    image: "confluentinc/cp-zookeeper:latest"
    hostname: zookeeper
    ports:
      - '32181:32181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: "confluentinc/cp-enterprise-kafka:latest"
    hostname: kafka
    ports:
      - '9092:9092'
      - '29092:29092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:32181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    extra_hosts:
      - "moby:127.0.0.1"

  schema-registry:
    image: "confluentinc/cp-schema-registry:latest"
    hostname: schema-registry
    depends_on:
      - zookeeper
      - kafka
    ports:
      - '8081:8081'
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181
    extra_hosts:
      - "moby:127.0.0.1"

  # Runs the Kafka KSQL data generator for topic called "pageviews"
  ksql-datagen-pageviews:
    image: "confluentinc/ksql-examples:latest"
    hostname: ksql-datagen-pageviews
    depends_on:
      - kafka
      - schema-registry
    # Note: The container's `run` script will perform the same readiness checks
    # for Kafka and Confluent Schema Registry, but that's ok because they complete fast.
    # The reason we check for readiness here is that we can insert a sleep time
    # for topic creation before we start the application.
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b kafka:29092 1 20 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 20 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 2 && \
                       java -jar /usr/share/java/ksql-examples/ksql-examples-0.1-SNAPSHOT-standalone.jar
                       quickstart=pageviews format=delimited topic=pageviews bootstrap-server=kafka:29092 maxInterval=100 iterations=1000 && \
                       java -jar /usr/share/java/ksql-examples/ksql-examples-0.1-SNAPSHOT-standalone.jar
                       quickstart=pageviews format=delimited topic=pageviews bootstrap-server=kafka:29092 maxInterval=1000'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      STREAMS_BOOTSTRAP_SERVERS: kafka:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081
    extra_hosts:
      - "moby:127.0.0.1"

  # Runs the Kafka KSQL data generator for topic called "users"
  ksql-datagen-users:
    image: "confluentinc/ksql-examples:latest"
    hostname: ksql-datagen-users
    depends_on:
      - kafka
      - schema-registry
    # Note: The container's `run` script will perform the same readiness checks
    # for Kafka and Confluent Schema Registry, but that's ok because they complete fast.
    # The reason we check for readiness here is that we can insert a sleep time
    # for topic creation before we start the application.
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b kafka:29092 1 20 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 20 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 2 && \
                       java -jar /usr/share/java/ksql-examples/ksql-examples-0.1-SNAPSHOT-standalone.jar
                       quickstart=users format=json topic=users bootstrap-server=kafka:29092 maxInterval=100 iterations=1000 && \
                       java -jar /usr/share/java/ksql-examples/ksql-examples-0.1-SNAPSHOT-standalone.jar
                       quickstart=users format=json topic=users bootstrap-server=kafka:29092 maxInterval=1000'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      STREAMS_BOOTSTRAP_SERVERS: kafka:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081
    extra_hosts:
      - "moby:127.0.0.1"

  # Runs the Kafka KSQL application
  ksql-cli:
    image: "confluentinc/ksql-cli:latest"
    hostname: ksql-cli
    depends_on:
      - kafka
      - schema-registry
      - ksql-datagen-pageviews
      - ksql-datagen-users
    command: "perl -e 'while(1){ sleep 99999 }'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      STREAMS_BOOTSTRAP_SERVERS: kafka:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081
    extra_hosts:
      - "moby:127.0.0.1"

Prepare the environment with the following command

sudo docker-compose up -d

This will create the minimal services to test ksql

  • zookeeper
  • schema-registry
  • kafka
  • ksql-cli

It also will create 2 dockers which will generate data in order to test:

  • ksql-datagen-pageviews
  • ksql-datagen-users

Those dockers are kafka producers, which we will use to test KSQL.

KSQL
#

Execute the CLI in order to test

sudo docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092

You will see something like this

                       ======================================
                       =      _  __ _____  ____  _          =
                       =     | |/ // ____|/ __ \| |         =
                       =     | ' /| (___ | |  | | |         =
                       =     |  <  \___ \| |  | | |         =
                       =     | . \ ____) | |__| | |____     =
                       =     |_|\_\_____/ \___\_\______|    =
                       =                                    =
                       =   Streaming SQL Engine for Kafka   =
Copyright 2017 Confluent Inc.                         

CLI v0.1, Server v0.1 located at http://localhost:9098

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

Now let’s create a new STREAM

CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');

Let’s describe the created object

ksql> DESCRIBE pageviews_original;

 Field    | Type            
----------------------------
 ROWTIME  | BIGINT          
 ROWKEY   | VARCHAR(STRING) 
 VIEWTIME | BIGINT          
 USERID   | VARCHAR(STRING) 
 PAGEID   | VARCHAR(STRING) 

Now lets create a users TABLE

CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON');

And confirm the DDL

ksql> DESCRIBE users_original;

 Field        | Type            
--------------------------------
 ROWTIME      | BIGINT          
 ROWKEY       | VARCHAR(STRING) 
 REGISTERTIME | BIGINT          
 GENDER       | VARCHAR(STRING) 
 REGIONID     | VARCHAR(STRING) 
 USERID       | VARCHAR(STRING) 

So there are two main objects TABLES and STREAMS. To see the existing ones, execute:

ksql> SHOW STREAMS;

 Stream Name        | Kafka Topic | Format    
----------------------------------------------
 PAGEVIEWS_ORIGINAL | pageviews   | DELIMITED 

and

ksql> SHOW TABLES;

 Table Name     | Kafka Topic | Format | Windowed 
--------------------------------------------------
 USERS_ORIGINAL | users       | JSON   | false    

Differences between STREAMS and TABLES
#

Stream
#

A stream is an unbounded sequence of structured data (“facts”). For example, we could have a stream of financial transactions such as “Alice sent $100 to Bob, then Charlie sent $50 to Bob”. Facts in a stream are immutable, which means new facts can be inserted to a stream, but existing facts can never be updated or deleted. Streams can be created from a Kafka topic or derived from existing streams and tables.

Table
#

A table is a view of a stream, or another table, and represents a collection of evolving facts. For example, we could have a table that contains the latest financial information such as “Bob’s current account balance is $150”. It is the equivalent of a traditional database table but enriched by streaming semantics such as windowing. Facts in a table are mutable, which means new facts can be inserted to the table, and existing facts can be updated or deleted. Tables can be created from a Kafka topic or derived from existing streams and tables.

Extending the Model
#

Lets create a new pageviews stream with key pageid and joi it with table users.

First create the new stream

CREATE STREAM pageviews \
   (viewtime BIGINT, \
    userid VARCHAR, \
    pageid VARCHAR) \
   WITH (kafka_topic='pageviews', \
         value_format='DELIMITED', \
         key='pageid', \
         timestamp='viewtime');

And users table

CREATE TABLE users \
   (registertime BIGINT, \
    gender VARCHAR, \
    regionid VARCHAR, \
    userid VARCHAR, \
    interests array<VARCHAR>, \
    contact_info map<VARCHAR, VARCHAR>) \
   WITH (kafka_topic='users', \
         value_format='JSON');

Aggregations
#

Let’s test some aggregations

SELECT count(*),userid FROM pageviews GROUP BY userid;

As data arrives in stream this is constantly updating , but we can verify that the count values begins from the start of the query, this could be useful to dump to another topic with transformed data.

Let’s do that.

CREATE TABLE user_counts AS select count(*),userid from pageviews group by userid;

We can use the statement SHOW TOPICS which is useful and confirmed that the USER_COUNTS TOPICS is created.

The following aggregations are available

FunctionExampleDescription
COUNTCOUNT(col1)Count the number of rows
MAXMAX(col1)Return the maximum value for a given column and window
MINMIN(col1)Return the minimum value for a given column and window
SUMSUM(col1)Sums the column values

Window
#

The WINDOW clause lets you control how to group input records that have the same key into so-called windows for operations such as aggregations or joins. Windows are tracked per record key. KSQL supports the following WINDOW types:

  • TUMBLING
  • HOPPING
  • SESSION

Window Tumbling
#

TUMBLING: Tumbling windows group input records into fixed-sized, non-overlapping windows based on the records’ timestamps. You must specify the window size for tumbling windows. Note: Tumbling windows are a special case of hopping windows where the window size is equal to the advance interval.

Example:

SELECT item_id, SUM(quantity)
  FROM orders
  WINDOW TUMBLING (SIZE 20 SECONDS)
  GROUP BY item_id;

Window HOPPING
#

HOPPING: Hopping windows group input records into fixed-sized, (possibly) overlapping windows based on the records’ timestamps. You must specify the window size and the advance interval for hopping windows.

Example:

SELECT item_id, SUM(quantity)
  FROM orders
  WINDOW HOPPING (SIZE 20 SECONDS, ADVANCE BY 5 SECONDS)
  GROUP BY item_id;

Window SESSION
#

SESSION: Session windows group input records into so-called sessions. You must specify the session inactivity gap parameter for session windows. For example, imagine you set the inactivity gap to 5 minutes. If, for a given record key such as “alice”, no new input data arrives for more than 5 minutes, then the current session for “alice” is closed, and any newly arriving data for “alice” in the future will mark the beginning of a new session.

Example:

SELECT item_id, SUM(quantity)
  FROM orders
  WINDOW SESSION (20 SECONDS)
  GROUP BY item_id;

Transformations
#

Let’s create a new stream with a column transformation

CREATE STREAM pageviews_transformed \
  WITH (timestamp='viewtime', \
        partitions=5, \
        value_format='JSON') AS \
  SELECT viewtime, \
         userid, \
         pageid, \
         TIMESTAMPTOSTRING(viewtime, 'yyyy-MM-dd HH:mm:ss.SSS') AS timestring \
  FROM pageviews \
  PARTITION BY userid;

Joining
#

And joining the STREAM with enriched data from TABLE

CREATE STREAM pageviews_enriched AS \
   SELECT pv.viewtime, \
          pv.userid AS userid, \
          pv.pageid, \
          pv.timestring, \
          u.gender, \
          u.regionid, \
          u.interests, \
          u.contact_info \
   FROM pageviews_transformed pv \
   LEFT JOIN users u ON pv.userid = users.userid;

Use Cases
#

Common KSQL use cases are:

  • Fraud detection - identify and act on out of the ordinary data to provide real-time awareness.
  • Personalization - create real-time experiences and insight for end users driven by data.
  • Notifications - build custom alerts and messages based on real-time data.
  • Real-time Analytics - power real-time dashboards to understand what’s happening as it does.
  • Sensor data and IoT - understand and deliver sensor data how and where it needs to be.
  • Customer 360 - provide a clear, real-time understanding of your customers across every interaction.

Streaming ETL
#

KSQL makes it simple to transform data within the pipeline, readying messages to cleanly land in another system

Anomaly Detection
#

KSQL is a good fit for identifying patterns or anomalies on real-time data. By processing the stream as data arrives you can identify and properly surface out of the ordinary events with millisecond latency.

Monitoring
#

Kafka’s ability to provide scalable ordered messages with stream processing make it a common solution for log data monitoring and alerting. KSQL lends a familiar syntax for tracking, understanding, and managing alerts.

Conclusion
#

On a first glance KSQL seems pretty easy to start using it. It’s still pretty new, i would give it a time to mature before start using it on production.

For small transformations, data-quality control and analytical queries that don’t take into account a large windows this seems a good solution. For more complex queries i still prefer to keep Kafka to it’s core business and let the computation work to Spark.

More Information
#

References
#