Parquet Decoder Example(4.3)

This example illustrates typical use of the Parquet Decoder agent in a batch workflow. In this example, complete records are processed using the embedded document schema. The following configurations will be created:

  • Ultra Format
  • Batch Workflow that makes use of a Parquet Decoder agent that parses Parquet documents.

Define an Ultra Format

A simple Ultra Format needs to be created for the incoming UDRs. For more information about the Ultra Format Editor and the UFDL syntax, refer to the Ultra Format(3.0).

Example - Ultra

Create an Ultra Format as defined below:

external BOOK_HEADER : identified_by(strREContains(HEADER, "title,name,organization,copyrightYear")), terminated_by(0xA)
{
  ascii HEADER : terminated_by(0xA);
};

external BookRecord
{
  ascii title                 : terminated_by(",");
  ascii authorName            : terminated_by(",");
  ascii organization          : terminated_by(",");
  ascii copyrightYearString   : terminated_by(",");
  ascii numberOfPages         : terminated_by(0xA);
};

internal BookRecord
{
  string title;
  string authorName;
  string organization;
  string copyrightYearString;
  int numberOfPages;

  //  enriched
  date copyrightYear;
};

//  decoder
in_map BOOK_HEADER_InMap : external(BOOK_HEADER), target_internal(BOOK_HEADER), discard_output { automatic; };
in_map BookRecord_InMap : external(BookRecord), internal(BookRecord) { automatic; };
decoder BOOK_HEADER_Decode : in_map(BOOK_HEADER_InMap);
decoder BookRecord_Decode : in_map(BookRecord_InMap);
decoder DECODER { decoder BOOK_HEADER_Decode; decoder BookRecord_Decode *; };

//  encoder
out_map BookRecord_OutMap : external(BookRecord), internal(BookRecord) { automatic; };
encoder ENCODER : out_map(BookRecord_OutMap);

Create a Batch Workflow

In this workflow, Parquet files on disk are retrieved that are then decoded into UDRs that are written into a CSV file. The workflow is illustrated here:

Example workflow with Parquet Encoder

Walking through the example workflow from left to right, we have:

  • A Disk agent that reads in the source file (which contains a Parquet document) as a byte array.
  • A Parquet Decoder agent that parses the bytes from the file as Parquet, passing ParquetDecoderUDRs to the Analysis agent.
  • An Analysis agent that transforms these incoming ParquetDecoderUDRs into BookRecord UDRs.
  • An Encoder agent that encodes the BookRecord UDRs as CSV bytes.
  • The Disk forwarding agent receives the bytearray data and writes out a CSV document.

This section walks through the steps of creating such a batch workflow.

Disk

Disk_Input is a Disk Collection agent that collects data from an input file and forwards it to the Decoder agent.

Double-click on the Disk_Source agent to display the configuration dialog for the agent:

Example of a Disk agent configuration


Parquet Decoder

The Parquet Decoder agent collects the bytes from the Disk Collector into a complete Parquet document (with an embedded schema). The Parquet Decoder creates ParquetDecoderUDRs - one for each row - and forwards them on to the next agent.

Double-click on the Parquet Decoder agent to display the configuration dialog.

The Parquet Decoder agent with no Parquet Profile specified.

Note that if no Parquet Profile is specified, the ParquetDecoderUDRs will include all columns in the file. You can specify a Parquet Profile with a schema to subset the columns to increase performance.

Analysis

The Analysis Agent transforms the data from each ParquetDecoderUDR into a BookRecord UDR as defined above in the Ultra. In particular, the ParquetDecoderUDR includes a payload map with contents that mirror the Parquet schema defined in the profile - that data is available when constructing well-typed UDRs (for example, BookRecord).

Double-click on the Analysis agent to display the configuration dialog.

The Analysis agent dialogue with the APL code defined.

In this dialog, the APL code for handling input data is written. In the example, each ParquetDecoderUDR is transformed into a BookeRecord UDR. Adapt the code according to your requirements.

You can also see the UDR type used in the UDR Types field, in this example it is a ParquetDecoderUDR.

Example - Parquet APL

The APL code below shows an example of processing ParquetDecoderUDR:

import ultra.Sandbox_Parquet_Autotest.Autotest_Ultra;

consume 
{
  switch (input)
    {
      case (ParquetDecoderUDR decoderUDR)
      {
        //  payload
        map<string,any> payload = decoderUDR.payload;
        map<string,any> author = (map<string,any>) mapGet(payload, "author");

        //  extract
        BookRecord record = udrCreate(BookRecord);
        record.title = (string) mapGet(payload, "title");
        record.authorName = (string) mapGet(author, "name");
        record.organization = (string) mapGet(author, "organization");
        record.copyrightYear = (date) mapGet(payload, "copyrightYear");
        record.numberOfPages = (int) mapGet(payload, "numberOfPages");

        //  normalize
        dateToString(record.copyrightYearString, record.copyrightYear, "yyyy");

        //  route
        udrRoute(record);
      }
    }
} 

The data in the payload map in the ParquetDecoderUDR conforms to the embedded schema.

Encoder

The Encoder agent receives the BookRecord UDRs from the Analysis agent and generates byte arrays in CSV format - one byte array for each UDR. Double-click on the Encoder agent to display the configuration dialog.


Example of an Encoder agent configuration

In this dialog, choose the Encoder that you defined in your Ultra Format.

Disk Forwarder

Disk_Destination is a Disk Forwarding agent that writes bytes to an output file on disk.

Double-click on the Disk_Destination agent to display the configuration dialog for the agent:

Example of a Disk agent configuration

Running the Workflow

When you run the Workflow, it processes Parquet files from the input directory and writes out corresponding CSV files in the configured output directory.