The KafkaOffsetUDR is used to determine an offset which is not at the beginning or the end of collection, and only applies if you have selected Start at requested in the Kafka Collection agent configuration dialog - see 9.4849.6.1 Kafka Real-time Collection Agent Configuration. The Kafka collection agent waits for the KafkaOffsetUDR and does not consume before the offset information. Messages can be persisted to a database or aggregation agent.
Note
title
Note!
If you send a KafkaOffsetUDR from initialize without any content, messages are read from the beginning (from the first offset).
The following fields are included in the KafkaOffsetUDR:
Field
Description
offsets (map<int,long>)
This field is populated with offset information. As offsets in Kafka are unique per partition, this maps partition identifiers (int) to an offset (long).
offsetsAsString (string)
This field contains a comma separated key-value list. It is available as a convenience for populating with offset information, as it provides a simple way to store the map with partitions and offset as a string. It reads and writes the same underlying data as the offsets (map<int,long>) field, so that changing the value of offsetsAsString (string) changes the value of offsets (map<int,long>) and vice versa.
Info
title
Example
An example of two partitions as string type: 1=2,2=3
Example of KafkaOffsetUDR
In this example the Kafka collection agent has been configured to start at requested offset. To ensure that the offsets are updated in ascending order, it is recommended that you configure the outgoing route from the Kafka collection agent to be Synchronous.
Note
title
Note!
If you configure the outgoing route from the Kafka Collection agent to be Asynchronous (Default), you must use different APL logic from the example below so that you only save the highest offset values for each partition.
Example workflow with KafkaOffsetUDR
Info
title
Example - APL code in a workflow using KafkaOffsetUDR
An example of how the APL code may be in the Analysis Agent:
Code Block
language
text
theme
Eclipse
initialize {
//get offsets from DB and send to collector
topic = (string)mimGet("Kafka_Collector_1", "Topic");
table res = sqlPrepSelect("select offsets from kafka_offset where id = ?", topic, "mySQL.mySQLdb");
offsetsFromDB = (string)tableGet(res, 0, "offsets");
out = udrCreate(kafka.KafkaOffsetUDR);
out.offsetsAsString = offsetsFromDB;
udrRoute(out);
}
consume {
...
//keep track of current offsets
mapSet(out.offsets, input.partition, input.offset +1);
}
deinitialize {
//update db with new offsets
sqlPrepUpdate("update kafka_offset set offsets = ? where id = ?", out.offsetsAsString, topic,"mySQL.mySQLdb");
}