OAuth Bearer Token Authentication for listeners (Apache Kafka pre 3.1.0)

CAUTION:

These procedures apply to the following Apache Kafka versions that are supported by Streaming Data Manager: 2.5.0,2.5.1,2.6.0,2.6.2,2.7.0,2.7.1,2.8.0,2.8.1. If you are using version 3.1.0 or newer with Streaming Data Manager, see OAuth Bearer Token Authentication for listeners (Apache Kafka 3.1.0 and above).

This scenario covers using Kafka ACLs with a third-party identity provider. As the most common way to achieve this is OAuth, we follow the industry standards and use OAuth 2.0 Bearer Token based authentication. Streaming Data Manager supports OAuth Bearer Token authentication for both external listeners and internal listeners.

Having a workload deployed into the kafka namespace to have it authenticated via OAuth, you have to provide a valid JWT/Bearer Token to be able to authenticate and authorize yourself. This implies that the workload itself is capable of OAuth 2.0.

Prerequisites

To use Kafka ACLs with Istio mTLS, you need:

  • a Kubernetes cluster (version 1.19 and above), with
  • at least 12 vCPU and 12 GB of memory, and
  • with the capability to provision LoadBalancer Kubernetes services.
  • A Kafka cluster.

External listener

Streaming Data Manager supports OAuth Bearer Token authentication for both external listeners and internal listeners.

  1. Enable ACLs and configure an external listener using Streaming Data Manager. Complete the following steps.
    1. Verify that your deployed Kafka cluster is up and running:

      supertubes cluster get --namespace <namespace-of-your-cluster> --kafka-cluster <name-of-your-kafka-cluster> --kubeconfig <path-to-kubeconfig-file>
      

      Expected output:

      Namespace  Name   State           Image                               Alerts  Cruise Control Topic Status  Rolling Upgrade Errors  Rolling Upgrade Last Success
      kafka      kafka  ClusterRunning  banzaicloud/kafka:2.13-2.5.0-bzc.1  0       CruiseControlTopicReady      0
      
    2. Enable ACLs and configure an external listener. The deployed Kafka cluster has no ACLs, and external access is disabled by default. Enable them by applying the following changes:

      supertubes cluster update --namespace kafka --kafka-cluster kafka --kubeconfig <path-to-kubeconfig-file> -f -<<EOF
      apiVersion: kafka.banzaicloud.io/v1beta1
      kind: KafkaCluster
      spec:
        ingressController: "istioingress"
        istioIngressConfig:
          gatewayConfig:
            mode: PASSTHROUGH
        readOnlyConfig: |   
          auto.create.topics.enable=false
          offsets.topic.replication.factor=2
          authorizer.class.name=kafka.security.authorizer.AclAuthorizer
          allow.everyone.if.no.acl.found=false
          # OAUTH
          sasl.enabled.mechanisms=OAUTHBEARER
          sasl.mechanism.inter.broker.protocol=PLAIN
          listener.name.oauth.oauthbearer.sasl.login.callback.handler.class=com.cisco.kafka.oauth.client.LoginCallbackHandler
          listener.name.oauth.oauthbearer.sasl.sasl.server.callback.handler.class=com.cisco.kafka.oauth.server.ValidatorCallbackHandler
          listener.name.oauth.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
          ciscoOauthIssuer="https://<your-idp-provider>" \
          ciscoOauthTokenEndpoint="https://<your-idp-provider>/token" \
          ciscoOauthMetadataEndpoint="https://<your-idp-provider>/.well-known/openid-configuration" \
          ciscoOauthClientSecret="<client-secret>" \
          ciscoOauthValidatorRequiredClaims="iss,sub,iat,aud,exp,jti" \
          ciscoOauthClientId="<oauth-client-id>";
        listenersConfig:
          externalListeners:
          - type: "sasl_plaintext"
            name: "oauth"
            externalStartingPort: 19090
            containerPort: 9094
      EOF
      

      For details of the OAuth-related configuration options, see the option reference.

    3. The update in the previous step triggers a rolling upgrade of the Kafka cluster. Verify that this is reflected in the state of the cluster.

      supertubes cluster get --namespace kafka --kafka-cluster kafka --kubeconfig <path-to-kubeconfig-file>
      

      Expected output:

      Namespace  Name   State                    Image                               Alerts  Cruise Control Topic Status  Rolling Upgrade Errors  Rolling Upgrade Last Success
      kafka      kafka  ClusterRollingUpgrading  banzaicloud/kafka:2.13-2.5.0-bzc.1  0       CruiseControlTopicReady      0
      
    4. Wait until the reconfiguration is finished and the cluster is in the ClusterRunning state. This can take a while, as the rolling upgrade applies changes on a broker-by-broker basis.

