The Kafka agents enable you to configure workflows in with improved scalability and fault tolerance. As part of the data collection, data is written to Kafka to secure it from being lost if a failure occurs, and each topic can be set up to be replicated across several servers.
Example of Kafka workflows
Service Context
If you choose to use embedded Kafka, you use the Kafka and Zookeeper embedded in the Service Contexts. Service Contexts are used as a convenient way of configuring and running Kafka and Zookeeper clusters embedded “inside” to provide more flexibility to manage these services using . For further information on Service Context, see 1. System Overview in the System Administrator's Guide.
Note
If the platform is restarted, you must also restart the configured services using the following command:
$ mzsh service restart
Controlled Shutdown of Embedded Kafka
If you require to shutdown embedded Kafka, you must first shutdown the Service Contexts used for Kafka and then shutdown the ones used by Zookeeper, as shown below. If you shutdown Zookeeper Service Contexts first, a controlled shutdown of Kafka is not possible.
$ mzsh shutdown sc1 sc2 sc3 $ mzsh shutdown zk1 zk2 zk3
Scaling
Using Kafka provides the capability to scale as required. One of the many ways to scale a Kafka cluster is when you create your Kafka configuration. It is recommended that when creating your Kafka configuration, you consider how many partitions you may eventually require and add more than you may currently require, as this will make it easier to scale up at a later stage . If necessary, you can add partitions later on using the kafka --alter
option but it is a more complicated process. For information on how to use the kafka --alter
option, see 2.2.11 kafka in the MZSH Command Line Tool User's Guide.
Example of Scaling a Kafka Cluster
You can also refer to http://kafka.apache.org for guidance on scaling using partitions.
To Add a Broker
If you want to add a broker to a cluster, take the following steps:
To add a broker, you are required to add an SC to the kafka configuration. If you require to create a new SC, see 2.4.1 Creating Pico Configurations. If you already have another SC that you can add, skip to step 2.
Ensure that the SC has the property
mz.servicehost.natures
set todefault
, and that the propertymz.servicehost.port.range
does not conflict with the port range of any existing SC.
You can use the following commands to set these properties if necessary:$ mzsh topo set pico:<sc>/val:config.properties.mz.servicehost.natures default $ mzsh topo set pico:<sc>/val:config.properties.mz.servicehost.port.range <available port range>
Add a new SC and brokerid to the kafka configuration in the
custom.conf
, underdeployment-info in the directory $MZ_HOME/common/config/cell/default/master/services
. See the example below to add sc4 and the corresponding brokerid to the kafka customized service:Example - Adding a broker and an SC to deployment-info
kafka { kafka1 { config { deployment-info=[ { brokerid=1 sc=sc1 }, { brokerid=2 sc=sc2 }, { brokerid=3 sc=sc3 }, { brokerid=4 sc=sc4 } ] zookeeper=zookeeper1 } start-after=[ "zookeeper/zookeeper1" ] template="1/custom/basic" } }
Use the command mzsh topo command to activate the modification to the
custom.conf
:$ mzsh topo activate
Restart the Platform and all the SCs:
$ mzsh restart platform $ mzsh system restart
- Run the command
mzsh service start -s custom
.
- To reassign partitions to the new broker, use the
kafka-reassign-partitions.sh
script delivered with Kafka. For further information on Kafka tools, see the section below, Kafka Tools.
Kafka Tools
All external tools delivered with Kafka (see http://kafka.apache.org) can be used by referring to the embedded Zookeeper service instance as
-- zookeeper <IP>:<port>/kafka/<service id>
Item | Description |
---|---|
<IP>:<port> | The IP and port of a Zookeeper node in the cluster. To find the connection details to all Zookeeper nodes, run the mzsh command Example of service dump output
|
<service id> | The service id is the Kafka service name defined in Example of custom.conf showing a service id |
Example of reference to embedded Zookeeper service instance
bin/kafka-topics.sh --zookeeper 10.46.124.46:5653/kafka/kafka1 --list
is the equivalent of
mzsh mzadmin/dr kafka --service-key kafka1 --list
For information on how to use the different tools delivered with Kafka, see http://kafka.apache.org.
Authentication with Kerberos and Encryption with SSL
When using Kafka client version 0.9 and later, you can configure the Kafka profile to use authentication via Kerberos and SSL encryption. Supported properties for this are available in the Producer and Consumer tabs in the Kafka profile. See https://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.html for further information on how to configure this.