Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The input data in this example use case consists of sales numbers in CSV format. This dataset is from here on, referred to as "sales". The data is collected in real-time from the regions "APAC", "AMERICAS", and "EMEA". We want to calculate the total-, average, and number of sales per minute. These numbers will be our KPIs, broken down per country and region.

Example - Input data

timestamp

region

country

amount

2017-03-08T13:53:52.123

EMEA

Sweden

123.50

2017-03-08T13:53:56.123

APAC

India

12.12

2017-03-08T13:53:59.123

AMERICAS

US

425.23

Note

As a prerequisite, the scripts must be prepared according to Preparing and Creating Scripts for KPI Management.

Step-by-Step Instructions

  1. Configure the service model. The service model describes your data, which KPIs (KPI stands for Key Performance Index) to generate, and how to calculate them. A JSON representation is used to describe the model, which includes the following top-level objects:

    1. dimension

    2. tree

    3. metric

    4. kpi

    5. threshold (optional)
      Start with the dimensions and tree objects. The dimensions describe the fields of your data used for grouping and the tree the relation between them. The identifying fields in the input data are region and country. A region has one or more countries. The data type is sales. In the dimension object we specify each of our identifying fields as separate objects, with the datatype and field in the body.

      Code Block
      "dimension": {
          "Region": {
            "sales": "region"
          },
          "Country": {
            "sales": "country"
          }
        },
        "tree": {
          "tree1": {
            "Region": {
              "Country": {
              }
            }
          }
        }
      • Define the metrics using the amount field in the input data:

        • totalSales - For total sales, sum up the amount for each record by using the sum function on the expression expr, which contains the amount field.

        • avgSales - For average sales use the avg function instead of sum.

        • numSales - To count the number of records, use the function isSet in the expression. This function evaluates to 1 if there is a value in amount or 0 if there is no value. Use the function sum to sum up the 1s and 0s.

      • Define the KPIs. The expected output is the total sales, average sales, and number of sales per region and country in 60 second periods. 

      • Use the property node to describe where in the topology the KPI should be calculated and windowSize to set the period length. Use the name of the metrics defined above in the expr property

        Rw ui expands macro

        Rw expand
        titleKPI

        "kpi": {
          "Region.TotalSales": {
            "node": [
              "tree1",
              "Region"
            ],
            "windowSize": 60,
            "expr": "totalSales"
          },
          "Region.AvgSales": {
            "node": [
              "tree1",
              "Region"
            ],
            "windowSize": 60,
            "expr": "avgSales"
          },
          "Region.NumberOfSales": {
            "node": [
              "tree1",
              "Region"
            ],
            "windowSize": 60,
            "expr": "numSales"
          },
          "Country.TotalSales": {
            "node": [
              "tree1",
              "Region",
              "Country"
            ],
            "windowSize": 60,
            "expr": "totalSales"
          },
          "Country.AvgSales": {
            "node": [
              "tree1",
              "Region",
              "Country"
            ],
            "windowSize": 60,
            "expr": "avgSales"
          },
          "Country.NumberOfSales": {
            "node": [
              "tree1",
              "Region",
              "Country"
            ],
            "windowSize": 60,
            "expr": "numSales"
          }
        }

        Combine all the objects above for a complete representation of the model. Below is an example containing all types.

        Expand
        titleExpand to see an example of a full model
        {
          "dimension": {
              "Region": {
                "sales": "region"
              },
              "Country": {
                "sales": "country"
              }
            },
            "tree": {
              "tree1": {
                "Region": {
                  "Country": {
                  }
                }
              }
            },
          "metric": {
            "totalSales" : {
              "fun": "sum",
              "expr": {
                "sales": "amount"
               }
            },
            "avgSales" : {
              "fun": "avg",
              "expr": {
                "sales": "amount"
               }
            },
            "numSales" : {
              "fun": "sum",
              "expr": {
                "sales": "isSet(amount)"
               }
            }
          },
         "kpi": {
          "Region.TotalSales": {
            "node": [
              "tree1",
              "Region"
            ],
            "windowSize": 60,
            "expr": "totalSales"
          },
          "Region.AvgSales": {
            "node": [
              "tree1",
              "Region"
            ],
            "windowSize": 60,
            "expr": "avgSales"
          },
          "Region.NumberOfSales": {
            "node": [
              "tree1",
              "Region"
            ],
            "windowSize": 60,
            "expr": "numSales"
          },
          "Country.TotalSales": {
            "node": [
              "tree1",
              "Region",
              "Country"
            ],
            "windowSize": 60,
            "expr": "totalSales"
          },
          "Country.AvgSales": {
            "node": [
              "tree1",
              "Region",
              "Country"
            ],
            "windowSize": 60,
            "expr": "avgSales"
          },
          "Country.NumberOfSales": {
            "node": [
              "tree1",
              "Region",
              "Country"
            ],
            "windowSize": 60,
            "expr": "numSales"
          }
        }
        }

        Open the Desktop and paste the service model into a KPI profile. Save the profile with the name SalesModel in the folder kpisales .
        Image Added

  2. Configure Kafka and Zookeper.

    KPI Management reads and writes its data to and from Kafka. In order for this to work, you need to install and configure both Kafka and Zookeeper. More information about this can be found on the pages Spark, kafka and zookeeper as well as Starting Clusters and Creating Topics. Kafka depends on Zookeeper (which is also included in the Kafka-installation folder) and you need to ensure that Zookeeper is started first.

  3. Configure Spark. The Spark cluster will be running a so called app for doing the KPI calculations.

  4. Install and Configure Spark. The Spark cluster will be running a so called "app" for doing the KPI calculations. First you need to install Spark for Scala (spark-3.5.0-bin-hadoop3-scala2.13). More information about this can be found in the Spark documentation, https://spark.apache.org/docs/3.5.0/. For further information about properties related to Spark, see Spark, kafka and zookeeper. Please note on the page that the spark-defaults.conf in the spark needs to contain the parameters mentioned on Preparing and Creating Scripts for KPI Management for this to work.

    The Spark slave node will have one worker that will be assigned four cores. The cores are split between the executors and the Spark driver. This means that we will have three executors running in parallel. The property SPARK_DEFAULT_PARALLELISM in kpi_param.sh is set to match this value.
    The property MZ_KPI_PROFILE_NAME needs to match the folder- and configuration name of the KPI profile that was created in step 1.

  5. Start up Zookeeper, Kafka and Spark.

    Set up environment variables

    Code Block
    $ export SPARK_HOME=/opt/spark-3.5.0-bin-hadoop3-scala2.13
    $ export KAFKA_HOME=/opt/kafka_2.13-3.3.2
    $ export PATH=$KAFKA_HOME/bin:$PATH:/opt/mz_kpiapp/bin

    And while located in $KAFKA_HOME execute:

    Start Zookeeper and Kafka

    Code Block
    $ bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh config/server.properties

    Run the following command to start spark:

    Start Spark

    Code Block
    start_master_worker.sh


  6. Create the Kafka topics that are required by the KPI app. Each of the Spark executors needs to read from a separate Kafka partition so each of the topics needs three partitions, i.e the number of partitions for each topic must be identical to the value of the property SPARK_DEFAULT_PARALLELISM in kpi_params.sh.

    Create Kafka Topics

    Code Block
    $ bin/kafka-topics.sh --create --topic kpi-input --bootstrap-server localhost:9092 --partitions 2
    $ bin/kafka-topics.sh --create --topic kpi-output --bootstrap-server localhost:9092 --partitions 2
    $ bin/kafka-topics.sh --create --topic kpi-error --bootstrap-server localhost:9092 --partitions 2


  7. Create the real-time workflow. In this guide we will use Pulse agents to simulate sales data coming from three different sources, EMEA, AMERICAS, and APAC. 

    1. Add three Pulse agents and an Analysis agent.
      Workflow Pulse AgentsImage Added
      Workflow - Pulse Agents
      Configure the Pulse agents as follows:

      • AMERICAS will send 1000 TPS -  Set Time Unit to MILLISECONDS and Interval to 1

      • EMEA will send 500 TPS - Set Time Unit to MILLISECONDS and Interval to 2

      • APAC will send 250 TPS - Set Time Unit to MILLISECONDS and Interval to 4

      To be able to identify the data, set the data to the region name.
      Image Added Image Added
      Pulse agent configuration
      The pulse agents only sends us a simple event containing the name of the region, the other data that will be used in the KPI calculations are generated in the connected Analysis agent.
      The APL code below creates the input to KPI Management.

      Expand
      titleClick to expand...


      Code Block
      titleAPL Code
      list<string> americas = listCreate(string, "US", "Canada", "Mexico", "Brazil", "Argentina", "Cuba", "Colombia");
      list<string> emea = listCreate(string, "Sweden", "UK", "Portugal", "Italy", "France", "Germany", "Norway", "Spain", "Finland", "Denmark");
      list<string> apac = listCreate(string, "India", "China", "Japan", "Thailand", "Australia", "Indonesia", "Malaysia","South Korea");
      
      consume {
          // create KDR - the input for the KPI CLusterIn agent
          kpimanagement.KDR kdr = udrCreate(kpimanagement.KDR);
      
          // The KDR has a type field - we set this to the value we had for our data type in the model
          kdr.type = "sales";
      
          // It also has a timestamp field - lets populate that from the current time but using seconds.
          kdr.timestamp = dateCreateNowMilliseconds() / 1000;
      
          string region = baToStr(input.Data);
      
          // the data in our use case (country, city, amount) we will put in the values field of the kdr.
          map<string, any> sales = mapCreate(string,any);
          mapSet(sales, "region", region);
      
          // lets create a random amount between 1 and 1000
          int amount = randomInt(1000);
      
          // set amount and city depending on the region
          if (region == "AMERICAS") {
             mapSet(sales, "amount", amount * 1.25d);
             mapSet(sales, "country", randomCountry(americas));
          } else if (region == "EMEA") {
             mapSet(sales, "amount", amount * 1.0d);
             mapSet(sales, "country", randomCountry(emea));
          } else if (region == "APAC") {
             mapSet(sales, "amount", amount * 0.65d);
             mapSet(sales, "country", randomCountry(apac));
          } else {
             mapSet(sales, "amount", 0.0d);
             mapSet(sales, "country", "UNKNOWN");
             debug("Unknown region:" + region);
          }
      
          kdr.values = sales;
          udrRoute(kdr);
      }
      
      // pick a random country from a list
      string randomCountry(list<string> countries) {
          int index = randomInt(listSize(countries));
          return listGet(countries, index);
      }



    2. Create a Kafka profile for the KPI Cluster In agent. This agent will write to the kpi-input topic.

      Image Added
      Kafka profile configuration - kpi-input

    3. Add a KPI Cluster In agent.
      Image Added
      Workflow - KPI Cluster In agent

      Image Added
      Configure it to use the KPI profile that you created as part of point 'a' above. And add the Kafka Profile that the agent will use to write on the kpi-input  topic. This will be read from by the KPI Management Spark application.
      The Analysis agent is added because the KPI Forwarding agent will send out KafkaExceptionUDR in case of errors in the Kafka communication (if Route On Error option is selected). This example does not cover handling of those errors.
      Image Added

    4. Create a Kafka Profile for the KPI Cluster Out agent. This agent will read from the kpi-output topic.
      Image Added
      Kafka Profile Configuration - kpi-output

    5. Add a KPI Output Agent (on its own, see further down for screenshot) and configure it as follows. This agent will provide the KPI output:
      Image Added

    6. Add another Analysis agent for debugging of the KPIs.
      Image Added
      Final workflow configuration

    7. Add the APL code below to the Analysis agent.

      Expand


      Code Block
      titleAPL Code
      string format = "yyyy-MM-dd'T'HH:mm:ss:S";
      consume {
          // input is KPIAggregatedOutput which contains a list of
          // KPIOutput
          list<kpimanagement.KPIOutput> kpis = input.kpiOutputUDRs;
          // loop the KPIs and debug
          string dateStr = "";
          for (int i = 0; i < listSize(kpis); i++) {
              kpimanagement.KPIOutput kpi = listGet(kpis, i);
              dateToString(dateStr, dateCreateFromMilliseconds(kpi.periodStart * 1000), format);
              debug("Period start: " + dateStr + ", instance: " + kpi.instancePath  + ", KPI: " + kpi.kpi + ", Value:" + kpi.value + ", Samples: " + kpi.sampleCount);
          }
      }



    8. Submit the Spark application to the cluster.

      Submit the Spark application

      Code Block
      submit.sh kpiapp


  8. Open the Spark UI at  http://localhost:8080/.   You should see that kpiapp is running.
    Image Added

    Spark UI

  9. Open the workflow configuration in the Workflow Monitor. Enable debugging and select events for the KPI Cluster Out agent and the Analysis agent that produces the debug output.

  10. Start the workflow. It may take a minute to display the output data:

    Image Added