OAuth Bearer Token Authentication for listeners (Apache Kafka 3.1.0 and above)

CAUTION:

Prior to Apache Kafka version 3.1.0, the native OAuth Bearer Token authentication supported only unsecured JWT tokens which were not suitable for production environments, therefore Streaming Data Manager used its own OAuth implementation instead.

To enable OAuth Bearer Token Authentication for pre 3.1.0 Kafka versions (using the Streaming Data Manager OAuth plugin), see OAuth Bearer Token Authentication for listeners (Apache Kafka pre 3.1.0).

This scenario covers using Kafka ACLs with a third-party identity provider via OAuth 2.0 Bearer Token based authentication. Starting from Apache Kafka version 3.1.0, Streaming Data Manager relies on Apache Kafka’s native OAuth Bearer Token authentication support introduced by KIP-768 for both external listeners and internal listeners.

Since the workload has to provide a valid JWT/Bearer Token for authentication, it is required to be capable of OAuth 2.0.

Note: Since Streaming Data Manager ensures TLS communication via Istio, the OAuth must be configured in PLAINTEXT mode.

Prerequisites

To use Kafka ACLs with Istio mTLS, you need:

  • capability to provision LoadBalancer Kubernetes services
  • a Kafka cluster

Calisti resource requirements

Make sure that your Kubernetes or OpenShift cluster has sufficient resources to install Calisti. The following table shows the number of resources needed on the cluster:

Resource Required
CPU - 32 vCPU in total
- 4 vCPU available for allocation per worker node (If you are testing on a cluster at a cloud provider, use nodes that have at least 4 CPUs, for example, c5.xlarge on AWS.)
Memory - 64 GiB in total
- 4 GiB available for allocation per worker node for the Kubernetes cluster (8 GiB in case of the OpenShift cluster)
Storage 12 GB of ephemeral storage on the Kubernetes worker nodes (for Traces and Metrics)

External listener

  1. Enable ACLs and configure an external listener using Streaming Data Manager. Complete the following steps:

    1. Verify that your deployed Kafka cluster is up and running:

      smm sdm cluster get --namespace <namespace-of-your-cluster> --kafka-cluster <name-of-your-kafka-cluster> --kubeconfig <path-to-kubeconfig-file>
      

      Expected output:

      Namespace  Name   State           Image                               Alerts  Cruise Control Topic Status  Rolling Upgrade Errors  Rolling Upgrade Last Success
      kafka      kafka  ClusterRunning  banzaicloud/kafka:3.1.0  0       CruiseControlTopicReady      0
      
    2. Enable ACLs and configure an external listener. The deployed Kafka cluster has no ACLs, and external access is disabled by default. Enable them by applying the following changes:

      smm sdm cluster update --namespace kafka --kafka-cluster kafka --kubeconfig <path-to-kubeconfig-file> -f -<<EOF
      apiVersion: kafka.banzaicloud.io/v1beta1
      kind: KafkaCluster
      spec:
        ingressController: "istioingress"
        istioIngressConfig:
          gatewayConfig:
            mode: PASSTHROUGH
        readOnlyConfig: |   
          auto.create.topics.enable=false
          offsets.topic.replication.factor=2
          authorizer.class.name=kafka.security.authorizer.AclAuthorizer
          allow.everyone.if.no.acl.found=false
      
          security.inter.broker.protocol=PLAINTEXT
      
          # OAUTH
          sasl.enabled.mechanisms=OAUTHBEARER
      
          # OAuth issuer token endpoint URL
          sasl.oauthbearer.token.endpoint.url=https://myidp.example.com/oauth2/default/v1/token
      
          # OAuth issuer's JWK Set endpoint URL from which to retrieve the set of JWKs managed by the provider; this can be a file://-based URL that points to a broker file system-accessible file-based copy of the JWKS data.
          sasl.oauthbearer.jwks.endpoint.url=https://myidp.example.com/oauth2/default/v1/keys
      
          listener.name.external.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
          listener.name.external.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
          listener.name.external.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
            clientSecret="<client-secret>" \
            clientId="oauth-client-id";
      
          listener.name.oauth.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
          listener.name.oauth.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
          listener.name.oauth.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
            clientSecret="<client-secret>" \
            clientId="oauth-client-id";
        listenersConfig:
          externalListeners:
          - type: "sasl_plaintext"
            name: "external"
            externalStartingPort: 19090
            containerPort: 9094
      EOF
      

      For details of the OAuth-related configuration options, see the KIP-768 and the Apache Kafka documentation.

    3. The update in the previous step triggers a rolling upgrade of the Kafka cluster. Verify that this is reflected in the state of the cluster.

      smm sdm cluster get --namespace kafka --kafka-cluster kafka --kubeconfig <path-to-kubeconfig-file>
      

      Expected output:

      Namespace  Name   State                    Image                               Alerts  Cruise Control Topic Status  Rolling Upgrade Errors  Rolling Upgrade Last Success
      kafka      kafka  ClusterRollingUpgrading  banzaicloud/kafka:2.13-3.1.0  0       CruiseControlTopicReady      0
      
    4. Wait until the reconfiguration is finished and the cluster is in the ClusterRunning state. This can take a while, as the rolling upgrade applies changes on a broker-by-broker basis.

