Disaster recovery on Kubernetes using MirrorMaker2

Streaming Data Manager offers multiple disaster recovery methods for Apache Kafka .

This section describes how to configure MirrorMaker2 with Streaming Data Manager to back up a Kafka cluster to a remote Kafka cluster running on a separate Kubernetes cluster, and how to recover the lost Kafka cluster. This disaster recovery solution uses a MirrorMaker2 active/active, active/passive topology setup.

Note: Streaming Data Manager and MirrorMaker2 supports other scenarios as well, such as fan-out, aggregation, or cluster migration. If you want to use Streaming Data Manager in one of these scenarios, contact us for details.

How it works

MirrorMaker2 uses the Connect framework to replicate topics between Kafka clusters.

  • Both topics and consumer groups are replicated
  • Topic configuration and ACLs are replicated
  • Cross-cluster offsets are synchronized
  • Partitioning is preserved

Streaming Data Manager uses MirrorMaker2 to set up cross-cluster replication between remote Kafka clusters and recover a lost Kafka cluster from a remote Kafka cluster. It deploys a MirrorMaker2 instance for each Kafka cluster into the same namespace where the Kafka cluster resides. The MirrorMaker2 instance acts as a:

  • Producer targeting the Kafka cluster running on the same Kubernetes cluster and namespace (it is recommended to have MirrorMaker2 deployed close to the target Kafka cluster)
  • Consumer for the remote Kafka clusters

Cross-cluster replication

Prepare the MirrorMaker2 descriptor

Streaming Data Manager expects a descriptor file in YAML or JSON format that describes the topology of the Kafka clusters and the MirrorMaker2 replication topology in the following format.

# list of Kubernetes config file paths of clusters hosting our Kafka clusters that we want to make MM2 deployments aware of
kubernetesConfigs:
- # path-to-the-kubeconfig-1
- # path-to-the-kubeconfig-2
- # path-to-the-kubeconfig-3

# list of Kafka clusters to make MM2 deployments aware of
kafkaClusters:
- namespace: # kubernetes namespace hosting the Kafka cluster, defaults to 'kafka'
  name: # Kafka cluster name, defaults to 'kafka'
  kubernetesConfigContext: # name of Kubernetes configuration context as defined in the kubeconfig files which references the Kubernetes cluster hosting the Kafka cluster. If not specified, the default context is used.
  alias: # kafka cluster alias by which MM2 refers to this Kafka cluster as (for example, kafka1). If not provided it defaults to '${kubernetesConfigContext}_${namespace}_${name}'
  internalListenerName: # name of the internal listener which local MM2 instances access this Kafka cluster through
  externalListenerName: # name of the external listener which remote MM2 instances access this Kafka cluster through
...
mirrorMaker2Spec:
  kafkaHeapOpts: # heap opts setting for MirrorMaker2, defaults to -Xms256M -Xmx2G
  resources:
  nodeSelector:
  tolerations:
  affinity:
  istioControlPlane:
    name: # name of the IstioControlPlane where Streaming Data Manager is installed
    namespace: # namespace of the IstioControlPlane where Streaming Data Manager is installed

mirrorMaker2Properties: |
  # replication topologies and flows, mm2 config, etc.

  # two way replication between 3 Kafka clusters
  kafka1->kafka2.enabled=true
  kafka1->kafka3.enabled=true

  kafka2->kafka1.enabled=true
  kafka2->kafka3.enabled=true

  kafka3->kafka1.enabled=true
  kafka3->kafka2.enabled=true

  tasks.max=5000

  topics.exclude=.*[\\-\\.]internal, .*\\.replica, __.*, _.*
  groups.exclude=console-consumer-.*, connect-.*, __.*, _.*

  # Setting replication factor of newly created remote topics
  # For anything other than development testing, a value greater than 1 is recommended to ensure availability.
  replication.factor=2

  ############################# MM2 Internal Topic Settings  #############################
  # For anything other than development testing, a value greater than 1 is recommended to ensure availability,
  checkpoints.topic.replication.factor=2
  heartbeats.topic.replication.factor=2
  offset-syncs.topic.replication.factor=2

  offset.storage.replication.factor=2
  status.storage.replication.factor=2
  config.storage.replication.factor=2  

