Create Confluent Avro-based Apache Flink® table#
Confluent Avro is a serialization format that requires integrating with a schema registry. This enables the serialization and deserialization of data in a format that is language-agnostic, easy to read, and supports schema evolution.
Aiven for Apache Flink® simplifies the process of creating an Apache Flink® source table that uses the Confluent Avro data format with Karapace, an open-source schema registry for Apache Kafka® that enables you to store and retrieve Avro schemas. With Aiven for Apache Flink, you can stream data in the Confluent Avro data format and perform real-time transformations.
This article provides information on how to create an Apache Flink source table that uses Confluent Avro format to stream Avro messages.
Prerequisites#
Aiven for Apache Flink service with Aiven for Apache Kafka® integration. See Create Apache Flink® data service integrations for more information.
Aiven for Apache Kafka® service with Karapace Schema registry enabled. See Getting started with Karapace for more information.
By default, Flink cannot create Apache Kafka topics while pushing the first record automatically. To change this behavior, enable in the Aiven for Apache Kafka target service the
kafka.auto_create_topics_enable
option in Advanced configuration section.
Create an Apache Flink® table with Confluent Avro#
In the Aiven for Apache Flink service page, select Application from the left sidebar.
Create a new application or select an existing one with Aiven for Apache Kafka® integration.
Note
If editing an existing application, create a new version to make changes to the source or sink tables.
In the Create new version screen, select Add source tables.
Select Add new table or select Edit if you want to edit an existing source table.
In the Add new source table or Edit source table screen, select the Aiven for Apache Kafka® service as the integrated service.
In the Table SQL section, enter the SQL statement below to create an Apache Kafka®-based Apache Flink® table with Confluent Avro:
CREATE TABLE kafka ( -- specify the table columns ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '', 'scan.startup.mode' = 'earliest-offset', 'topic' = 'my_test.public.students', 'value.format' = 'avro-confluent', -- the value data format is Confluent Avro 'value.avro-confluent.url' = 'http://localhost:8082', -- the URL of the schema registry 'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO', -- the source of the user credentials for accessing the schema registry 'value.avro-confluent.basic-auth.user-info' = 'user_info' -- the user credentials for accessing the schema registry )
The following are the parameters:
connector
: the Kafka connector type, between the Apache Kafka SQL Connector (valuekafka
) for standard topic reads/writes and the Upsert Kafka SQL Connector (valueupsert-kafka
) for changelog type of integration based on message key.Note
For more information on the connector types and the requirements for each, see the articles on Kafka connector types and the requirements for each connector type.
properties.bootstrap.servers
: this parameter can be left empty since the connection details will be retrieved from the Aiven for Apache Kafka integration definitiontopic
: the topic to be used as a source for the data pipeline. If you want to use a new topic that does not yet exist, write the topic name.value.format
: indicates that the value data format is in the Confluent Avro format.Note
The
key.format
parameter can also be set to theavro-confluent
format.avro-confluent.url
: this is the URL for the Karapace schema registry.value.avro-confluent.basic-auth.credentials-source
: this specifies the source of the user credentials for accessing the Karapace schema registry. At present, only theUSER_INFO
value is supported for this parameter.value.avro-confluent.basic-auth.user-info
: this should be set to theuser_info
string you created earlier.Important
To access the Karapace schema registry, the user needs to provide the username and password using the
user_info
parameter. Theuser_info
parameter is a string formatted asuser_info = f"{username}:{password}"
.Additionally, on the source table, the user only needs read permission to the subject containing the schema. However, on the sink table, if the schema does not exist, the user must have write permission for the schema registry.
It is important to provide this information to authenticate and access the Karapace schema registry.
To create a sink table, select Add sink tables and repeat steps 4-6 for sink tables.
In the Create statement section, create a statement that defines the fields retrieved from each message in a topic.
Example: Define a Flink table using the standard connector over topic in Confluent Avro format#
The Aiven for Apache Kafka service called demo-kafka
includes a topic called my_test.public.student
that holds a stream of student data in Confluent Avro format like:
{"id": 1, "name": "John", "email": "john@gmail.com"}
{"id": 2, "name": "Jane", "email": "jane@yahoo.com"}
{"id": 3, "name": "Bob", "email": "bob@hotmail.com"}
{"id": 4, "name": "Alice", "email": "alice@gmail.com"}
You can define a students
Flink table by selecting demo-kafka
as the integration service and writing the following SQL schema:
CREATE TABLE students (
id INT,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'my_test.public.students',
'value.format' = 'avro-confluent'
'value.avro-confluent.url' = 'http://localhost:8082',
'value.avro-confluent.basic-auth.credentials-source'= 'USER_INFO',
'value.avro-confluent.basic-auth.user-info" = 'user_info',
)
Note
The SQL schema includes the output message fields id
, name
, email
and the related data type.