Avro Decoding/Encoding Functions

The functions described in this section are used to decode binary encoded Avro messages.

The following functions for AVRO Decoding/Encoding described here are:

avroInitialize

This function initialises a schema registry provider that is used to obtain schemas used in avroDeserialize and avroSerialize functions

void avroInitialize(string schemaRegistry, string schemaField)
ParameterDescription

schemaRegistry

The binary encoded Avro message to decode

schemaField

A DynamicAvro UDR 

Returns

avroDeserialize

This function decodes a binary encoded Avro message to a DynamicAvro UDR 


DynamicAvro avroDeserialize ( AvroDecoderUDR avroDecoderUDR)
ParameterDescription

avroDecoderUDR

AvroDecoderUDR containing a bytearray with a binary encoded Avro message to decode

Returns

A DynamicAvro UDR containing decoded message 

Note

avroDeserialize function has to be used inside try-catch block if the exception handling is required

Example - Example - Decoding AVRO binary encoded message

import apl.Avro;

initialize
{
  string schemaRegistry = "http://localhost:8081/schemas/ids";
  string schemaField = "schema";
  avroInitialize(schemaRegistry, schemaField); 
}

consume {
	//This example assumes that the input of an analysis agent is a bytearray
	//representing complete Avro message
    debug(input);
    Avro.AvroDecoderUDR decoder = udrCreate(Avro.AvroDecoderUDR);
    decoder.readerSchemaID = "5";
    decoder.writerSchemaID = "5";
    decoder.data = input;
    Avro.DynamicAvro decodedAvro = (Avro.DynamicAvro)Avro.avroDeserialize(decoder);
    debug(decodedAvro);
}

avroSerialize

This function encodes a schema defined structure using binary Avro encoder

bytearray avroSerialize ( AvroEncoderUDR avroEncoderUDR )   

Note

avroSerialize function has to be used inside try-catch block if the exception handling is required

ParameterDescription

avroEncoderUDR

AvroEncoderUDR containing data to encode and specifying selected schemaID (data must match selected schema)

Returns

binary encoded Avro message

Example - Encoding UDR to AVRO

 APL:

import apl.Avro;
 
string writerSchemaID = "5"; // assumes the correct registered schema is "5"

initialize
{
  string schemaRegistry = "http://localhost:8081/schemas/ids";
  string schemaField = "schema";
  avroInitialize(schemaRegistry, schemaField);
}
 