Internal listener

Perform the steps as described in the external listener part, but adjust the listenersConfig section:

  listenersConfig:
    ...
    internalListeners:
    ...
    - containerPort: 29094
      name: oauth
      type: sasl_plaintext
      usedForInnerBrokerCommunication: false

Try out

The following example uses Keycloak as the Identity Provider which supports OIDC.

Deploy and configure Keycloak

  1. Deploy Keycloak.

    helm repo add bitnami https://charts.bitnami.com/bitnami
    
    helm install idp bitnami/keycloak --namespace keycloak  --create-namespace --version 7.1.18 -f-<<EOF
    auth:
      adminUser: admin
      adminPassword: mypwd
    service:
      type: ClusterIP
    EOF
    
  2. Import the Kafka realm into Keycloak.

    1. Run the following command:

      kubectl port-forward -n keycloak svc/idp-keycloak 7070:80
      
    2. Download the following JSON file.

      
      
    3. Open localhost:7070 in your browser and use the Keycloak UI to import the kafka-realm.json file.

  3. Update your Kafka cluster’s readOnlyConfig and listenersConfig configuration sections with the following OAuth-related properties:

    
    
  4. Test the OIDC authentication.

    Note: OAuthCompatibilityTool is a tool provided by the Kafka community when KIP-768 is added for testing, see: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575#KIP768:ExtendSASL/OAUTHBEARERwithSupportforOIDC-Testing

    1. To test the OIDC authentication with consumer credentials, run the following command:

      kafka-run-class.sh org.apache.kafka.tools.OAuthCompatibilityTool \
      --clientId kafka-consumer \
      --clientSecret 4090cfea-44c1-470b-a610-3a7b0f4366ae \
      --sasl.oauthbearer.jwks.endpoint.url http://idp-keycloak.keycloak.svc.cluster.local/auth/realms/kafka/protocol/openid-connect/certs \
      --sasl.oauthbearer.token.endpoint.url http://idp-keycloak.keycloak.svc.cluster.local/auth/realms/kafka/protocol/openid-connect/token \
      --sasl.oauthbearer.expected.audience account
      

      Expected result:

      PASSED 1/5: client configuration
      PASSED 2/5: client JWT retrieval
      PASSED 3/5: client JWT validation
      PASSED 4/5: broker configuration
      PASSED 5/5: broker JWT validation
      SUCCESS
      
    2. To test the OIDC authentication with producer credentials, run the following command:

      kafka-run-class.sh org.apache.kafka.tools.OAuthCompatibilityTool \
        --clientId kafka-producer \
        --clientSecret 85cb47f8-6c76-43f0-bbe2-a66f7f86395a \
        --sasl.oauthbearer.jwks.endpoint.url http://idp-keycloak.keycloak.svc.cluster.local/auth/realms/kafka/protocol/openid-connect/certs \
        --sasl.oauthbearer.token.endpoint.url http://idp-keycloak.keycloak.svc.cluster.local/auth/realms/kafka/protocol/openid-connect/token \
        --sasl.oauthbearer.expected.audience account
      

      Expected result:

      PASSED 1/5: client configuration
      PASSED 2/5: client JWT retrieval
      PASSED 3/5: client JWT validation
      PASSED 4/5: broker configuration
      PASSED 5/5: broker JWT validation
      SUCCESS
      