Streaming Data Manager automatically generates the MirrorMaker2 configuration for each MirrorMaker2 instance (MirrorMaker2 has its proprietary configuration format). Streaming Data Manager maintains the Kafka servers section of the MirrorMaker2 configuration, while the replication flows and other MirrorMaker2 settings are populated from the mirrorMaker2Properties provided by the user. The generated MirrorMaker2 configuration looks like:

# maintained by Streaming Data Manager
clusters: kafka1, kafka2, kafka3, ...

kafka1.bootstrap.servers=... # internal kafka bootstrap servers URL if MM2 is on the same Kubernetes cluster as Kafka cluster, otherwise external kafka bootstrap servers URL
kafka2.bootstrap.servers=... # internal kafka bootstrap servers URL if MM2 is on the same Kubernetes cluster as Kafka cluster, otherwise external kafka bootstrap servers URL
kafka3.bootstrap.servers=... # internal kafka bootstrap servers URL if MM2 is on the same Kubernetes cluster as Kafka cluster, otherwise external kafka bootstrap servers URL

# user provided mm2 settings
kafka1->kafka2.enabled=true
kafka1->kafka3.enabled=true

kafka2->kafka1.enabled=true
kafka2->kafka3.enabled=true

kafka3->kafka1.enabled=true
kafka3->kafka2.enabled=true

tasks.max=5000

topics.exclude=.*[\\-\\.]internal, .*\\.replica, __.*, _.*
groups.exclude=console-consumer-.*, connect-.*, __.*, _.*

# Setting the frequency to check source cluster for new topics and new consumer groups.
refresh.topics.interval.seconds=300
refresh.groups.interval.seconds=300

# Setting replication factor of newly created remote topics
# For anything other than development testing, a value greater than 1 is recommended to ensure availability.
replication.factor=2

############################# MM2 Internal Topic Settings  #############################
# For anything other than development testing, a value greater than 1 is recommended to ensure availability,
checkpoints.topic.replication.factor=2
heartbeats.topic.replication.factor=2
offset-syncs.topic.replication.factor=2

offset.storage.replication.factor=2
status.storage.replication.factor=2
config.storage.replication.factor=2

Note: Keep the replication flow settings the same across all MirrorMaker2 instances to avoid omitting topics from cross-cluster replication.

In the example above, MirrorMaker2 will replicate topics from kafka1 to kafka2 and the other way around:

  • topics from kafka1 are replicated to kafka2 as kafka1.{topic-name}, similarly
  • topics from kafka2 are replicated to kafka1 as kafka2.{topic-name}.

As an example, if we have a topic named topic1 on both Kafka clusters, then:

  • kafka1 will have topic1 and kafka2.topic1, and
  • kafka2 will have topic1 and kafka2.topic1, respectively.

Bidirectional cross-cluster replication happens similarly between kafka2 <-> kafka3 and kafka1 <-> kafka3.

supertubes mm2 deploy -f <path-to-mm2-deployment-descriptor>

