The scenario in this example is to use streaming analytics to determine road tolls according to the current traffic situation per segment of a highway.
The Siddhi Analytics agent is used to analyze the data sent to the agent, and determine the toll that is to be applied.
Workflow example to determine road tolls
The sections below provide descriptions of agent configurations for this example workflow.
Input
The Analysis agent is configured to consume data provided from the agent collecting traffic data which is then routed to the Siddhi Analytics agent for analysis and to determine the appropriate toll based on the streamed data.
The data consumed by the Analysis agent and sent to the Siddhi Analytics agent can be seen in the figure shown further below, Example output shown in Workflow Monitor, which shows the workflow monitor for this example.
Example Analysis agent configuration in traffic tolls case
Example - Code for Analysis agent
import ultra.siddhi.Segmented_Tolls; consume { if (instanceOf(input, FileSend)){ FileSend aInput = (FileSend) input; CarLoc aCarLoc = (CarLoc) aInput.Data; debug(aCarLoc); udrRoute(aCarLoc); }; };
Streaming_Analytics
The Siddhi Analytics agent processes the UDRs by defining streams, selecting the elements that you require to be analyzed and where appropriate inserted into another stream in series, and finally to the selected output stream.
In this example, the elements of the input stream are defined first. Each stream in turn selects the relevant elements for analysis and when the determined condition is met, the relevant data is inserted into the next stream. This example shows streams being joined to correlate all the conditions met to be calculated and to apply the appropriate tolls based on the analysis of the input data.
The results are inserted into the selected output stream, which in this case is SegToll, and the output type is Toll. The resulting data is sent to the next agent in the workflow.
Example Siddhi Analytics agent configuration
Example - Siddhi query language for traffic tolls
define stream CarLocInputStream(input object, car_id string, speed int, exp_way int, lane int, dir int, x_pos float); -- CarSegStr: Compute in which segment the car is located from CarLocInputStream select car_id, speed, exp_way, lane, dir, math:ceil(x_pos/5280) as seg insert into CarSegStr; -- SegVol: Compute density of cars in a segment from CarSegStr # window.time(1 min) select exp_way, lane, dir, seg, count(*) as volume group by exp_way, lane, dir, seg having volume > 50 insert into SegVol; -- SegAvgSpeed: Compute segments with average speed less than 60 from CarSegStr # window.time(1 min) select exp_way, lane, dir, seg, avg(speed) as avg_speed having avg_speed < 60 insert into SegAvgSpeed; -- SegToll: Compute per segment toll from SegAvgSpeed unidirectional join SegVol # window.time(35 sec) on SegAvgSpeed.exp_way == SegVol.exp_way and SegAvgSpeed.lane == SegVol.lane and SegAvgSpeed.dir == SegVol.dir and SegAvgSpeed.seg == SegVol.seg select SegAvgSpeed.exp_way, SegAvgSpeed.lane, SegAvgSpeed.dir, SegAvgSpeed.seg, avg(SegVol.volume) * 5 as toll insert into SegToll;
Output in Results_Analysis
The output is shown in the Results_Analysis pane in the Workflow Monitor as shown in the figure below, where the toll is calculated per highway segment based on the data analysis executed in the Siddhi Analytics agent.
Example output shown in Workflow Monitor