KafkaOffsetUDR
The KafkaOffsetUDR
is used to determine an offset which is not at the beginning or the end of collection, and only applies if you have selected Start at requested in the Kafka Collection agent configuration dialog - see Kafka Real-time Collection Agent Configuration. The Kafka collection agent waits for the KafkaOffsetUDR
and does not consume before the offset information. Messages can be persisted to a database or aggregation agent.
Note!
If you send a KafkaOffsetUDR
from initialize
without any content, messages are read from the beginning (from the first offset).
The following fields are included in the KafkaOffsetUDR
:
Field | Description |
---|---|
offsets (map<int,long>) | This field is populated with offset information. As offsets in Kafka are unique per partition, this maps partition identifiers (int) to an offset (long). |
offsetsAsString (string) | This field contains a comma separated key-value list. It is available as a convenience for populating with offset information, as it provides a simple way to store the map with partitions and offset as a string. It reads and writes the same underlying data as the Example An example of two partitions as string type: |
Example of KafkaOffsetUDR
In this example the Kafka collection agent has been configured to start at requested offset. To ensure that the offsets are updated in ascending order, it is recommended that you configure the outgoing route from the Kafka collection agent to be Synchronous.
Note!
If you configure the outgoing route from the Kafka Collection agent to be Asynchronous (Default), you must use different APL logic from the example below so that you only save the highest offset values for each partition.
Example workflow with KafkaOffsetUDR
Example - APL code in a workflow using KafkaOffsetUDR
An example of how the APL code may be in the Analysis Agent:
kafka.KafkaOffsetUDR out; string topic; initialize { //get offsets from DB and send to collector topic = (string)mimGet("Kafka_1", "Topic"); table res = sqlPrepSelect("select offsets from kafka_offset where id = ?", topic, "Default.Postg"); string offsetsFromDB = (string)tableGet(res, 0, "offsets"); out = udrCreate(kafka.KafkaOffsetUDR); out.offsetsAsString = offsetsFromDB; udrRoute(out); } consume { //keep track of current offsets mapSet(out.offsets, input.partition, input.offset +1); } deinitialize { //update db with new offsets sqlPrepUpdate("update kafka_offset set offsets = ? where id = ?", out.offsetsAsString, topic,"Default.Postg"); }