Amazon SQS Collection Agent
The SQS Collection Agent consumes messages from Amazon Simple Queue Service. After collection, the agent can carry out the following operations in the queue:
Delete the message.
Set a new visibility timer (resets the period after which the message becomes available again in the queue.)
To carry out any of these operations you need an Analysis agent that configures the reply and sends it back to the collector. If you want to keep the original behavior of the queue, do not send any replies back.
In the workflow example below, messages are collected from SQS and forwarded to Disk. A reply is sent back to either delete the message once it’s been successfully processed or, reset the visibility timer for selected messages.
The following procedure shows the key configurations for creating the above workflow.
Workflow design
Create the workflow with the following agents:
Agent | Configuration |
---|---|
Aws SQS | Collects messages from Amazon SQS Service. |
Analysis | Receives UDRs of type sqsCollectorCycleUDR and validates them. A reply is sent back to the collector for a select set of messages. All messages are forwarded to the next node in the workflow. |
Encoder | Encodes the data to the format of the forwarded files |
Disk | Creates files. The size of the files is set in the collector. |
Aws SQS Collector configuration
Update the SQS Collector configuration as required.
Field | Description |
---|---|
Max messages to receive | The maximum number of messages that can be received from the AWS SQS queue. The range can be set from 1-10. |
Poll time (seconds) | The interval of time before the agent checks for new messages. The range can be set from 0-20. |
Standard attributes | The standard attributes that you want to include in the message collection. The default value is 'All'. |
Here is an example SQS Collection Agent configuration:
Analysis
Configure the Analysis agent to send all collected messages to the Encoder and check if a message belongs to a specific group id. The validity parameter will be reset in the queue for these messages.
consume {
// Reset the visibility for messages in message group id "Group_B"
if(mapGet(input.standardAttributes,"MessageGroupId")=="Group_B"){
input.visibility = 1;
udrRoute(input, "r_2");
}
// Forward all the messages to the Encoder
udrRoute(input, "r_3");
}
If a reply is sent back to the collector, the standard behavior of the queue is reset:
If the validity field of the UDR has been updated, the validity parameter is changed to this value in the queue.
If the UDR has not been changed, the message is deleted in the queue (without waiting for the preset validity period).