Parquet Encoder Example(3.0)
This example illustrates typical use of the Parquet Encoder agent in a batch workflow. The following configurations will be created:
- An Ultra Format
- A Parquet Profile
- A Batch Workflow that makes use of the Parquet Encoder agent to create Parquet documents
Define an Ultra Format
A simple Ultra Format needs to be created for the incoming UDRs. For more information about the Ultra Format Editor and the UFDL syntax, refer to the Ultra Format(3.0).
Example - Ultra
Create an Ultra Format as defined below:
external BOOK_HEADER : identified_by(strREContains(HEADER, "title,name,organization,copyrightYear")), terminated_by(0xA) { ascii HEADER : terminated_by(0xA); }; external BookRecord { ascii title : terminated_by(","); ascii authorName : terminated_by(","); ascii organization : terminated_by(","); ascii copyrightYearString : terminated_by(","); ascii numberOfPages : terminated_by(0xA); }; internal BookRecord { string title; string authorName; string organization; string copyrightYearString; int numberOfPages; // enriched date copyrightYear; }; // decoder in_map BOOK_HEADER_InMap : external(BOOK_HEADER), target_internal(BOOK_HEADER), discard_output { automatic; }; in_map BookRecord_InMap : external(BookRecord), internal(BookRecord) { automatic; }; decoder BOOK_HEADER_Decode : in_map(BOOK_HEADER_InMap); decoder BookRecord_Decode : in_map(BookRecord_InMap); decoder DECODER { decoder BOOK_HEADER_Decode; decoder BookRecord_Decode *; }; // encoder out_map BookRecord_OutMap : external(BookRecord), internal(BookRecord) { automatic; }; encoder ENCODER : out_map(BookRecord_OutMap);
Define a Parquet Profile
The Parquet Profile is used to define the Schema as well as define advanced properties for encoding. See Parquet Profile Configuration for information on how to open the Parquet Profile editor.
Profile - Schema Tab
Profile Configuration Example - Schema Tab
The Schema Tab contains a single dialog box to specify the Parquet Schema.
Example - Parquet Schema
The structured text block shows an example Parquet schema for a book asset. Copy and paste this text to your schema.
message book { required binary title (UTF8); required group author { optional binary name (UTF8); optional binary organization (UTF8); } optional int32 copyrightYear (DATE); optional int64 numberOfPages; }
Profile - Advanced Tab
Profile Configuration Example - Advanced Tab
The Advanced Tab includes a number of dialogs with default values retained.
Create a Batch Workflow
In this workflow, CSV records in a disk are retrieved that are then encoded into a Parquet document. The workflow is illustrated here:
Example workflow with Parquet Encoder
Walking through the example workflow from left to right, we have:
- A Disk Agent that reads in the source file as a byte array.
- A Decoder Agent that parses the bytes from the file and decodes the CSV records, passing BookRecord UDRs to the Analysis agent.
- An Analysis agent that transforms these incoming BookRecord UDRs into ParquetEncoderUDRs in accordance with the schema specified in the Parquet Profile.
- The Parquet Encoder agent that receives the ParquetEncoderUDRs, encodes the data with Parquet, and forwards the data as a byte-array.
- The Disk Collection forwarding agent receives the byte-array data and writes out a Parquet document.
This section walks through the steps of creating such a batch workflow.
Disk
Disk_Source is a Disk Collection agent that collects data from an input file and forwards it to the Decoder agent.
Double-click on the Disk_Source agent to display the configuration dialog for the agent:
Example of a Disk agent configuration
Decoder
The Decoder agent receives the input data from the Disk agent, translates it into UDRs and forwards them to the Analysis agent. Double-click on the Decoder agent to display the configuration dialog.
Example of an Decoder agent configuration
In this dialog, choose the Decoder that you defined in your Ultra Format.
Analysis
The Analysis Agent transforms the data from each BookDecoder UDR into a ParquetEncoderUDR. In particular, the ParquetEncoderUDR includes a map with contents that mirror the Parquet schema defined in the profile.
Double-click on the Analysis agent to display the configuration dialog.
The Analysis agent dialogue with the APL code defined.
In this dialog, the APL code for handling input data is written. In the example, each BookRecord UDR is transformed into a ParquetEncoderUDR. Adapt the code according to your requirements.
You can also see the UDR type used in the UDR Types field, in this example it is a BookRecord
.
Example - Parquet APL
The APL code below shows an example of constructing a ParquetEncoderUDR:
import ultra.Sandbox_Parquet_Autotest.Autotest_Ultra; import ultra.Parquet; consume { switch (input) { case (BookRecord record) { // normalize strToDate(record.copyrightYear, record.copyrightYearString, "yyyy"); // payload - primitives map<string,any> payload = mapCreate(string,any); mapSet(payload, "title", record.title); mapSet(payload, "copyrightYear", record.copyrightYear); mapSet(payload, "numberOfPages", record.numberOfPages); // payload - author structure map<string,any> author = mapCreate(string,any); mapSet(author, "name", record.authorName); mapSet(author, "organization", record.organization); mapSet(payload, "author", author); // encode and route ParquetEncoderUDR encoderUDR = udrCreate(ParquetEncoderUDR); encoderUDR.payload = payload; udrRoute(encoderUDR); } } }
Note in the code that the data in the payload map in the ParquetEncoderUDR mirrors the schema configured in the profile. Non-matching structures will result in errors at runtime.
Parquet Encoder
The Parquet Encoder agent creates a Parquet document based on the ParquetEncoderUDRs it receives from upstream agents.
Double-click on the Parquet Encoder agent to display the configuration dialog.
The Parquet Encoder agent with the Parquet Profile defined.
In this dialog, choose the Parquet Profile that you defined earlier.
Disk Forwarder
Disk_Destination is a Disk Forwarding agent that writes bytes to an output file on disk.
Double-click on the Disk_Destination agent to display the configuration dialog for the agent:
Example of a Disk agent configuration
Running the Workflow
When you run the Workflow, it processes the CSV file from the input directory and writes out a corresponding Parquet file in the configured output directory.