9.59.5.2 Parquet Encoder Example

Overview

This example illustrates typical use of the Parquet Encoder agent in a batch workflow. The following configurations will be created:

  • An Ultra Format
  • A Parquet Profile
  • A Batch Workflow that makes use of the Parquet Encoder agent to create Parquet documents

Define an Ultra Format

A simple Ultra Format needs to be created for the incoming UDRs. For more information about the Ultra Format Editor and the UFDL syntax, refer to the Ultra Format Management User's Guide.


Example - Ultra

Create an Ultra Format as defined below:

external BOOK_HEADER : identified_by(strREContains(HEADER, "title,name,organization,copyrightYear")), terminated_by(0xA)
{
  ascii HEADER : terminated_by(0xA);
};

external BookRecord
{
  ascii title                 : terminated_by(",");
  ascii authorName            : terminated_by(",");
  ascii organization          : terminated_by(",");
  ascii copyrightYearString   : terminated_by(",");
  ascii numberOfPages         : terminated_by(0xA);
};

internal BookRecord
{
  string title;
  string authorName;
  string organization;
  string copyrightYearString;
  int numberOfPages;

  //  enriched
  date copyrightYear;
};

//  decoder
in_map BOOK_HEADER_InMap : external(BOOK_HEADER), target_internal(BOOK_HEADER), discard_output { automatic; };
in_map BookRecord_InMap : external(BookRecord), internal(BookRecord) { automatic; };
decoder BOOK_HEADER_Decode : in_map(BOOK_HEADER_InMap);
decoder BookRecord_Decode : in_map(BookRecord_InMap);
decoder DECODER { decoder BOOK_HEADER_Decode; decoder BookRecord_Decode *; };

//  encoder
out_map BookRecord_OutMap : external(BookRecord), internal(BookRecord) { automatic; };
encoder ENCODER : out_map(BookRecord_OutMap);

Define a Parquet Profile

The Parquet Profile is used to define the Schema as well as define advanced properties for encoding. See 9.59.2.1 Parquet Profile Configuration for information on how to open the Parquet Profile editor.

Profile - Schema Tab


Profile Configuration Example - Schema Tab

The Schema Tab contains a single dialog box to specify the Parquet Schema.

Example - Parquet Schema

The structured text block shows an example Parquet schema for a book asset. Copy and paste this text to your schema.

message book {
  required binary title (UTF8);
  required group author {
   optional binary name (UTF8);
   optional binary organization (UTF8);
  }
  optional int32 copyrightYear (DATE);
  optional int64 numberOfPages;
}

Profile - Advanced Tab

Profile Configuration Example - Advanced Tab

The Advanced Tab includes a number of dialogs with default values retained.

Create a Batch Workflow

In this workflow, CSV records in a disk are retrieved that are then encoded into a Parquet document. The workflow is illustrated here:

Example workflow with Parquet Encoder

Walking through the example workflow from left to right, we have:

  • A Disk Agent named Disk_Source that reads in the source file as a byte array.
  • A Decoder Agent named CSV_Decoder that parses the bytes from the file and decodes the CSV records, passing BookRecord UDRs to the Analysis agent.
  • An Analysis agent named Analysis that transforms these incoming BookRecord UDRs into ParquetEncoderUDRs in accordance with the schema specified in the Parquet Profile.
  • The Parquet Encoder agent receives the ParquetEncoderUDRs, encodes the data with Parquet, and forwards the data as a bytearray.
  • The Disk_Destination forwarding agent receives the bytearray data and writes out a Parquet document.

This section walks through the steps of creating such a batch workflow.

Disk

Disk_Source is a Disk Collection agent that collects data from an input file and forwards it to the Decoder agent.

Double-click on the Disk_Source agent to display the configuration dialog for the agent:

Example of a Disk agent configuration

Decoder

The Decoder agent receives the input data from the Disk agent, translates it into UDRs and forwards them to the Analysis agent. Double-click on the Decoder agent to display the configuration dialog.


Example of an Decoder agent configuration

In this dialog, choose the Decoder that you defined in your Ultra Format.

Analysis

The Analysis Agent transforms the data from each BookDecoder UDR into a ParquetEncoderUDR. In particular, the ParquetEncoderUDR includes a map with contents that mirror the Parquet schema defined in the profile.

Double-click on the Analysis agent to display the configuration dialog.

The Analysis agent dialogue with the APL code defined.

In this dialog, the APL code for handling input data is written. In the example, each BookRecord UDR is transformed into a ParquetEncoderUDR. Adapt the code according to your requirements.

You can also see the UDR type used in the UDR Types field, in this example it is a BookRecord.

Example - Parquet APL

The APL code below shows an example of constructing a ParquetEncoderUDR:

import ultra.Sandbox_Parquet_Autotest.Autotest_Ultra;
import ultra.Parquet;

consume 
{
  switch (input)
    {
      case (BookRecord record)
      {
        //  normalize
        strToDate(record.copyrightYear, record.copyrightYearString, "yyyy");

        //  payload - primitives
        map<string,any> payload = mapCreate(string,any);
        mapSet(payload, "title", record.title);
        mapSet(payload, "copyrightYear", record.copyrightYear);
        mapSet(payload, "numberOfPages", record.numberOfPages);

        //  payload - author structure
        map<string,any> author = mapCreate(string,any);
        mapSet(author, "name", record.authorName);
        mapSet(author, "organization", record.organization);
        mapSet(payload, "author", author);

        //  encode and route
        ParquetEncoderUDR encoderUDR = udrCreate(ParquetEncoderUDR);
        encoderUDR.payload = payload;
        udrRoute(encoderUDR);
      }
    }
}

Note in the code that the data in the payload map in the ParquetEncoderUDR mirrors the schema configured in the profile. Non-matching structures will result in errors at runtime.

Parquet Encoder

The Parquet Encoder agent creates a Parquet document based on the ParquetEncoderUDRs it receives from upstream agents.

Double-click on the Parquet Encoder agent to display the configuration dialog.

The Parquet Encoder agent with the Parquet Profile defined.

In this dialog, choose the Parquet Profile that you defined earlier.

Disk Forwarder

Disk_Destination is a Disk Forwarding agent that writes bytes to an output file on disk.

Double-click on the Disk_Destination agent to display the configuration dialog for the agent:

Example of a Disk agent configuration

Running the Workflow

When you run the Workflow, it processes the CSV file from the input directory and writes out a corresponding Parquet file in the configured output directory.