Database Bulk Functions(3.0)

The Database Bulk functions enable you to bundle multiple queries into one SQL statement. This significantly improves the throughput due to reduced need of context switching and network traversals. The functions are intended for batch usage only. 

Making an SQL Query

When making an SQL query it must follow a certain format and the order of the columns in the query must remain the same in the statement as in the where_clause.

Example - Statement example

Select A,B,C from Test_Table

The where_clause argument for the sqlBulkPrepareStatement. For further information, see the section below, sqlBulkPrepareStatement.

A=? and B=? and C=?

The function adds a "WHERE" to the end of the statement and an "AND" between the static_where_clause and the where_clause statements. An "OR" is added between each where_clause statement.

The lookup SQL query next shows one static_where_clause and two where_clause.

Select A,B,C  FROM Test_Table WHERE (A=? and B=? and C=?) 
AND ((A=? and B=? and C=?) OR (A=? and B=? and C=?))

If the where_clause in the SQL statement contains a range condition instead of an exact lookup key, the intended result cannot directly be associated back to the UDRs. Instead a result based on the significant columns will be returned. The result rows matching the range condition must be associated back to the UDRs using APL code. For further information, see the lookup query example below.

sqlBulkPrepareStatement

This function prepares the the query. A JDBC connection is created and the bulkSize is set.

The returned object is a mandatory parameter to the sqlBulkExecuteQuery() function and is needed for the execution of the lookup SQL query.

