Avro Decoder/Encoder Example(4.3)
This example shows how you can configure Avro Decoder/Encoder to be used together with a schema registry, providing Avro schema.
To read more about UDRs used in the example see Avro Types and UDRs(4.3)
Example configuration:
To simplify the setup, the configuration includes a workflow simulating a schema registry that returns schemas. See the workflow configuration for the schema content.
Disk_1
This agent collects files containing one complete binary encoded Avro message.
This example assumes that each file contains one Avro message.
You can use the following file as input for the Disk_1 agent
Analysis_1
This agent is responsible for creating an AvroDecoderUDR and filling in the necessary information that are required for Avro Decoder:
APL code:
consume {
debug(input);
//Create an udr that will be an input for the decoder
Avro.AvroDecoderUDR myUdr = udrCreate(Avro.AvroDecoderUDR);
//Binary encoded avro message read from a file
myUdr.data = input;
myUdr.readerSchemaID = “7”;
myUdr.writerSchemaID = “7”
debug(myUdr);
udrRoute(myUdr);
}
Decoder_1
This is an example of how you can configure an Avro Decoder and a schema registry.
Analysis_2 agent
This agent shows an example of how one can read a decoded Avro message and create an AvroEncoder
UDR that is an input for the Avro encoder.
Reading decoded Avro message is done by inspecting DynamicAvro UDR data field. The type of it depends on an Avro schema used for decoding. This examples shows how to access elements of different types.
Preparing data for encoding is done by populating AvroEncoder
UDR.
APL code:
consume {
//debug(input);
if (instanceOf(input, DecoderErrorUDR)) {
abort("Received DecoderErrorUDR");
} else {
DynamicAvro dynamicAvroUdr = (DynamicAvro)input;
AvroRecordUDR inp = (AvroRecordUDR)dynamicAvroUdr.data;
map<string,any> fields = (map<string,any>)inp.fields;
debug("MyDouble value: " +mapGet(fields, "myDouble"));
//debug(mapGet(fields, "rootUsers"));
//Get an enum
debug(mapGet(fields,"favorite_fotball_team"));
//Print the whole list
debug(mapGet(fields, "favoriteFoodList"));
//Get the first list element
debug(mapGet(fields, "favoriteFoodList").fields.dish);
//Preview the second list element
debug(mapGet(fields, "favoriteFoodList").fields.next);
//Preview the third list element
debug(mapGet(fields, "favoriteFoodList").fields.next.fields.next);
//Get the fourth element - null means no more elements
debug(mapGet(fields, "favoriteFoodList").fields.next.fields.next.fields.next);
//Get a record included in a record
AvroRecordUDR nameRecord = (AvroRecordUDR)mapGet(fields, "name");
//Get a value of one of the fields of the included record
debug(mapGet(nameRecord.fields, "lastName"));
//Get a list of maps, print first element
list<any> arrayOfMaps = (list<any>)(mapGet(fields, "array_of_maps"));
debug(listGet(arrayOfMaps,0));
//Part 2 - Preparing input for encoding
//Record as a top element of the schema
Avro.AvroRecordUDR record = udrCreate(Avro.AvroRecordUDR);
record.fullname = "example.avro.User3";
//Map to keep record's fields
map<string,any> fieldz = mapCreate(string,any);
// name
Avro.AvroRecordUDR name = udrCreate(Avro.AvroRecordUDR);
map<string,any> namefields = mapCreate(string,any);
mapSet(namefields, "firstName", "");
mapSet(namefields, "lastName", "Kula");
name.fullname = "example.avro.FullName2";
name.fields = namefields;
mapSet(fieldz, "name", name);
// favorite number
mapSet(fieldz, "favorite_number", (int)1632);
// colour
mapSet(fieldz, "favorite_color", "Green");
// football team (spelled with one "o" in schema)
Avro.AvroEnumUDR favorite_fotball_team = udrCreate(Avro.AvroEnumUDR);
favorite_fotball_team.fullname = "example.avro.teams.teams";
favorite_fotball_team.symbol = "Djurgarden";
mapSet(fieldz, "favorite_fotball_team", favorite_fotball_team);
// IP address
list<any> ipAddresslist = listCreate(any);
Avro.AvroFixedUDR ipv4Address = udrCreate(Avro.AvroFixedUDR);
ipv4Address.fullname = "example.avro.ipv4Address";
ipv4Address.bytes = baCreateFromHexString("00ABCDEF");
listAdd(ipAddresslist, ipv4Address);
Avro.AvroFixedUDR ipv6Address = udrCreate(Avro.AvroFixedUDR);
ipv6Address.fullname = "example.avro.ipv6Address";
ipv6Address.bytes = baCreateFromHexString("00ABCDEFEFEFEFEFEFEFEFEFEFEFEFEF");
listAdd(ipAddresslist, ipv6Address);
mapSet(fieldz, "ipAddresses", ipAddresslist);
// favoriteFoodList
Avro.AvroRecordUDR favoriteFoodList = udrCreate(Avro.AvroRecordUDR);
Avro.AvroRecordUDR dish1 = udrCreate(Avro.AvroRecordUDR);
Avro.AvroRecordUDR dish2 = udrCreate(Avro.AvroRecordUDR);
Avro.AvroRecordUDR dish3 = udrCreate(Avro.AvroRecordUDR);
map<string,any> dish3Map = mapCreate(string,any);
mapSet(dish3Map, "dish", "Brown beans");
mapSet(dish3Map, "next", null);
dish3.fields = dish3Map;
dish3.fullname = "example.avro.favoriteFood";
map<string,any> dish2Map = mapCreate(string,any);
mapSet(dish2Map, "dish", "Pannkakor");
mapSet(dish2Map, "next", dish3);
dish2.fields = dish2Map;
dish2.fullname = "example.avro.favoriteFood";
map<string,any> dish1Map = mapCreate(string,any);
mapSet(dish1Map, "dish", "Ramen");
mapSet(dish1Map, "next", dish2);
dish1.fields = dish1Map;
dish1.fullname = "example.avro.favoriteFood";
favoriteFoodList.fullname = "example.avro.favoriteFood";
favoriteFoodList.fields = dish1Map;
mapSet(fieldz, "favoriteFoodList", favoriteFoodList);
// salary
mapSet(fieldz, "salary", 347857486343);
// myFixed
Avro.AvroFixedUDR myFixed = udrCreate(Avro.AvroFixedUDR);
bytearray myFixed_bytes = baCreateFromHexString("DEADBEEF");
myFixed.bytes = myFixed_bytes;
myFixed.fullname = "example.avro.myfixed";
mapSet(fieldz, "myFixed", myFixed);
// myFloat
mapSet(fieldz, "myFloat", (float)0.4);
// myDouble
mapSet(fieldz, "myDouble", (double)0.5);
// RootUsers
map<string,any> rootUsers = mapCreate(string,any);
Avro.AvroRecordUDR olleBack = udrCreate(Avro.AvroRecordUDR);
olleBack.fullname = "example.avro.RootUsers";
map<string,any> fields1 = mapCreate(string,any);
mapSet(fields1, "rootUser", "Allan");
mapSet(fields1, "privileges", (int)3);
olleBack.fields = fields1;
Avro.AvroRecordUDR nisseHult = udrCreate(Avro.AvroRecordUDR);
nisseHult.fullname = "example.avro.RootUsers";
map<string,any> fields2 = mapCreate(string,any);
mapSet(fields2, "rootUser", "Guran");
mapSet(fields2, "privileges", (int)2);
nisseHult.fields = fields2;
mapSet(rootUsers, "olleBack", olleBack);
mapSet(rootUsers, "nisseHult", nisseHult);
mapSet(fieldz, "rootUsers", rootUsers);
// array_of_maps
list<any> array_of_maps = listCreate(any);
map<string,any> map1 = mapCreate(string,any);
Avro.AvroRecordUDR Nyckel = udrCreate(Avro.AvroRecordUDR);
map<string,any> nyckelFields = mapCreate(string,any);
mapSet(nyckelFields,"favoriteUser", "Sture D.Y.");
mapSet(nyckelFields,"favoriteNumber", (int)10);
Nyckel.fullname = "example.avro.some_record";
Nyckel.fields = nyckelFields;
mapSet(map1, "Nyckel", Nyckel);
listAdd(array_of_maps, map1);
map<string,any> map2 = mapCreate(string,any);
Avro.AvroRecordUDR Nyckel2 = udrCreate(Avro.AvroRecordUDR);
map<string,any> nyckel2Fields = mapCreate(string,any);
mapSet(nyckel2Fields,"favoriteUser", "Sture D.A.");
mapSet(nyckel2Fields,"favoriteNumber", (int)12);
Nyckel2.fullname = "example.avro.some_record";
Nyckel2.fields = nyckel2Fields;
mapSet(map2, "Nyckel2", Nyckel2);
listAdd(array_of_maps, map2);
mapSet(fieldz, "array_of_maps", array_of_maps);
record.fields = fieldz;
debug("-------------- record -------------------");
debug(record);
debug("-------------- record -------------------");
Avro.AvroEncoderUDR encode = udrCreate(Avro.AvroEncoderUDR);
encode.data = record;
encode.writerSchemaID = "7";
//debug(encode);
udrRoute(encode);
}
}
Encoder_1
This is an example of how you can configure an Avro Encoder and a schema registry to be used.