Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Overview

This example illustrates typcial use of the Parquet Decoder agent in a batch workflow. In this example, complete records are processed using the embedded document schema.

This example only uses a Batch Workflow with

The following configurations will be created:

  • An Ultra Format
  • A Batch Workflow that makes use of a Parquet Decoder agent that parses Parquet documents.

Create a Batch Parquet Workflow

In this workflow, a Disk agent collects data that is forwarded to a Parquet Decoder agent in bytearray format. The data is converted into ParquetDecoderUDRs (one UDR per record). The UDRs are routed forward into the workflow. The Parquet Encoder agent receives the ParquetEncoderUDRs, converts the data into Parquet, and forwards bytearray data be written to a Parquet document.

Image Removed

Example workflow with Parquet Decoder and Encoder

The workflow consists of a Disk collection agent named Disk_source, a Parquet_Decoder and a Parquet_Encoder using a Parquet profile (not in the pi cute), and a Disk forwarding agent named Disk_Destination.

The Parquet Profile

Start with creating the Parquet Apache Profile. The main purpose of the profile is to define the Schema, as explained below. The Profile will later be used for both the Parquet Decoder and Encoder.

Image Removed
The Parquet profile definition. For details, see the example below.

...

Define an Ultra Format

A simple Ultra Format needs to be created both for the incoming UDRs. For more information about the Ultra Format Editor and the UFDL syntax, refer to the Ultra Format Management User's Guide.

Info
titleExample -

...

Ultra

Create an Ultra Format as defined below:

Code Block
languagetext
themeEclipse

...

external 

...

BOOK_

...

HEADER 

...

: identified_by(strREContains(HEADER, "title,name,organization,copyrightYear")), terminated_by(0xA)
{
  ascii HEADER : terminated_by(0xA);
};

external BookRecord
{
  ascii title                 : terminated_by(",");
  ascii authorName            : terminated_by(",");
  ascii organization          : terminated_by(",");
  ascii copyrightYearString   : terminated_by(",");
  ascii numberOfPages         : terminated_by(0xA);
};

internal BookRecord
{
  string title;
  string authorName;
  string organization;
  string copyrightYearString;
  int numberOfPages;

  //  enriched
  date copyrightYear;
};

//  decoder
in_map BOOK_HEADER_InMap : external(BOOK_HEADER), target_internal(BOOK_HEADER), discard_output { automatic; };
in_map BookRecord_InMap : external(BookRecord), internal(BookRecord) { automatic; };
decoder BOOK_HEADER_Decode : in_map(BOOK_HEADER_InMap);
decoder BookRecord_Decode : in_map(BookRecord_InMap);
decoder DECODER { decoder BOOK_HEADER_Decode; decoder BookRecord_Decode *; };

//  encoder
out_map BookRecord_OutMap : external(BookRecord), internal(BookRecord) { automatic; };
encoder ENCODER : out_map(BookRecord_OutMap);


Create a Batch Workflow

In this workflow, Parquet files on disk are retrieved that are then decoded into UDRs that are written into a CSV file. The workflow is illustrated here:

Image Added

Example workflow with Parquet Encoder

Walking through the example workflow from left to right ...

  • A Disk agent named Disk_Source that reads in the source file (which contains a Parquet document) as a byte array.
  • A Parquet Decoder agent that parses the bytes from the file as Parquet, passing ParquetDecoderUDRs to the Analysis agent.
  • An Analysis agent named Analysis that transforms these incoming ParquetDecoderUDRs into BookRecord UDRs.
  • An Encoder agent named CSV_Encoder that encodes the BookRecord UDRs as CSV bytes.
  • The Disk_Destination forwarding agent receives the bytearray data and writes out a CSV document.

This section will walk through the steps of creating such a batch workflow.

Disk

Disk_Source is a Disk Collection agent that collects data from an input file and forwards it to the Decoder agent.

Double-click on the Disk_Source agent to display the configuration dialog for the agent:

Image Added

Example of a Disk agent configuration


Parquet Decoder

