Amazon SQS Forwarding Agent
The SQS Forwarding Agent forwards data to Amazon Simple Queue Service. The agent operates in cycles of three steps:
Consumes UDRs of type
sqsForwarderCycleUDR
, meaning that the agent needs a preceding Analysis agent that creates and populates these UDRs.Sends the UDR contents to Amazon SQS.
Awaits replies from Amazon SQS, then forwards the response to an Analysis agent. The response is of type
sqsForwarderCycleUDR
. If the field error is populated, then the insert failed.
In the example workflow below, data is collected from files, mapped to sqsForwarderCycleUDR
, and sent to Amazon SQS. UDRs that are not accepted by Amazon SQS will be forwarded to an Inter Workflow agent.
The following procedure shows the key configurations for creating the above workflow
Workflow design
Create the workflow with the following agents:
Agent | Configuration |
Disk | Collects files with data that shall be forwarded to SQS. |
Analysis 1 | Creates UDRs of type |
Aws SQS Forwarder | Sends data to SQS. |
Analysis 2 | Validates and handles potential errors from failed attempts to forward data. |
Inter Workflow | Sends UDRs that could not be sent to SQS to Inter Workflow storage. |
Analysis_1
Configure the Analysis _1 Agent to create a sqsForwardCycleUDRs
and map its contents from the collected UDRs. See the below example.
import ultra.amazon_sqs.ufdl_ultra;
consume {
if (instanceOf(input,asciiSEQ_TI)){
asciiSEQ_TI ip = (asciiSEQ_TI) input;
Sqs.SqsForwarderCycleUDR out = udrCreate(Sqs.SqsForwarderCycleUDR);
out.msgBody = ip.message ;
out.queueName = ip.queuename;
//Group id and deduplication id must be updated when sending to a fifo queue.
if (strEndsWith(ip.queuename,"fifo")){
out.msgGroupId = "Group_A";
out.deduplicationId = (string)ip.seqNum;
}
//custom attributes
map<string,Sqs.SqsMessageAttributeUDR> attrMap = mapCreate( string, Sqs.SqsMessageAttributeUDR);
Sqs.SqsMessageAttributeUDR dataValue = udrCreate(Sqs.SqsMessageAttributeUDR) ;
dataValue.value = "example";
dataValue.dataType = "String";
mapSet( attrMap, "one", dataValue );
Sqs.SqsMessageAttributeUDR dataValue2 = udrCreate(Sqs.SqsMessageAttributeUDR) ;
dataValue2.value = "215";
dataValue2.dataType = "Number";
mapSet( attrMap, "two",dataValue2 );
bytearray myBA = baCreate( 1 );
baSet( myBA, 0, 72 );
Sqs.SqsMessageAttributeUDR dataValue3 = udrCreate(Sqs.SqsMessageAttributeUDR) ;
dataValue3.value = myBA;
dataValue3.dataType = "Binary";
mapSet( attrMap, "three",dataValue3 );
out.msgAttribute = attrMap;
debug (out);
udrRoute(out);
}
}
Analysis_2
Configure the Analysis_2 Agent to handle potential errors that have occurred when forwarding sqsForwardCycleUDRs
to Amazon SQS. All records will be routed as replies from the SQS forwarding agent. Erroneous records will populate the “error” field of sqsForwardCycleUDR
. For successfully forwarded UDRs, the “error” field will be null.
consume {
if(input.error != null){
logWarning("UDR could not be sent to Amazon SQS.");
udrRoute(input);
debug(input);
}
}