Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


Note
titleNote!

If you send a KafkaOffsetUDR from initialize without any content, messages are read from the beginning (from the first offset).

The following fields are included in the KafkaOffsetUDR:

FieldDescription

offsets (map<int,long>)

This field is populated with offset information. As offsets in Kafka are unique per partition, this maps partition identifiers (int) to an offset (long).
offsetsAsString (string)

This field contains a comma separated key-value list. It is available as a convenience for populating with offset information, as it provides a simple way to store the map with partitions and offset as a string. It reads and writes the same underlying data as the offsets (map<int,long>) field, so that changing the value of offsetsAsString (string) changes the value of offsets (map<int,long>) and vice versa.

Info
titleExample

An example of two partitions as string type:
1=2,2=3


Example of KafkaOffsetUDR 

In this example the Kafka collection agent has been configured to start at requested offset. To ensure that the offsets are updated in ascending order, it is recommended that you configure the outgoing route from the Kafka collection agent to be Synchronous.

Note
titleNote!

If you configure the outgoing route from the Kafka Collection agent to be Asynchronous (Default), you must use different APL logic from the example below so that you only save the highest offset values for each partition.


Example workflow with KafkaOffsetUDR

Info
titleExample - APL code in a workflow using KafkaOffsetUDR

An example of how the APL code may be in the Analysis Agent:

Code Block
languagetext
themeEclipse
initialize { 
    //get offsets from DB and send to collector 
    topic = (string)mimGet("Kafka_Collector_1", "Topic"); 
    table res = sqlPrepSelect("select offsets from kafka_offset where id = ?", topic, "mySQL.mySQLdb"); 
    offsetsFromDB = (string)tableGet(res, 0, "offsets"); 
    out = udrCreate(kafka.KafkaOffsetUDR); 
    out.offsetsAsString = offsetsFromDB; 
    udrRoute(out); 
 }
consume { 
    ...  
    //keep track of current offsets 
    mapSet(out.offsets, input.partition, input.offset +1); 
 }
deinitialize { 
    //update db with new offsets 
    sqlPrepUpdate("update kafka_offset set offsets = ? where id = ?", out.offsetsAsString, topic,"mySQL.mySQLdb"); 
 }




...