The Parquet Decoder agent will collect the bytes from the Disk Collector into a complete Parquet document (with an embedded schema). The Parquet Decoder will create ParquetDecoderUDRs - one for each row - and forward them on to the next agent.

Double-click on the Parquet Decoder agent to display the configuration dialog.

Image Added

The Parquet Decoder agent with no Parquet Profile specified.

In this dialog, note that no Parquet Profile is specified. In this case, the ParquetDecoderUDRs will include all columns in the file. You can specify a Parquet Profile with a schema to subset the columns - which will increase performance.

Analysis

The Analysis Agent transforms the data from each ParquetDecoderUDR into a BookRecord UDR as defined above in the Ultra. In particular, the ParquetDecoderUDR includes a payload map whose contents mirrors the Parquet schema defined in the profile - that data is available when constructing well-typed UDRs (e.g., BookRecord).

Double-click on the Analysis agent to display the configuration dialog.

Image Added

The Analysis agent dialogue with the APL code defined.

In this dialog, the APL code for handling input data is written. In the example, each ParquetDecoderUDR is transformed into a BookeRecord UDR. Adapt the code according to your requirements.

You can also see the UDR type used in the UDR Types field, in this example it is aParquetDecoderUDR.

Info
titleExample - Parquet APL

The APL code below shows an example of processing ParquetDecoderUDR:

Code Block
languagetext
themeEclipse
import ultra.Sandbox_Parquet_Autotest.Autotest_Ultra;

consume 
{
  switch (input)
    {
      case (ParquetDecoderUDR decoderUDR)
      {
        //  payload
        map<string,any> payload = decoderUDR.payload;
        

...

map<string,any> 

...

author 

...

= (

...

map<string,any>)

...

 

...

mapGet(payload, "author");

  

...

      

...

// 

...

 

...

extract

...

        BookRecord 

...

record 

...

= 

...

udrCreate(

...

BookRecord);
        

...

record.title 

...

= 

...

(string) 

...

mapGet(payload, "title");
      

...

  

...

record.authorName = (string) mapGet(author, "name");
        

...

record.organization = (

...

string) 

...

mapGet(author, "organization");
       

...

 

...

record.copyrightYear 

...

= 

...

(date) mapGet(payload, "copyrightYear");
        

...

record.numberOfPages = (int) mapGet(payload, "numberOfPages");

 

...

     

...

  // 

...

 normalize
  

...

 

...

 

...

    

...

dateToString(record.copyrightYearString, record.copyrightYear, "yyyy");

    

...

 

...

 

...

 

...

 //  

...

route
  

...

 

...

 

...

    

...

udrRoute(record);
    

...

 

...

 }
    

...

}

...


} 

...


The

...

data

...

Image Removed

...

in

...

In this dialog, the following settings are made:

  • The Disk collection agent is configured to collect data from the /data/parquet/input/ directory, which is stated in the  Directory field.

  • The Disk out agent is configured to collect data from the /data/parquet/output/ directory, which is stated in the  Directory field.
  • The agent will collect all files in the directory.

  • The Filename Template can be defined as in the example above.
The input file should be in format bla bla......

the payload map in the ParquetDecoderUDR will confom to the embedded schema. To write the above APL code and transform the record 

Analysis

The Analysis agent is made to transfer input payload to the encoder. It also has a counter to ....... You can also see the UDR type used in the UDR Types field - in this example ParquetEncoderUDR and ParquetDecoderUDR.

...

Info
titleExample - Parquet APL

The APL code below shows an example Parquet schema for company employees:

Code Block
languagetext
themeEclipse
int counter = 0;

beginBatch
{
counter = 0;
}

consume 
{
counter = counter + 1;
ParquetEncoderUDR encoderUDR = udrCreate(ParquetEncoderUDR);
encoderUDR.payload = input.payload;
udrRoute(encoderUDR);
if (counter % 10000 == 0) debug(counter);
}


The Parquet Encoder and Decoder
Image Modified

Select the Parquet profile in the Encoder and Decoder.

Running the Workflow

When you have executed the workflow, the result will be .... on the output files. ...

...