Configure Streaming Data Manager and MirrorMaker2

  1. Create two Kubernetes clusters, for example, named as example-k8s-cluster-1 and example-k8s-cluster-2.

  2. Install Streaming Data Manager on both Kubernetes clusters.

    1. Configure the Istio Control Plane that was set up by Streaming Data Manager on the two Kubernetes clusters to trust each other:

      Note: To ensure that the Kafka clusters are accessed through TLS, Streaming Data Manager automatically sets up an Istio service mesh, and runs Kafka inside the mesh on each Kubernetes cluster. That way, Istio handles TLS at the networking layer.

      Add the following configuration to the existing IstioControlPlane.servicemesh.cisco.com custom resource which controls the Istio mesh deployed by Streaming Data Manager:

      apiVersion: servicemesh.cisco.com/v1alpha1
      kind: IstioControlPlane
      spec:
        istiod:
          deployment:
            env:
            - name: ISTIO_MULTIROOT_MESH
              value: "true"
        meshConfig:
          defaultConfig:
            proxyMetadata:
              PROXY_CONFIG_XDS_AGENT: "true"
          caCertificates:
          - pem: |
                      # CA certificate of the Istio Control Plane running on the other Kubernetes cluster in PEM format
      

      Note: if there are more CAs to trust just add a separate - pem: item for each of them.

    2. Repeat the previous step for the other cluster.

  3. Create Kafka clusters on both Kubernetes clusters

    supertubes cluster create --namespace kafka -c {example-k8s-cluster-1-kubeconfig.yaml}
    
    supertubes cluster create --namespace kafka -c {example-k8s-cluster-2-kubeconfig.yaml}
    
  4. Add an external listener named external to each of the Kafka clusters.

    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaCluster
    spec:
    ...
      istioControlPlane:
        name: # name of the IstioControlPlane custom resource that managed the Istio mesh where the Kafka cluster runs (for example, sdm-icp-v115x)
        namespace: # namespace of the IstioControlPlane custom resource that managed the Istio mesh where the Kafka cluster runs (for example, istio-system)
     ...
      listenersConfig:
        externalListeners:
          - type: "plaintext"
            name: "external"
            externalStartingPort: 19090
            containerPort: 9094
     ...
    
  5. Wait until the Kafka clusters becomes operational:

    supertubes cluster get --namespace kafka --kafka-cluster kafka -c {example-k8s-cluster-1-kubeconfig.yaml}
    Namespace  Name   State           Image                                 Alerts  Cruise Control Topic Status  Rolling Upgrade Errors  Rolling Upgrade Last Success
    kafka      kafka  ClusterRunning  ghcr.io/banzaicloud/kafka:2.13-2.8.1  0       CruiseControlTopicReady      0                       2020-03-04 14:07:36
    
  6. Enable cross-cluster replication with MirrorMaker2. In the MirrorMaker2 descriptor file, you have to reference the Kafka clusters. For example, the one running on example-k8s-cluster-1 as kafka1 and the one running on example-k8s-cluster-2 as kafka2.

    supertubes mm2 deploy -f -<<EOF
    # list of Kubernetes config file paths of clusters hosting our Kafka clusters that we want to make MM2 deployments aware of
    kubernetesConfigs:
      - {example-k8s-cluster-1-kubeconfig.yaml}
      - {example-k8s-cluster-2-kubeconfig.yaml}
    
    # list of Kafka clusters to make MM2 deployments aware of
    kafkaClusters:
      - namespace: kafka
        name: kafka
        kubernetesConfigContext: kubernetes-admin@example-k8s-cluster-1  # the context from {example-k8s-cluster-1-kubeconfig.yaml}
        alias: kafka1 # name MM2 refers to this Kafka cluster to
        internalListenerName: internal # name of the Kafka cluster internal listener local MM2 instance to use
        externalListenerName: external # name of the Kafka cluster external listener remote MM2 instances to use
      - namespace: kafka
        name: kafka
        kubernetesConfigContext: kubernetes-admin@example-k8s-cluster-2 # the context from {example-k8s-cluster-2-kubeconfig.yaml}
        alias: kafka2 # name MM2 refers to this Kafka cluster to
        internalListenerName: internal # name of the Kafka cluster internal listener local MM2 instance to use
        externalListenerName: external # name of the Kafka cluster external listener remote MM2 instances to use
    mirrorMaker2Spec:
      istioControlPlane:
        name: # name of the IstioControlPlane where Streaming Data Manager is installed
        namespace: # namespace of the IstioControlPlane where Streaming Data Manager is installed
    mirrorMaker2Properties: |-
      # replication topologies and flows, mm2 config, etc.
      kafka1->kafka2.enabled=true
      kafka2->kafka1.enabled=true
    
        # we don't have ACLs set so skip replicating them
      sync.topic.acls.enabled=false
    
      tasks.max=5000
    
      topics.exclude=.*[\\-\\.]internal, .*\\.replica, __.*, _.*
      groups.exclude=console-consumer-.*, connect-.*, __.*, _.*
    
      # Setting the frequency to check source cluster for new topics and new consumer groups.
      refresh.topics.interval.seconds=300
      refresh.groups.interval.seconds=300
    
      # Setting replication factor of newly created remote topics
      # For anything other than development testing, a value greater than 1 is recommended to ensure availability.
      replication.factor=2
    
      ############################# MM2 Internal Topic Settings  #############################
      # For anything other than development testing, a value greater than 1 is recommended to ensure availability,
      checkpoints.topic.replication.factor=2
      heartbeats.topic.replication.factor=2
      offset-syncs.topic.replication.factor=2
    
      offset.storage.replication.factor=2
      status.storage.replication.factor=2
      config.storage.replication.factor=2  
    EOF
    
  7. Create some Kafka topics.

    1. Create a topic named testtopic on both Kafka clusters, for example:

      supertubes cluster topic create --namespace kafka --kafka-cluster kafka -c {example-k8s-cluster-1-kubeconfig.yaml} -f -<<EOF
      apiVersion: kafka.banzaicloud.io/v1alpha1
      kind: KafkaTopic
      metadata:
        name: testtopic
      spec:
        name: testtopic
        partitions: 3
        replicationFactor: 2
        config:
          "retention.ms": "28800000"
          "cleanup.policy": "delete"
      EOF
      
    2. Repeat the previous step on the other Kafka cluster.

  8. Generate client certificates. The Kafka clusters run inside an Istio service mesh with TLS enabled. This means you need a client certificate to write messages to your topic. You can create client certificates manually using KafkaUser custom resources, or using the Streaming Data Manager web interface.

    The exact procedure of manually creating KafkaUser custom resources and extracting the generated certificates depends on how you are signing the client certificates. See the following examples for the csr-operator and cert-manager:

    • csr-operator:

      Create a Kafka user that the client application will use to identify itself.

      kubectl create -f - <<EOF
      apiVersion: kafka.banzaicloud.io/v1alpha1
      kind: KafkaUser
      metadata:
        name: external-kafkauser
        namespace: default
      spec:
        clusterRef:
          name: kafka
          namespace: kafka
        secretName: external-kafkauser-secret
        pkiBackendSpec:
          pkiBackend: "k8s-csr"
          signerName: "csr.banzaicloud.io/privateca"
      EOF
      
      1. Export the public certificate of the CA.

        • If you are using the CSR operator:

          kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "chain.pem"}}' | base64 -D > /var/tmp/ca.crt
          
        • If you are using cert-manager:

          kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "ca.crt"}}' | base64 -D > /var/tmp/ca.crt 
          

        Alternatively, you can download the CA certificate and the client certificate from the Streaming Data Manager web interface.

      2. Export the client certificate stored in external-kafkauser-secret which represents Kafka user external-kafkauser:

        kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "tls.crt"}}' | base64 -D > /var/tmp/tls.crt
        kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "tls.key"}}' | base64 -D > /var/tmp/tls.key
        
      3. Use the exported client credentials and the CA certificate in your application to connect to the external listener of the Kafka cluster. (Otherwise, Istio automatically rejects the client application.) The following command is an example to connect with the kcat client application:

        kcat -L -b aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:29092 -X security.protocol=SSL -X ssl.key.location=/var/tmp/tls.key -X ssl.certificate.location=/var/tmp/tls.crt -X ssl.ca.location=/var/tmp/ca.crt
        

        Metadata for all topics (from broker -1: ssl://aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:29092/bootstrap): 2 brokers: broker 0 at aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:19090 (controller) broker 1 at aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:19091 1 topics: topic "example-topic" with 3 partitions: partition 0, leader 0, replicas: 0,1, isrs: 0,1 partition 1, leader 1, replicas: 1,0, isrs: 0,1 partition 2, leader 0, replicas: 0,1, isrs: 0,1

    • cert-manager:

      Create a Kafka user that the client application will use to identify itself.

      kubectl create -f - <<EOF
      apiVersion: kafka.banzaicloud.io/v1alpha1
      kind: KafkaUser
      metadata:
        name: external-kafkauser
        namespace: default
      spec:
        clusterRef:
          name: kafka
          namespace: kafka
        secretName: external-kafkauser-secret
        pkiBackendSpec:
          pkiBackend: "cert-manager"
          issuerRef:
            name: "ca-issuer"
            kind: "ClusterIssuer"
      EOF
      
      1. Export the public certificate of the CA.

        • If you are using the CSR operator:

          kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "chain.pem"}}' | base64 -D > /var/tmp/ca.crt
          
        • If you are using cert-manager:

          kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "ca.crt"}}' | base64 -D > /var/tmp/ca.crt 
          

        Alternatively, you can download the CA certificate and the client certificate from the Streaming Data Manager web interface.

      2. Export the client certificate stored in external-kafkauser-secret which represents Kafka user external-kafkauser:

        kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "tls.crt"}}' | base64 -D > /var/tmp/tls.crt
        kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "tls.key"}}' | base64 -D > /var/tmp/tls.key
        
      3. Use the exported client credentials and the CA certificate in your application to connect to the external listener of the Kafka cluster. (Otherwise, Istio automatically rejects the client application.) The following command is an example to connect with the kcat client application:

        kcat -L -b aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:29092 -X security.protocol=SSL -X ssl.key.location=/var/tmp/tls.key -X ssl.certificate.location=/var/tmp/tls.crt -X ssl.ca.location=/var/tmp/ca.crt
        

        Metadata for all topics (from broker -1: ssl://aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:29092/bootstrap): 2 brokers: broker 0 at aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:19090 (controller) broker 1 at aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:19091 1 topics: topic "example-topic" with 3 partitions: partition 0, leader 0, replicas: 0,1, isrs: 0,1 partition 1, leader 1, replicas: 1,0, isrs: 0,1 partition 2, leader 0, replicas: 0,1, isrs: 0,1

  9. Find the public addresses through which the Kafka cluster is exposed on both Kubernetes cluster:

    kubectl get service -n kafka meshgateway-external-kafka
    NAME                         TYPE           CLUSTER-IP      EXTERNAL-IP                                                                PORT(S)                                                           AGE
    meshgateway-external-kafka   LoadBalancer   10.10.171.247   a7c265ab846c14f8fa082773fee7c0da-2063154997.eu-north-1.elb.amazonaws.com   19090:31645/TCP,19091:32240/TCP,19092:32483/TCP,29092:30681/TCP   61m
    
    kubectl get service -n kafka meshgateway-external-kafka
    NAME                         TYPE           CLUSTER-IP      EXTERNAL-IP     PORT(S)                                                           AGE
    meshgateway-external-kafka   LoadBalancer   10.10.161.160   51.124.19.105   19090:32651/TCP,19091:31402/TCP,19092:30403/TCP,29092:31586/TCP   57m
    
  10. Write messages to testtopic on both clusters. Use kcat to produce and consume messages from/to our testtopic topic.

    kcat -b a7c265ab846c14f8fa082773fee7c0da-2063154997.eu-north-1.elb.amazonaws.com:29092 -X security.protocol=SSL -X ssl.ca.location=ca.crt -X ssl.certificate.location=tls.crt -X ssl.key.location=tls.key -P -t testtopic
    
    kafka1: message 1
    kafka1: message 2
    
    kcat -b 51.124.19.105:29092 -X security.protocol=SSL -X ssl.ca.location=ca.crt -X ssl.certificate.location=tls.crt -X ssl.key.location=tls.key -P -t testtopic
    
    kafka2: message 1
    kafka2: message 2
    
  11. Check replicated messages. Verify that MirrorMaker replicated messages from testtopic from kafka1 to kafka2 under kafka2.testtopic topic:

    kcat -b a7c265ab846c14f8fa082773fee7c0da-2063154997.eu-north-1.elb.amazonaws.com:29092 -X security.protocol=SSL -X ssl.ca.location=ca.crt -X ssl.certificate.location=tls.crt -X ssl.key.location=tls.key -C -t kafka2.testtopic -c2
    kafka2: message 1
    kafka2: message 2
    
    kcat -b 51.124.19.105:29092 -X security.protocol=SSL -X ssl.ca.location=ca.crt -X ssl.certificate.location=tls.crt -X ssl.key.location=tls.key -C -t kafka1.testtopic -c2
    kafka1: message 2
    kafka1: message 1
    

    The summary of what you are seeing is:

    +-- kafka1
    |   +-- testtopic
    |       +-- kafka1: message 1
    |       +-- kafka1: message 2
    |   +-- kafka2.testtopic
    |       +-- kafka2: message 1
    |       +-- kafka2: message 2
    +-- kafka2
    |   +-- testtopic
    |       +-- kafka2: message 1
    |       +-- kafka2: message 2
    |   +-- kafka1.testtopic
    |       +-- kafka1: message 1
    |       +-- kafka1: message 2
    

Recover a lost Kafka cluster

In the event of losing one of the Kubernetes clusters that hosts your Kafka cluster, complete the following steps.

  1. While the new cluster is provisioned, direct the client applications (consumers and producers) to the remaining Kafka clusters.

  2. Create a new Kubernetes cluster, for example, example-k8s-cluster-new.

  3. Wait until the new Kubernetes cluster is ready.

  4. Install Streaming Data Manager on the new cluster.

    supertubes install  -a --no-demo-cluster -c {example-k8s-cluster-new-kubeconfig.yaml}
    
  5. Configure the Istio mesh that was set up by Streaming Data Manager to trust the ones running on the remaining Kafka clusters.

  6. Create a new Kafka cluster to replace the lost one. In the following examples and in the MirrorMaker2 descriptor, we will refer to this new Kafka cluster as kafka1-new.

    supertubes cluster create --namespace kafka -c {example-k8s-cluster-new-kubeconfig.yaml}
    
  7. Wait until the Kafka clusters becomes operational

    supertubes cluster get --namespace kafka --kafka-cluster kafka -c {example-k8s-cluster-new-kubeconfig.yaml}
    Namespace  Name   State           Image                                 Alerts  Cruise Control Topic Status  Rolling Upgrade Errors  Rolling Upgrade Last Success
    kafka      kafka  ClusterRunning  ghcr.io/banzaicloud/kafka:2.13-2.8.1  0       CruiseControlTopicReady      0                       2020-03-04 16:42:34
    
  8. Add an external listener named external to each of the Kafka clusters.

  9. Create a topic named testtopic in the new Kafka cluster.

    supertubes cluster topic create --namespace kafka --kafka-cluster kafka -c {example-k8s-cluster-new-kubeconfig.yaml} -f -<<EOF
    apiVersion: kafka.banzaicloud.io/v1alpha1
    kind: KafkaTopic
    metadata:
      name: testtopic
    spec:
      name: testtopic
      partitions: 3
      replicationFactor: 2
      config:
        "retention.ms": "28800000"
        "cleanup.policy": "delete"
    EOF
    
  10. Create a new alias for the new Kafka cluster. The new (replacement) Kafka cluster must get a new alias and should not reuse the alias of the lost Kafka cluster, because MirrorMaker2 doesn’t replicate topics that are prefixed with the name of a cluster alias.

  11. Update the MirrorMaker2 topology descriptor file to reflect the new Kubernetes cluster and Kafka cluster alias (kafka1-new) as well.

    supertubes mm2 deploy -f -<<EOF
    # list of Kubernetes config file paths of clusters hosting our Kafka clusters that we want to make MM2 deployments aware of
    kubernetesConfigs:
      - {example-k8s-cluster-new-kubeconfig.yaml}
      - {example-k8s-cluster-2-kubeconfig.yaml}
    
    # list of Kafka clusters to make MM2 deployments aware of
    kafkaClusters:
      - namespace: kafka
        name: kafka
        kubernetesConfigContext: kubernetes-admin@example-k8s-cluster-new
        alias: kafka1-new # name MM2 refers to this Kafka cluster to
        internalListenerName: internal # name of the Kafka cluster internal listener local MM2 instance to use
        externalListenerName: external # name of the Kafka cluster external listener remote MM2 instances to use
      - namespace: kafka
        name: kafka
        kubernetesConfigContext: kubernetes-admin@example-k8s-cluster-2
        alias: kafka2 # name MM2 refers to this Kafka cluster to
        internalListenerName: internal # name of the Kafka cluster internal listener local MM2 instance to use
        externalListenerName: external # name of the Kafka cluster external listener remote MM2 instances to use
    
    mirrorMaker2Spec:
      istioControlPlane:
        name: # name of the IstioControlPlane where Streaming Data Manager is installed
        namespace: # namespace of the IstioControlPlane where Streaming Data Manager is installed
    
    mirrorMaker2Properties: |-
      # replication topologies and flows, mm2 config, etc.
      kafka1-new->kafka2.enabled=true
      kafka2->kafka1-new.enabled=true
    
      sync.topic.acls.enabled=false
      tasks.max=5000
    
      topics.exclude=.*[\\-\\.]internal, .*\\.replica, __.*, _.*
      groups.exclude=console-consumer-.*, connect-.*, __.*, _.*
    
      # Setting replication factor of newly created remote topics
      # For anything other than development testing, a value greater than 1 is recommended to ensure availability.
      replication.factor=2
    
      ############################# MM2 Internal Topic Settings  #############################
      # For anything other than development testing, a value greater than 1 is recommended to ensure availability,
      checkpoints.topic.replication.factor=2
      heartbeats.topic.replication.factor=2
      offset-syncs.topic.replication.factor=2
    
      offset.storage.replication.factor=2
      status.storage.replication.factor=2
      config.storage.replication.factor=2  
    EOF
    
  12. Run the following command with the modified descriptor file:

    supertubes mm2 deploy -f <path-to-mm2-deployment-descriptor>
    
  13. Wait until Streaming Data Manager updates all MirrorMaker2 instances.

  14. The new Kafka cluster starts catching up from the other clusters.

  15. Verify that all messages from the lost kafka1 cluster are replicated from the backup kafka2 cluster to the newly created kafka1-new replacement Kafka cluster.

    kubectl get service -n kafka meshgateway-external-kafka
    NAME                         TYPE           CLUSTER-IP      EXTERNAL-IP                                                               PORT(S)                                                           AGE
    meshgateway-external-kafka   LoadBalancer   10.10.235.122   abee2dfdb54dc485e8449b776a649377-1677360459.eu-west-1.elb.amazonaws.com   19090:30870/TCP,19091:32756/TCP,19092:30961/TCP,29092:31455/TCP   25m
    
    kcat -b abee2dfdb54dc485e8449b776a649377-1677360459.eu-west-1.elb.amazonaws.com:29092 -X security.protocol=SSL -X ssl.ca.location=ca.crt -X ssl.certificate.location=tls.crt -X ssl.key.location=tls.key -P -t testtopic
    
    kafka1-new: message 1
    kafka1-new: message 2