Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Overview

This example illustrates typcial use of the Parquet Decoder agent in a batch workflow. In this example, complete records are processed using the embedded document schema.

This example only uses a Batch Workflow with a Parquet Decoder agent that parses Parquet documents.

Create a Batch Parquet Workflow

In this workflow, a Disk agent collects data that is forwarded to a Parquet Decoder agent in bytearray format. The data is converted into ParquetDecoderUDRs (one UDR per record). The UDRs are routed forward into the workflow. The Parquet Encoder agent receives the ParquetEncoderUDRs, converts the data into Parquet, and forwards bytearray data be written to a Parquet document.

Example workflow with Parquet Decoder and Encoder

The workflow consists of a Disk collection agent named Disk_source, a Parquet_Decoder and a Parquet_Encoder using a Parquet profile (not in the pi cute), and a Disk forwarding agent named Disk_Destination.

The Parquet Profile

Start with creating the Parquet Apache Profile. The main purpose of the profile is to define the Schema, as explained below. The Profile will later be used for both the Parquet Decoder and Encoder.

...

Info
titleExample - Parquet Schema

The structured text block shows an example Parquet schema for company employees. Copy and paste this text to your schema.

Code Block
languagetext
themeEclipse
message base_schema
{ 
  optional int32 optionalInt32Field; 
  optional int64 optionalInt64Field; 
  optional float optionalFloatField; 
  optional double optionalDoubleField; 
  optional boolean optionalBooleanField; 
  optional binary optionalStringField (UTF8);
  optional int32 optionalDateField (DATE);
  optional int32 optionalTimeMillisField (TIME_MILLIS);
  optional int64 optionalTimeMicrosField (TIME_MICROS);
  optional int64 optionalTimestampMillisField (TIMESTAMP_MILLIS);
  optional int64 optionalTimestampMicrosField (TIMESTAMP_MICROS);

  optional int32 missingOptionalInt32Field; 
  optional int64 missingOptionalInt64Field; 
  optional float missingOptionalFloatField; 
  optional double missingOptionalDoubleField; 
  optional boolean missingOptionalBooleanField; 
  optional binary missingOptionalStringField (UTF8);

  required int32 requiredInt32Field; 
  required int64 requiredInt64Field; 
  required float requiredFloatField; 
  required double requiredDoubleField; 
  required boolean requiredBooleanField; 
  required binary requiredStringField (UTF8);

  repeated int32 listInt32Field;
  repeated boolean listBooleanField;
  repeated binary listStringField (UTF8);

  optional group groupField {
    required group name {
      optional binary first_name (UTF8);
      required binary last_name (UTF8);
    }
    optional int32 id;
  }

  repeated group complexListField {
    required group address {
      optional binary city (UTF8);
      required binary state (UTF8);
    }
    optional int32 unit;
  }

  optional binary jsonField (JSON);

  optional group optionalIntegerListField (LIST) {
    repeated group list {
      optional int32 element;
    }
  }

  optional group carListField (LIST) {
    repeated group list {
      required group element {
        required binary make (UTF8);
        required binary model (UTF8);
      }
    }
  }

  required group testScoreMap (MAP) {
    repeated group key_value {
      required binary key (UTF8);
      optional int32 value;
    }
  }

  required group personCarMap (MAP) {
    repeated group key_value {
      required group key {
        required binary firstName (UTF8);
        required binary lastName (UTF8);
      }
      required group value {
        required binary make (UTF8);
        required binary model (UTF8);
      }
    }
  }

  required group listOfListsField (LIST) {
    repeated group list {
      required group element (LIST) {
        repeated group list {
          required int32 element;
        }
      }
    }
  }

  repeated int32 emptyRepeatedInt32Field;

  optional group emptyIntegerListField (LIST) {
    repeated group list {
      optional int32 element;
    }
  }

  required group emptyValueMap (MAP) {
    repeated group key_value {
      required binary key (UTF8);
    }
  }
}


The Disk agents

In this workflow, you use a Disk collection and forwarding agents to read and write data to files. Double-click on the Disk agent to display the configuration dialog for the agent:

...

  • The Disk collection agent is configured to collect data from the /data/parquet/input/ directory, which is stated in the  Directory field.

  • The Disk out agent is configured to collect data from the /data/parquet/output/ directory, which is stated in the  Directory field.
  • The agent will collect all files in the directory.

  • The Filename Template can be defined as in the example above.

The input file should be in format bla bla......

Analysis

The Analysis agent is made to transfer input payload to the encoder. It also has a counter to ....... You can also see the UDR type used in the UDR Types field - in this example ParquetEncoderUDR and ParquetDecoderUDR.

...

Info
titleExample - Parquet APL

The APL code below shows an example Parquet schema for company employees:

Code Block
languagetext
themeEclipse
int counter = 0;

beginBatch
{
counter = 0;
}

consume 
{
counter = counter + 1;
ParquetEncoderUDR encoderUDR = udrCreate(ParquetEncoderUDR);
encoderUDR.payload = input.payload;
udrRoute(encoderUDR);
if (counter % 10000 == 0) debug(counter);
}


The Parquet Encoder and Decoder
Image Modified

Select the Parquet profile in the Encoder and Decoder.

Running the Workflow

When you have executed the workflow, the result will be .... on the output files. ...

...