Create an Amazon S3 sink connector by Aiven from Apache Kafka® ============================================================== The Apache Kafka Connect® S3 sink connector by Aiven enables you to move data from an Aiven for Apache Kafka® cluster to Amazon S3 for long term storage. .. Note:: There are two versions of S3 sink connector available with Aiven for Apache Kafka Connect®: one is developed by Aiven, another developed by Confluent. This article uses the Aiven version. The S3 sink connector by Confluent is discussed in a :doc:`dedicated page `. .. note:: You can check the full set of available parameters and configuration options in the `connector's documentation `_. Prerequisites ------------- To setup the S3 sink connector by Aiven, you need an Aiven for Apache Kafka® service :doc:`with Apache Kafka Connect enabled ` or a :ref:`dedicated Aiven for Apache Kafka Connect cluster `. Furthermore you need to follow the steps :doc:`to prepare the AWS account and S3 sink ` and collect the following information about the target S3 bucket upfront: * ``AWS_S3_NAME``: The name of the S3 bucket * ``AWS_S3_REGION``: The AWS region where the S3 bucket has been created * ``AWS_USER_ACCESS_KEY_ID``: The AWS user access key ID * ``AWS_USER_SECRET_ACCESS_KEY``: The AWS user secret access key .. Tip:: If you want to secure your Kafka Connect to S3 using `AWS Assume role credentials `_, check out the :doc:`dedicated article `. Setup an S3 sink connector with Aiven CLI ----------------------------------------- The following example demonstrates how to setup an Apache Kafka Connect® S3 sink connector using the :ref:`Aiven CLI dedicated command `. Define a Kafka Connect® configuration file '''''''''''''''''''''''''''''''''''''''''' Define the connector configurations in a file (we'll refer to it with the name ``s3_sink.json``) with the following content: .. code:: { "name": "", "connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "topics": "", "aws.access.key.id": "", "aws.secret.access.key": "", "aws.s3.bucket.name": "", "aws.s3.region": "" } The configuration file contains the following entries: * ``name``: The connector name * ``topics``: The list of Apache Kafka® topics to sink to the S3 bucket * ``key.converter`` and ``value.converter``: Data converters, depending on the topic data format. Check the `GitHub repository documentation `_ for more information * ``aws.access.key.id``: The AWS user access key ID * ``aws.secret.access.key``: The AWS user secret access key * ``aws.s3.bucket.name``: The name of the S3 bucket * ``aws.s3.region``: The AWS region where the S3 bucket has been created .. Tip:: You can define S3 sink connector naming and data formats by setting the :doc:`dedicated parameters `. Check out the `GitHub repository parameters documentation `_ for the full list of configuration options. Create an S3 sink connector with Aiven CLI '''''''''''''''''''''''''''''''''''''''''' To create the connector, execute the following :ref:`Aiven CLI command `, replacing the ``SERVICE_NAME`` with the name of the existing Aiven for Apache Kafka® service where the connector needs to run: .. code:: avn service connector create SERVICE_NAME @s3_sink.json Check the connector status with the following command, replacing the ``SERVICE_NAME`` with the existing Aiven for Apache Kafka® service and the ``CONNECTOR_NAME`` with the name of the connector defined before: .. code:: avn service connector status SERVICE_NAME CONNECTOR_NAME With the connection in place, verify that the data is flowing to the target S3 bucket. Example: define a S3 sink connector ----------------------------------- The example creates an S3 sink connector with the following properties: * connector name: ``my_s3_sink`` * source topics: ``students`` * target S3 bucket name: ``my-test-bucket`` * target S3 bucket region: ``eu-central-1`` * AWS user access key id: ``AKIAXXXXXXXXXX`` * AWS user secret access key: ``hELuXXXXXXXXXXXXXXXXXXXXXXXXXX`` The connector configuration is the following: .. code:: { "name": "my_s3_sink", "connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "topics": "students", "aws.access.key.id": "AKIAXXXXXXXXXX", "aws.secret.access.key": "hELuXXXXXXXXXXXXXXXXXXXXXXXXXX", "aws.s3.bucket.name": "my-test-bucket", "aws.s3.region": "eu-central-1" } With the above configuration stored in a ``s3_sink.json`` file, you can create the connector in the ``demo-kafka`` instance with: .. code:: avn service connector create demo-kafka @s3_sink.json