Avro Decoding/Encoding Functions
The functions described in this section are used to decode binary encoded Avro messages.
The following functions for AVRO Decoding/Encoding described here are:
avroInitialize
This function initialises a schema registry provider that is used to obtain schemas used in avroDeserialize and avroSerialize functions
void avroInitialize(string schemaRegistry, string schemaField)Parameter | Description |
|---|---|
| The binary encoded Avro message to decode |
schemaField | A DynamicAvro UDR |
Returns |
avroDeserialize
This function decodes a binary encoded Avro message to a DynamicAvro UDR
DynamicAvro avroDeserialize ( AvroDecoderUDR avroDecoderUDR)Parameter | Description |
|---|---|
avroDecoderUDR | AvroDecoderUDR containing a bytearray with a binary encoded Avro message to decode |
Returns | A DynamicAvro UDR containing decoded message |
Note
avroDeserialize function has to be used inside try-catch block if the exception handling is required
Example - Example - Decoding AVRO binary encoded message
import apl.Avro;
initialize
{
string schemaRegistry = "http://localhost:8081/schemas/ids";
string schemaField = "schema";
avroInitialize(schemaRegistry, schemaField);
}
consume {
//This example assumes that the input of an analysis agent is a bytearray
//representing complete Avro message
debug(input);
Avro.AvroDecoderUDR decoder = udrCreate(Avro.AvroDecoderUDR);
decoder.readerSchemaID = "5";
decoder.writerSchemaID = "5";
decoder.data = input;
Avro.DynamicAvro decodedAvro = (Avro.DynamicAvro)Avro.avroDeserialize(decoder);
debug(decodedAvro);
}avroSerialize
This function encodes a schema defined structure using binary Avro encoder
bytearray avroSerialize ( AvroEncoderUDR avroEncoderUDR ) Note
avroSerialize function has to be used inside try-catch block if the exception handling is required
Parameter | Description |
|---|---|
avroEncoderUDR | AvroEncoderUDR containing data to encode and specifying selected schemaID (data must match selected schema) |
Returns | binary encoded Avro message |
Example - Encoding UDR to AVRO
APL:
import apl.Avro;
string writerSchemaID = "5"; // assumes the correct registered schema is "5"
initialize
{
string schemaRegistry = "http://localhost:8081/schemas/ids";
string schemaField = "schema";
avroInitialize(schemaRegistry, schemaField);
}
consume {
Avro.AvroRecordUDR record = create_output();
Avro.AvroEncoderUDR encode_UDR = udrCreate(Avro.AvroEncoderUDR);
encode_UDR.data = record;
encode_UDR.writerSchemaID = writerSchemaID;
bytearray serializedPayload = Avro.avroSerialize(encode_UDR);
}
Avro.AvroRecordUDR create_output(){
Avro.AvroRecordUDR record = udrCreate(Avro.AvroRecordUDR);
record.fullname = "example.avro.User3";
map<string,any> fieldz = mapCreate(string,any);
// name
Avro.AvroRecordUDR name = udrCreate(Avro.AvroRecordUDR);
map<string,any> namefields = mapCreate(string,any);
mapSet(namefields, "firstName", "");
mapSet(namefields, "lastName", "Kula");
name.fullname = "example.avro.FullName2";
name.fields = namefields;
mapSet(fieldz, "name", name);
// favorite number
mapSet(fieldz, "favorite_number", (int)1632);
// colour
mapSet(fieldz, "favorite_color", "Green");
// football team (spelled with one "o" in schema)
Avro.AvroEnumUDR favorite_fotball_team = udrCreate(Avro.AvroEnumUDR);
favorite_fotball_team.fullname = "example.avro.teams.teams";
favorite_fotball_team.symbol = "Djurgarden";
mapSet(fieldz, "favorite_fotball_team", favorite_fotball_team);
// IP address
list<any> ipAddresslist = listCreate(any);
Avro.AvroFixedUDR ipv4Address = udrCreate(Avro.AvroFixedUDR);
ipv4Address.fullname = "example.avro.ipv4Address";
ipv4Address.bytes = baCreateFromHexString("00ABCDEF");
listAdd(ipAddresslist, ipv4Address);
Avro.AvroFixedUDR ipv6Address = udrCreate(Avro.AvroFixedUDR);
ipv6Address.fullname = "example.avro.ipv6Address";
ipv6Address.bytes = baCreateFromHexString("00ABCDEFEFEFEFEFEFEFEFEFEFEFEFEF");
listAdd(ipAddresslist, ipv6Address);
mapSet(fieldz, "ipAddresses", ipAddresslist);
// favoriteFoodList
Avro.AvroRecordUDR favoriteFoodList = udrCreate(Avro.AvroRecordUDR);
Avro.AvroRecordUDR dish1 = udrCreate(Avro.AvroRecordUDR);
Avro.AvroRecordUDR dish2 = udrCreate(Avro.AvroRecordUDR);
Avro.AvroRecordUDR dish3 = udrCreate(Avro.AvroRecordUDR);
map<string,any> dish3Map = mapCreate(string,any);
mapSet(dish3Map, "dish", "Brown beans");
mapSet(dish3Map, "next", null);
dish3.fields = dish3Map;
dish3.fullname = "example.avro.favoriteFood";
map<string,any> dish2Map = mapCreate(string,any);
mapSet(dish2Map, "dish", "Pannkakor");
mapSet(dish2Map, "next", dish3);
dish2.fields = dish2Map;
dish2.fullname = "example.avro.favoriteFood";
map<string,any> dish1Map = mapCreate(string,any);
mapSet(dish1Map, "dish", "Ramen");
mapSet(dish1Map, "next", dish2);
dish1.fields = dish1Map;
dish1.fullname = "example.avro.favoriteFood";
favoriteFoodList.fullname = "example.avro.favoriteFood";
favoriteFoodList.fields = dish1Map;
mapSet(fieldz, "favoriteFoodList", favoriteFoodList);
// salary
mapSet(fieldz, "salary", 347857486343);
// myFixed
Avro.AvroFixedUDR myFixed = udrCreate(Avro.AvroFixedUDR);
bytearray myFixed_bytes = baCreateFromHexString("DEADBEEF");
myFixed.bytes = myFixed_bytes;
myFixed.fullname = "example.avro.myfixed";
mapSet(fieldz, "myFixed", myFixed);
// myFloat
mapSet(fieldz, "myFloat", (float)0.4);
// myDouble
mapSet(fieldz, "myDouble", (double)0.5);
// RootUsers
map<string,any> rootUsers = mapCreate(string,any);
Avro.AvroRecordUDR olleBack = udrCreate(Avro.AvroRecordUDR);
olleBack.fullname = "example.avro.RootUsers";
map<string,any> fields1 = mapCreate(string,any);
mapSet(fields1, "rootUser", "Allan");
mapSet(fields1, "privileges", (int)3);
olleBack.fields = fields1;
Avro.AvroRecordUDR nisseHult = udrCreate(Avro.AvroRecordUDR);
nisseHult.fullname = "example.avro.RootUsers";
map<string,any> fields2 = mapCreate(string,any);
mapSet(fields2, "rootUser", "Guran");
mapSet(fields2, "privileges", (int)2);
nisseHult.fields = fields2;
mapSet(rootUsers, "olleBack", olleBack);
mapSet(rootUsers, "nisseHult", nisseHult);
mapSet(fieldz, "rootUsers", rootUsers);
// array_of_maps
list<any> array_of_maps = listCreate(any);
map<string,any> map1 = mapCreate(string,any);
Avro.AvroRecordUDR Nyckel = udrCreate(Avro.AvroRecordUDR);
map<string,any> nyckelFields = mapCreate(string,any);
mapSet(nyckelFields,"favoriteUser", "Sture D.Y.");
mapSet(nyckelFields,"favoriteNumber", (int)10);
Nyckel.fullname = "example.avro.some_record";
Nyckel.fields = nyckelFields;
mapSet(map1, "Nyckel", Nyckel);
listAdd(array_of_maps, map1);
map<string,any> map2 = mapCreate(string,any);
Avro.AvroRecordUDR Nyckel2 = udrCreate(Avro.AvroRecordUDR);
map<string,any> nyckel2Fields = mapCreate(string,any);
mapSet(nyckel2Fields,"favoriteUser", "Sture D.A.");
mapSet(nyckel2Fields,"favoriteNumber", (int)12);
Nyckel2.fullname = "example.avro.some_record";
Nyckel2.fields = nyckel2Fields;
mapSet(map2, "Nyckel2", Nyckel2);
listAdd(array_of_maps, map2);
mapSet(fieldz, "array_of_maps", array_of_maps);
// debug("--------------- fields ------------------");
// debug(fieldz);
// debug("--------------- fields ------------------");
record.fields = fieldz;
debug("-------------- record -------------------");
debug(record);
debug("-------------- record -------------------");
return record;
}