Internal listener

  1. Follow the external listener configuration instructions with a little additional change.
    1. Perform the steps as described in the external listener part, but adjust the listenersConfig section:

         listenersConfig:
           ...
           internalListeners:
           ...
           - containerPort: 29094
            name: oauth
            type: sasl_plaintext
            usedForInnerBrokerCommunication: false
      

Oauth module configuration options

The module provides Login and Validator callback handlers for supporting the OAuth authentication mechanism in Kafka as described in KIP-255.

The Login callback handler is responsible for acquiring a valid access token from predefined OIDC/OAuth authorization server, while the Validator callback handler takes care of the validation of the access token provided by the Kafka clients using the Simple Authentication and Security Layer (SASL).

Reference

Parameter Component Required Default
ciscoOauthIssuer Login/Validator Required
ciscoOauthMetadataEndpoint Login/Validator Optional
ciscoOauthAuthorizeEndpoint Login Optional
ciscoOauthTokenEndpoint Login Optional
ciscoOauthIntrospectEndpoint Validator Optional
ciscoOauthJWKSEndpoint Validator Optional
ciscoOauthJWKSFile Validator Optional
ciscoOauthRevokeEndpoint Login Optional
ciscoOauthClientId Login/Validator Optional
ciscoOauthClientSecret Login/Validator Optional
ciscoOauthAllowableClockSkewSec Login/Validator Optional 5
ciscoOauthLoginRequestScopes Login Optional
ciscoOauthLoginAccessToken Login Optional
ciscoOauthLoginRefreshToken Login Optional
ciscoOauthValidatorRequiredScopes Validator Optional
ciscoOauthValidatorRequiredAudience Validator Optional
ciscoOauthValidatorAllowMissingTypeHeader Validator Optional false
ciscoOauthValidatorRequiredClaims Validator Optional iss,sub,iat,aud,exp,nbf,jti

ciscoOauthIssuer

Base URL for the OIDC/OAuth authorization server. The value of this parameter must match the value of the iss claim of the access token.

ciscoOauthIssuer="https://accounts.example.com"

ciscoOauthMetadataEndpoint

The URL for metadata API endpoint of the OIDC/OAuth authorization server. This URL is used to fetch configuration of the authorization server like API endpoints, supported claims, and so on. This is useful when the metadata information is hosted on a URL different from the issuer (set by ciscoOauthIssuer). Falling back to the URL composed from ciscoOauthIssuer + "/.well-known/openid-configuration" if this parameter is not set.

ciscoOauthMetadataEndpoint="https://accounts.example.com/.well-known/openid-configuration"

ciscoOauthAuthorizeEndpoint

Not implemented yet.

The URL for the authorization API endpoint of the OIDC/OAuth authorization server. This URL must be set in case the authorization server does not have support for the metadata API.

ciscoOauthAuthorizeEndpoint="https://accounts.example.com/o/oauth2/v2/auth"

ciscoOauthTokenEndpoint

The URL for the token API endpoint of the OIDC/OAuth authorization server. This URL must be set in case the authorization server does not have support for the metadata API.

ciscoOauthTokenEndpoint="https://accounts.example.com/o/oauth2/v2/token"

ciscoOauthIntrospectEndpoint

The URL for the introspect API endpoint of the OIDC/OAuth authorization server. This URL must be set if access token introspection (detecting revoked tokens) during validation is desired and the authorization server does not have support for the metadata API. Support for token introspection may vary across OIDC/OAuth authorization server implementations. See RFC7662 for information about token inspection.

ciscoOauthIntrospectEndpoint="https://accounts.example.com/o/oauth2/v2/introspect"

ciscoOauthJWKSEndpoint

The URL for the JWKS (JSON Web Key Set) API endpoint of the OIDC/OAuth authorization server. This URL must be set in case the authorization server does not support the metadata API. The endpoint is used to fetch set of public keys used by the authorization server to sign access tokens in order to perform token signature validation.

ciscoOauthJWKSEndpoint="https://accounts.example.com/o/oauth2/v2/keys"

ciscoOauthJWKSFile

The path to the JWKS (JSON Web Key Set) file stored in JSON format on the local disk. If this parameter is set, the validator does not attempt to retrieve keys from JWKS API endpoint, but uses the provided file to get the list of keys for validating the token signature.

ciscoOauthJWKSFile="/path/to/jwks.json"

ciscoOauthRevokeEndpoint

Not implemented yet.

