Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Example of Kafka workflows

Service Context

If you choose to use embedded Kafka, you use the Kafka and Zookeeper embedded in the Service Contexts. Service Contexts are used as a convenient way of configuring and running Kafka and Zookeeper clusters embedded “inside” to provide more flexibility  to manage these services using. For further information on Service Context, see 1. System Overview in the System Administrator's Guide.

Note
titleNote

If the platform is restarted, you must also restart the configured services using the following command:

Code Block
languagetext
themeEclipse
$ mzsh service restart


Controlled Shutdown of Embedded Kafka

If you require to shutdown embedded Kafka, you must first shutdown the Service Contexts used for Kafka and then shutdown the ones used by Zookeeper, as shown below. If you shutdown Zookeeper Service Contexts first, a controlled shutdown of Kafka is not possible.

Code Block
languagetext
themeEclipse
$ mzsh shutdown sc1 sc2 sc3
$ mzsh shutdown zk1 zk2 zk3

Scaling

Using Kafka provides the capability to scale as required. One of the many ways to scale a Kafka cluster is when you create your Kafka configuration. It is recommended that when creating your Kafka configuration, you consider how many partitions you may eventually require and add more than you may  currently require,  as this will make it easier to scale up at a later stage . If necessary, you can add partitions later on using the kafka --alter option but it is a more complicated process. For information on how to use the  kafka --alter  option, see  2.2.11 kafka in the MZSH Command Line Tool User's Guide.

Example of Scaling a Kafka Cluster

You can also refer to http://kafka.apache.org for guidance on scaling using partitions.

To Add a Broker

If you want to add a broker to a cluster, take the following steps:

  1. To add a broker, you are required to add an SC to the kafka configuration. If you require to create a new SC, see 2.4.1 Creating Pico Configurations. If you already have another SC that you can add, skip to step 2.
     

  2. Ensure that the SC has the property mz.servicehost.natures set to default, and that the property mz.servicehost.port.range does not conflict with the port range of any existing SC.
    You can use the following commands to set these properties if necessary:

    Code Block
    languagetext
    themeEclipse
    $ mzsh topo set pico:<sc>/val:config.properties.mz.servicehost.natures default
    $ mzsh topo set pico:<sc>/val:config.properties.mz.servicehost.port.range <available port range>


  3. Add a new SC and brokerid to the kafka configuration in the custom.conf, under deployment-info in the directory $MZ_HOME/common/config/cell/default/master/servicesSee the example below to add sc4 and the corresponding brokerid to the kafka customized service:

    Info
    titleExample - Adding a broker and an SC to deployment-info


    Code Block
    languagetext
    themeEclipse
    kafka {
        kafka1 {
            config {
                deployment-info=[
                    {
                        brokerid=1
                        sc=sc1
                    },
                    {
                        brokerid=2
                        sc=sc2
                    },
                    {
                        brokerid=3
                        sc=sc3
                    },
                    {
                        brokerid=4
                        sc=sc4
                    }
                ]
                zookeeper=zookeeper1
            }
            start-after=[
                "zookeeper/zookeeper1"
            ]
            template="1/custom/basic"
        }
    }



  4. Use the command mzsh topo command to activate the modification to the custom.conf:

    Code Block
    languagetext
    themeEclipse
    $ mzsh topo activate


  5. Restart the Platform and all the SCs:

    Code Block
    languagetext
    themeEclipse
    $ mzsh restart platform
    $ mzsh system restart


  6. Run the command mzsh service start -s custom.
     
  7. To reassign partitions to the new broker, use the kafka-reassign-partitions.sh script delivered with Kafka. For further information on Kafka tools, see the section below, Kafka Tools.

Kafka Tools

All external tools delivered with Kafka (see http://kafka.apache.org) can be used by referring to the embedded Zookeeper service instance as

Code Block
languagetext
themeEclipse
-- zookeeper <IP>:<port>/kafka/<service id>


ItemDescription
<IP>:<port>

The IP and port of a Zookeeper node in the cluster. To find the connection details to all Zookeeper nodes, run the mzsh command service dump.

Info
titleExample of service dump output

"zookeeper" : {
    "zookeeper1" : {
        "clientinfo" : {
            "clientinfo" : "10.46.124.46:5653,10.46.124.46:5703,10.46.124.46:5753"
        },


<service id>

The service id is the Kafka service name defined in custom.conf.


Info
titleExample of custom.conf showing a service id




Info
titleExample of reference to embedded Zookeeper service instance


Code Block
languagetext
themeEclipse
bin/kafka-topics.sh --zookeeper 10.46.124.46:5653/kafka/kafka1 --list

is the equivalent of

Code Block
languagetext
themeEclipse
mzsh mzadmin/dr kafka --service-key kafka1 --list


For information on how to use the different tools delivered with Kafka, see http://kafka.apache.org.

Authentication with Kerberos and Encryption with SSL

When using Kafka client version 0.9 and later, you can configure the Kafka profile to use authentication via Kerberos and SSL encryption. Supported properties for this are available in the Producer and Consumer tabs in the Kafka profile. See https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html for further information on how to configure this.


...