Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Overview

This example illustrates typcial use of the Parquet Decoder agent in a batch workflow. In this example, complete records are processed using the embedded document schema. The following configurations will be created:

  • An Ultra Format
  • A Batch Workflow that makes use of a Parquet Decoder agent that parses Parquet documents.

Define an Ultra Format

A simple Ultra Format needs to be created both for the incoming UDRs. For more information about the Ultra Format Editor and the UFDL syntax, refer to the Ultra Format Management User's Guide.

Info
titleExample - Ultra

Create an Ultra Format as defined below:

Code Block
languagetext
themeEclipse
external BOOK_HEADER : identified_by(strREContains(HEADER, "title,name,organization,copyrightYear")), terminated_by(0xA)
{
  ascii HEADER : terminated_by(0xA);
};

external BookRecord
{
  ascii title                 : terminated_by(",");
  ascii authorName            : terminated_by(",");
  ascii organization          : terminated_by(",");
  ascii copyrightYearString   : terminated_by(",");
  ascii numberOfPages         : terminated_by(0xA);
};

internal BookRecord
{
  string title;
  string authorName;
  string organization;
  string copyrightYearString;
  int numberOfPages;

  //  enriched
  date copyrightYear;
};

//  decoder
in_map BOOK_HEADER_InMap : external(BOOK_HEADER), target_internal(BOOK_HEADER), discard_output { automatic; };
in_map BookRecord_InMap : external(BookRecord), internal(BookRecord) { automatic; };
decoder BOOK_HEADER_Decode : in_map(BOOK_HEADER_InMap);
decoder BookRecord_Decode : in_map(BookRecord_InMap);
decoder DECODER { decoder BOOK_HEADER_Decode; decoder BookRecord_Decode *; };

//  encoder
out_map BookRecord_OutMap : external(BookRecord), internal(BookRecord) { automatic; };
encoder ENCODER : out_map(BookRecord_OutMap);


Create a Batch Workflow

In this workflow, Parquet files on disk are retrieved that are then decoded into UDRs that are written into a CSV file. The workflow is illustrated here:

Example workflow with Parquet Encoder

Walking through the example workflow from left to right ...

  • A Disk agent named Disk_Source that reads in the source file (which contains a Parquet document) as a byte array.
  • A Parquet Decoder agent that parses the bytes from the file as Parquet, passing ParquetDecoderUDRs to the Analysis agent.
  • An Analysis agent named Analysis that transforms these incoming ParquetDecoderUDRs into BookRecord UDRs.
  • An Encoder agent named CSV_Encoder that encodes the BookRecord UDRs as CSV bytes.
  • The Disk_Destination forwarding agent receives the bytearray data and writes out a CSV document.

This section will walk through the steps of creating such a batch workflow.

Disk

Disk_Source is a Disk Collection agent that collects data from an input file and forwards it to the Decoder agent.

Double-click on the Disk_Source agent to display the configuration dialog for the agent:

Example of a Disk agent configuration


Parquet Decoder

The Parquet Decoder agent will collect the bytes from the Disk Collector into a complete Parquet document (with an embedded schema). The Parquet Decoder will create ParquetDecoderUDRs - one for each row - and forward them on to the next agent.

Double-click on the Parquet Decoder agent to display the configuration dialog.

The Parquet Decoder agent with no Parquet Profile specified.

In this dialog, note that no Parquet Profile is specified. In this case, the ParquetDecoderUDRs will include all columns in the file. You can specify a Parquet Profile with a schema to subset the columns - which will increase performance.

Analysis

The Analysis Agent transforms the data from each ParquetDecoderUDR into a BookRecord UDR as defined above in the Ultra. In particular, the ParquetDecoderUDR includes a payload map whose contents mirrors the Parquet schema defined in the profile - that data is available when constructing well-typed UDRs (e.g., BookRecord).

Double-click on the Analysis agent to display the configuration dialog.

The Analysis agent dialogue with the APL code defined.

In this dialog, the APL code for handling input data is written. In the example, each ParquetDecoderUDR is transformed into a BookeRecord UDR. Adapt the code according to your requirements.

You can also see the UDR type used in the UDR Types field, in this example it is aParquetDecoderUDR.

Info
titleExample - Parquet APL

The APL code below shows an example of processing ParquetDecoderUDR:

Code Block
languagetext
themeEclipse
import ultra.Sandbox_Parquet_Autotest.Autotest_Ultra;

consume 
{
  switch (input)
    {
      case (ParquetDecoderUDR decoderUDR)
      {
        //  payload
        map<string,any> payload = decoderUDR.payload;
        map<string,any> author = (map<string,any>) mapGet(payload, "author");

        //  extract
        BookRecord record = udrCreate(BookRecord);
        record.title = (string) mapGet(payload, "title");
        record.authorName = (string) mapGet(author, "name");
        record.organization = (string) mapGet(author, "organization");
        record.copyrightYear = (date) mapGet(payload, "copyrightYear");
        record.numberOfPages = (int) mapGet(payload, "numberOfPages");

        //  normalize
        dateToString(record.copyrightYearString, record.copyrightYear, "yyyy");

        //  route
        udrRoute(record);
      }
    }
} 


The data in the payload map in the ParquetDecoderUDR will confom conform to the embedded schema. To write the above APL code and transform the record 

Analysis

The Analysis agent is made to transfer input payload to the encoder. It also has a counter to ....... You can also see the UDR type used in the UDR Types field - in this example ParquetEncoderUDR and ParquetDecoderUDR.

Encoder

The Encoder agent receives the BookRecord UDRs from the Analysis agent and generates byte arrays in CSV format - one byte array for each UDR. Double-click on the

...

Encoder agent to display the configuration dialog.

Image Removed

...

titleExample - Parquet APL

The APL code below shows an example Parquet schema for company employees:

Code Block
languagetext
themeEclipse
int counter = 0;

beginBatch
{
counter = 0;
}

consume 
{
counter = counter + 1;
ParquetEncoderUDR encoderUDR = udrCreate(ParquetEncoderUDR);
encoderUDR.payload = input.payload;
udrRoute(encoderUDR);
if (counter % 10000 == 0) debug(counter);
}

The Parquet Encoder and Decoder
Image Removed

Select the Parquet profile in the Encoder and Decoder.

Image Added
Example of an Encoder agent configuration

In this dialog, choose the Encoder that you defined in your Ultra Format.

Disk Forwarder

Disk_Destination is a Disk Forwarding agent that writes bytes to an output file on disk.

Double-click on the Disk_Destination agent to display the configuration dialog for the agent:

Image Added

Example of a Disk agent configuration

Running the Workflow

When you have executed run the workflow, the result will be .... on the output files. ..Workflow, it will process Parquet files from the input directory and write out corresponding CSV files in the configured output directory.