The URL for the revocation API endpoint of the OIDC/OAuth authorization server. This URL must be set in case the authorization server does not have support for the metadata API and revoking the current access token at process shutdown is desired.

ciscoOauthRevokeEndpoint="https://accounts.example.com/o/oauth2/v2/revoke"

ciscoOauthClientId

Client ID provided by the authorization server for the clients for authentication.

Login

Omit this parameter if the ciscoOauthLoginAccessToken is set, which will cause the Login callback to reuse the provided access token without contacting the authorization server. If both the ciscoOauthLoginAccessToken and ciscoOauthLoginRefreshToken are set then this parameter must be set as this information is required by the authorization server for performing access token refresh.

Validator

Omit this parameter only if offline access token validation is desired (without performing access token introspection) and one of the followings are provided: ciscoOauthMetadataEndpoint, ciscoOauthJWKSEndpoint, ciscoOauthJWKSFile. Otherwise, it must be always set.

ciscoOauthClientId="CLIENT_ID"

ciscoOauthClientSecret

Client Secret provided by the authorization server for the clients for authentication.

Login

Omit this parameter if the ciscoOauthLoginAccessToken is set, which will cause the Login callback to reuse the provided access token without contacting the authorization server. Depending on the OAuth client type, the OIDC/OAuth authorization server might require the client secret to be set if both the ciscoOauthLoginAccessToken and ciscoOauthLoginRefreshToken are provided and the client is configured as confidental. See RFC6749 - Client Types for more information.

Validator

Omit this parameter only if offline access token validation is desired (without performing access token introspection) and one of the following parameters are provided: ciscoOauthMetadataEndpoint, ciscoOauthJWKSEndpoint, ciscoOauthJWKSFile. Otherwise it must be always set.

ciscoOauthClientSecret="CLIENT_SECRET"

ciscoOauthAllowableClockSkewSec

Use this parameter to set the allowed time difference (in seconds) between the server and client clocks when validating the exp (Expiration Time) and nbf (Not Before) claims.

Default: 5

ciscoOauthAllowableClockSkewSec="30"

ciscoOauthLoginRequestScopes

Comma-separated list of scopes included in the access token request. This parameter is ignored if ciscoOauthLoginAccessToken or both the ciscoOauthLoginAccessToken and ciscoOauthLoginRefreshToken are set as changing the list of scopes for predefined tokens are not supported by the standard.

ciscoOauthLoginRequestScopes="kafka:read,kafka:write"

ciscoOauthLoginAccessToken

Use this parameter to provide the access token to be used by the Login callback if the access token is acquired outside of the application process and Login callback requesting a new access token from the OIDC/OAuth authorization server is not desired.

ciscoOauthLoginAccessToken="eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiIxMjM0NTY3ODkwI..."

ciscoOauthLoginRefreshToken

Use this parameter to provide the refresh token to be used by the Login callback to renew access token set in ciscoOauthLoginAccessToken configuration parameter in case it has expired. In order to request a new access token, the Login callback needs to contact the OIDC/OAuth authorization server hence one of the following parameters must be set: ciscoOauthMetadataEndpoint, ciscoOauthTokenEndpoint. Also, the OIDC/OAuth authorization server must support Refresh Token grant type. See RFC6749 - Refreshing an Access Token for more information.

ciscoOauthRefreshToken="0UtJXGSsqlUq..."

ciscoOauthValidatorRequiredScopes

Comma-separated list of required scopes which must be present in the token in order to consider it valid. Empty list means no validation will be performed.

ciscoOauthValidatorRequiredScopes="kafka:read,kafka:write"

ciscoOauthValidatorRequiredAudience

Comma-separated list of required audiences which must be present in the token in order to consider it valid. Usually this is a single-item list, but there might be environments with multiple audiences too. Empty list means no validation will be performed.

ciscoOauthValidatorRequiredAudience="kafka-cluster-1"

ciscoOauthValidatorAllowMissingTypeHeader

If true the validator accepts tokens with missing typ header field as certain OIDC/OAuth servers (for example, Dex) might not set this field in the issued tokens. Important to note that the validator will treat tokens as signed JWT (aka JWS) in this case.

Default: false

ciscoOauthValidatorAllowMissingTypeHeader="true"

ciscoOauthValidatorRequiredClaims

Comma-separated list of claims which must present in the token in order to consider it valid. See RFC7519 - JWT Claims for more information about claims.

Default: iss,sub,iat,aud,exp,nbf,jti

ciscoOauthValidatorRequiredClaims="iss,sub,iat,aud,exp,nbf,jti"