Avro Decoding/Encoding Functions
The functions described in this section are used to decode binary encoded Avro messages.
The following functions for AVRO Decoding/Encoding described here are:
avroInitialize
This function initialises a schema registry provider that is used to obtain schemas used in avroDeserialize and avroSerialize functions
void avroInitialize(string schemaRegistry, string schemaField)
Parameter | Description |
---|---|
| The binary encoded Avro message to decode |
schemaField | A DynamicAvro UDR |
Returns |
avroDeserialize
This function decodes a binary encoded Avro message to a DynamicAvro UDR
DynamicAvro avroDeserialize ( AvroDecoderUDR avroDecoderUDR)
Parameter | Description |
---|---|
avroDecoderUDR | AvroDecoderUDR containing a bytearray with a binary encoded Avro message to decode |
Returns | A DynamicAvro UDR containing decoded message |
Note
avroDeserialize function has to be used inside try-catch block if the exception handling is required
Example - Example - Decoding AVRO binary encoded message
import apl.Avro; initialize { string schemaRegistry = "http://localhost:8081/schemas/ids"; string schemaField = "schema"; avroInitialize(schemaRegistry, schemaField); } consume { //This example assumes that the input of an analysis agent is a bytearray //representing complete Avro message debug(input); Avro.AvroDecoderUDR decoder = udrCreate(Avro.AvroDecoderUDR); decoder.readerSchemaID = "5"; decoder.writerSchemaID = "5"; decoder.data = input; Avro.DynamicAvro decodedAvro = (Avro.DynamicAvro)Avro.avroDeserialize(decoder); debug(decodedAvro); }
avroSerialize
This function encodes a schema defined structure using binary Avro encoder
bytearray avroSerialize ( AvroEncoderUDR avroEncoderUDR )
Note
avroSerialize function has to be used inside try-catch block if the exception handling is required
Parameter | Description |
---|---|
avroEncoderUDR | AvroEncoderUDR containing data to encode and specifying selected schemaID (data must match selected schema) |
Returns | binary encoded Avro message |
Example - Encoding UDR to AVRO
APL:
import apl.Avro; string writerSchemaID = "5"; // assumes the correct registered schema is "5" initialize { string schemaRegistry = "http://localhost:8081/schemas/ids"; string schemaField = "schema"; avroInitialize(schemaRegistry, schemaField); } consume { Avro.AvroRecordUDR record = create_output(); Avro.AvroEncoderUDR encode_UDR = udrCreate(Avro.AvroEncoderUDR); encode_UDR.data = record; encode_UDR.writerSchemaID = writerSchemaID; bytearray serializedPayload = Avro.avroSerialize(encode_UDR); } Avro.AvroRecordUDR create_output(){ Avro.AvroRecordUDR record = udrCreate(Avro.AvroRecordUDR); record.fullname = "example.avro.User3"; map<string,any> fieldz = mapCreate(string,any); // name Avro.AvroRecordUDR name = udrCreate(Avro.AvroRecordUDR); map<string,any> namefields = mapCreate(string,any); mapSet(namefields, "firstName", ""); mapSet(namefields, "lastName", "Kula"); name.fullname = "example.avro.FullName2"; name.fields = namefields; mapSet(fieldz, "name", name); // favorite number mapSet(fieldz, "favorite_number", (int)1632); // colour mapSet(fieldz, "favorite_color", "Green"); // football team (spelled with one "o" in schema) Avro.AvroEnumUDR favorite_fotball_team = udrCreate(Avro.AvroEnumUDR); favorite_fotball_team.fullname = "example.avro.teams.teams"; favorite_fotball_team.symbol = "Djurgarden"; mapSet(fieldz, "favorite_fotball_team", favorite_fotball_team); // IP address list<any> ipAddresslist = listCreate(any); Avro.AvroFixedUDR ipv4Address = udrCreate(Avro.AvroFixedUDR); ipv4Address.fullname = "example.avro.ipv4Address"; ipv4Address.bytes = baCreateFromHexString("00ABCDEF"); listAdd(ipAddresslist, ipv4Address); Avro.AvroFixedUDR ipv6Address = udrCreate(Avro.AvroFixedUDR); ipv6Address.fullname = "example.avro.ipv6Address"; ipv6Address.bytes = baCreateFromHexString("00ABCDEFEFEFEFEFEFEFEFEFEFEFEFEF"); listAdd(ipAddresslist, ipv6Address); mapSet(fieldz, "ipAddresses", ipAddresslist); // favoriteFoodList Avro.AvroRecordUDR favoriteFoodList = udrCreate(Avro.AvroRecordUDR); Avro.AvroRecordUDR dish1 = udrCreate(Avro.AvroRecordUDR); Avro.AvroRecordUDR dish2 = udrCreate(Avro.AvroRecordUDR); Avro.AvroRecordUDR dish3 = udrCreate(Avro.AvroRecordUDR); map<string,any> dish3Map = mapCreate(string,any); mapSet(dish3Map, "dish", "Brown beans"); mapSet(dish3Map, "next", null); dish3.fields = dish3Map; dish3.fullname = "example.avro.favoriteFood"; map<string,any> dish2Map = mapCreate(string,any); mapSet(dish2Map, "dish", "Pannkakor"); mapSet(dish2Map, "next", dish3); dish2.fields = dish2Map; dish2.fullname = "example.avro.favoriteFood"; map<string,any> dish1Map = mapCreate(string,any); mapSet(dish1Map, "dish", "Ramen"); mapSet(dish1Map, "next", dish2); dish1.fields = dish1Map; dish1.fullname = "example.avro.favoriteFood"; favoriteFoodList.fullname = "example.avro.favoriteFood"; favoriteFoodList.fields = dish1Map; mapSet(fieldz, "favoriteFoodList", favoriteFoodList); // salary mapSet(fieldz, "salary", 347857486343); // myFixed Avro.AvroFixedUDR myFixed = udrCreate(Avro.AvroFixedUDR); bytearray myFixed_bytes = baCreateFromHexString("DEADBEEF"); myFixed.bytes = myFixed_bytes; myFixed.fullname = "example.avro.myfixed"; mapSet(fieldz, "myFixed", myFixed); // myFloat mapSet(fieldz, "myFloat", (float)0.4); // myDouble mapSet(fieldz, "myDouble", (double)0.5); // RootUsers map<string,any> rootUsers = mapCreate(string,any); Avro.AvroRecordUDR olleBack = udrCreate(Avro.AvroRecordUDR); olleBack.fullname = "example.avro.RootUsers"; map<string,any> fields1 = mapCreate(string,any); mapSet(fields1, "rootUser", "Allan"); mapSet(fields1, "privileges", (int)3); olleBack.fields = fields1; Avro.AvroRecordUDR nisseHult = udrCreate(Avro.AvroRecordUDR); nisseHult.fullname = "example.avro.RootUsers"; map<string,any> fields2 = mapCreate(string,any); mapSet(fields2, "rootUser", "Guran"); mapSet(fields2, "privileges", (int)2); nisseHult.fields = fields2; mapSet(rootUsers, "olleBack", olleBack); mapSet(rootUsers, "nisseHult", nisseHult); mapSet(fieldz, "rootUsers", rootUsers); // array_of_maps list<any> array_of_maps = listCreate(any); map<string,any> map1 = mapCreate(string,any); Avro.AvroRecordUDR Nyckel = udrCreate(Avro.AvroRecordUDR); map<string,any> nyckelFields = mapCreate(string,any); mapSet(nyckelFields,"favoriteUser", "Sture D.Y."); mapSet(nyckelFields,"favoriteNumber", (int)10); Nyckel.fullname = "example.avro.some_record"; Nyckel.fields = nyckelFields; mapSet(map1, "Nyckel", Nyckel); listAdd(array_of_maps, map1); map<string,any> map2 = mapCreate(string,any); Avro.AvroRecordUDR Nyckel2 = udrCreate(Avro.AvroRecordUDR); map<string,any> nyckel2Fields = mapCreate(string,any); mapSet(nyckel2Fields,"favoriteUser", "Sture D.A."); mapSet(nyckel2Fields,"favoriteNumber", (int)12); Nyckel2.fullname = "example.avro.some_record"; Nyckel2.fields = nyckel2Fields; mapSet(map2, "Nyckel2", Nyckel2); listAdd(array_of_maps, map2); mapSet(fieldz, "array_of_maps", array_of_maps); // debug("--------------- fields ------------------"); // debug(fieldz); // debug("--------------- fields ------------------"); record.fields = fieldz; debug("-------------- record -------------------"); debug(record); debug("-------------- record -------------------"); return record; }