Data Aggregator

The Data Aggregator Function consolidates related records that originate from either a single or multiple sources, into a single record, and and turns it into a more useful format for analysis and decision-making. Related records are grouped into sessions according to the value of their respective fields, and a set of configurable conditions.

As an example, streams using the Data Aggregator Function can help billing systems that have limitations on the number of usage events that can be processed. Using this Function, you can perform aggregation operations such as SUM, COUNT, MAX, MIN, AVERAGE, and many more on any configured field(s), as well as set conditions on those fields when flushing data.

For example, accumulate usage records over time, per account, subscriber, and service to calculate the total usage of a service over a month and then use the aggregated data to deliver to a billing service or to be used as an input for a cloud-based storage service.

Configuration

The Data Aggregator Function's configuration contains the following sections:

Group Fields

Here you specify the names of all the fields you want to group together when performing the aggregation.

Field

Description

Field

Description

Fields

Specify the fields (keys) to be used for grouping aggregated data. If a record has the same values as another record in the selected fields, they will be grouped in the same session.

Example

For example, there are three records being read in a stream with field key, user and you decide to group by user

Record1 has user field with value set as 1

Record2 has user field with value set as 2

Record3 has user field with value set as 1

So, Record1 and Record3 will be grouped in the same aggregated data session and the value will be updated to 2 (in case of the SUM operation). And, Record 2 will be in a separate session.

You can either type the name of the field(s) or select the field(s) from the drop-down menu that appears when you click on the field. In the Fields drop-down menu, you also have the option to Select all or Deselect all fields.

Group based on date/time

Select the checkbox to enable aggregation based on a time period. Specify applicable fields, containing timestamp data, to be used for grouping of aggregation data.

Select the Period to group the information by:

  • Hour

  • Day

  • Month

  • Year

Time Period Format

The supported time period format is ISO 8601 (extended format) in accordance with the YYYY-MM-DDTHH:mm:ss.sssZ pattern.

In case you need to convert a date format to support the ISO standard, see Script.

An example would be: "2019-01-13T00:00:00.000Z". 

Click  to add additional grouping criteria based on the time period.

Configuration

Any change in the Group Fields configuration may result in a new aggregation session, however, the old sessions will still be in the storage. So if you change it back to the previous configuration, the already existing session in the storage (if any) will be used during the aggregation. Refer to the note for more information on the TTL of an aggregation session.

Aggregate Fields

Here you specify the name of the field(s) on which the aggregation operation will be performed:

Field

Operation

Field

Operation

Name of the input field key

The Operation drop-down menu is divided into three types: 

  • Numeric: 

    • SUM

    • MAX

    • MIN

    • AVERAGE

  • General

    • COUNT

    • CARRY_FIRST

    • CARRY_LAST

  • Date

    • MAX

    • MIN

Click  to add more fields for aggregation.

Flush by

Here you select how and when you want to flush the aggregated data to the next Function in the stream.

In Usage Engine, you can flush using any of the following options:

Flush by

Description

Flush by

Description

End of transaction

Aggregated data is flushed at the end of each transaction.

This option is not applicable to real-time streams.

End of stream

Aggregated data is flushed at the end of the stream.

This option is not applicable to real-time streams.

Timeout

Aggregated data is timed out when a predefined interval has passed or a condition is met.

In the case of batch streams, the flush for timed out aggregated data happens only when a stream is executed. For real-time streams, the aggregated data is checked every 60 seconds and flushed in case of timeout. 

Select the type of timeout based on:

  • Duration

    • Hours: Aggregated data is timed out at the specified interval after creation.

  • Frequency

    • Day: Aggregated data is timed out on a daily basis. Both Time and Timezone must be considered when specifying frequency based on Day. The timeout will be an absolute value as you set time/day when the timeout must happen. 

    • Month: Aggregated data is timed out on monthly basis. Specify which day of every month, time of that day and the timezone when the aggregated data will be timed out. 

  • Custom

    • Based on timestamp field: Timeout will be defined based on the input timestamp. Supports UTC format only.

      For example, a record that matches aggregation criteria with the timestamp field set to 20220803:13.00.00 will have its timeout set to that date and time. Timeout will be updated in case new matching records arrive (with a different timestamp).  

Click if you wish to add custom conditions to timeout the aggregated data. This option is available only when the Timeout flush by option is selected.

Click the following to know more:

 

Metadata

You can view and access the following metadata properties of aggregated session. To view the metadata, use the meta object as mentioned in the Script Function. Here is an example:

Property name 

Description

Property name 

Description

count

Number of aggregated records

flushType

The reason for session being flushed out. Shows any of the values: ALL_FILES, EACH_FILE, TIMEOUT and CONDITION. During preview, the value will be empty.

firstEvent

Date and time of the first aggregated record in the session

lastEvent

Date and time of the last aggregated record in the session