Team Exasol
Team Exasol

Blog snapshot

This post will show you:

  • How to use Kafka consumer user-defined functions to import data into an Exasol table.
  • The design decisions used to maintain the Kafka topic partition record offsets.

In the previous part we looked into Apache Kafka architecture and how to use Kafka Connect JDBC Connector together with our analytics database's JDBC driver. In this blog, we are going to show how to run a Kafka consumer application, based on our IMPORT User Defined Function (UDF).

Apache Kafka Consumer UDF

We've developed a consumer application based on our analytics database's user-defined functions (UDF) so you can use it to reliably import data from any Apache Kafka topic. Unlike Kafka Connect JDBC based solution, it's not something which will keep running once you've set it up. You'll need to run a SQL statement regularly to import the latest records from a Kafka topic into an Exasol table.

Preparing UDF scripts and importing data

Let’s take a look at how you prepare the UDF scripts and run the SQL import statement to import the data from a Kafka topic.

#1 Setting up the UDFs

Please follow these steps to make sure the UDFs are properly setup:

  • Download the latest JAR file from releases.
  • Upload the JAR file to a BucketFS bucket.
  • Create UDF scripts. You can see examples on the project page.

#2 Creating an Exasol table

You can use this code to create a table which maps to the column names and datatypes of the Kafka topic - with additional metadata for offset management:

CREATE OR REPLACE TABLE RETAIL.SALES_POSITIONS (
    -- Required for Kafka import UDF
    KAFKA_PARTITION DECIMAL(18, 0),
    KAFKA_OFFSET DECIMAL(36, 0),

    -- These columns match the Kafka topic schema
    SALES_ID    INTEGER,
    POSITION_ID SMALLINT,
    ARTICLE_ID  SMALLINT,
    AMOUNT      SMALLINT,
    PRICE       DECIMAL(9,2),
    VOUCHER_ID  SMALLINT,
    CANCELED    BOOLEAN
);

#3 Importing data from the topic into a table

This imports new records from the Kafka topic into the Exasol table, run the Exasol import SQL statement:

IMPORT INTO RETAIL.SALES_POSITIONS
FROM SCRIPT ETL.KAFKA_PATH WITH
  BOOTSTRAP_SERVERS   = 'kafka01.internal.tld:9092,kafka02.internal.tld:9093,kafka03.internal.tld:9094'
  SCHEMA_REGISTRY_URL = 'http://schema-registry.internal.tld:8081'
  TOPICS              = 'SALES-POSITIONS'
  TABLE_NAME          = 'RETAIL.KAFKA_TOPIC_SALES_POSITIONS'
  GROUP_ID            = 'exasol-kafka-udf-consumers';
But what do these parameters do? 

They're required and optional key-value properties used in the UDF consumer application.

Here's a breakdown of each:

  • BOOTSTRAP_SERVERS: This value defines a single server or list of Kafka servers - so our consumer UDF can register itself and discover the full Kafka cluster nodes.
  • SCHEMA_REGISTRY_URL: This is a Schema Registry address – used to obtain the topic schema metadata. It provides the topic field name and types that the UDF uses to map them into a relational table schema.
  • TOPICS: This relates to the Kafka topic name we are going to import data from.
  • TABLE_NAME: This is the corresponding Exasol table name we want to insert data into.
  • GROUP_ID: This is an optional parameter. When the consumer UDF starts, it creates a parallel consumer as there are partitions inside the topic. These parallel consumers are registered as a consumer group with the given identifier.

You can read more about consumer configuration parameters in the Kafka documentation.

Choosing the best way to manage the records you've imported

One of the main design decisions is how to manage the imported records from the Kafka topic. Once we import any record, we don’t want that record to be inserted into a table again. We need to commit the record offsets - to make sure Kafka brokers don’t send previously consumed records to new consumers. We briefly explained three approaches to do this in the first part of the blog series.

In this instance, it’s best to keep the Kafka topic partition identifier - and record offset pairs inside an Exasol table as metadata. This means if the import UDF terminates without any failure, these two values are committed to the table together with actual Kafka topic records. The next import UDF obtains maximum offset per partition from Exasol table and consumes Kafka records after these offsets. So it only imports the new records into a table.

What next?

In the next part of this blog series, we’re going to dig into the detail of using the AWS Kinesis connector. It shares a similar design and implementation details to the Kafka import UDF – and will help you smoothly consume data stored in Kinesis services from Exasol AWS cloud deployments.