Data migration using MirrorMaker2

Streaming Data Manager offers an elegant way of migrating data from existing Kafka clusters to Kafka clusters that are managed by Streaming Data Manager without introducing downtime of the existing Kafka clusters. This is extremely useful for onboarding to Streaming Data Manager without losing data on the already running Kafka clusters.

This solution uses the active/passive topology option of MirrorMaker2, meaning that data is only replicated from the source cluster to the target cluster.

To show that even the most basic Kafka cluster can be used as a source, the examples use a Kafka cluster running on an Amazon EC2 VM.

Data migration with MirrorMaker2 overview Data migration with MirrorMaker2 overview

How it works

MirrorMaker2 uses the Connect framework to replicate topics between Kafka clusters.

  • Both topics and consumer groups are replicated
  • Topic configuration and ACLs are replicated
  • Cross-cluster offsets are synchronized
  • Partitioning is preserved

Streaming Data Manager uses MirrorMaker2 to set up replication from a source Kafka cluster to a target managed by Streaming Data Manager. MirrorMaker2 itself is deployed on the target cluster and acts as a Consumer for the source Kafka cluster.

Note: MirrorMaker2 doesn’t replicate Kafka topics' replication factor. The replication factor of every replicated Kafka topics is set in the replication.factor configuration option of MirrorMaker2. In the following example, it’s set to 2.

MirrorMaker2 only replicates ACLs when the following conditions are met:

  • resourceType is TOPIC
  • patternType is LITERAL
  • permissionType and operation are not set to WRITE and ALLOW (this means ACLs that grant WRITE permission can’t be replicated)

You can create the non-replicated ACLs using Streaming Data Manager’s cloud-native ACL resources. For details, see Kafka ACLs over Istio overview.

To complete the migration, you’ll have to complete the following high-level steps:

  1. Prepare a MirrorMaker2 descriptor
  2. Configure the target cluster
  3. Configure the source cluster
  4. Deploy MirrorMaker2
  5. Test the deployment

Prepare the MirrorMaker2 descriptor

Create a descriptor file in YAML or JSON format that describes the topology of the Kafka clusters and the MirrorMaker2 replication topology in the following format. Edit the file to match your environment.

Note: MirrorMaker2 doesn’t replicate Kafka topics' replication factor. The replication factor of every replicated Kafka topics is set in the replication.factor configuration option of MirrorMaker2. In the following example, it’s set to 2.

MirrorMaker2 only replicates ACLs when the following conditions are met:

  • resourceType is TOPIC
  • patternType is LITERAL
  • permissionType and operation are not set to WRITE and ALLOW (this means ACLs that grant WRITE permission can’t be replicated)

You can create the non-replicated ACLs using Streaming Data Manager’s cloud-native ACL resources. For details, see Kafka ACLs over Istio overview.

# list of Kubernetes config file paths of clusters hosting our Streaming Data Manager Kafka clusters that we want to make MM2 deployments aware of.
# here we should only need one in most cases, which will be the target of the data migration
kubernetesConfigs:
- # path-to-target-kubeconfig
...
# list of Streaming Data Manager Kafka clusters to make MM2 deployments aware of. To be more specific the kafkaClusters we want as targets. In most cases only one is needed at a time.
kafkaClusters:
- namespace: # kubernetes namespace hosting the Kafka cluster, defaults to 'kafka'
  name: # Kafka cluster name, defaults to 'kafka'
  kubernetesConfigContext: # name of Kubernetes configuration context as defined in the kubeconfig files which references the Kubernetes cluster hosting the Kafka cluster. If not specified, the default context is used.
  alias: # kafka cluster alias by which MM2 refers to this Kafka cluster as (for example, target-cluster). If not provided it defaults to '${kubernetesConfigContext}_${namespace}_${name}'
  internalListenerName: # name of the internal listener which local MM2 instances access this Kafka cluster through. If not specified the externalListenerName will be used.
  externalListenerName: # name of the external listener which remote MM2 instances access this Kafka cluster through