any sqlBulkPrepareStatement
(string  dbProfile ,
string  statement ,
string  static_where_clause ,  //set to null if not present
string  where_clause ,
int  bulkSize ,
int  significantColumns ,
int  totalColumns ,
int  timeout ,  //Optional
boolean disableCommit ,  //Optional)
ParameterDescription

dbProfile

Name of the database where the table resides

statement

Statement to send to the database

static_where_clause

The static_where_clause of the SQL statement stays the same throughout the batch mode execution

where_clause

The where_clause of the SQL statement principal.

The where_clause columns that are to be considered significant must correspond with the columns in the select statement. For example, when statement reads "select A, B and C from X", where A and B is significant, the where_clause must begin with "A=? and B=?".

bulkSize

The size of the bundled lookup SQL query, that is the number of UDRs with a unique UDR key to be bundled. The ideal bulkSize depends on database table characteristics such as structure and number of rows.

significantColumns

This parameter indicates which columns in the select statement are significant to tie a result row to its context (UDR).

All columns to be used as significant columns must be included in SQL search condition, and they must be used in the same order as in the search condition. For example, when using an incoming UDR as context and a search condition matching its IMSI field, using significant columns = 1 ties all matching rows to this UDR, namely, all rows with the same IMSI as that carried by the UDR.

totalColumns

The total number of columns in the statement

timeout

The least time interval, in milliseconds, the workflow will allow before a database bulk lookup will be done

disableCommit

An optional parameter to disable the commit statement from being performed at the end of every SQL transaction for this particular function. Setting this parameter to false will result in the commit statement to be performed at the end of every SQL transaction for this particular function. By default,  will have the disableCommit set to true unless otherwise changed via this parameter.

It should be noted that on recent Oracle versions, the DBLink SQL transaction behaviour has changed, where every single SQL statement for remote database transaction requires a commit or rollback statement in order to close a connection.

In addition, PostgreSQL users should set this to false as every statement needs to be committed. 

Returns:

The returned object is a “Prepared Statement” object containing the added parameters, a prepared result list and a maintained contexts list. This object is a mandatory parameter to the sqlBulkExecuteQuery() function and is needed for the execution of the SQL query.

Example - Lookup query

Lookup query with one of the where_clause conditions without an exact lookup key.

To prepare a query:

SELECT Column1, Column2, Column3 FROM Test_Table
WHERE Column1=? AND Column2=? AND Column3 <=?;

The call to the function and the values for the above example:

significantColumns = 2
totalColumns = 3
where_clause = "Column1=? and Column2=? and Column3 <=?"
static_where_clause = null
bulksize = 100
dbprofile = "myFolder.myProfile"
statement = "select Column1, Column2, Column3 from Test_Table"

The call to the APL function:

initialize {
    any pStatement = sqlBulkPrepareStatement("myFolder.myProfile", 
    "select Column1, Column2, Column3 from Test_Table",
    null, "Column1=? and Column2=? and Column3 <=?",
    100, 2,3);
}

sqlBulkExecuteQuery

When a UDR enters sqlBulkExecuteQuery the first met criteria of bulkSize or timeout triggers the database bulk lookup.

The start of the timeout interval will be reset when a database query is done regardless if the last query was due to reached bulkSize or timeout.

Next timeout occasion is calculated by adding the time of the last database lookup to the current timeout value.

list <any> sqlBulkExecuteQuery(any ps,
any  context ,
any value1, any value2, ...)
ParameterDescription

ps

The object returned from a call to the sqlBulkPrepareStatement()

context

A context string, usually the UDR routed to the node

values

A number of values to populate the query

Returns:

A list containing objects that in their turn contains a context object and a result list. This object is a mandatory parameter to the sqlBulkResult* functions.

Example - Using sqlBulkExecuteQuery

The call to the function and the values for the example above, Example - Lookup Query:

ps =  pStatement object from sqlBulkPrepareStatement()
context = myUDR
values =  myValues1, myValue2, myValue3

Call to the APL function:

consume{
    list <any> result = sqlBulkExecuteQuery(pStatement,
    myUDR, 
    10,2,3); 
}

The function executes the SQL query before the bulkSize is reached.

list <any> sqlBulkDrain(any ps)
ParameterDescription

ps

A preparedStatement object returned by the sqlBulkPrepareStatement(), containing among others a result list and a maintained contexts list.

Returns:

A list containing objects that in their turn contain a context object and a list with results.

sqlBulkResultGetContext

The function finds the context object from one result object in the list returned either by the sqlBulkExecuteQuery() or the sqlBulkDrain() functions.

any sqlBulkResultGetContext(any presult)
ParameterDescription

presult

A result object from the list returned by the sqlBulkExecuteQuery() or the sqlBulkDrain() functions

Returns:

The context object associated with the presult parameter

sqlBulkResultGetSize

The function states the size of a result object in the list returned from the sqlBulkExecuteQuery() or the sqlBulkDrain() functions.

int sqlBulkResultGetSize(any presult)
ParameterDescription

presult

A result object from the list returned by the sqlBulkExecuteQuery() or the sqlBulkDrain() functions

Returns:

An integer representing the size of the result

sqlBulkResultGetValue

The function gets a result value from a result object in the list returned by either the sqlBulkExecuteQuery() or the sqlBulkDrain() functions.

any sqlBulkResultGetValue(any presult,
    int index,
    int column)
ParameterDescription

presult

A result object from the list returned by the sqlBulkExecuteQuery() or the sqlBulkDrain() functions

index

The index of the result table

column

The column number

Returns:

The value of the result object

sqlBulkElapsedNanoTime

This function accumulates the time of the execution of the database queries that is end time of execution minus start time of execution.

long sqlBulkElapsedNanoTime()
ParameterDescription

Returns:

A long integer representing the accumulated nanoseconds.

sqlBulkClose

The function closes down the database connection as well as the SQL statement. If the sqlBulkClose is not called the connection will be closed when the workflow stops.

void sqlBulkClose(any  ps )
ParameterDescription

ps

A preparedStatement object returned by the sqlBulkPrepareStatement(), containing among others a result list and a maintained contexts list

Returns:

Nothing

Example Workflow

The example shows a basic workflow where a Database Bulk lookup with a range condition is performed.


Example workflow

Prerequisites

To use the sqlBulk related APL commands there must be a database profile configuration prepared. For further information, see Database (3.0) in the Desktop user's guide.

Decoder_1 Agent

The incoming UDRs contains name and number to 10 people.

Katerine,Stenberg,0046123456
Kalle,Andersson,0046789123
Mia,Karlsson,0046999111
Karina,Hansson,0046222333
PerErik,Larsson,0046111999
Mikael,Grenkvist,0046333444
Jonas,Bergsten,0046555666
Petra,Sjoberg,0046777888
Karl,Kvistgren,0046444555
FiaLotta,Bergman,0046666222

Analysis_1 Agent

The Analysis agent does the database bulk lookup. In this case, the number of calls made by the people listed in the incoming UDRs.

When the bulksize is reached (in this case 10) the query to the database selected in the database profile is made.

// A preparedStatement object
any pStatementObj;

// Default.ExampleTable - the name of the database profile

initialize {
   pStatementObj = sqlBulkPrepareStatement(
   "Default.ExampleTable", "SELECT first_name, last_name, 
   telephone_no, usage_min FROM EXAMPLE_TABLE",
   null, "first_name=? and last_name=? and telephone_no=? ", 10, 3, 4);
}

consume {
   debug("In consume adding queries to the bulk execute");
   processResults( sqlBulkExecuteQuery(
   pStatementObj, input, input.first_name, input.last_name,
   input.telephoneno));
}

drain {
   processResults( sqlBulkDrain (pStatementObj));
}

void processResults( list <any> resultList ) {
   if( resultList == null ) {

      // Do nothing

      return;
    }

   int i=0;

   // Iterate the results returned from the database

   while( i < listSize( resultList )) {
      any result = listGet( resultList, i );
      i=i+1;

      // A string to populate with results from the database query

      string myTotalValue;
      int resultSize = sqlBulkResultGetSize(result);

      // At end of the iteration the myTotalValue will contain a 
      // comma separated usageMin value string with values meeting 

      int index = 0;

      while(index < resultSize){
         // Populate myTotalValue with result if it met range condition
    
         int usageMinValue = (int) sqlBulkResultGetValue(result, index, 3);
    
         // In this test the range condition is set to a constant value.
    
         if( usageMinValue >= 10 ){
             myTotalValue = myTotalValue+","+usageMinValue;
         }
    
         index = index +1;
      }
                
      InternalUDR iUdr = (InternalUDR) sqlBulkResultGetContext( result );
            
      // Assign the populated array to the usageMin
    
      iUdr.usagemin = myTotalValue;
      udrRoute( iUdr );
   }
}

The database lookups are in this example done in a database table with a content shown here:

Database Table

Encoder_1 Agent

When the encoder has encoded the information from the analysis agent the result sent to the disk out agent is shown here:

Katerine,Stenberg,46123456,,10,10
Kalle,Andersson,46789123,,10
Mia,Karlsson,46999111,,10
Karina,Hansson,46222333,,10
PerErik,Larsson,46111999,,11,11
Mikael,Grenkvist,46333444,,10
Jonas,Bergsten,46555666,,10
Petra,Sjoberg,46777888,,10
Karl,Kvistgren,46444555,,10
FiaLotta,Bergman,46666222,,10