Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Current »

This example shows how you can configure Avro Decoder/Encoder to be used together with a schema registry, providing Avro schema.

To read more about UDRs used in the example see Avro Types and UDRs

Example configuration:

To simplify the setup, the configuration includes a workflow simulating a schema register that returns schemas. See the workflow configuration for the schema content.

Screenshot 2024-04-04 at 09.16.29.png

Disk_1

This agent collects files containing one complete binary encoded Avro message.
This example assumes that each file contains one Avro message.

You can use the following file as input for the Disk_1 agent

Analysis_1

This agent is responsible for creating an AvroDecoderUDR and filling in the necessary information that are required for Avro Decoder:

APL code:

consume {
    debug(input);

//Create an udr that will be an input for the decoder
    Avro.AvroDecoderUDR myUdr = udrCreate(Avro.AvroDecoderUDR);

//Binary encoded avro message read from a file
    myUdr.data = input;
    myUdr.readerSchemaID = “7”;
    myUdr.writerSchemaID = “7” 
    debug(myUdr);
    udrRoute(myUdr);
}

Decoder_1

This is an example of how you can configure an Avro Decoder and a schema register.

Screenshot 2024-04-11 at 09.51.53.png

Analysis_2 agent

This agent shows an example of how one can read a decoded Avro message and create an AvroEncoder UDR that is an input for the Avro encoder.

Reading decoded Avro message is done by inspecting DynamicAvro UDR data field. The type of it depends on an Avro schema used for decoding. This examples shows how to access elements of different types.

Preparing data for encoding is done by populating AvroEncoder UDR.

APL code:

