Kafka Batch Collection Agent

Kafka Batch Collection Agent

The Kafka batch collection agent consumes messages from Kafka. A batch will be forwarded when the configured batch size is reached or when the hardcoded timeout of 5 seconds is reached.

  • For topics written transactionally, the agent ensures that only committed messages are read (using the Kafka property isolation.level=read_committed, see Kafka Documentation).

  • For topics written non-transactionally, the agent reads all messages.

Note!
The workflow will remain running when all messages have been consumed and will wait for more messages. You can stop the workflow from Desktop, mzsh/mzcli commands, or the operations REST interface. The stop will be delayed for 5 seconds.

Workflow Example

A simple workflow with a Kafka batch collection agent can look like this:

kafkaBatchColl_wf.png
Example workflow with a Kafka batch collection agent 

This workflow example has been created as follows:

Workflow Design

Create the workflow with the following agents:

Agent

Configuration

Kafka

Collects messages from Kafka. Define the size of the files forwarded by the Disk forwarding agent with the Batch Size setting.

Analysis

Receives KafkaRecord UDRs and creates output UDRs based on the contents of the input UDR. An offset is used to create a unique id.

Encoder

Encodes the data to the format the files will be forwarded in.

Disk

Creates files. The size of the files is configured in the Kafka collector agent.

Kafka Collector

Configure the Kafka collector agent to batch up the collected messages in groups of 100.

kafka_batch_collection.png
Kafka batch collection configuration with Batch Size set to 100

Kafka Profile

The Kafka profile defines the broker from which you want to collect data and you must have created it before you can select it in the Execution tab of the Workflow Properties.

kafkaWF_prop.png
Workflow Properties with Kafka profile selected

Analysis Agent

Configure the Analysis agent to create the output UDR and then map the contents of the Kafka message. A unique id is created using the offset from the input UDR.

consume { Default.UFL_test.test_TI myUDR = udrCreate(Default.UFL_test.test_TI); myUDR.offSet = (string)input.offset; myUDR.value = baToStr(input.value); udrRoute(myUDR); }