Kafka Connect and Connector examples

The following example shows you how to deploy Amazon’s S3 Sink Connector.

For more information about available connectors and connector classes, see Available Kafka Connectors and Connector Classes.

Prerequisites

  • An Apache Kafka cluster (including Kafka Connect) deployed with Streaming Data Manager
  • AWS credentials with privileges to write to an S3 bucket. The S3 Sink Connector needs AWS credentials to be able to write messages from a topic to an S3 bucket. The AWS credentials can be passed to the connector through a file that is mounted into the hosting Kafka Connect cluster.

Steps

  1. Create a Kubernetes secret from the AWS credentials:

    echo "aws_access_key_id=<my-aws-access-key-id>" >> aws_creds.txt
    echo "aws_secret_access_key=<my-aws-access-key" >> aws_creds.txt
    
    cat aws_creds.txt | base64
    
    apiVersion: v1
    kind: Secret
    metadata:
      name: aws-s3-secret
      namespace: kafka
    data:
      aws-s3-creds.properties: # base64 encoded AWS credentials (creds.txt)
    
  2. Mount the secret into Kafka Connect. Modify the KafkaConnect custom resource, as follows, to mount AWS secrets into the Kafka Connect pods:

    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaConnect
    metadata:
      name: kafka
      namespace: kafka
    ...
    spec:
    ...
    volumeMounts:
    - name: aws-s3-creds
      readOnly: true
      mountPath: /etc/secrets
    volumes:
    - name: aws-s3-creds
      secret:
        secretName: aws-s3-secret
    
  3. Create the S3 Sink Connector.

    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: my-s3-sink
    spec:
      class: "io.confluent.connect.s3.S3SinkConnector"
      tasksMax: 6
      clusterRef:
        name: kafka
      config:
        aws.access.key.id: ${file:/etc/secrets/aws-s3-creds.properties:aws_access_key_id}
        aws.secret.access.key: ${file:/etc/secrets/aws-s3-creds.properties:aws_secret_access_key}
        flush.size: "10000"
        format.class: io.confluent.connect.s3.format.bytearray.ByteArrayFormat
        key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
        locale: en-US
        partition.duration.ms: "30000"
        partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
        path.format: "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH"
        rotate.schedule.interval.ms: "180000"
        s3.bucket.name: my-test-s3-bucket
        s3.region: eu-central-1
        schema.compatibility: NONE
        schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
        storage.class: io.confluent.connect.s3.storage.S3Storage
        timezone: UTC
        topics: my-example-topic
        value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    
  4. If Kafka ACLs are enabled, in addition to the ACLs created by Streaming Data Manager for Kafka Connect, the following ACLs must be created for the S3 Sink Connector:

    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaACL
    metadata:
      name: my-s3-sink-kafkaacl
      namespace: kafka
    spec:
      kind: User
      name: CN=kafka-kafka-kafka-connect
      clusterRef:
        name: kafka
        namespace: kafka
      roles:
        - name: consumer
          resourceSelectors:
            - name: my-example-topic
              namespace: kafka
            - name: connector-groups
              namespace: kafka
    
    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaResourceSelector
    metadata:
      name: my-example-topic
      namespace: kafka
    spec:
      type: topic
      name: example-topic
      pattern: literal
    
    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaResourceSelector
    metadata:
      name: connector-groups
      namespace: kafka
    spec:
      type: group
      name: connect-
      pattern: prefixed