consume {
    Avro.AvroRecordUDR record = create_output();
    Avro.AvroEncoderUDR encode_UDR = udrCreate(Avro.AvroEncoderUDR);
    encode_UDR.data = record;
    encode_UDR.writerSchemaID = writerSchemaID;
    bytearray serializedPayload = Avro.avroSerialize(encode_UDR);
}
 
 
Avro.AvroRecordUDR create_output(){
 
  Avro.AvroRecordUDR record = udrCreate(Avro.AvroRecordUDR);
  record.fullname = "example.avro.User3";
  map<string,any> fieldz = mapCreate(string,any);
   
  //  name
  Avro.AvroRecordUDR name = udrCreate(Avro.AvroRecordUDR);
      map<string,any> namefields = mapCreate(string,any);
      mapSet(namefields, "firstName", "");
      mapSet(namefields, "lastName", "Kula");
      name.fullname = "example.avro.FullName2";
      name.fields = namefields;
  mapSet(fieldz, "name", name);
 
  //  favorite number
  mapSet(fieldz, "favorite_number", (int)1632); 
  // colour
  mapSet(fieldz, "favorite_color", "Green");
      // football team (spelled with one "o" in schema)
      Avro.AvroEnumUDR favorite_fotball_team = udrCreate(Avro.AvroEnumUDR);
      favorite_fotball_team.fullname = "example.avro.teams.teams";
      favorite_fotball_team.symbol = "Djurgarden";
  mapSet(fieldz, "favorite_fotball_team", favorite_fotball_team);
   
  // IP address
  list<any> ipAddresslist = listCreate(any);
      Avro.AvroFixedUDR ipv4Address = udrCreate(Avro.AvroFixedUDR);
      ipv4Address.fullname = "example.avro.ipv4Address";
      ipv4Address.bytes = baCreateFromHexString("00ABCDEF");
      listAdd(ipAddresslist, ipv4Address);
      Avro.AvroFixedUDR ipv6Address = udrCreate(Avro.AvroFixedUDR);
      ipv6Address.fullname = "example.avro.ipv6Address";
      ipv6Address.bytes = baCreateFromHexString("00ABCDEFEFEFEFEFEFEFEFEFEFEFEFEF");
      listAdd(ipAddresslist, ipv6Address);
  mapSet(fieldz, "ipAddresses", ipAddresslist);
 
  //  favoriteFoodList
  Avro.AvroRecordUDR favoriteFoodList = udrCreate(Avro.AvroRecordUDR);
    Avro.AvroRecordUDR dish1 = udrCreate(Avro.AvroRecordUDR);
        Avro.AvroRecordUDR dish2 = udrCreate(Avro.AvroRecordUDR);
            Avro.AvroRecordUDR dish3 = udrCreate(Avro.AvroRecordUDR);
            map<string,any> dish3Map = mapCreate(string,any);
            mapSet(dish3Map, "dish", "Brown beans");
            mapSet(dish3Map, "next", null);
            dish3.fields = dish3Map;
            dish3.fullname = "example.avro.favoriteFood";
        map<string,any> dish2Map = mapCreate(string,any);
        mapSet(dish2Map, "dish", "Pannkakor");
        mapSet(dish2Map, "next", dish3);
        dish2.fields = dish2Map;
        dish2.fullname = "example.avro.favoriteFood";
    map<string,any> dish1Map = mapCreate(string,any);
    mapSet(dish1Map, "dish", "Ramen");
    mapSet(dish1Map, "next", dish2);
    dish1.fields = dish1Map;
    dish1.fullname = "example.avro.favoriteFood";
    favoriteFoodList.fullname = "example.avro.favoriteFood";
    favoriteFoodList.fields = dish1Map;
  mapSet(fieldz, "favoriteFoodList", favoriteFoodList);
 
  // salary
  mapSet(fieldz, "salary", 347857486343);
   
  // myFixed
  Avro.AvroFixedUDR myFixed = udrCreate(Avro.AvroFixedUDR);
      bytearray myFixed_bytes = baCreateFromHexString("DEADBEEF");
      myFixed.bytes = myFixed_bytes;
      myFixed.fullname = "example.avro.myfixed";
  mapSet(fieldz, "myFixed", myFixed);
 
  // myFloat
  mapSet(fieldz, "myFloat", (float)0.4);
 
  // myDouble
  mapSet(fieldz, "myDouble", (double)0.5);
 
  // RootUsers
  map<string,any> rootUsers = mapCreate(string,any);
      Avro.AvroRecordUDR olleBack = udrCreate(Avro.AvroRecordUDR);
      olleBack.fullname = "example.avro.RootUsers";
      map<string,any> fields1 = mapCreate(string,any);
      mapSet(fields1, "rootUser", "Allan");
      mapSet(fields1, "privileges", (int)3);
      olleBack.fields = fields1;
     
      Avro.AvroRecordUDR nisseHult = udrCreate(Avro.AvroRecordUDR);
      nisseHult.fullname = "example.avro.RootUsers";
      map<string,any> fields2 = mapCreate(string,any);
      mapSet(fields2, "rootUser", "Guran");
      mapSet(fields2, "privileges", (int)2);
      nisseHult.fields = fields2;
     
      mapSet(rootUsers, "olleBack", olleBack);
      mapSet(rootUsers, "nisseHult", nisseHult);
  mapSet(fieldz, "rootUsers", rootUsers);
   
  // array_of_maps
  list<any> array_of_maps = listCreate(any);
      map<string,any> map1 = mapCreate(string,any);
      Avro.AvroRecordUDR Nyckel = udrCreate(Avro.AvroRecordUDR);
      map<string,any> nyckelFields = mapCreate(string,any);
      mapSet(nyckelFields,"favoriteUser", "Sture D.Y.");
      mapSet(nyckelFields,"favoriteNumber", (int)10);
      Nyckel.fullname = "example.avro.some_record";
      Nyckel.fields = nyckelFields;
      mapSet(map1, "Nyckel", Nyckel);
      listAdd(array_of_maps, map1);
     
      map<string,any> map2 = mapCreate(string,any);
      Avro.AvroRecordUDR Nyckel2 = udrCreate(Avro.AvroRecordUDR);
      map<string,any> nyckel2Fields = mapCreate(string,any);
      mapSet(nyckel2Fields,"favoriteUser", "Sture D.A.");
      mapSet(nyckel2Fields,"favoriteNumber", (int)12);
      Nyckel2.fullname = "example.avro.some_record";
      Nyckel2.fields = nyckel2Fields;
      mapSet(map2, "Nyckel2", Nyckel2);
      listAdd(array_of_maps, map2);
  mapSet(fieldz, "array_of_maps", array_of_maps);
   
  // debug("--------------- fields ------------------");
  // debug(fieldz);
  // debug("--------------- fields ------------------");
 
  record.fields = fieldz;
  debug("-------------- record -------------------");
  debug(record);
  debug("-------------- record -------------------");
 
  return record;
}