Kafka Connect and Connector examples
The following example shows you how to deploy Amazon’s S3 Sink Connector.
For more information about available connectors and connector classes, see Available Kafka Connectors and Connector Classes.
Prerequisites
- An Apache Kafka cluster (including Kafka Connect) deployed with Streaming Data Manager
- AWS credentials with privileges to write to an S3 bucket. The S3 Sink Connector needs AWS credentials to be able to write messages from a topic to an S3 bucket. The AWS credentials can be passed to the connector through a file that is mounted into the hosting Kafka Connect cluster.
Steps
-
Create a Kubernetes secret from the AWS credentials:
echo "aws_access_key_id=<my-aws-access-key-id>" >> aws_creds.txt echo "aws_secret_access_key=<my-aws-access-key" >> aws_creds.txt cat aws_creds.txt | base64
apiVersion: v1 kind: Secret metadata: name: aws-s3-secret namespace: kafka data: aws-s3-creds.properties: # base64 encoded AWS credentials (creds.txt)
-
Mount the secret into Kafka Connect. Modify the KafkaConnect custom resource, as follows, to mount AWS secrets into the Kafka Connect pods:
apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaConnect metadata: name: kafka namespace: kafka ... spec: ... volumeMounts: - name: aws-s3-creds readOnly: true mountPath: /etc/secrets volumes: - name: aws-s3-creds secret: secretName: aws-s3-secret
-
Create the S3 Sink Connector.
apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaConnector metadata: name: my-s3-sink spec: class: "io.confluent.connect.s3.S3SinkConnector" tasksMax: 6 clusterRef: name: kafka config: aws.access.key.id: ${file:/etc/secrets/aws-s3-creds.properties:aws_access_key_id} aws.secret.access.key: ${file:/etc/secrets/aws-s3-creds.properties:aws_secret_access_key} flush.size: "10000" format.class: io.confluent.connect.s3.format.bytearray.ByteArrayFormat key.converter: org.apache.kafka.connect.converters.ByteArrayConverter locale: en-US partition.duration.ms: "30000" partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner path.format: "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH" rotate.schedule.interval.ms: "180000" s3.bucket.name: my-test-s3-bucket s3.region: eu-central-1 schema.compatibility: NONE schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator storage.class: io.confluent.connect.s3.storage.S3Storage timezone: UTC topics: my-example-topic value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
-
If Kafka ACLs are enabled, in addition to the ACLs created by Streaming Data Manager for Kafka Connect, the following ACLs must be created for the S3 Sink Connector:
apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaACL metadata: name: my-s3-sink-kafkaacl namespace: kafka spec: kind: User name: CN=kafka-kafka-kafka-connect clusterRef: name: kafka namespace: kafka roles: - name: consumer resourceSelectors: - name: my-example-topic namespace: kafka - name: connector-groups namespace: kafka
apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaResourceSelector metadata: name: my-example-topic namespace: kafka spec: type: topic name: example-topic pattern: literal
apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaResourceSelector metadata: name: connector-groups namespace: kafka spec: type: group name: connect- pattern: prefixed