consume {
   //debug(input);
    if (instanceOf(input, DecoderErrorUDR)) {
        abort("Received DecoderErrorUDR");
    } else {
        
        DynamicAvro dynamicAvroUdr = (DynamicAvro)input;
        AvroRecordUDR inp = (AvroRecordUDR)dynamicAvroUdr.data;
        map<string,any> fields = (map<string,any>)inp.fields;
        debug("MyDouble value: " +mapGet(fields, "myDouble"));
        //debug(mapGet(fields, "rootUsers"));
        
        //Get an enum 
        debug(mapGet(fields,"favorite_fotball_team"));
        
        //Print the whole list
        debug(mapGet(fields, "favoriteFoodList"));
        //Get the first list element
        debug(mapGet(fields, "favoriteFoodList").fields.dish);
        //Preview the second list element
        debug(mapGet(fields, "favoriteFoodList").fields.next);
        //Preview the third list element
        debug(mapGet(fields, "favoriteFoodList").fields.next.fields.next);
        //Get the fourth element - null means no more elements
        debug(mapGet(fields, "favoriteFoodList").fields.next.fields.next.fields.next);
        
        //Get a record included in a record
        AvroRecordUDR nameRecord = (AvroRecordUDR)mapGet(fields, "name");
        //Get a value of one of the fields of the included record
        debug(mapGet(nameRecord.fields, "lastName"));
        
        //Get a list of maps, print first element
        list<any> arrayOfMaps = (list<any>)(mapGet(fields, "array_of_maps"));
        debug(listGet(arrayOfMaps,0));
        
        //Part 2 - Preparing input for encoding
        
        //Record as a top element of the schema
        Avro.AvroRecordUDR record = udrCreate(Avro.AvroRecordUDR);
        record.fullname = "example.avro.User3";
        //Map to keep record's fields
        map<string,any> fieldz = mapCreate(string,any);
  
        //  name
        Avro.AvroRecordUDR name = udrCreate(Avro.AvroRecordUDR);
        map<string,any> namefields = mapCreate(string,any);
        mapSet(namefields, "firstName", "");
        mapSet(namefields, "lastName", "Kula");
        name.fullname = "example.avro.FullName2";
        name.fields = namefields;
        mapSet(fieldz, "name", name);

        //  favorite number
        mapSet(fieldz, "favorite_number", (int)1632);
        // colour
        mapSet(fieldz, "favorite_color", "Green");
        // football team (spelled with one "o" in schema)
        Avro.AvroEnumUDR favorite_fotball_team = udrCreate(Avro.AvroEnumUDR);
        favorite_fotball_team.fullname = "example.avro.teams.teams";
        favorite_fotball_team.symbol = "Djurgarden";
        mapSet(fieldz, "favorite_fotball_team", favorite_fotball_team);
  
        // IP address
        list<any> ipAddresslist = listCreate(any);
        Avro.AvroFixedUDR ipv4Address = udrCreate(Avro.AvroFixedUDR);
        ipv4Address.fullname = "example.avro.ipv4Address";
        ipv4Address.bytes = baCreateFromHexString("00ABCDEF");
        listAdd(ipAddresslist, ipv4Address);
        Avro.AvroFixedUDR ipv6Address = udrCreate(Avro.AvroFixedUDR);
        ipv6Address.fullname = "example.avro.ipv6Address";
        ipv6Address.bytes = baCreateFromHexString("00ABCDEFEFEFEFEFEFEFEFEFEFEFEFEF");
        listAdd(ipAddresslist, ipv6Address);
        mapSet(fieldz, "ipAddresses", ipAddresslist);

        //  favoriteFoodList
        Avro.AvroRecordUDR favoriteFoodList = udrCreate(Avro.AvroRecordUDR);
        Avro.AvroRecordUDR dish1 = udrCreate(Avro.AvroRecordUDR);
        Avro.AvroRecordUDR dish2 = udrCreate(Avro.AvroRecordUDR);
        Avro.AvroRecordUDR dish3 = udrCreate(Avro.AvroRecordUDR);
        map<string,any> dish3Map = mapCreate(string,any);
        mapSet(dish3Map, "dish", "Brown beans");
        mapSet(dish3Map, "next", null);
        dish3.fields = dish3Map;
        dish3.fullname = "example.avro.favoriteFood";
        map<string,any> dish2Map = mapCreate(string,any);
        mapSet(dish2Map, "dish", "Pannkakor");
        mapSet(dish2Map, "next", dish3);
        dish2.fields = dish2Map;
        dish2.fullname = "example.avro.favoriteFood";
        map<string,any> dish1Map = mapCreate(string,any);
        mapSet(dish1Map, "dish", "Ramen");
        mapSet(dish1Map, "next", dish2);
        dish1.fields = dish1Map;
        dish1.fullname = "example.avro.favoriteFood";
        favoriteFoodList.fullname = "example.avro.favoriteFood";
        favoriteFoodList.fields = dish1Map;
        mapSet(fieldz, "favoriteFoodList", favoriteFoodList);

        // salary
        mapSet(fieldz, "salary", 347857486343);
  
        // myFixed
        Avro.AvroFixedUDR myFixed = udrCreate(Avro.AvroFixedUDR);
        bytearray myFixed_bytes = baCreateFromHexString("DEADBEEF");
        myFixed.bytes = myFixed_bytes;
        myFixed.fullname = "example.avro.myfixed";
        mapSet(fieldz, "myFixed", myFixed);

        // myFloat
        mapSet(fieldz, "myFloat", (float)0.4);

        // myDouble
        mapSet(fieldz, "myDouble", (double)0.5);

        // RootUsers
        map<string,any> rootUsers = mapCreate(string,any);
        Avro.AvroRecordUDR olleBack = udrCreate(Avro.AvroRecordUDR);
        olleBack.fullname = "example.avro.RootUsers";
        map<string,any> fields1 = mapCreate(string,any);
        mapSet(fields1, "rootUser", "Allan");
        mapSet(fields1, "privileges", (int)3);
        olleBack.fields = fields1;
    
        Avro.AvroRecordUDR nisseHult = udrCreate(Avro.AvroRecordUDR);
        nisseHult.fullname = "example.avro.RootUsers";
        map<string,any> fields2 = mapCreate(string,any);
        mapSet(fields2, "rootUser", "Guran");
        mapSet(fields2, "privileges", (int)2);
        nisseHult.fields = fields2;
    
        mapSet(rootUsers, "olleBack", olleBack);
        mapSet(rootUsers, "nisseHult", nisseHult);
        mapSet(fieldz, "rootUsers", rootUsers);
  
        // array_of_maps
        list<any> array_of_maps = listCreate(any);
        map<string,any> map1 = mapCreate(string,any);
        Avro.AvroRecordUDR Nyckel = udrCreate(Avro.AvroRecordUDR);
        map<string,any> nyckelFields = mapCreate(string,any);
        mapSet(nyckelFields,"favoriteUser", "Sture D.Y.");
        mapSet(nyckelFields,"favoriteNumber", (int)10);
        Nyckel.fullname = "example.avro.some_record";
        Nyckel.fields = nyckelFields;
        mapSet(map1, "Nyckel", Nyckel);
        listAdd(array_of_maps, map1);
    
        map<string,any> map2 = mapCreate(string,any);
        Avro.AvroRecordUDR Nyckel2 = udrCreate(Avro.AvroRecordUDR);
        map<string,any> nyckel2Fields = mapCreate(string,any);
        mapSet(nyckel2Fields,"favoriteUser", "Sture D.A.");
        mapSet(nyckel2Fields,"favoriteNumber", (int)12);
        Nyckel2.fullname = "example.avro.some_record";
        Nyckel2.fields = nyckel2Fields;
        mapSet(map2, "Nyckel2", Nyckel2);
        listAdd(array_of_maps, map2);
        mapSet(fieldz, "array_of_maps", array_of_maps);
  
        record.fields = fieldz;
        debug("-------------- record -------------------");
        debug(record);
        debug("-------------- record -------------------");
        Avro.AvroEncoderUDR encode = udrCreate(Avro.AvroEncoderUDR);
        
        encode.data = record;
        
        encode.writerSchemaID = "7";
        //debug(encode);
        udrRoute(encode);
    }
}

Encoder_1

This is an example of how you can configure an Avro Encoder and a schema register to be used.

Screenshot 2024-04-11 at 09.52.11.png

  • No labels