9.48.2 Kafka Agents Overview

The Kafka agents enable you to configure workflows with improved scalability and fault tolerance. As part of the data collection, data is written to Kafka to secure it from being lost if a failure occurs, and each topic can be set up to be replicated across several servers.

Example of Kafka workflows

Service Context

If you choose to use embedded Kafka, you use the Kafka and Zookeeper embedded in the Service Contexts. Service Contexts are used as a convenient way of configuring and running Kafka and Zookeeper clusters embedded “inside”  to provide more flexibility  to manage these services using . For further information on Service Context, see 1. System Overview in the System Administrator's Guide.

Note

If the platform is restarted, you must also restart the configured services using the following command:

$ mzsh service restart

Controlled Shutdown of Embedded Kafka

If you require to shutdown embedded Kafka, you must first shutdown the Service Contexts used for Kafka and then shutdown the ones used by Zookeeper, as shown below. If you shutdown Zookeeper Service Contexts first, a controlled shutdown of Kafka is not possible.

$ mzsh shutdown sc1 sc2 sc3
$ mzsh shutdown zk1 zk2 zk3

Scaling

Using Kafka provides the capability to scale as required. One of the many ways to scale a Kafka cluster is when you create your Kafka configuration. It is recommended that when creating your Kafka configuration, you consider how many partitions you may eventually require and add more than you may  currently require,  as this will make it easier to scale up at a later stage . If necessary, you can add partitions later on using the kafka --alter option but it is a more complicated process. For information on how to use the  kafka --alter  option, see  2.2.11 kafka in the MZSH Command Line Tool User's Guide.

Example of Scaling a Kafka Cluster

You can also refer to http://kafka.apache.org for guidance on scaling using partitions.

To Add a Broker

If you want to add a broker to a cluster, take the following steps:

  1. To add a broker, you are required to add an SC to the kafka configuration. If you require to create a new SC, see 2.4.1 Creating Pico Configurations. If you already have another SC that you can add, skip to step 2.
     

  2. Ensure that the SC has the property mz.servicehost.natures set to default, and that the property mz.servicehost.port.range does not conflict with the port range of any existing SC.
    You can use the following commands to set these properties if necessary:

    $ mzsh topo set pico:<sc>/val:config.properties.mz.servicehost.natures default
    $ mzsh topo set pico:<sc>/val:config.properties.mz.servicehost.port.range <available port range>
  3. Add a new SC and brokerid to the kafka configuration in the custom.conf, under deployment-info in the directory $MZ_HOME/common/config/cell/default/master/servicesSee the example below to add sc4 and the corresponding brokerid to the kafka customized service:

    Example - Adding a broker and an SC to deployment-info

    kafka {
        kafka1 {
            config {
                deployment-info=[
                    {
                        brokerid=1
                        sc=sc1
                    },
                    {
                        brokerid=2
                        sc=sc2
                    },
                    {
                        brokerid=3
                        sc=sc3
                    },
                    {
                        brokerid=4
                        sc=sc4
                    }
                ]
                zookeeper=zookeeper1
            }
            start-after=[
                "zookeeper/zookeeper1"
            ]
            template="1/custom/basic"
        }
    }


  4. Use the command mzsh topo command to activate the modification to the custom.conf:

    $ mzsh topo activate
  5. Restart the Platform and all the SCs:

    $ mzsh restart platform
    $ mzsh system restart
  6. Run the command mzsh service start -s custom.
     
  7. To reassign partitions to the new broker, use the kafka-reassign-partitions.sh script delivered with Kafka. For further information on Kafka tools, see the section below, Kafka Tools.

Kafka Tools

All external tools delivered with Kafka (see http://kafka.apache.org) can be used by referring to the embedded Zookeeper service instance as

-- zookeeper <IP>:<port>/kafka/<service id>
ItemDescription
<IP>:<port>

The IP and port of a Zookeeper node in the cluster. To find the connection details to all Zookeeper nodes, run the mzsh command service dump.

Example of service dump output

"zookeeper" : {
    "zookeeper1" : {
        "clientinfo" : {
            "clientinfo" : "10.46.124.46:5653,10.46.124.46:5703,10.46.124.46:5753"
        },

<service id>

The service id is the Kafka service name defined in custom.conf.


Example of custom.conf showing a service id


Example of reference to embedded Zookeeper service instance

bin/kafka-topics.sh --zookeeper 10.46.124.46:5653/kafka/kafka1 --list

is the equivalent of

mzsh mzadmin/dr kafka --service-key kafka1 --list

For information on how to use the different tools delivered with Kafka, see http://kafka.apache.org.

Authentication with Kerberos and Encryption with SSL

When using Kafka client version 0.9 and later, you can configure the Kafka profile to use authentication via Kerberos and SSL encryption. Supported properties for this are available in the Producer and Consumer tabs in the Kafka profile. See https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html for further information on how to configure this.