...
# list of non-Streaming Data Manager Kafka clusters to make MM2 deployments aware of. We will want to use only one of these at a time in most cases.
nonSDMKafkaClusters:
  - bootstrapServerAddress: # the bootstrap server address from the Kafka cluster we're migrating from.
    alias: # kafka cluster alias by which MM2 refers to this Kafka cluster as (for example, source-cluster).
...
mirrorMaker2Spec:
  kafkaHeapOpts: # heap opts setting for MirrorMaker2, defaults to -Xms256M -Xmx2G
  resources:
  nodeSelector:
  tolerations:
  affinity:
  minReplicas:
  maxReplicas:
  istioControlPlane:
    name: # name of the IstioControlPlane where Streaming Data Manager is installed
    namespace: # namespace of the IstioControlPlane where Streaming Data Manager is installed

mirrorMaker2Properties: |
  # replication topologies and flows, mm2 config, etc.

  # active/passive setup for moving data to the target from the source
  source-cluster->target-cluster.enabled=true
  # keep the *-offset-syncs-* topic on the target side to avoid needing MirrorMaker2 to have write permissions on the source cluster
  source-cluster->target-cluster.offset-syncs.topic.location=target

  # if we also want to copy acls
  sync.topic.acls.enabled=true

  # use these as empty strings if you don't want resources to be renamed on replication
  replication.policy.separator=
  source.cluster.alias=
  target.cluster.alias=

  tasks.max=5000

  topics.exclude=.*[\\-\\.]internal, .*\\.replica, __.*, _.*
  groups.exclude=console-consumer-.*, connect-.*, __.*, _.*

  # Setting replication factor of newly created remote topics
  # For anything other than development testing, a value greater than 1 is recommended to ensure availability.
  # The original replication factor of topics can't be preserved by MirrorMaker2.
  replication.factor=2

  ############################# MM2 Internal Topic Settings  #############################
  # For anything other than development testing, a value greater than 1 is recommended to ensure availability,
  checkpoints.topic.replication.factor=2
  heartbeats.topic.replication.factor=2
  offset-syncs.topic.replication.factor=2

  offset.storage.replication.factor=2
  status.storage.replication.factor=2
  config.storage.replication.factor=2

  # Only needed if the source cluster is using SSL
  source-cluster.security.protocol=SSL
  source-cluster.ssl.truststore.password={truststore password}
  source-cluster.ssl.truststore.location={truststore location}
  source-cluster.ssl.keystore.password={keystore password}
  source-cluster.ssl.keystore.location={keystore location}
  source-cluster.ssl.key.password={key password}

Streaming Data Manager automatically generates the MirrorMaker2 configuration for each MirrorMaker2 instance (MirrorMaker2 has its proprietary configuration format). Streaming Data Manager maintains the Kafka clusters and the non-Streaming Data Manager Kafka clusters section of the MirrorMaker2 configuration, while the replication flows and other MirrorMaker2 settings are populated from the mirrorMaker2Properties section of your descriptor file. The generated MirrorMaker2 configuration looks like:

# maintained by Streaming Data Manager
clusters: source-cluster, target-cluster, ...

source-cluster.bootstrap.servers=... # bootstrap server address of the non-SDM Kafka cluster in our example.
target-cluster.bootstrap.servers=... # internal kafka bootstrap servers URL if MM2 is on the same Kubernetes cluster as Kafka cluster, otherwise external kafka bootstrap servers URL

# user provided mm2 settings
source-cluster->target-cluster.enabled=true
# keep the *-offset-syncs-* topic on the target side to avoid needing MirrorMaker2 to have write permissions on the source cluster
source-cluster->target-cluster.offset-syncs.topic.location=target

sync.topic.acls.enabled=true

replication.policy.separator=
source.cluster.alias=
target.cluster.alias=

tasks.max=5000

topics.exclude=.*[\\-\\.]internal, .*\\.replica, __.*, _.*
groups.exclude=console-consumer-.*, connect-.*, __.*, _.*

