Overview
This example illustrates typcial use of the Parquet Decoder agent in a batch workflow. In this example, complete records are processed using the embedded document schema.
This example only uses a Batch Workflow with a Parquet Decoder agent that parses Parquet documents.
Create a Batch Parquet Workflow
In this workflow, a Disk agent collects data that is forwarded to a Parquet Decoder agent in bytearray format. The data is converted into ParquetDecoderUDRs (one UDR per record). The UDRs are routed forward into the workflow. The Parquet Encoder agent receives the ParquetEncoderUDRs, converts the data into Parquet, and forwards bytearray data be written to a Parquet document.
Example workflow with Parquet Decoder and Encoder
The workflow consists of a Disk collection agent named Disk_source, a Parquet_Decoder and a Parquet_Encoder using a Parquet profile (not in the pi cute), and a Disk forwarding agent named Disk_Destination.
The Parquet Profile
Start with creating the Parquet Apache Profile. The main purpose of the profile is to define the Schema, as explained below. The Profile will later be used for both the Parquet Decoder and Encoder.
The Parquet profile definition. For details, see the example below.
Parquet supports a small set of primitives (integer, floating point, boolean, and byte array). These primitives can be extended using logical type annotations, which are modifiers of primitives. For example, the UTF8 annotation is a modifier to byte arrays that denote string data. Parquet also supports structured data through groups and repetitions (that is, optional, required, repeated).
Example - Parquet Schema
The structured text block shows an example Parquet schema for company employees. Copy and paste this text to your schema.
message base_schema { optional int32 optionalInt32Field; optional int64 optionalInt64Field; optional float optionalFloatField; optional double optionalDoubleField; optional boolean optionalBooleanField; optional binary optionalStringField (UTF8); optional int32 optionalDateField (DATE); optional int32 optionalTimeMillisField (TIME_MILLIS); optional int64 optionalTimeMicrosField (TIME_MICROS); optional int64 optionalTimestampMillisField (TIMESTAMP_MILLIS); optional int64 optionalTimestampMicrosField (TIMESTAMP_MICROS); optional int32 missingOptionalInt32Field; optional int64 missingOptionalInt64Field; optional float missingOptionalFloatField; optional double missingOptionalDoubleField; optional boolean missingOptionalBooleanField; optional binary missingOptionalStringField (UTF8); required int32 requiredInt32Field; required int64 requiredInt64Field; required float requiredFloatField; required double requiredDoubleField; required boolean requiredBooleanField; required binary requiredStringField (UTF8); repeated int32 listInt32Field; repeated boolean listBooleanField; repeated binary listStringField (UTF8); optional group groupField { required group name { optional binary first_name (UTF8); required binary last_name (UTF8); } optional int32 id; } repeated group complexListField { required group address { optional binary city (UTF8); required binary state (UTF8); } optional int32 unit; } optional binary jsonField (JSON); optional group optionalIntegerListField (LIST) { repeated group list { optional int32 element; } } optional group carListField (LIST) { repeated group list { required group element { required binary make (UTF8); required binary model (UTF8); } } } required group testScoreMap (MAP) { repeated group key_value { required binary key (UTF8); optional int32 value; } } required group personCarMap (MAP) { repeated group key_value { required group key { required binary firstName (UTF8); required binary lastName (UTF8); } required group value { required binary make (UTF8); required binary model (UTF8); } } } required group listOfListsField (LIST) { repeated group list { required group element (LIST) { repeated group list { required int32 element; } } } } repeated int32 emptyRepeatedInt32Field; optional group emptyIntegerListField (LIST) { repeated group list { optional int32 element; } } required group emptyValueMap (MAP) { repeated group key_value { required binary key (UTF8); } } }
The Disk agents
In this workflow, you use a Disk collection and forwarding agents to read and write data to files. Double-click on the Disk agent to display the configuration dialog for the agent:
The Disk agent configurations; Disk-in to the left and Disk-out to the right.
In this dialog, the following settings are made:
The Disk collection agent is configured to collect data from the
/data/parquet/input/
directory, which is stated in the Directory field.- The Disk out agent is configured to collect data from the
/data/parquet/output/
directory, which is stated in the Directory field. The agent will collect all files in the directory.
- The Filename Template can be defined as in the example above.
The input file should be in format bla bla......
Analysis
The Analysis agent is made to transfer input payload to the encoder. It also has a counter to ....... You can also see the UDR type used in the UDR Types field - in this example ParquetEncoderUDR and ParquetDecoderUDR.
Double-click on the Analysis agent to display the configuration dialog.
The Analysis agent dialogue with the APL code defined.
Example - Parquet APL
The APL code below shows an example Parquet schema for company employees:
int counter = 0; beginBatch { counter = 0; } consume { counter = counter + 1; ParquetEncoderUDR encoderUDR = udrCreate(ParquetEncoderUDR); encoderUDR.payload = input.payload; udrRoute(encoderUDR); if (counter % 10000 == 0) debug(counter); }
The Parquet Encoder and Decoder
Select the Parquet profile in the Encoder and Decoder.
Running the Workflow
When you have executed the workflow, the result will be .... on the output files. ...