Avro Decoding/Encoding Functions

Avro Decoding/Encoding Functions

The functions described in this section are used to decode binary encoded Avro messages.

The following functions for AVRO Decoding/Encoding described here are:

avroInitialize

This function initialises a schema registry provider that is used to obtain schemas used in avroDeserialize and avroSerialize functions

void avroInitialize(string schemaRegistry, string schemaField)

Parameter

Description

Parameter

Description

schemaRegistry

The binary encoded Avro message to decode

schemaField

A DynamicAvro UDR 

Returns

 

avroDeserialize

This function decodes a binary encoded Avro message to a DynamicAvro UDR 

 

DynamicAvro avroDeserialize ( AvroDecoderUDR avroDecoderUDR)

Parameter

Description

Parameter

Description

avroDecoderUDR

AvroDecoderUDR containing a bytearray with a binary encoded Avro message to decode

Returns

A DynamicAvro UDR containing decoded message 

Note

avroDeserialize function has to be used inside try-catch block if the exception handling is required

Example - Example - Decoding AVRO binary encoded message

import apl.Avro; initialize { string schemaRegistry = "http://localhost:8081/schemas/ids"; string schemaField = "schema"; avroInitialize(schemaRegistry, schemaField); } consume { //This example assumes that the input of an analysis agent is a bytearray //representing complete Avro message debug(input); Avro.AvroDecoderUDR decoder = udrCreate(Avro.AvroDecoderUDR); decoder.readerSchemaID = "5"; decoder.writerSchemaID = "5"; decoder.data = input; Avro.DynamicAvro decodedAvro = (Avro.DynamicAvro)Avro.avroDeserialize(decoder); debug(decodedAvro); }

avroSerialize

This function encodes a schema defined structure using binary Avro encoder

bytearray avroSerialize ( AvroEncoderUDR avroEncoderUDR )

Note

avroSerialize function has to be used inside try-catch block if the exception handling is required

Parameter

Description

Parameter

Description

avroEncoderUDR

AvroEncoderUDR containing data to encode and specifying selected schemaID (data must match selected schema)

Returns

binary encoded Avro message

Example - Encoding UDR to AVRO

 APL:

import apl.Avro; string writerSchemaID = "5"; // assumes the correct registered schema is "5" initialize { string schemaRegistry = "http://localhost:8081/schemas/ids"; string schemaField = "schema"; avroInitialize(schemaRegistry, schemaField); } consume { Avro.AvroRecordUDR record = create_output(); Avro.AvroEncoderUDR encode_UDR = udrCreate(Avro.AvroEncoderUDR); encode_UDR.data = record; encode_UDR.writerSchemaID = writerSchemaID; bytearray serializedPayload = Avro.avroSerialize(encode_UDR); } Avro.AvroRecordUDR create_output(){ Avro.AvroRecordUDR record = udrCreate(Avro.AvroRecordUDR); record.fullname = "example.avro.User3"; map<string,any> fieldz = mapCreate(string,any); // name Avro.AvroRecordUDR name = udrCreate(Avro.AvroRecordUDR); map<string,any> namefields = mapCreate(string,any); mapSet(namefields, "firstName", ""); mapSet(namefields, "lastName", "Kula"); name.fullname = "example.avro.FullName2"; name.fields = namefields; mapSet(fieldz, "name", name); // favorite number mapSet(fieldz, "favorite_number", (int)1632); // colour mapSet(fieldz, "favorite_color", "Green"); // football team (spelled with one "o" in schema) Avro.AvroEnumUDR favorite_fotball_team = udrCreate(Avro.AvroEnumUDR); favorite_fotball_team.fullname = "example.avro.teams.teams"; favorite_fotball_team.symbol = "Djurgarden"; mapSet(fieldz, "favorite_fotball_team", favorite_fotball_team); // IP address list<any> ipAddresslist = listCreate(any); Avro.AvroFixedUDR ipv4Address = udrCreate(Avro.AvroFixedUDR); ipv4Address.fullname = "example.avro.ipv4Address"; ipv4Address.bytes = baCreateFromHexString("00ABCDEF"); listAdd(ipAddresslist, ipv4Address); Avro.AvroFixedUDR ipv6Address = udrCreate(Avro.AvroFixedUDR); ipv6Address.fullname = "example.avro.ipv6Address"; ipv6Address.bytes = baCreateFromHexString("00ABCDEFEFEFEFEFEFEFEFEFEFEFEFEF"); listAdd(ipAddresslist, ipv6Address); mapSet(fieldz, "ipAddresses", ipAddresslist); // favoriteFoodList Avro.AvroRecordUDR favoriteFoodList = udrCreate(Avro.AvroRecordUDR); Avro.AvroRecordUDR dish1 = udrCreate(Avro.AvroRecordUDR); Avro.AvroRecordUDR dish2 = udrCreate(Avro.AvroRecordUDR); Avro.AvroRecordUDR dish3 = udrCreate(Avro.AvroRecordUDR); map<string,any> dish3Map = mapCreate(string,any); mapSet(dish3Map, "dish", "Brown beans"); mapSet(dish3Map, "next", null); dish3.fields = dish3Map; dish3.fullname = "example.avro.favoriteFood"; map<string,any> dish2Map = mapCreate(string,any); mapSet(dish2Map, "dish", "Pannkakor"); mapSet(dish2Map, "next", dish3); dish2.fields = dish2Map; dish2.fullname = "example.avro.favoriteFood"; map<string,any> dish1Map = mapCreate(string,any); mapSet(dish1Map, "dish", "Ramen"); mapSet(dish1Map, "next", dish2); dish1.fields = dish1Map; dish1.fullname = "example.avro.favoriteFood"; favoriteFoodList.fullname = "example.avro.favoriteFood"; favoriteFoodList.fields = dish1Map; mapSet(fieldz, "favoriteFoodList", favoriteFoodList); // salary mapSet(fieldz, "salary", 347857486343); // myFixed Avro.AvroFixedUDR myFixed = udrCreate(Avro.AvroFixedUDR); bytearray myFixed_bytes = baCreateFromHexString("DEADBEEF"); myFixed.bytes = myFixed_bytes; myFixed.fullname = "example.avro.myfixed"; mapSet(fieldz, "myFixed", myFixed); // myFloat mapSet(fieldz, "myFloat", (float)0.4); // myDouble mapSet(fieldz, "myDouble", (double)0.5); // RootUsers map<string,any> rootUsers = mapCreate(string,any); Avro.AvroRecordUDR olleBack = udrCreate(Avro.AvroRecordUDR); olleBack.fullname = "example.avro.RootUsers"; map<string,any> fields1 = mapCreate(string,any); mapSet(fields1, "rootUser", "Allan"); mapSet(fields1, "privileges", (int)3); olleBack.fields = fields1; Avro.AvroRecordUDR nisseHult = udrCreate(Avro.AvroRecordUDR); nisseHult.fullname = "example.avro.RootUsers"; map<string,any> fields2 = mapCreate(string,any); mapSet(fields2, "rootUser", "Guran"); mapSet(fields2, "privileges", (int)2); nisseHult.fields = fields2; mapSet(rootUsers, "olleBack", olleBack); mapSet(rootUsers, "nisseHult", nisseHult); mapSet(fieldz, "rootUsers", rootUsers); // array_of_maps list<any> array_of_maps = listCreate(any); map<string,any> map1 = mapCreate(string,any); Avro.AvroRecordUDR Nyckel = udrCreate(Avro.AvroRecordUDR); map<string,any> nyckelFields = mapCreate(string,any); mapSet(nyckelFields,"favoriteUser", "Sture D.Y."); mapSet(nyckelFields,"favoriteNumber", (int)10); Nyckel.fullname = "example.avro.some_record"; Nyckel.fields = nyckelFields; mapSet(map1, "Nyckel", Nyckel); listAdd(array_of_maps, map1); map<string,any> map2 = mapCreate(string,any); Avro.AvroRecordUDR Nyckel2 = udrCreate(Avro.AvroRecordUDR); map<string,any> nyckel2Fields = mapCreate(string,any); mapSet(nyckel2Fields,"favoriteUser", "Sture D.A."); mapSet(nyckel2Fields,"favoriteNumber", (int)12); Nyckel2.fullname = "example.avro.some_record"; Nyckel2.fields = nyckel2Fields; mapSet(map2, "Nyckel2", Nyckel2); listAdd(array_of_maps, map2); mapSet(fieldz, "array_of_maps", array_of_maps); // debug("--------------- fields ------------------"); // debug(fieldz); // debug("--------------- fields ------------------"); record.fields = fieldz; debug("-------------- record -------------------"); debug(record); debug("-------------- record -------------------"); return record; }