# Setting the frequency to check source cluster for new topics and new consumer groups.
refresh.topics.interval.seconds=300
refresh.groups.interval.seconds=300

# Setting replication factor of newly created remote topics
# For anything other than development testing, a value greater than 1 is recommended to ensure availability.
replication.factor=2

############################# MM2 Internal Topic Settings  #############################
# For anything other than development testing, a value greater than 1 is recommended to ensure availability,
checkpoints.topic.replication.factor=2
heartbeats.topic.replication.factor=2
offset-syncs.topic.replication.factor=2

offset.storage.replication.factor=2
status.storage.replication.factor=2
config.storage.replication.factor=2

# Only needed if the source cluster is using SSL
source-cluster.security.protocol=SSL
source-cluster.ssl.truststore.password={truststore password}
source-cluster.ssl.truststore.location={truststore location}
source-cluster.ssl.keystore.password={keystore password}
source-cluster.ssl.keystore.location={keystore location}
source-cluster.ssl.key.password={key password}

In the example above, MirrorMaker2 will replicate topics from source-cluster to target-cluster. The original names of the topics are preserved, because the following parameters are set to empty string:

replication.policy.separator=
source.cluster.alias=
target.cluster.alias=

Configure the target cluster

Create the cluster where you want to migrate your data. For the rest of this guide, this will be the target cluster.

  1. Create a Kubernetes cluster and install Calisti with Streaming Data Manager on it. Make sure to size the cluster properly, so it has enough resources for Streaming Data Manager and your Kafka brokers as well. For details, see Installation.

  2. Create a Kafka cluster on the target cluster. For details and other options, see Create Kafka cluster.

    smm sdm cluster create --namespace kafka -c {target-kubeconfig.yaml}
    
  3. Add an external listener named external to the Kafka cluster. Modify your KafkaCluster CR based on the following example:

    apiVersion: kafka.banzaicloud.io/v1beta1
    kind: KafkaCluster
    spec:
    ...
      istioControlPlane:
        name: # name of the IstioControlPlane custom resource that managed the Istio mesh where the Kafka cluster runs (for example, sdm-icp-v115x)
        namespace: # namespace of the IstioControlPlane custom resource that managed the Istio mesh where the Kafka cluster runs (for example, istio-system)
     ...
      listenersConfig:
        externalListeners:
          - type: "plaintext"
            name: "external"
            externalStartingPort: 19090
            containerPort: 9094
     ...
    
  4. Wait until the Kafka clusters becomes operational:

    smm sdm cluster get --namespace kafka --kafka-cluster kafka -c {target-kubeconfig.yaml}
    

    Expected output:

    Namespace  Name   State           Image                                 Alerts  Cruise Control Topic Status  Rolling Upgrade Errors  Rolling Upgrade Last Success
    kafka      kafka  ClusterRunning  ghcr.io/banzaicloud/kafka:2.13-2.8.1  0       CruiseControlTopicReady      0                       2020-03-04 14:07:36
    

Configure the source cluster

This example assumes that the source Kafka cluster is deployed and running on an Amazon EC2 VM. Other providers should offer a similar experience, but feel free to reach out to us if you need help in migrating your Kafka cluster to Streaming Data Manager.

Note: The MirrorMaker2 configuration assumes that the source Kafka cluster has at least 2 brokers running. If you have only one broker, adjust the MirrorMaker2 descriptor file accordingly.

  1. Edit the properties files containing the configuration of your Kafka brokers, and under the Socket Server Settings section add a line defining a listener. You must be able to reach this cluster from the outside.

    <LISTENER_NAME>://<PUBLICLY_ACCESSIBLE_ADDRESS_OF_THE_HOST>:<PORT>
    

    The following example uses a plaintext listener.

    PLAINTEXT://ec2-184-72-117-95.compute-1.amazonaws.com:9092
    

    PLAINTEXT is the name of the listener. If you do not define a listener.security.protocol.map in the configuration file, it also defines the security protocol used. The port can be almost anything you want, typically it’s 9092.

    CAUTION:

    Plaintext listeners do not encrypt data in any way. Depending on your security policies, additional configuration might be required. For more information on securing listeners refer to the official Apache Kafka documentation.
  2. Restart the broker to which you have added the listener. The cluster is ready to be connected to MirrorMaker2.