Test OIDC authentication with console consumer and console producer

  1. Create an example-topic:

    kubectl apply -n kafka -f- <<EOF
    apiVersion: kafka.banzaicloud.io/v1alpha1
    kind: KafkaTopic
    metadata:
      name: example-topic
      namespace: kafka
    spec:
      clusterRef:
        name: kafka
      name: example-topic
      partitions: 3
      replicationFactor: 2
      config:
        "retention.ms": "604800000"
        "cleanup.policy": "delete"
    EOF
    
  2. Grant access to the identities defined in Keycloak to the example-topic topic with Kafka ACLs:

    kubectl apply -n kafka -f- <<EOF
    ---
    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaResourceSelector
    metadata:
      name: selector-of-example-topic
      namespace: kafka
    spec:
      type: topic
      name: example-topic
      pattern: literal
    ---
    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaResourceSelector
    metadata:
      name: all-groups
      namespace: kafka
    spec:
      type: group
      name: '*'
      pattern: literal
    ---
    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaACL
    metadata:
      name: consumer-kafkaacl
      namespace: kafka
    spec:
      kind: User
      name: 0aa61449-e970-4854-ac2a-f22e664b0110
      clusterRef:
        name: kafka
        namespace: kafka
      roles:
        - name: consumer
          resourceSelectors:
            - name: selector-of-example-topic
              namespace: kafka
            - name: all-groups
              namespace: kafka   
    ---
    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaACL
    metadata:
      name: producer-kafkaacl
      namespace: kafka
    spec:
      kind: User
      name: 4c01f71e-f6a8-46e6-99f4-379db03c4362
      clusterRef:
        name: kafka
        namespace: kafka
      roles:
        - name: producer
          resourceSelectors:
            - name: selector-of-example-topic
              namespace: kafka
    EOF
    
  3. Create another topic (called forbidden-topic in the following example) that none of the identities defined in Keycloak has access to:

    kubectl apply -n kafka -f- <<EOF
    apiVersion: kafka.banzaicloud.io/v1alpha1
    kind: KafkaTopic
    metadata:
      name: forbidden-topic
      namespace: kafka
    spec:
      clusterRef:
        name: kafka
      name: forbidden-topic
      partitions: 3
      replicationFactor: 2
      config:
        "retention.ms": "604800000"
        "cleanup.policy": "delete"
    EOF
    
  4. Configure the producer configuration. Use the following producer configuration with the kafka-console-producer.sh tool (shipped with Kafka) for consuming and producing messages. Set the producer properties like this in a properties file:

    sasl.mechanism=OAUTHBEARER
    security.protocol=SASL_PLAINTEXT
    sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
    sasl.oauthbearer.token.endpoint.url=http://idp-keycloak.keycloak.svc.cluster.local/auth/realms/kafka/protocol/openid-connect/token
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        clientId="kafka-producer" \
        clientSecret="85cb47f8-6c76-43f0-bbe2-a66f7f86395a" \
        scope="kafka:write";
    

    Note: You can produce messages only messages to the example-topic, because the producer user has no access to the topic forbidden-topic.

  5. Configure the consumer. Use the following consumer configuration with kafka-console-producer.sh tool (shipped with Kafka) for consuming and producing messages. Set the consumer properties like this in a properties file:

    group.id=consumer-1
    group.instance.id=consumer-1-instance-1
    client.id=consumer-1-instance-1
    sasl.mechanism=OAUTHBEARER
    security.protocol=SASL_PLAINTEXT
    sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
    sasl.oauthbearer.token.endpoint.url=http://idp-keycloak.keycloak.svc.cluster.local/auth/realms/kafka/protocol/openid-connect/token
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
        clientId="kafka-consumer" \
        clientSecret="4090cfea-44c1-470b-a610-3a7b0f4366ae" \
        scope="kafka:read" ;
    

    Note: Only messages from the example-topic can be consumed, because the consumer user has no access to the topic forbidden-topic.