KafkaOffsetUDR

The KafkaOffsetUDR is used to determine an offset which is not at the beginning or the end of collection, and only applies if you have selected Start at requested in the Kafka Collection agent configuration dialog - see Kafka Real-time Collection Agent Configuration. The Kafka collection agent waits for the KafkaOffsetUDR and does not consume before the offset information. Messages can be persisted to a database or aggregation agent.


Note!

If you send a KafkaOffsetUDR from initialize without any content, messages are read from the beginning (from the first offset).

The following fields are included in the KafkaOffsetUDR:

FieldDescription

offsets (map<int,long>)

This field is populated with offset information. As offsets in Kafka are unique per partition, this maps partition identifiers (int) to an offset (long).
offsetsAsString (string)

This field contains a comma separated key-value list. It is available as a convenience for populating with offset information, as it provides a simple way to store the map with partitions and offset as a string. It reads and writes the same underlying data as the offsets (map<int,long>) field, so that changing the value of offsetsAsString (string) changes the value of offsets (map<int,long>) and vice versa.

Example

An example of two partitions as string type:
1=2,2=3


Example of KafkaOffsetUDR 

In this example the Kafka collection agent has been configured to start at requested offset. To ensure that the offsets are updated in ascending order, it is recommended that you configure the outgoing route from the Kafka collection agent to be Synchronous.

Note!

If you configure the outgoing route from the Kafka Collection agent to be Asynchronous (Default), you must use different APL logic from the example below so that you only save the highest offset values for each partition.


Example workflow with KafkaOffsetUDR


Example - APL code in a workflow using KafkaOffsetUDR

An example of how the APL code may be in the Analysis Agent:

kafka.KafkaOffsetUDR  out;
string topic;

initialize {
    //get offsets from DB and send to collector
    topic = (string)mimGet("Kafka_1", "Topic");
    table res = sqlPrepSelect("select offsets from kafka_offset where id = ?", topic, "Default.Postg");
    string offsetsFromDB = (string)tableGet(res, 0, "offsets");
    out = udrCreate(kafka.KafkaOffsetUDR);
    out.offsetsAsString = offsetsFromDB;
    udrRoute(out);
 }
 
consume {
   
    //keep track of current offsets
    mapSet(out.offsets, input.partition, input.offset +1);
 }
deinitialize {
    //update db with new offsets
    sqlPrepUpdate("update kafka_offset set offsets = ? where id = ?", out.offsetsAsString, topic,"Default.Postg");
 }