Kafka Batch Collection Agent

The Kafka batch collection agent consumes messages from Kafka. A batch will be forwarded when the configured batch size has been reached, or when the hardcoded timeout of 5 seconds has been reached.

Note!

The workflow will remain running when all messages have been consumed and will wait for more messages. You can stop the workflow from Desktop Online, mzcli commands, or the operations REST interface. The stop will be delayed for 5 seconds.

Workflow Example

A simple workflow with a Kafka batch collection agent can look like this:

kafkaBatchColl_wf.png
Example workflow with a Kafka batch collection agent 

This workflow example has been created as follows:

Workflow Design

Create the workflow with the following agents:

Agent

Configuration

Kafka

Collects messages from Kafka. Define the size of the files forwarded by the Disk forwarding agent with the Batch Size setting.

Analysis

Receives KafkaRecord UDRs and creates output UDRs based on the contents of the input UDR. An offset is used to create a unique id.

Encoder

Encodes the data to the format the files will be forwarded in.

Disk

Creates files. The size of the files is configured in the Kafka collector agent.

Kafka Collector

Configure the Kafka collector agent to batch up the collected messages in groups of 100.

kafka_batch_collection.png
Kafka batch collection configuration with Batch Size set to 100

Kafka Profile

The Kafka profile defines the broker from which you want to collect data and you must have created it before you can select it in the Execution tab of the Workflow Properties.

Analysis Agent

Configure the Analysis agent to create the output UDR and then map the contents of the Kafka message. A unique id is created using the offset from the input UDR.

consume { Default.UFL_test.test_TI myUDR = udrCreate(Default.UFL_test.test_TI); myUDR.offSet = (string)input.offset; myUDR.value = baToStr(input.value); udrRoute(myUDR); }

 

Â