Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The Data Aggregator Function consolidates related helps you streamline data by combining related records from one or more sources into a single record transforming it into a , more useful format. This process allows for easier analysis . Related records are grouped into sessions according to the value of their respective fields, and a set of configurable conditions.The Data Aggregator Function can be used in a stream when a billing system has and better decision-making. It groups records into sessions based on their field values and configurable conditions, making data usable for different use cases.

For instance, if you're working with a system with limitations on the number of usage events it can process. It performs aggregation operations such as handle, the Data Aggregator Function can simplify this by performing operations like SUM, COUNT, MAX, MIN, and AVERAGE on any configured field(s). It can also set conditions on those fields when flushing data.

The Data Aggregator Function works by accumulating usage records over time, per account, subscriber, and product, to calculate the total usage of a product over a month. It then aggregates this data and delivers it to an external system, such as a billing or storage system.

Configuration

The Configuration for the Data Aggregator Function contains the following sections:

  • Group Fields

  • Aggregate Fields

  • Flush by

...

Rw tab
titleField

...

selected fields

...

Info

Example

For example, there are three records being read in a stream with field key, user and you decide to group by user

Record1 has user field with value set as 1

Record2 has user field with value set as 2

Record3 has user field with value set as 1

So, Record1 and Record3 will be grouped in the same aggregated data session and the value will be updated to 2 (in case of the SUM operation). And, Record 2 will be in a separate session.

You can either type the name of the field(s) or select the field(s) from the drop-down menu that appears when you click on the field. In the Fields drop-down menu, you also have the option to Select all or Deselect all fields.

Rw tab
titleGroup based on date/time checkbox

Select the checkbox to enable aggregation based on a time period. Specify applicable fields, containing timestamp data, to be used for grouping of aggregation data.

Select the Period to group the information by:

  • Hour

  • Day

  • Month

  • Year

Info

Time Period Format

The supported time period format is ISO 8601 (extended format) in accordance with the YYYY-MM-DDTHH:mm:ss.sssZ pattern.

In case you need to convert a date format to support the ISO standard, see Script.

An example would be: "2019-01-13T00:00:00.000Z". 

Click the + Period button to add additional grouping criteria based on time.

Group Fields

In this section, specify the names of all the fields you want to group when performing the aggregation.

...

Configuration

...

Description

...

Fields

Specify the fields (keys) to be used for grouping aggregated data. If a record has the same values as another record in the selected fields, they will be grouped in the same session.

...

Example

...

Record2 has user field with value set as 2

Record3 has user field with value set as 1

So, Record1 and Record3 will be grouped in the same aggregated data session and the value will be updated to 2 (in case of the SUM operation). And, Record 2 will be in a separate session.

You can either type the name of the field(s) or select the field(s) from the drop-down menu that appears when you click on the field. In the Fields drop-down menu, you also have the option to Select all or Deselect all fields.

...

Group based on date/time

...

Select the checkbox to enable aggregation based on a time period. Specify applicable fields, containing timestamp data, to be used for grouping of aggregation data.

Select the Period to group the information by:

  • Hour

  • Day

  • Month

  • Year

Info

Time Period Format

The supported time period format is ISO 8601 (extended format) in accordance with the YYYY-MM-DDTHH:mm:ss.sssZ pattern.

In case you need to convert a date format to support the ISO standard, see Script.

An example would be: "2019-01-13T00:00:00.000Z". 

Click Image Removed to add additional grouping criteria based on the time period.

Note

Configuration

Any change in the Group Fields configuration may result in a new aggregation session, however, the old sessions will still be in the storage. So if you change it back to the previous configuration, the already existing session in the storage (if any) will be used during the aggregation. Refer to the note for more information on the TTL of an aggregation session.

...

Here you specify the name of the field(s) on which the aggregation operation will be performed:

...

Field

...

Operation

...

Name of the input field key

...

The Operation drop-down menu is divided into three types: 

  • Numeric: 

    • SUM

    • MAX

    • MIN

    • AVERAGE

  • General

    • COUNT

    • CARRY_FIRST

    • CARRY_LAST

  • Date

    • MAX

    • MIN

Click Image Removed to add more fields for aggregation.

...

Here you select how and when you want to flush the aggregated data to the next Function in the stream.

In Usage Engine, you can flush using any of the following options:

...

Flush by

...

Description

...

End of transaction

...

Aggregated data is flushed at the end of each transaction.

Info

A use case where records from a single file or data set are being aggregated during the stream execution.

This option is not applicable to real-time streams.

...

End of stream

...

Aggregated data is flushed at the end of the stream.

Info

Example