Deploy MirrorMaker2

Deploy MirrorMaker2 on your Streaming Data Manager cluster. Edit and run the following command. Replace {target-kubeconfig.yaml} and the value of bootstrapServerAddress to match your environment.

smm sdm mm2 deploy -f -<<EOF
# list of Kubernetes config file paths of clusters hosting our Streaming Data Manager Kafka clusters that we want to make MM2 deployments aware of.
# here we should only need one in most cases, which will be the target of the data migration
kubernetesConfigs:
  - {target-kubeconfig.yaml}

# list of Kafka clusters to make MM2 deployments aware of
kafkaClusters:
  - namespace: kafka
    name: kafka
    kubernetesConfigContext: kubernetes-admin@target  # the context from {target-kubeconfig.yaml}
    alias: target-cluster # name MM2 refers to this Kafka cluster as
    internalListenerName: internal # name of the Kafka cluster internal listener local MM2 instance to use
    externalListenerName: external # name of the Kafka cluster external listener remote MM2 instances to use
# list of non-Streaming Data Manager Kafka clusters to make MM2 deployments aware of.
nonSDMKafkaClusters:
  - bootstrapServerAddress: ec2-184-72-117-95.compute-1.amazonaws.com:9092 # the bootstrap server address from the Kafka cluster we're migrating from.
    alias: source-cluster # kafka cluster alias by which MM2 refers to this Kafka cluster as (for example, source-cluster).
mirrorMaker2Spec:
  istioControlPlane:
    name: # name of the IstioControlPlane where Streaming Data Manager is installed
    namespace: # namespace of the IstioControlPlane where Streaming Data Manager is installed
  # need to set both minReplicas and maxReplicas to 1 due to an upstream Kafka issue (see KIP-710 for more details)
  minReplicas: 1
  maxReplicas: 1

mirrorMaker2Properties: |-
  # replication topologies and flows, mm2 config, etc.
  source-cluster->target-cluster.enabled=true
  # keep the *-offset-syncs-* topic on the target side to avoid needing MirrorMaker2 to have write permissions on the source cluster
  source-cluster->target-cluster.offset-syncs.topic.location=target

  # set this to true if we have ACLs
  sync.topic.acls.enabled=false

  tasks.max=5000

  replication.policy.separator=
  source.cluster.alias=
  target.cluster.alias=

  topics.exclude=.*[\\-\\.]internal, .*\\.replica, __.*, _.*
  groups.exclude=console-consumer-.*, connect-.*, __.*, _.*

  # Setting the frequency to check source cluster for new topics and new consumer groups.
  refresh.topics.interval.seconds=300
  refresh.groups.interval.seconds=300

  # Setting replication factor of newly created remote topics
  # For anything other than development testing, a value greater than 1 is recommended to ensure availability.
  replication.factor=2

  ############################# MM2 Internal Topic Settings  #############################
  # For anything other than development testing, a value greater than 1 is recommended to ensure availability,
  checkpoints.topic.replication.factor=2
  heartbeats.topic.replication.factor=2
  offset-syncs.topic.replication.factor=2

  offset.storage.replication.factor=2
  status.storage.replication.factor=2
  config.storage.replication.factor=2

  # Only needed if the source cluster is using SSL
  source-cluster.security.protocol=SSL
  source-cluster.ssl.truststore.password={truststore password}
  source-cluster.ssl.truststore.location={truststore location}
  source-cluster.ssl.keystore.password={keystore password}
  source-cluster.ssl.keystore.location={keystore location}
  source-cluster.ssl.key.password={key password}
EOF

Note: If the source cluster has SSL enabled we need to set some parameters for MirrorMaker2 to be able to read from it. Those can be seen above. To make this work we need to create a configmap or a secret containing the truststore and the keystore and modify the created MirrorMaker2 deployment in K8S by mounting it as a volume.

