OAuth Bearer Token Authentication for listeners (Apache Kafka pre 3.1.0)
CAUTION:
These procedures apply to the following Apache Kafka versions that are supported by Streaming Data Manager: 2.5.0,2.5.1,2.6.0,2.6.2,2.7.0,2.7.1,2.8.0,2.8.1. If you are using version 3.1.0 or newer with Streaming Data Manager, see OAuth Bearer Token Authentication for listeners (Apache Kafka 3.1.0 and above).This scenario covers using Kafka ACLs with a third-party identity provider. As the most common way to achieve this is OAuth, we follow the industry standards and use OAuth 2.0 Bearer Token based authentication. Streaming Data Manager supports OAuth Bearer Token authentication for both external listeners and internal listeners.
Having a workload deployed into the kafka namespace to have it authenticated via OAuth, you have to provide a valid JWT/Bearer Token to be able to authenticate and authorize yourself. This implies that the workload itself is capable of OAuth 2.0.
Prerequisites
To use Kafka ACLs with Istio mTLS, you need:
- capability to provision LoadBalancer Kubernetes services
- a Kafka cluster
Calisti resource requirements
Make sure that your Kubernetes or OpenShift cluster has sufficient resources to install Calisti. The following table shows the number of resources needed on the cluster:
Resource | Required |
---|---|
CPU | - 32 vCPU in total - 4 vCPU available for allocation per worker node (If you are testing on a cluster at a cloud provider, use nodes that have at least 4 CPUs, for example, c5.xlarge on AWS.) |
Memory | - 64 GiB in total - 4 GiB available for allocation per worker node for the Kubernetes cluster (8 GiB in case of the OpenShift cluster) |
Storage | 12 GB of ephemeral storage on the Kubernetes worker nodes (for Traces and Metrics) |
External listener
Streaming Data Manager supports OAuth Bearer Token authentication for both external listeners and internal listeners.
- Enable ACLs and configure an external listener using Streaming Data Manager. Complete the following steps.
-
Verify that your deployed Kafka cluster is up and running:
smm sdm cluster get --namespace <namespace-of-your-cluster> --kafka-cluster <name-of-your-kafka-cluster> --kubeconfig <path-to-kubeconfig-file>
Expected output:
Namespace Name State Image Alerts Cruise Control Topic Status Rolling Upgrade Errors Rolling Upgrade Last Success kafka kafka ClusterRunning banzaicloud/kafka:2.13-2.5.0-bzc.1 0 CruiseControlTopicReady 0
-
Enable ACLs and configure an external listener. The deployed Kafka cluster has no ACLs, and external access is disabled by default. Enable them by applying the following changes:
smm sdm cluster update --namespace kafka --kafka-cluster kafka --kubeconfig <path-to-kubeconfig-file> -f -<<EOF apiVersion: kafka.banzaicloud.io/v1beta1 kind: KafkaCluster spec: ingressController: "istioingress" istioIngressConfig: gatewayConfig: mode: PASSTHROUGH readOnlyConfig: | auto.create.topics.enable=false offsets.topic.replication.factor=2 authorizer.class.name=kafka.security.authorizer.AclAuthorizer allow.everyone.if.no.acl.found=false # OAUTH sasl.enabled.mechanisms=OAUTHBEARER sasl.mechanism.inter.broker.protocol=PLAIN listener.name.oauth.oauthbearer.sasl.login.callback.handler.class=com.cisco.kafka.oauth.client.LoginCallbackHandler listener.name.oauth.oauthbearer.sasl.sasl.server.callback.handler.class=com.cisco.kafka.oauth.server.ValidatorCallbackHandler listener.name.oauth.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ ciscoOauthIssuer="https://<your-idp-provider>" \ ciscoOauthTokenEndpoint="https://<your-idp-provider>/token" \ ciscoOauthMetadataEndpoint="https://<your-idp-provider>/.well-known/openid-configuration" \ ciscoOauthClientSecret="<client-secret>" \ ciscoOauthValidatorRequiredClaims="iss,sub,iat,aud,exp,jti" \ ciscoOauthClientId="<oauth-client-id>"; listenersConfig: externalListeners: - type: "sasl_plaintext" name: "oauth" externalStartingPort: 19090 containerPort: 9094 EOF
For details of the OAuth-related configuration options, see the option reference.
-
The update in the previous step triggers a rolling upgrade of the Kafka cluster. Verify that this is reflected in the state of the cluster.
smm sdm cluster get --namespace kafka --kafka-cluster kafka --kubeconfig <path-to-kubeconfig-file>
Expected output:
Namespace Name State Image Alerts Cruise Control Topic Status Rolling Upgrade Errors Rolling Upgrade Last Success kafka kafka ClusterRollingUpgrading banzaicloud/kafka:2.13-2.5.0-bzc.1 0 CruiseControlTopicReady 0
-
Wait until the reconfiguration is finished and the cluster is in the ClusterRunning state. This can take a while, as the rolling upgrade applies changes on a broker-by-broker basis.
-
Internal listener
- Follow the external listener configuration instructions with a little additional change.
-
Perform the steps as described in the external listener part, but adjust the listenersConfig section:
listenersConfig: ... internalListeners: ... - containerPort: 29094 name: oauth type: sasl_plaintext usedForInnerBrokerCommunication: false
-
Oauth module configuration options
The module provides Login
and Validator
callback handlers for supporting the OAuth authentication mechanism in Kafka as described in KIP-255.
The Login
callback handler is responsible for acquiring a valid access token from predefined OIDC/OAuth authorization server, while the Validator
callback handler takes care of the validation of the access token provided by the Kafka clients using the Simple Authentication and Security Layer (SASL).
Reference
Parameter | Component | Required | Default |
---|---|---|---|
ciscoOauthIssuer | Login/Validator | Required | |
ciscoOauthMetadataEndpoint | Login/Validator | Optional | |
ciscoOauthAuthorizeEndpoint | Login | Optional | |
ciscoOauthTokenEndpoint | Login | Optional | |
ciscoOauthIntrospectEndpoint | Validator | Optional | |
ciscoOauthJWKSEndpoint | Validator | Optional | |
ciscoOauthJWKSFile | Validator | Optional | |
ciscoOauthRevokeEndpoint | Login | Optional | |
ciscoOauthClientId | Login/Validator | Optional | |
ciscoOauthClientSecret | Login/Validator | Optional | |
ciscoOauthAllowableClockSkewSec | Login/Validator | Optional | 5 |
ciscoOauthLoginRequestScopes | Login | Optional | |
ciscoOauthLoginAccessToken | Login | Optional | |
ciscoOauthLoginRefreshToken | Login | Optional | |
ciscoOauthValidatorRequiredScopes | Validator | Optional | |
ciscoOauthValidatorRequiredAudience | Validator | Optional | |
ciscoOauthValidatorAllowMissingTypeHeader | Validator | Optional | false |
ciscoOauthValidatorRequiredClaims | Validator | Optional | iss,sub,iat,aud,exp,nbf,jti |
ciscoOauthIssuer
Base URL for the OIDC/OAuth authorization server. The value of this parameter must match the value of the iss
claim of the access token.
ciscoOauthIssuer="https://accounts.example.com"
ciscoOauthMetadataEndpoint
The URL for metadata API endpoint of the OIDC/OAuth authorization server. This URL is used to fetch configuration of the authorization server like API endpoints, supported claims, and so on.
This is useful when the metadata information is hosted on a URL different from the issuer (set by ciscoOauthIssuer
). Falling back to the URL composed from ciscoOauthIssuer + "/.well-known/openid-configuration"
if this parameter is not set.
ciscoOauthMetadataEndpoint="https://accounts.example.com/.well-known/openid-configuration"
ciscoOauthAuthorizeEndpoint
Not implemented yet.
The URL for the authorization API endpoint of the OIDC/OAuth authorization server. This URL must be set in case the authorization server does not have support for the metadata API.
ciscoOauthAuthorizeEndpoint="https://accounts.example.com/o/oauth2/v2/auth"
ciscoOauthTokenEndpoint
The URL for the token API endpoint of the OIDC/OAuth authorization server. This URL must be set in case the authorization server does not have support for the metadata API.
ciscoOauthTokenEndpoint="https://accounts.example.com/o/oauth2/v2/token"
ciscoOauthIntrospectEndpoint
The URL for the introspect API endpoint of the OIDC/OAuth authorization server. This URL must be set if access token introspection (detecting revoked tokens) during validation is desired and the authorization server does not have support for the metadata API. Support for token introspection may vary across OIDC/OAuth authorization server implementations. See RFC7662 for information about token inspection.
ciscoOauthIntrospectEndpoint="https://accounts.example.com/o/oauth2/v2/introspect"
ciscoOauthJWKSEndpoint
The URL for the JWKS (JSON Web Key Set) API endpoint of the OIDC/OAuth authorization server. This URL must be set in case the authorization server does not support the metadata API. The endpoint is used to fetch set of public keys used by the authorization server to sign access tokens in order to perform token signature validation.
ciscoOauthJWKSEndpoint="https://accounts.example.com/o/oauth2/v2/keys"
ciscoOauthJWKSFile
The path to the JWKS (JSON Web Key Set) file stored in JSON format on the local disk. If this parameter is set, the validator does not attempt to retrieve keys from JWKS API endpoint, but uses the provided file to get the list of keys for validating the token signature.
ciscoOauthJWKSFile="/path/to/jwks.json"
ciscoOauthRevokeEndpoint
Not implemented yet.
The URL for the revocation API endpoint of the OIDC/OAuth authorization server. This URL must be set in case the authorization server does not have support for the metadata API and revoking the current access token at process shutdown is desired.
ciscoOauthRevokeEndpoint="https://accounts.example.com/o/oauth2/v2/revoke"
ciscoOauthClientId
Client ID provided by the authorization server for the clients for authentication.
Login
Omit this parameter if the ciscoOauthLoginAccessToken
is set, which will cause the Login
callback to reuse the provided access token without contacting the authorization server. If both the ciscoOauthLoginAccessToken
and ciscoOauthLoginRefreshToken
are set then this parameter must be set as this information is required by the authorization server for performing access token refresh.
Validator
Omit this parameter only if offline access token validation is desired (without performing access token introspection) and one of the followings are provided: ciscoOauthMetadataEndpoint
, ciscoOauthJWKSEndpoint
, ciscoOauthJWKSFile
. Otherwise, it must be always set.
ciscoOauthClientId="CLIENT_ID"
ciscoOauthClientSecret
Client Secret provided by the authorization server for the clients for authentication.
Login
Omit this parameter if the ciscoOauthLoginAccessToken
is set, which will cause the Login
callback to reuse the provided access token without contacting the authorization server. Depending on the OAuth client type, the OIDC/OAuth authorization server might require the client secret to be set if both the ciscoOauthLoginAccessToken
and ciscoOauthLoginRefreshToken
are provided and the client is configured as confidental. See RFC6749 - Client Types for more information.
Validator
Omit this parameter only if offline access token validation is desired (without performing access token introspection) and one of the following parameters are provided: ciscoOauthMetadataEndpoint
, ciscoOauthJWKSEndpoint
, ciscoOauthJWKSFile
. Otherwise it must be always set.
ciscoOauthClientSecret="CLIENT_SECRET"
ciscoOauthAllowableClockSkewSec
Use this parameter to set the allowed time difference (in seconds) between the server and client clocks when validating the exp
(Expiration Time) and nbf
(Not Before) claims.
Default: 5
ciscoOauthAllowableClockSkewSec="30"
ciscoOauthLoginRequestScopes
Comma-separated list of scopes included in the access token request. This parameter is ignored if ciscoOauthLoginAccessToken
or both the ciscoOauthLoginAccessToken
and ciscoOauthLoginRefreshToken
are set as changing the list of scopes for predefined tokens are not supported by the standard.
ciscoOauthLoginRequestScopes="kafka:read,kafka:write"
ciscoOauthLoginAccessToken
Use this parameter to provide the access token to be used by the Login
callback if the access token is acquired outside of the application process and Login
callback requesting a new access token from the OIDC/OAuth authorization server is not desired.
ciscoOauthLoginAccessToken="eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwI..."
ciscoOauthLoginRefreshToken
Use this parameter to provide the refresh token to be used by the Login
callback to renew access token set in ciscoOauthLoginAccessToken
configuration parameter in case it has expired. In order to request a new access token, the Login
callback needs to contact the OIDC/OAuth authorization server hence one of the following parameters must be set: ciscoOauthMetadataEndpoint
, ciscoOauthTokenEndpoint
. Also, the OIDC/OAuth authorization server must support Refresh Token grant type. See RFC6749 - Refreshing an Access Token for more information.
ciscoOauthRefreshToken="0UtJXGSsqlUq..."
ciscoOauthValidatorRequiredScopes
Comma-separated list of required scopes which must be present in the token in order to consider it valid. Empty list means no validation will be performed.
ciscoOauthValidatorRequiredScopes="kafka:read,kafka:write"
ciscoOauthValidatorRequiredAudience
Comma-separated list of required audiences which must be present in the token in order to consider it valid. Usually this is a single-item list, but there might be environments with multiple audiences too. Empty list means no validation will be performed.
ciscoOauthValidatorRequiredAudience="kafka-cluster-1"
ciscoOauthValidatorAllowMissingTypeHeader
If true
the validator accepts tokens with missing typ
header field as certain OIDC/OAuth servers (for example, Dex) might not set this field in the issued tokens. Important to note that the validator will treat tokens as signed JWT (aka JWS) in this case.
Default: false
ciscoOauthValidatorAllowMissingTypeHeader="true"
ciscoOauthValidatorRequiredClaims
Comma-separated list of claims which must present in the token in order to consider it valid. See RFC7519 - JWT Claims for more information about claims.
Default: iss,sub,iat,aud,exp,nbf,jti
ciscoOauthValidatorRequiredClaims="iss,sub,iat,aud,exp,nbf,jti"