A use case where data sets from multiple files are being aggregated during a stream execution.

This option is not applicable to real-time streams.

...

Timeout

Aggregated data is timed out when a predefined interval has passed or a condition is met.

In the case of batch streams, the flush for timed out aggregated data happens only when a stream is executed. For real-time streams, the aggregated data is checked every 60 seconds and flushed in case of timeout. 

Info

Example

Example of a timeout - Record the consumption of mobile data of a subscriber by 10th of every month.

Example of a condition timeout -  Record the consumption of mobile data on 10th of every month or if 100 GB limit is reached. So in this case the timeout will happen if either of these two conditions are met.

Select the type of timeout based on:

Duration

Hours: Aggregated data is timed out at the specified interval after creation.

...

titleExample

...

  • At 13.00 Account 'X' with price 10 is aggregated, timeout will then be set to 14.00
  • At 13.07 Account 'Y'  with price 20 is aggregated, timeout will be set to 14.07
  • At 13.33 Account 'X' with price 12 is aggregated, sum updated to 22, timeout will still be set to 14.00

...

Frequency

  • Day: Aggregated data is timed out on a daily basis. Both Time and Timezone must be considered when specifying frequency based on Day. The timeout will be an absolute value as you set time/day when the timeout must happen. 

    Info
    titleExample

    For example, you set the timeout to day and time to 12.00 PM, then all created aggregation sessions will be timed out independently on when they were created.

  • Month: Aggregated data is timed out on monthly basis. Specify which day of every month, time of that day and the timezone when the aggregated data will be timed out. 

...

Custom

  • Based on timestamp field: Timeout will be defined based on the input timestamp. Supports UTC format only.

    Info
    titleDate Format

    The supported date format is ISO 8601 (extended format) in accordance with the YYYY-MM-DDTHH:mm:ss.sss pattern.

    An example would be: "2019-01-13T00:00:00.000". 

    For example, a record that matches aggregation criteria with the timestamp field set to 20220803:13.00.00 will have its timeout set to that date and time. Timeout will be updated in case new matching records arrive (with a different timestamp).  

...

expandedtrue
titleAdd Condition

...

You also have the option to set a condition to timeout the aggregated data. (For example, timeout to happen when the value of the SUM field is more than 70).

Note
This option is only available in case of Timeout flush setting.

...

Based on: Select which input fields or aggregated fields you want to apply the condition on. The Input Fields show all the input fields configured in the stream and Aggregated Fields show the fields that you have selected to perform aggregation on (in the Aggregation tab).

Note

The CARRY_FIRST and CARRY_LAST operations are not supported. So if you perform aggregation using any of these General Operations, those will not appear under the Aggregated Fields.

...

The following table explains which options are available when you select a specific criteria to add a Flush condition:

...

Select from the following values:

  • Numerical
  • Text Values (string)
  • Boolean

...

Options vary according to the Type of field.

  • Numerical: Mathematical operation. You can choose from the following:
    Less than, Greater than, Less or equal to, Greater or equal to, Equal, Is different from
  • Text: Matches/Not Matches. It is not possible to perform mathematical operation on strings.
  • Boolean: Yes (True) and No (False)

...

If you select:

  • Numeric Values: You can type the value or use the up or down keys to increment or decrement the value. 
  • Text Values (string): Enter the string value.
  • Boolean: Select Yes/No 

...

Click +Add Condition to add more conditions.

...

Note

TTL

The aggregated session is stored for a maximum of 180 days. That means if a session is not updated for 180 days, all the stored data pertaining to that session will be deleted permanently. 

Info

Some example streams

Metadata

You can view and access the following metadata properties of aggregated session. To view the metadata, use the meta object as mentioned in the Script Function. Here is an example:

Info

Example

Code Block
{"origin":"Data_Aggregator","count":7,"flushType":"TIMEOUT","firstEvent":"2022-04-08T17:35:53.239Z","lastEvent":"2022-04-08T17:38:17.315Z","lastCall":false}

...

Property name 

...

Description

...

count

...

Number of aggregated records

...

flushType

...

The reason for session being flushed out. Shows any of the values: ALL_FILES, EACH_FILE, TIMEOUT and CONDITION. During preview, the value will be empty.

...

firstEvent

...

Date and time of the first aggregated record in the session

...

lastEvent

...

. After setting a timeout, you can add custom conditions on it that specify when the data should be output.

A typical use case would be accumulating usage records per account, subscriber, or service over time. You can then calculate total monthly usage and send that aggregated data to a billing system or cloud-based storage, making your data management much more efficient.

Subsections

This section contains the following subsections: