Kafka Real-Time Forwarding Agent
The Kafka real-time forwarding agent sends messages to Kafka but has no configuration settings of its own. For this reason, it must be placed after an Analysis or Aggregation agent (i.e., an agent that allows you to write APL), which populates KafkaRecord UDRs with the messages.
The system does not automatically populate Kafka-specific fields such as the topic, message key, or value—you must set these manually using APL. The only default behavior is that the partition is set to 0 if not explicitly specified.
You must also handle the response from the Forwarding agent. It always returns a KafkaRecord, even if the message was successfully inserted into Kafka. You can check whether the insert failed by inspecting the errorMessage field: if it is not null, an error occurred. You can then add any appropriate error-handling logic. In the example below, the data is only validated, and no actions are taken beyond displaying the error for debugging purposes.
Workflow Example
Workflow Design
Agent | Configuration |
|---|---|
Pulse_1 | This is our “data simulator” agent used for demos and tests. It produces data at regular intervals. |
Analysis_1 | Populates the KafkaRecord. This is where you enter data to send, including topic, headers, etc. Nothing is populated automatically. |
Kafka_forwarding | Sends data to Kafka. |
Analysis_2 | Validates reply from Kafka to see if the insert was successful or not. |
Analysis Agent
In the first Analysis agent (Analysis_1), the following code is used to generate KafkaRecord UDRs, map the content of the input UDR, and specify the Kafka topic where the data should be sent.
consume {
kafka.KafkaRecord consRec = udrCreate(kafka.KafkaRecord);
consRec.value = input.Data;
consRec.topic = "example";
debug("Forwarding record to Kafka...");
udrRoute(consRec);
}In the second Analysis agent (Analysis_2), the following code validates if the insert was successful.
consume {
if (input.errorMessage != null) {
debug("Error...");
} else {
debug("Success");
}
}
Kafka Profile
The https://infozone.atlassian.net/wiki/x/lwDzEQ defines the broker to which the messages should be forwarded. You need to create the Kafka profile before it can be selected in the agent.