...
With the Script Aggregator provides a flexible and controlled way to Function, you can correlate, consolidate, and aggregate records of different data types . It is used in a flexible and controlled way. You can use it to perform more advanced and complex operations that cannot be solved using than you can do with the pre-defined operations in the Data Aggregator. As a user, you can use
The Script Aggregator to
...
has the following capabilities:
Perform correlation:
To aggregateCorrelating usage data with nested data
objectobjects across multiple sources.
- perform
Perform advanced aggregation:
To aggregateAggregating usage data with nested data object.
push Push partially: To send Sending data to the downstream function functions during aggregation based on a certain conditionconditions.
Info |
---|
PrerequisiteTo be able to use take full potential advantage of the capabilities in this function, Function you must be know JavaScript well versed with JavaScript. |
In this topic:
...
You can use the following properties/objects while performing the aggregation:
...
The session
object represents the current matching aggregated data.
...
flushType
: The flushType
is set as part of the meta downstream once the session is either pushed or flushed.
mycustomProperty
: Allows to add a custom property in the meta data, that will also be pushed downstream along with the payload.
...
expanded | true |
---|---|
icon | false |
title | Meta properties |
Code Block |
---|
meta = {
count: number,
firstEvent: timestamp,
lastEvent: timestamp,
flushType: string,
myCustomProperty: any,
} |
...
Code Block | ||
---|---|---|
| ||
await session.push(
excludeCurrentPayload?: boolean, flushType?: string
) |
...
title | Example |
---|
Code Block |
---|
if (!session.data.myCounter) {
session.data.myCounter = 0;
}
session.data.myCounter += 1;
await session.push();
/*
## 1 ##
OUTPUT:
payload -> { myCounter: 1 }
meta -> {
count: 1,
firstEvent: '2022-06-10T08:57:06.242Z',
lastEvent: '2022-06-10T08:57:06.242Z'
}
SESSION in store: {
data: {
myCounter: 1,
}
meta: {
count: 1,
firstEvent: '2022-06-10T08:57:06.242Z',
lastEvent: '2022-06-10T08:57:06.242Z'
}
}
## 2 ##
OUTPUT:
payload -> { myCounter: 2 }
meta -> {
count: 2,
firstEvent: '2022-06-10T08:57:06.242Z',
lastEvent: '2022-06-10T08:57:06.553Z'
}
SESSION in store: {
data: {
myCounter: 2,
}
meta: {
count: 2,
firstEvent: '2022-06-10T08:57:06.242Z',
lastEvent: '2022-06-10T08:57:06.553Z'
}
}
*\
|
delete
: Deletes the current session. You will not be able to perform any operation on the session object after it has been deleted.
Code Block | ||
---|---|---|
| ||
await session.delete();
// Will not push!
await session.push();
// Will not be saved!
session.data.myProperty = 1337; |
flush(excludeCurrentPayload?: boolean)
: Flushes the data and deletes the session. You will not be able to perform any operation on the session object after it has been deleted.
...
title | Note! |
---|
...
In case of batch stream, On Timeout is triggered for timedout sessions when the stream is run. And, in case of real-time streams, the On Timeout is triggered for timed out sessions every 60 seconds.
...
Code Block | ||
---|---|---|
| ||
await session.flush(
excludeCurrentPayload?: boolean, flushType?: string
) |
clear():
Clears the session and can be re-used.
Code Block |
---|
await session.clear() |
...
title | Example |
---|
Code Block |
---|
session.setTimeout(new Date('2022-06-10T08:57:06.242Z'));
const moment = require('moment');
session.setTimeout(moment().add(1, 'days').toDate()); |
...
The configuration view appears as shown below.
Configuration Screen
...
Specify the name of the input field key (or column name) based on value of which you would like to group the aggregated data. Group by is a rule to group based on same value stored in a column.
Note | ||
---|---|---|
| ||
You can add multiple group by rules. In this case, if the grouping cannot be performed on the basis of first rule then the second rule is taken into consideration, and so on and so forth. If a record does not match any of the Group by rule(s) the stream will abort. If you don't specify anything in Group by, then aggregation will be performed on all the columns. |
Script block
You can define the custom logic using the following tabs:
...
title | On Transform |
---|
...
title | On Timeout |
---|
...
Preview is not available for script mentioned in the On Timeout block.
...
In this section, you will see some easy- to-understand examples:
Note | ||
---|---|---|
| ||
Refer to the PeppaCRM example stream to understand the usage of the Script Aggregator Function. You can also import this example stream to be able to use it in your solution. |
Info | ||
---|---|---|
|
Code Block |
---|
/* On Transform script */
if (!session.data.sum)
{
// Initiate sum property
session.data.sum = 0;
// Configure the session to timeout 1 hour in the future from first matching record
session.setTimeout(new Date(Date.now() + 1000 * 60 * 60));
}
// Perform aggregation
session.data.sum += payload.sum;
|
Code Block |
---|
/* On Timeout script */
// Perform aggregation
session.data.mean=session.data.sum/session.meta.count;
// Flush session
await session.flush(); |
title | Example 2: Partial Push and Conditional Flush |
---|
The session is partially pushed downstream for every 5 records that matches the current session. If 100 records has matched the session, it will be flushed.
Code Block |
---|
/* On Transform script */
if (!session.data.sum) {
// Initiate sum property
session.data.sum = 0;
}
// Perform aggregation
session.data.sum += payload.value;
if (session.meta.count === 100) {
// Flush on the 100th record
await session.flush();
} else if (session.meta.count % 5) {
// Push session every 10th record
await session.push();
} |
title | Example 3: Exclusive Push |
---|
The session is pushed downstream if a matching record has a "day" that differs from the "day" stored in the session data. During the push, the meta that was provided with the current record will be ignored. This means that the state of the meta pushed along downstream will hold information from the previous aggregation of the session. For example, if the meta.count increased to 5 with the current record, it will be pushed downstream with count 4 as that was its previous state.
Code Block |
---|
function init() {
// Initiate sum property
session.data.sum = 0;
session.data.day = payload.day;
}
if (!session.data.sum) {
init();
}
if (session.data.day !== payload.day) {
await session.push(true);
session.clear();
init();
}
// Perform aggregation
session.data.sum += payload.value; |
title | Licensing Information |
---|
The feature described in this documentation is optional and might not be part of the edition covered by your company's license. .
This section contains the following sub-sections:
Child pages | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|