Kafka Connect and Connector examples

The following example shows you how to deploy Amazon’s S3 Sink Connector.

Prerequisites

  • An Apache Kafka cluster (including Kafka Connect) deployed with Streaming Data Manager
  • AWS credentials with privileges to write to an S3 bucket. The S3 Sink Connector needs AWS credentials to be able to write messages from a topic to an S3 bucket. The AWS credentials can be passed to the connector through a file that is mounted into the hosting Kafka Connect cluster.

Steps

  1. Create a Kubernetes secret from the AWS credentials:

    echo "aws_access_key_id=<my-aws-access-key-id>" >> aws_creds.txt
    echo "aws_secret_access_key=<my-aws-access-key" >> aws_creds.txt
    
    cat aws_creds.txt | base64
    
    apiVersion: v1
    kind: Secret
    metadata:
    name: aws-s3-secret
    namespace: kafka
    data:
    aws-s3-creds.properties: # base64 encoded AWS credentials (creds.txt)
    
  2. Mount the secret into Kafka Connect. Modify the KafkaConnect custom resource, as follows, to mount AWS secrets into the Kafka Connect pods:

    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaConnect
    metadata:
    name: kafka
    namespace: kafka
    ...
    spec:
    ...
    volumeMounts:
    - name: aws-s3-creds
        readOnly: true
        mountPath: /etc/secrets
    volumes:
    - name: aws-s3-creds
        secret:
        secretName: aws-s3-secret
    
  3. Create the S3 Sink Connector.

    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaConnector
    metadata:
    name: my-s3-sink
    spec:
    class: "io.confluent.connect.s3.S3SinkConnector"
    tasksMax: 6
    clusterRef:
        name: <
    config:
        aws.access.key.id: ${file:/etc/secrets/aws-s3-creds.properties:aws_access_key_id}
        aws.secret.access.key: ${file:/etc/secrets/aws-s3-creds.properties:aws_secret_access_key}
        flush.size: "10000"
        format.class: io.confluent.connect.s3.format.bytearray.ByteArrayFormat
        key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        locale: en-US
        partition.duration.ms: "30000"
        partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
        path.format: "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH"
        rotate.schedule.interval.ms: "180000"
        s3.bucket.name: my-test-s3-bucket
        s3.region: eu-central-1
        schema.compatibility: NONE
        schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
        storage.class: io.confluent.connect.s3.storage.S3Storage
        timezone: UTC
        topics: my-example-topic
        value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    
  4. If Kafka ACLs are enabled, in addition to the ACLs created by Streaming Data Manager for Kafka Connect, the following ACLs must be created for the S3 Sink Connector:

    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaACL
    metadata:
    name: my-s3-sink-kafkaacl
    namespace: kafka
    spec:
    kind: User
    name: CN=kafka-kafka-kafka-connect
    clusterRef:
        name: kafka
        namespace: kafka
    roles:
        - name: consumer
        resourceSelectors:
            - name: my-example-topic
            namespace: kafka
            - name: connector-groups
            namespace: kakfa
    
    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaResourceSelector
    metadata:
    name: my-example-topic
    namespace: kafka
    spec:
    type: topic
    name: example-topic
    pattern: literal
    
    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaResourceSelector
    metadata:
    name: connector-groups
    namespace: kafka
    spec:
    type: group
    name: connect-
    pattern: prefixed