9.72.6 Siddhi Analytics Agent Example

The scenario in this example is to use streaming analytics to determine road tolls according to the current traffic situation per segment of a highway. 

The Siddhi Analytics agent is used to analyze the data sent to the agent, and determine the toll that is to be applied.

Workflow example to determine road tolls

The sections below provide descriptions of agent configurations for this example workflow.

Input

The Analysis agent is configured to consume data provided from the agent collecting traffic data which is then routed to the Siddhi Analytics agent for analysis and to determine the appropriate toll based on the streamed data.

The data consumed by the Analysis agent and sent to the Siddhi Analytics agent can be seen in the figure shown further below, Example output shown in Workflow Monitor, which shows the workflow monitor for this example.

Example Analysis agent configuration in traffic tolls case

Example - Code for Analysis agent

import ultra.siddhi.Segmented_Tolls;

consume {
    if (instanceOf(input, FileSend)){
        FileSend aInput = (FileSend) input;
        CarLoc aCarLoc = (CarLoc) aInput.Data;
        debug(aCarLoc);
        udrRoute(aCarLoc);    
    };
};

Streaming_Analytics

The Siddhi Analytics agent processes the UDRs by defining streams, selecting the elements that you require to be analyzed and where appropriate inserted into another stream in series, and finally to the selected output stream.

In this example, the elements of the input stream are defined first. Each stream in turn selects the relevant elements for analysis and when the determined condition is met, the relevant data is inserted into the next stream. This example shows streams being joined to correlate all the conditions met to be calculated and to apply the appropriate tolls based on the analysis of the input data.

The results are inserted into the selected output stream, which in this case is SegToll, and the output type is Toll. The resulting data is sent to the next agent in the workflow.

Example Siddhi Analytics agent configuration

Example - Siddhi query language for traffic tolls

define stream CarLocInputStream(input object, car_id string, speed int, exp_way int, lane int, dir int, x_pos float);

-- CarSegStr: Compute in which segment the car is located
from CarLocInputStream
select car_id, speed, exp_way, lane, dir, math:ceil(x_pos/5280) as seg
insert into CarSegStr;

-- SegVol: Compute density of cars in a segment
from CarSegStr # window.time(1 min)
select exp_way, lane, dir, seg, count(*) as volume 
group by exp_way, lane, dir, seg
having volume > 50
insert into SegVol;

-- SegAvgSpeed: Compute segments with average speed less than 60
from CarSegStr # window.time(1 min)
select exp_way, lane, dir, seg, avg(speed) as avg_speed
having avg_speed < 60
insert into SegAvgSpeed;

-- SegToll: Compute per segment toll
from SegAvgSpeed unidirectional
join SegVol # window.time(35 sec)
on SegAvgSpeed.exp_way == SegVol.exp_way
and SegAvgSpeed.lane == SegVol.lane
and SegAvgSpeed.dir == SegVol.dir
and SegAvgSpeed.seg == SegVol.seg
select SegAvgSpeed.exp_way, SegAvgSpeed.lane, SegAvgSpeed.dir, SegAvgSpeed.seg, avg(SegVol.volume) * 5 as toll
insert into SegToll;

Output in Results_Analysis

The output is shown in the Results_Analysis pane in the Workflow Monitor as shown in the figure below, where the toll is calculated per highway segment based on the data analysis executed in the Siddhi Analytics agent.

Example output shown in Workflow Monitor