Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

With the Script Aggregator provides a flexible and controlled way to Function, you can correlate, consolidate, and aggregate records of different data types . It is used in a flexible and controlled way. You can use it to perform more advanced and complex operations that cannot be solved using than you can do with the pre-defined operations in the Data Aggregator. As a user, you can use

The Script Aggregator to

...

has the following capabilities:

  • Perform correlation:

     To aggregate

     Correlating usage data with nested data

    object

    objects across multiple sources

  • perform

    Perform advanced aggregation:

    To aggregate

    Aggregating usage data with nested data object.

  • push Push partially: To send Sending data to the downstream function functions during aggregation based on a certain conditionconditions.

title
Info

Prerequisite

To be able to use take full potential advantage of the capabilities in this function, Function you must  be know JavaScript well versed with JavaScript.

In this topic:

Aggregation Methods

Configuration

Examples

...

You can use the following properties/objects while performing the aggregation:

...

The session object represents the current matching aggregated data.

...

flushType: The flushType is set as part of the meta downstream once the session is either pushed or flushed.

mycustomProperty: Allows to add a custom property in the meta data, that will also be pushed downstream along with the payload.

...

expandedtrue
iconfalse
titleMeta properties
Code Block
meta = {
  count: number,
  firstEvent: timestamp,
  lastEvent: timestamp,
  flushType: string,
  myCustomProperty: any,
}

...

Code Block
titleSyntax
await session.push(
excludeCurrentPayload?: boolean, flushType?: string
)

...

titleExample
Code Block
if (!session.data.myCounter) {
  session.data.myCounter = 0;  
}
session.data.myCounter += 1;


await session.push();

/* 
  ## 1 ##
  OUTPUT:
    payload -> { myCounter: 1 }
    meta -> { 
      count: 1, 
      firstEvent: '2022-06-10T08:57:06.242Z', 
      lastEvent: '2022-06-10T08:57:06.242Z' 
    }
  SESSION in store: {
    data: {
      myCounter: 1,
    }
    meta: {
      count: 1, 
      firstEvent: '2022-06-10T08:57:06.242Z',  
      lastEvent: '2022-06-10T08:57:06.242Z' 
    }
  }

  ## 2 ##
  OUTPUT:
    payload -> { myCounter: 2 }
    meta -> { 
      count: 2, 
      firstEvent: '2022-06-10T08:57:06.242Z', 
      lastEvent: '2022-06-10T08:57:06.553Z' 
    }
  SESSION in store: {
    data: {
      myCounter: 2,
    }
    meta: {
      count: 2, 
      firstEvent: '2022-06-10T08:57:06.242Z', 
      lastEvent: '2022-06-10T08:57:06.553Z' 
    }
  }
  
*\

delete: Deletes the current session. You will not be able to perform any operation on the session object after it has been deleted.

Code Block
titleSyntax
 await session.delete();

// Will not push!
await session.push();

// Will not be saved!
session.data.myProperty = 1337;

flush(excludeCurrentPayload?: boolean): Flushes the data and deletes the session. You will not be able to perform any operation on the session object after it has been deleted.

...

titleNote!

...

In case of batch stream, On Timeout is triggered for timedout sessions when the stream is run. And, in case of real-time streams, the On Timeout is triggered for timed out sessions every 60 seconds.

...

Code Block
titleSyntax
await session.flush(
excludeCurrentPayload?: boolean, flushType?: string
)

clear(): Clears the session and can be re-used.

Code Block
await session.clear()

...

titleExample
Code Block
session.setTimeout(new Date('2022-06-10T08:57:06.242Z'));

const moment = require('moment');
session.setTimeout(moment().add(1, 'days').toDate());

...

The configuration view appears as shown below.

Image Removed

Configuration Screen

...

Specify the name of the input field key (or column name) based on value of which you would like to group the aggregated data. Group by is a rule to group based on same value stored in a column.

Note
titleNote!

You can add multiple group by rules. In this case, if the grouping cannot be performed on the basis of first rule then the second rule is taken into consideration, and so on and so forth. If a record does not match any of the Group by rule(s) the stream will abort.

If you don't specify anything in Group by, then aggregation will be performed on all the columns. 

Script block

You can define the custom logic using the following tabs: 

...

titleOn Transform

...

titleOn Timeout

...

Preview is not available for script mentioned in the On Timeout block.

...

In this section, you will see some easy- to-understand examples:  

Note
titleExample Stream

Refer to the PeppaCRM example stream to understand the usage of the Script Aggregator Function. You can also import this example stream to be able to use it in your solution.

During initialisation, the timeout is configured to 1 hour in the future. Once the timeout is triggered, a mean value is calculated from the sum and then the session is flushed.

Info
titleExample 1: Aggregate and Set Timeout
Code Block
/* On Transform script */
if (!session.data.sum) 
{
  // Initiate sum property
  session.data.sum = 0;
  // Configure the session to timeout 1 hour in the future from first matching record
  session.setTimeout(new Date(Date.now() + 1000 * 60 * 60));
}
// Perform aggregation
session.data.sum += payload.sum;

Code Block
/* On Timeout script */

// Perform aggregation
session.data.mean=session.data.sum/session.meta.count;

// Flush session
await session.flush();
Info
titleExample 2: Partial Push and Conditional Flush

The session is partially pushed downstream for every 5 records that matches the current session. If 100 records has matched the session, it will be flushed.

Code Block
​/* On Transform script */
if (!session.data.sum) {
  // Initiate sum property
  session.data.sum = 0;
}
// Perform aggregation
session.data.sum += payload.value;
​
if (session.meta.count === 100) {
  // Flush on the 100th record
  await session.flush();
} else if (session.meta.count % 5) {
  // Push session every 10th record
  await session.push();
}  
Info
titleExample 3: Exclusive Push

The session is pushed downstream if a matching record has a "day" that differs from the "day" stored in the session data. During the push, the meta that was provided with the current record will be ignored. This means that the state of the meta pushed along downstream will hold information from the previous aggregation of the session. For example, if the meta.count increased to 5 with the current record, it will be pushed downstream with count 4 as that was its previous state.

Code Block
function init() {
  // Initiate sum property
  session.data.sum = 0;
  session.data.day = payload.day;
}
​
if (!session.data.sum) {
  init();
}
​
if (session.data.day !== payload.day) {
  await session.push(true);
  session.clear();
  init();
}
​
// Perform aggregation
session.data.sum += payload.value;
Info
titleLicensing Information

The feature described in this documentation is optional and might not be part of the edition covered by your company's license. .

This section contains the following sub-sections:

Child pages (Children Display)
depth1
allChildrentrue
style
sortAndReverse
first0