Avro Decoder Example

This example shows how you can configure Avro Decoder/Encoder to be used together with a schema registry, providing Avro schema.

To read more about UDRs used in the example see Avro Types and UDRs

Example configuration:

To simplify the setup, the configuration includes a workflow simulating a schema register that returns schemas. See the workflow configuration for the schema content.

Screenshot 2024-04-04 at 09.16.29.png

 

Disk_1

This agent collects files containing one complete binary encoded Avro message.
This example assumes that each file contains one Avro message.

You can use the following file as input for the Disk_1 agent

Analysis_1

This agent is responsible for creating an AvroDecoderUDR and filling in the necessary information that are required for Avro Decoder:

APL code:

consume { debug(input); //Create an udr that will be an input for the decoder Avro.AvroDecoderUDR myUdr = udrCreate(Avro.AvroDecoderUDR); //Binary encoded avro message read from a file myUdr.data = input; myUdr.readerSchemaID = “7”; myUdr.writerSchemaID = “7” debug(myUdr); udrRoute(myUdr); }

 

Decoder_1

This is an example of how you can configure an Avro Decoder and a schema register.

Screenshot 2024-04-11 at 09.51.53.png

 

Analysis_2 agent

This agent shows an example of how one can read a decoded Avro message and create an AvroEncoder UDR that is an input for the Avro encoder.

Reading decoded Avro message is done by inspecting DynamicAvro UDR data field. The type of it depends on an Avro schema used for decoding. This examples shows how to access elements of different types.

Preparing data for encoding is done by populating AvroEncoder UDR.

APL code:

consume { //debug(input); if (instanceOf(input, DecoderErrorUDR)) { abort("Received DecoderErrorUDR"); } else { DynamicAvro dynamicAvroUdr = (DynamicAvro)input; AvroRecordUDR inp = (AvroRecordUDR)dynamicAvroUdr.data; map<string,any> fields = (map<string,any>)inp.fields; debug("MyDouble value: " +mapGet(fields, "myDouble")); //debug(mapGet(fields, "rootUsers")); //Get an enum debug(mapGet(fields,"favorite_fotball_team")); //Print the whole list debug(mapGet(fields, "favoriteFoodList")); //Get the first list element debug(mapGet(fields, "favoriteFoodList").fields.dish); //Preview the second list element debug(mapGet(fields, "favoriteFoodList").fields.next); //Preview the third list element debug(mapGet(fields, "favoriteFoodList").fields.next.fields.next); //Get the fourth element - null means no more elements debug(mapGet(fields, "favoriteFoodList").fields.next.fields.next.fields.next); //Get a record included in a record AvroRecordUDR nameRecord = (AvroRecordUDR)mapGet(fields, "name"); //Get a value of one of the fields of the included record debug(mapGet(nameRecord.fields, "lastName")); //Get a list of maps, print first element list<any> arrayOfMaps = (list<any>)(mapGet(fields, "array_of_maps")); debug(listGet(arrayOfMaps,0)); //Part 2 - Preparing input for encoding //Record as a top element of the schema Avro.AvroRecordUDR record = udrCreate(Avro.AvroRecordUDR); record.fullname = "example.avro.User3"; //Map to keep record's fields map<string,any> fieldz = mapCreate(string,any); // name Avro.AvroRecordUDR name = udrCreate(Avro.AvroRecordUDR); map<string,any> namefields = mapCreate(string,any); mapSet(namefields, "firstName", ""); mapSet(namefields, "lastName", "Kula"); name.fullname = "example.avro.FullName2"; name.fields = namefields; mapSet(fieldz, "name", name); // favorite number mapSet(fieldz, "favorite_number", (int)1632); // colour mapSet(fieldz, "favorite_color", "Green"); // football team (spelled with one "o" in schema) Avro.AvroEnumUDR favorite_fotball_team = udrCreate(Avro.AvroEnumUDR); favorite_fotball_team.fullname = "example.avro.teams.teams"; favorite_fotball_team.symbol = "Djurgarden"; mapSet(fieldz, "favorite_fotball_team", favorite_fotball_team); // IP address list<any> ipAddresslist = listCreate(any); Avro.AvroFixedUDR ipv4Address = udrCreate(Avro.AvroFixedUDR); ipv4Address.fullname = "example.avro.ipv4Address"; ipv4Address.bytes = baCreateFromHexString("00ABCDEF"); listAdd(ipAddresslist, ipv4Address); Avro.AvroFixedUDR ipv6Address = udrCreate(Avro.AvroFixedUDR); ipv6Address.fullname = "example.avro.ipv6Address"; ipv6Address.bytes = baCreateFromHexString("00ABCDEFEFEFEFEFEFEFEFEFEFEFEFEF"); listAdd(ipAddresslist, ipv6Address); mapSet(fieldz, "ipAddresses", ipAddresslist); // favoriteFoodList Avro.AvroRecordUDR favoriteFoodList = udrCreate(Avro.AvroRecordUDR); Avro.AvroRecordUDR dish1 = udrCreate(Avro.AvroRecordUDR); Avro.AvroRecordUDR dish2 = udrCreate(Avro.AvroRecordUDR); Avro.AvroRecordUDR dish3 = udrCreate(Avro.AvroRecordUDR); map<string,any> dish3Map = mapCreate(string,any); mapSet(dish3Map, "dish", "Brown beans"); mapSet(dish3Map, "next", null); dish3.fields = dish3Map; dish3.fullname = "example.avro.favoriteFood"; map<string,any> dish2Map = mapCreate(string,any); mapSet(dish2Map, "dish", "Pannkakor"); mapSet(dish2Map, "next", dish3); dish2.fields = dish2Map; dish2.fullname = "example.avro.favoriteFood"; map<string,any> dish1Map = mapCreate(string,any); mapSet(dish1Map, "dish", "Ramen"); mapSet(dish1Map, "next", dish2); dish1.fields = dish1Map; dish1.fullname = "example.avro.favoriteFood"; favoriteFoodList.fullname = "example.avro.favoriteFood"; favoriteFoodList.fields = dish1Map; mapSet(fieldz, "favoriteFoodList", favoriteFoodList); // salary mapSet(fieldz, "salary", 347857486343); // myFixed Avro.AvroFixedUDR myFixed = udrCreate(Avro.AvroFixedUDR); bytearray myFixed_bytes = baCreateFromHexString("DEADBEEF"); myFixed.bytes = myFixed_bytes; myFixed.fullname = "example.avro.myfixed"; mapSet(fieldz, "myFixed", myFixed); // myFloat mapSet(fieldz, "myFloat", (float)0.4); // myDouble mapSet(fieldz, "myDouble", (double)0.5); // RootUsers map<string,any> rootUsers = mapCreate(string,any); Avro.AvroRecordUDR olleBack = udrCreate(Avro.AvroRecordUDR); olleBack.fullname = "example.avro.RootUsers"; map<string,any> fields1 = mapCreate(string,any); mapSet(fields1, "rootUser", "Allan"); mapSet(fields1, "privileges", (int)3); olleBack.fields = fields1; Avro.AvroRecordUDR nisseHult = udrCreate(Avro.AvroRecordUDR); nisseHult.fullname = "example.avro.RootUsers"; map<string,any> fields2 = mapCreate(string,any); mapSet(fields2, "rootUser", "Guran"); mapSet(fields2, "privileges", (int)2); nisseHult.fields = fields2; mapSet(rootUsers, "olleBack", olleBack); mapSet(rootUsers, "nisseHult", nisseHult); mapSet(fieldz, "rootUsers", rootUsers); // array_of_maps list<any> array_of_maps = listCreate(any); map<string,any> map1 = mapCreate(string,any); Avro.AvroRecordUDR Nyckel = udrCreate(Avro.AvroRecordUDR); map<string,any> nyckelFields = mapCreate(string,any); mapSet(nyckelFields,"favoriteUser", "Sture D.Y."); mapSet(nyckelFields,"favoriteNumber", (int)10); Nyckel.fullname = "example.avro.some_record"; Nyckel.fields = nyckelFields; mapSet(map1, "Nyckel", Nyckel); listAdd(array_of_maps, map1); map<string,any> map2 = mapCreate(string,any); Avro.AvroRecordUDR Nyckel2 = udrCreate(Avro.AvroRecordUDR); map<string,any> nyckel2Fields = mapCreate(string,any); mapSet(nyckel2Fields,"favoriteUser", "Sture D.A."); mapSet(nyckel2Fields,"favoriteNumber", (int)12); Nyckel2.fullname = "example.avro.some_record"; Nyckel2.fields = nyckel2Fields; mapSet(map2, "Nyckel2", Nyckel2); listAdd(array_of_maps, map2); mapSet(fieldz, "array_of_maps", array_of_maps); record.fields = fieldz; debug("-------------- record -------------------"); debug(record); debug("-------------- record -------------------"); Avro.AvroEncoderUDR encode = udrCreate(Avro.AvroEncoderUDR); encode.data = record; encode.writerSchemaID = "7"; //debug(encode); udrRoute(encode); } }

 

Encoder_1

This is an example of how you can configure an Avro Encoder and a schema register to be used.