Script aggregator configuration

Script aggregator configuration

576a2e8d-1dc6-4166-88ce-fe1eebec9f82.png

Note!

Sensitive parameters cannot be used in the Script aggregator function.

The Script aggregator configuration has four sections:

  • Aggregation store ID - The store ID uniquely identifies the aggregation store associated with the Script aggregator and is required when querying sessions via the API from an external system.

  • Group by - Define the fields used to group incoming data into sessions for aggregation.

  • Flush trigger - Determine when aggregated data is flushed and sent downstream.

  • Script blocks (On transform and On timeout) - Specify the custom logic for how data should be processed and aggregated within each session.

ScriptAggregatorConfiguration.png
The Script aggregator configuration

Aggregation store ID

The aggregation store ID is required when querying sessions via API from an external system. Use the copy symbol to the right to easily copy the ID.

Group by

The Group by section defines how your data is split into sessions for aggregation. A set of one or more fields is called a group-by rule. At least one group-by rule must be configured in the function.

The logic changes based on the number of fields added to each group-by rule:

  • Multiple fields in a group-by rule act as an 'AND' function, meaning records must contain all specified fields to be added to the same session.

ScriptAgg_singleRule.png
Single Group-by rule with multiple fields
  • If multiple group-by rules are defined, they are evaluated in order from top to bottom. For each incoming record, the Script aggregator applies the first rule that matches; once a record matches a rule, no further rules are considered.

ScriptAgg_multipleRules.png
Multiple Group-by rules configured

Note!
For detailed examples, see Script aggregator group-by rule examples.

Setting

Description

Setting

Description

Fields

Select the field you want to add a rule to from the dropdown list, or add a field using the Add field option.

Note!
If a record doesn’t contain any of the fields specified in the Group by section, the stream will abort. To process such records, you can add a blank rule to group them into a separate session.

+ Add fields

Use this option to add additional rules. Each group of Fields is a rule.

Session key

The session key is created by combining the values of the fields in a group-by rule. This key determines how records are grouped into sessions. If multiple group-by rules are configured, each rule is evaluated in order, and the first rule that matches is used for grouping.

Caution!
Session keys are generated from field values only — field names are not included in the key. This means that if different group-by rules use different fields, but the values in those fields happen to be identical and appear in the same order, the rules will produce the same session key and group records into the same session unexpectedly.

Example:

  • Rule 1 groups by: collectionId, productId

  • Rule 2 groups by: accountId, customerId

If a record has collectionId = 12345, productId = 67890, and another record has accountId = 12345, customerId = 67890, both rules generate the same session key: [12345, 67890]. These records will be grouped into the same session, even though they matched on completely different fields.

Lookup script aggregation session API

The Lookup script aggregation session API uses the same logic as described above. When you want to look up a session, you give the API a list of values (like an email, customer ID, and so on). The API takes these values and puts them together in a specific order, the same way your script aggregation function does. It uses these combined values to create a unique session key. The API then searches for a session that matches this key and returns the session details to you.

Note!
When using this API, ensure your values match the field order specified in the Script aggregator function configuration.

Flush trigger

The Script aggregator supports three flush triggers that determine when sessions are finalized and sent downstream. These are mutually exclusive; only one trigger can be active at a time. When you select a flush trigger, the corresponding script block tab becomes available for customization. Select one of the following options:

Flush trigger

Description

Use case

Flush trigger

Description

Use case

On timeout

Sessions flush when they exceed the configured timeout period. This is the default behavior.

Time-based aggregation, such as hourly summaries, monthly aggregates for billing, or session timeouts.

On transaction end

All sessions flush at the end of each transaction (for example, after processing each file).

File-based aggregation, where each file should produce independent results

On stream end

All sessions flush when the stream execution completes.

Stream-wide aggregation across all input data processed during a single stream execution.

Script

Define custom logic in the following script block tabs:

Script block tab

Description

Script block tab

Description

On transform

Use this section to define custom aggregation logic that runs when data is grouped (that is, when a session is created or updated).

Example - Aggregating values and setting a session timeout

You want to sum usage values for each account and set the session to expire one hour after the first event arrives. In the On transform tab, you could use:

if (!session.data.sum) { // Initiate sum property session.data.sum = 0; // Configure the session to timeout 1 hour in the future from first matching record session.setTimeout(new Date(Date.now() + 1000 * 60 * 60)); } // Perform aggregation session.data.sum += payload.sum;
  • When the first record for a session arrives, the script initializes a running total (sum) and sets a timeout for one hour in the future.

  • Each time a new record is added to the session, its value is added to the running total.

  • The session will remain open and continue aggregating until the timeout is reached.

On timeout

Use this section to specify what should happen if a session times out. The timeout behavior depends on your configuration. For example, a session might be set to timeout after a fixed period (e.g., 1 hour), regardless of whether new data arrives.

Example - Flushing the session on timeout

When the session times out (as configured in the On transform tab), this script runs:

// This code can be updated and will execute at session timeout await session.flush();

When the timeout occurs, the session is flushed (finalized and sent downstream). This ensures that even if a session ends due to inactivity or after a set period, you still capture and process the accumulated data.

Note!
A typical stream executes by starting, processing all data, and then stopping. The timeout check for sessions is performed:

  • At the start of each stream execution

  • At the end of each transaction

This means the On Timeout block may not be triggered exactly at the specified timeout timestamp, but rather when the system next checks for timed-out sessions. For long-running transactions (such as processing a massive file with millions of records), if sessions time out during the transaction, the On Timeout block will be triggered only after all input records have been processed and the transaction completes.

On transaction end

Use this section to define what happens when a transaction ends (for example, when a file finishes processing). This script block is only available when the On transaction end flush trigger is selected.

Example - Flushing all sessions at transaction end

When processing files where each file should produce independent aggregation results:

// This code executes when each transaction (file) ends // All sessions are automatically flushed await session.flush();

This ensures that aggregation results are produced per file, rather than accumulating across files.

On stream end

Use this section to define what happens when the stream execution completes. This script block is only available when the On stream end flush trigger is selected.

Example - Final aggregation at stream end

When you want to aggregate all data across the entire stream execution:

// This code executes when the stream stops // All sessions are flushed, producing final aggregation results await session.flush();

Use this for scenarios where you need a single aggregation result across all processed data, regardless of transaction boundaries.