OAuth Bearer Token Authentication for listeners (Apache Kafka 3.1.0 and above)
CAUTION:
Prior to Apache Kafka version 3.1.0, the native OAuth Bearer Token authentication supported only unsecured JWT
tokens which were not suitable for production environments, therefore Streaming Data Manager used its own OAuth implementation instead.
To enable OAuth Bearer Token Authentication for pre 3.1.0 Kafka versions (using the Streaming Data Manager OAuth plugin), see OAuth Bearer Token Authentication for listeners (Apache Kafka pre 3.1.0).
This scenario covers using Kafka ACLs with a third-party identity provider via OAuth 2.0 Bearer Token based authentication. Starting from Apache Kafka version 3.1.0, Streaming Data Manager relies on Apache Kafka’s native OAuth Bearer Token authentication support introduced by KIP-768 for both external listeners and internal listeners.
Since the workload has to provide a valid JWT/Bearer Token for authentication, it is required to be capable of OAuth 2.0.
Note: Since Streaming Data Manager ensures TLS communication via Istio, the OAuth must be configured in
PLAINTEXT
mode.
Prerequisites
To use Kafka ACLs with Istio mTLS, you need:
- capability to provision LoadBalancer Kubernetes services
- a Kafka cluster
Calisti resource requirements
Make sure that your Kubernetes or OpenShift cluster has sufficient resources to install Calisti. The following table shows the number of resources needed on the cluster:
Resource | Required |
---|---|
CPU | - 32 vCPU in total - 4 vCPU available for allocation per worker node (If you are testing on a cluster at a cloud provider, use nodes that have at least 4 CPUs, for example, c5.xlarge on AWS.) |
Memory | - 64 GiB in total - 4 GiB available for allocation per worker node for the Kubernetes cluster (8 GiB in case of the OpenShift cluster) |
Storage | 12 GB of ephemeral storage on the Kubernetes worker nodes (for Traces and Metrics) |
External listener
-
Enable ACLs and configure an external listener using Streaming Data Manager. Complete the following steps:
-
Verify that your deployed Kafka cluster is up and running:
smm sdm cluster get --namespace <namespace-of-your-cluster> --kafka-cluster <name-of-your-kafka-cluster> --kubeconfig <path-to-kubeconfig-file>
Expected output:
Namespace Name State Image Alerts Cruise Control Topic Status Rolling Upgrade Errors Rolling Upgrade Last Success kafka kafka ClusterRunning banzaicloud/kafka:3.1.0 0 CruiseControlTopicReady 0
-
Enable ACLs and configure an external listener. The deployed Kafka cluster has no ACLs, and external access is disabled by default. Enable them by applying the following changes:
smm sdm cluster update --namespace kafka --kafka-cluster kafka --kubeconfig <path-to-kubeconfig-file> -f -<<EOF apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaCluster spec: ingressController: "istioingress" istioIngressConfig: gatewayConfig: mode: PASSTHROUGH readOnlyConfig: | auto.create.topics.enable=false offsets.topic.replication.factor=2 authorizer.class.name=kafka.security.authorizer.AclAuthorizer allow.everyone.if.no.acl.found=false security.inter.broker.protocol=PLAINTEXT # OAUTH sasl.enabled.mechanisms=OAUTHBEARER # OAuth issuer token endpoint URL sasl.oauthbearer.token.endpoint.url=https://myidp.example.com/oauth2/default/v1/token # OAuth issuer's JWK Set endpoint URL from which to retrieve the set of JWKs managed by the provider; this can be a file://-based URL that points to a broker file system-accessible file-based copy of the JWKS data. sasl.oauthbearer.jwks.endpoint.url=https://myidp.example.com/oauth2/default/v1/keys listener.name.external.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler listener.name.external.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler listener.name.external.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ clientSecret="<client-secret>" \ clientId="oauth-client-id"; listener.name.oauth.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler listener.name.oauth.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler listener.name.oauth.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ clientSecret="<client-secret>" \ clientId="oauth-client-id"; listenersConfig: externalListeners: - type: "sasl_plaintext" name: "external" externalStartingPort: 19090 containerPort: 9094 EOF
For details of the OAuth-related configuration options, see the KIP-768 and the Apache Kafka documentation.
-
The update in the previous step triggers a rolling upgrade of the Kafka cluster. Verify that this is reflected in the state of the cluster.
smm sdm cluster get --namespace kafka --kafka-cluster kafka --kubeconfig <path-to-kubeconfig-file>
Expected output:
Namespace Name State Image Alerts Cruise Control Topic Status Rolling Upgrade Errors Rolling Upgrade Last Success kafka kafka ClusterRollingUpgrading banzaicloud/kafka:2.13-3.1.0 0 CruiseControlTopicReady 0
-
Wait until the reconfiguration is finished and the cluster is in the
ClusterRunning
state. This can take a while, as the rolling upgrade applies changes on a broker-by-broker basis.
-
Internal listener
Perform the steps as described in the external listener part, but adjust the listenersConfig section:
listenersConfig:
...
internalListeners:
...
- containerPort: 29094
name: oauth
type: sasl_plaintext
usedForInnerBrokerCommunication: false
Try out
The following example uses Keycloak as the Identity Provider which supports OIDC.
Deploy and configure Keycloak
-
Deploy Keycloak.
helm repo add bitnami https://charts.bitnami.com/bitnami helm install idp bitnami/keycloak --namespace keycloak --create-namespace --version 7.1.18 -f-<<EOF auth: adminUser: admin adminPassword: mypwd service: type: ClusterIP EOF
-
Import the Kafka realm into Keycloak.
-
Run the following command:
kubectl port-forward -n keycloak svc/idp-keycloak 7070:80
-
Download the following JSON file.
-
Open
localhost:7070
in your browser and use the Keycloak UI to import thekafka-realm.json
file.
-
-
Update your Kafka cluster’s
readOnlyConfig
andlistenersConfig
configuration sections with the following OAuth-related properties: -
Test the OIDC authentication.
Note: OAuthCompatibilityTool is a tool provided by the Kafka community when KIP-768 is added for testing, see: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186877575#KIP768:ExtendSASL/OAUTHBEARERwithSupportforOIDC-Testing
-
To test the OIDC authentication with consumer credentials, run the following command:
kafka-run-class.sh org.apache.kafka.tools.OAuthCompatibilityTool \ --clientId kafka-consumer \ --clientSecret 4090cfea-44c1-470b-a610-3a7b0f4366ae \ --sasl.oauthbearer.jwks.endpoint.url http://idp-keycloak.keycloak.svc.cluster.local/auth/realms/kafka/protocol/openid-connect/certs \ --sasl.oauthbearer.token.endpoint.url http://idp-keycloak.keycloak.svc.cluster.local/auth/realms/kafka/protocol/openid-connect/token \ --sasl.oauthbearer.expected.audience account
Expected result:
PASSED 1/5: client configuration PASSED 2/5: client JWT retrieval PASSED 3/5: client JWT validation PASSED 4/5: broker configuration PASSED 5/5: broker JWT validation SUCCESS
-
To test the OIDC authentication with producer credentials, run the following command:
kafka-run-class.sh org.apache.kafka.tools.OAuthCompatibilityTool \ --clientId kafka-producer \ --clientSecret 85cb47f8-6c76-43f0-bbe2-a66f7f86395a \ --sasl.oauthbearer.jwks.endpoint.url http://idp-keycloak.keycloak.svc.cluster.local/auth/realms/kafka/protocol/openid-connect/certs \ --sasl.oauthbearer.token.endpoint.url http://idp-keycloak.keycloak.svc.cluster.local/auth/realms/kafka/protocol/openid-connect/token \ --sasl.oauthbearer.expected.audience account
Expected result:
PASSED 1/5: client configuration PASSED 2/5: client JWT retrieval PASSED 3/5: client JWT validation PASSED 4/5: broker configuration PASSED 5/5: broker JWT validation SUCCESS
-
Test OIDC authentication with console consumer and console producer
-
Create an
example-topic
:kubectl apply -n kafka -f- <<EOF apiVersion: kafka.banzaicloud.io/v1alpha1 kind: KafkaTopic metadata: name: example-topic namespace: kafka spec: clusterRef: name: kafka name: example-topic partitions: 3 replicationFactor: 2 config: "retention.ms": "604800000" "cleanup.policy": "delete" EOF
-
Grant access to the identities defined in Keycloak to the
example-topic
topic with Kafka ACLs:kubectl apply -n kafka -f- <<EOF --- apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaResourceSelector metadata: name: selector-of-example-topic namespace: kafka spec: type: topic name: example-topic pattern: literal --- apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaResourceSelector metadata: name: all-groups namespace: kafka spec: type: group name: '*' pattern: literal --- apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaACL metadata: name: consumer-kafkaacl namespace: kafka spec: kind: User name: 0aa61449-e970-4854-ac2a-f22e664b0110 clusterRef: name: kafka namespace: kafka roles: - name: consumer resourceSelectors: - name: selector-of-example-topic namespace: kafka - name: all-groups namespace: kafka --- apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaACL metadata: name: producer-kafkaacl namespace: kafka spec: kind: User name: 4c01f71e-f6a8-46e6-99f4-379db03c4362 clusterRef: name: kafka namespace: kafka roles: - name: producer resourceSelectors: - name: selector-of-example-topic namespace: kafka EOF
-
Create another topic (called
forbidden-topic
in the following example) that none of the identities defined in Keycloak has access to:kubectl apply -n kafka -f- <<EOF apiVersion: kafka.banzaicloud.io/v1alpha1 kind: KafkaTopic metadata: name: forbidden-topic namespace: kafka spec: clusterRef: name: kafka name: forbidden-topic partitions: 3 replicationFactor: 2 config: "retention.ms": "604800000" "cleanup.policy": "delete" EOF
-
Configure the producer configuration. Use the following producer configuration with the
kafka-console-producer.sh
tool (shipped with Kafka) for consuming and producing messages. Set the producer properties like this in a properties file:sasl.mechanism=OAUTHBEARER security.protocol=SASL_PLAINTEXT sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler sasl.oauthbearer.token.endpoint.url=http://idp-keycloak.keycloak.svc.cluster.local/auth/realms/kafka/protocol/openid-connect/token sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ clientId="kafka-producer" \ clientSecret="85cb47f8-6c76-43f0-bbe2-a66f7f86395a" \ scope="kafka:write";
Note: You can produce messages only messages to the
example-topic
, because the producer user has no access to the topicforbidden-topic
. -
Configure the consumer. Use the following consumer configuration with
kafka-console-producer.sh
tool (shipped with Kafka) for consuming and producing messages. Set the consumer properties like this in a properties file:group.id=consumer-1 group.instance.id=consumer-1-instance-1 client.id=consumer-1-instance-1 sasl.mechanism=OAUTHBEARER security.protocol=SASL_PLAINTEXT sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler sasl.oauthbearer.token.endpoint.url=http://idp-keycloak.keycloak.svc.cluster.local/auth/realms/kafka/protocol/openid-connect/token sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ clientId="kafka-consumer" \ clientSecret="4090cfea-44c1-470b-a610-3a7b0f4366ae" \ scope="kafka:read" ;
Note: Only messages from the
example-topic
can be consumed, because the consumer user has no access to the topicforbidden-topic
.