...
A simple Ultra Format needs to be created for the incoming UDRs. For more information about the Ultra Format Editor and the UFDL syntax, refer to the Ultra Format Management User's Guide.
Info |
---|
|
Create an Ultra Format as defined below: Code Block |
---|
| external BOOK_HEADER : identified_by(strREContains(HEADER, "title,name,organization,copyrightYear")), terminated_by(0xA)
{
ascii HEADER : terminated_by(0xA);
};
external BookRecord
{
ascii title : terminated_by(",");
ascii authorName : terminated_by(",");
ascii organization : terminated_by(",");
ascii copyrightYearString : terminated_by(",");
ascii numberOfPages : terminated_by(0xA);
};
internal BookRecord
{
string title;
string authorName;
string organization;
string copyrightYearString;
int numberOfPages;
// enriched
date copyrightYear;
};
// decoder
in_map BOOK_HEADER_InMap : external(BOOK_HEADER), target_internal(BOOK_HEADER), discard_output { automatic; };
in_map BookRecord_InMap : external(BookRecord), internal(BookRecord) { automatic; };
decoder BOOK_HEADER_Decode : in_map(BOOK_HEADER_InMap);
decoder BookRecord_Decode : in_map(BookRecord_InMap);
decoder DECODER { decoder BOOK_HEADER_Decode; decoder BookRecord_Decode *; };
// encoder
out_map BookRecord_OutMap : external(BookRecord), internal(BookRecord) { automatic; };
encoder ENCODER : out_map(BookRecord_OutMap); |
|
...
The Parquet Profile is used to define the Schema as well as define advanced properties for encoding. See .2.1 Parquet Profile Configuration for information on how to open the Parquet Profile editor.
Profile - Schema Tab
Image RemovedImage Added
Profile Configuration Example - Schema Tab
...
Info |
---|
title | Example - Parquet Schema |
---|
|
The structured text block shows an example Parquet schema for a book asset. Copy and paste this text to your schema. Code Block |
---|
| message book {
required binary title (UTF8);
required group author {
optional binary name (UTF8);
optional binary organization (UTF8);
}
optional int32 copyrightYear (DATE);
optional int64 numberOfPages;
} |
|
Profile - Advanced Tab
Image RemovedImage Added
Profile Configuration Example - Advanced Tab
...
In this workflow, CSV records in a disk are retrieved that are then encoded into a Parquet document. The workflow is illustrated here:
Image RemovedImage Added
Example workflow with Parquet Encoder
Walking through the example workflow from left to right, we have:
- A Disk Agent named Disk_Source Agente that reads in the source file as a byte array.
- A Decoder Agent named CSV_Decoder that parses the bytes from the file and decodes the CSV records, passing BookRecord UDRs to the Analysis agent.
- An Analysis agent named Analysis that transforms these incoming BookRecord UDRs into ParquetEncoderUDRs in accordance with the schema specified in the Parquet Profile.
- The Parquet Encoder agent that receives the ParquetEncoderUDRs, encodes the data with Parquet, and forwards the data as a bytearraybyte-array.
- The Disk _Destination Collection forwarding agent receives the bytearray byte-array data and writes out a Parquet document.
...
Double-click on the Disk_Source agent to display the configuration dialog for the agent:
Image RemovedImage Added
Example of a Disk agent configuration
...
The Decoder agent receives the input data from the Disk agent, translates it into UDRs and forwards them to the Analysis agent. Double-click on the Decoder agent to display the configuration dialog.
Image RemovedImage Added
Example of an Decoder agent configuration
...
Double-click on the Analysis agent to display the configuration dialog.
Image RemovedImage Added
The Analysis agent dialogue with the APL code defined.
...
Info |
---|
title | Example - Parquet APL |
---|
|
The APL code below shows an example of constructing a ParquetEncoderUDR: Code Block |
---|
| import ultra.Sandbox_Parquet_Autotest.Autotest_Ultra;
import ultra.Parquet;
consume
{
switch (input)
{
case (BookRecordAutotestRecord record)
{
// normalize
strToDate(record.copyrightYearoptionalTimestampMillisField, record.copyrightYearStringoptionalTimestampMillisFieldString, "yyyyMMddHHmmss", "yyyyEurope/Stockholm");
//
// payload - primitives
//
map<string,any> payload = mapCreate(string,any);
mapSet(payload, "titlerequiredInt32Field", record.titlerequiredInt32Field);
mapSet(payload, "copyrightYearrequiredInt64Field", record.copyrightYearrequiredInt64Field);
mapSet(payload, "numberOfPagesrequiredDoubleField", record.numberOfPagesrequiredDoubleField);
// payload - author structuremapSet(payload, "requiredStringField", record.requiredStringField);
map<string,any> author = mapCreate(string,anymapSet(payload, "optionalStringField", record.optionalStringField);
mapSet(authorpayload, "namemissingOptionalStringField", record.authorNamemissingOptionalStringField);
mapSet(authorpayload, "organizationoptionalTimestampMillisField", record.organizationoptionalTimestampMillisField);
mapSet(payload, "author", author); //
// encode and route
//
ParquetEncoderUDR encoderUDR = udrCreate(ParquetEncoderUDR);
encoderUDR.payload = payload;
//
// route
//
udrRoute(encoderUDR);
}
}
} |
|
...
Double-click on the Parquet Encoder agent to display the configuration dialog.
Image RemovedImage Added
The Parquet Encoder agent with the Parquet Profile defined.
...
Double-click on the Disk_Destination agent to display the configuration dialog for the agent:
Image RemovedImage Added
Example of a Disk agent configuration
...
When you run the Workflow, it processes the CSV file from the input directory and writes out a corresponding Parquet file in the configured output directory.