Amazon SQS Forwarding Agent

Private_Edition_flag.png

The SQS Forwarding Agent forwards data to Amazon Simple Queue Service. The agent operates in cycles of three steps:

  1. Consumes UDRs of type sqsForwarderCycleUDR, meaning that the agent needs a preceding Analysis agent that creates and populates these UDRs.

  2. Sends the UDR contents to Amazon SQS.

  3. Awaits replies from Amazon SQS, then forwards the response to an Analysis agent. The response is of type sqsForwarderCycleUDR. If the field error is populated, then the insert failed.

In the example workflow below, data is collected from files, mapped to sqsForwarderCycleUDR, and sent to Amazon SQS. UDRs that are not accepted by Amazon SQS will be forwarded to an Inter Workflow agent.

sqsForwardingWF.png
Workflow: Collecting from files and forwarding to Amazon SQS.

The following procedure shows the key configurations for creating the above workflow

Workflow design

Create the workflow with the following agents:

Agent

Configuration

Disk

Collects files with data that shall be forwarded to SQS.

Analysis 1

Creates UDRs of type sqsForwarderCycleUDR and populates them using the collected data.

Aws SQS Forwarder

Sends data to SQS.

Analysis 2

Validates and handles potential errors from failed attempts to forward data.

Inter Workflow

Sends UDRs that could not be sent to SQS to Inter Workflow storage.

Analysis_1

Configure the Analysis _1 Agent to create a sqsForwardCycleUDRs and map its contents from the collected UDRs. See the below example.

import ultra.amazon_sqs.ufdl_ultra; consume { if (instanceOf(input,asciiSEQ_TI)){ asciiSEQ_TI ip = (asciiSEQ_TI) input; Sqs.SqsForwarderCycleUDR out = udrCreate(Sqs.SqsForwarderCycleUDR); out.msgBody = ip.message ; out.queueName = ip.queuename; //Group id and deduplication id must be updated when sending to a fifo queue. if (strEndsWith(ip.queuename,"fifo")){ out.msgGroupId = "Group_A"; out.deduplicationId = (string)ip.seqNum; } //custom attributes map<string,Sqs.SqsMessageAttributeUDR> attrMap = mapCreate( string, Sqs.SqsMessageAttributeUDR); Sqs.SqsMessageAttributeUDR dataValue = udrCreate(Sqs.SqsMessageAttributeUDR) ; dataValue.value = "example"; dataValue.dataType = "String"; mapSet( attrMap, "one", dataValue ); Sqs.SqsMessageAttributeUDR dataValue2 = udrCreate(Sqs.SqsMessageAttributeUDR) ; dataValue2.value = "215"; dataValue2.dataType = "Number"; mapSet( attrMap, "two",dataValue2 ); bytearray myBA = baCreate( 1 ); baSet( myBA, 0, 72 ); Sqs.SqsMessageAttributeUDR dataValue3 = udrCreate(Sqs.SqsMessageAttributeUDR) ; dataValue3.value = myBA; dataValue3.dataType = "Binary"; mapSet( attrMap, "three",dataValue3 ); out.msgAttribute = attrMap; debug (out); udrRoute(out); } }

Analysis_2

Configure the Analysis_2 Agent to handle potential errors that have occurred when forwarding sqsForwardCycleUDRs to Amazon SQS. All records will be routed as replies from the SQS forwarding agent. Erroneous records will populate the “error” field of sqsForwardCycleUDR. For successfully forwarded UDRs, the “error” field will be null.

 

consume { if(input.error != null){ logWarning("UDR could not be sent to Amazon SQS."); udrRoute(input); debug(input); } }