Test the deployment

  1. If you do not already have some topics available on the source cluster, create a topic called testtopic by running the following command using the address of the listener you added:

    kafka-topics.sh --create --topic testtopic --bootstrap-server <listener-address>:<listener-port>
    
  2. Generate client certificates on the target cluster. The Kafka clusters run inside an Istio service mesh with TLS enabled. This means you need a client certificate to write messages to your topic. You can create client certificates manually using KafkaUser custom resources, or using the Streaming Data Manager web interface.

    The exact procedure of manually creating KafkaUser custom resources and extracting the generated certificates depends on how you are signing the client certificates. See the following examples for the csr-operator and cert-manager:

    • csr-operator:

      Create a Kafka user that the client application will use to identify itself. You can create the user manually using KafkaUser custom resources, or using the Streaming Data Manager web interface. Grant this user access to the topics it needs.

      kubectl create -f - <<EOF
      apiVersion: kafka.banzaicloud.io/v1alpha1
      kind: KafkaUser
      metadata:
        name: external-kafkauser
        namespace: default
      spec:
        clusterRef:
          name: kafka
          namespace: kafka
        secretName: external-kafkauser-secret
        pkiBackendSpec:
          pkiBackend: "k8s-csr"
          signerName: "csr.banzaicloud.io/privateca"
      EOF
      

      Note: By default, the certificate created for the Kafka user with the csr-operator is valid for 86400 seconds (1 day). To generate a certificate with a different validity, add the "csr.banzaicloud.io/certificate-lifetime" annotation to the KafkaUser CR spec. For example, the following CR creates a certificate valid for 604800 seconds (7 days) for the associated Kafka user:

      kubectl create -f - <<EOF
      apiVersion: kafka.banzaicloud.io/v1alpha1
      kind: KafkaUser
      metadata:
        name: external-kafkauser
        namespace: default
      spec:
        annotations:
          csr.banzaicloud.io/certificate-lifetime: "604800"
        clusterRef:
          name: kafka
          namespace: kafka
        secretName: external-kafkauser-secret
        pkiBackendSpec:
          pkiBackend: "k8s-csr"
          signerName: "csr.banzaicloud.io/privateca"
      EOF
      
      1. Export the public certificate of the CA.

        • If you are using the CSR operator:

          kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "chain.pem"}}' | base64 -D > /var/tmp/ca.crt
          
        • If you are using cert-manager:

          kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "ca.crt"}}' | base64 -D > /var/tmp/ca.crt 
          

        Alternatively, you can download the CA certificate and the client certificate from the Streaming Data Manager web interface.

      2. Export the client certificate stored in external-kafkauser-secret which represents Kafka user external-kafkauser:

        kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "tls.crt"}}' | base64 -D > /var/tmp/tls.crt
        kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "tls.key"}}' | base64 -D > /var/tmp/tls.key
        
      3. Use the exported client credentials and the CA certificate in your application to connect to the external listener of the Kafka cluster. (Otherwise, Istio automatically rejects the client application.) The following command is an example to connect with the kcat client application:

        kcat -L -b aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:29092 -X security.protocol=SSL -X ssl.key.location=/var/tmp/tls.key -X ssl.certificate.location=/var/tmp/tls.crt -X ssl.ca.location=/var/tmp/ca.crt
        

        Metadata for all topics (from broker -1: ssl://aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:29092/bootstrap): 2 brokers: broker 0 at aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:19090 (controller) broker 1 at aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:19091 1 topics: topic "example-topic" with 3 partitions: partition 0, leader 0, replicas: 0,1, isrs: 0,1 partition 1, leader 1, replicas: 1,0, isrs: 0,1 partition 2, leader 0, replicas: 0,1, isrs: 0,1

    • cert-manager:

      Create a Kafka user that the client application will use to identify itself. You can create the user manually using KafkaUser custom resources, or using the Streaming Data Manager web interface. Grant this user access to the topics it needs.

      kubectl create -f - <<EOF
      apiVersion: kafka.banzaicloud.io/v1alpha1
      kind: KafkaUser
      metadata:
        name: external-kafkauser
        namespace: default
      spec:
        clusterRef:
          name: kafka
          namespace: kafka
        secretName: external-kafkauser-secret
        pkiBackendSpec:
          pkiBackend: "cert-manager"
          issuerRef:
            name: "ca-issuer"
            kind: "ClusterIssuer"
      EOF
      

      Note: The certificate created for the Kafka user with cert-manager is valid for 90 days.

      1. Export the public certificate of the CA.

        • If you are using the CSR operator:

          kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "chain.pem"}}' | base64 -D > /var/tmp/ca.crt
          
        • If you are using cert-manager:

          kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "ca.crt"}}' | base64 -D > /var/tmp/ca.crt 
          

        Alternatively, you can download the CA certificate and the client certificate from the Streaming Data Manager web interface.

      2. Export the client certificate stored in external-kafkauser-secret which represents Kafka user external-kafkauser:

        kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "tls.crt"}}' | base64 -D > /var/tmp/tls.crt
        kubectl get secret external-kafkauser-secret -o 'go-template={{index .data "tls.key"}}' | base64 -D > /var/tmp/tls.key
        
      3. Use the exported client credentials and the CA certificate in your application to connect to the external listener of the Kafka cluster. (Otherwise, Istio automatically rejects the client application.) The following command is an example to connect with the kcat client application:

        kcat -L -b aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:29092 -X security.protocol=SSL -X ssl.key.location=/var/tmp/tls.key -X ssl.certificate.location=/var/tmp/tls.crt -X ssl.ca.location=/var/tmp/ca.crt
        

        Metadata for all topics (from broker -1: ssl://aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:29092/bootstrap): 2 brokers: broker 0 at aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:19090 (controller) broker 1 at aff4c6887766440238fb19c381779eae-1599690198.eu-north-1.elb.amazonaws.com:19091 1 topics: topic "example-topic" with 3 partitions: partition 0, leader 0, replicas: 0,1, isrs: 0,1 partition 1, leader 1, replicas: 1,0, isrs: 0,1 partition 2, leader 0, replicas: 0,1, isrs: 0,1

  3. Find the public addresses through which the Kafka cluster is exposed on both Kubernetes cluster:

    kubectl get service -n kafka meshgateway-external-kafka
    NAME                         TYPE           CLUSTER-IP      EXTERNAL-IP                                                                PORT(S)                                                           AGE
    meshgateway-external-kafka   LoadBalancer   10.10.171.247   a7c265ab846c14f8fa082773fee7c0da-2063154997.eu-north-1.elb.amazonaws.com   19090:31645/TCP,19091:32240/TCP,19092:32483/TCP,29092:30681/TCP   61m
    
    kubectl get service -n kafka meshgateway-external-kafka
    NAME                         TYPE           CLUSTER-IP      EXTERNAL-IP     PORT(S)                                                           AGE
    meshgateway-external-kafka   LoadBalancer   10.10.161.160   51.124.19.105   19090:32651/TCP,19091:31402/TCP,19092:30403/TCP,29092:31586/TCP   57m
    
  4. Write messages to testtopic on the source cluster, for example, running the built-in shell script of Kafka, and typing some test messages.

    kafka-console-producer.sh --topic testtopic --bootstrap-server <listener-address>:<listener-port>
    
    source message 1
    source message 2
    
  5. Check the replicated messages. Verify that MirrorMaker replicated messages from testtopic on the source cluster to the same topic on the target cluster. You can use the kcat tool to access the listener of the target cluster with the client certificate you have extracted:

    kcat -b <external-address-of-the-target-cluster>:29092 -X security.protocol=SSL -X ssl.ca.location=/var/tmp/ca.crt -X ssl.certificate.location=/var/tmp/tls.crt -X ssl.key.location=/var/tmp/tls.key -C -t testtopic -c2
    

    The messages in the topic should be displayed, for example:

    source message 1
    source message 2