Unit Test Functions
In addition to the Python functions described in /wiki/spaces/MD9/pages/4863243, the following functions are available for Unit Tests.
does not include any native assert functionality, this functionality can be found in the common python libraries. Add the /wiki/spaces/MD9/pages/4856848 and use in the unit test.
Note!
To access the Unit Test functions described below from a /wiki/spaces/MD9/pages/4856848 you can import the testkit module. This is useful when you write test-related helper functions.
Finalizers
addFinalizer
This function adds a finalizer function to be called at scope exit.
def addFinalizer(func, *args, **kwargs)
Parameter | Description |
---|---|
func | A function |
*args | Positional arguments to be passed to the function |
**kwargs | Keyword arguments to be passed to the function |
Returns | An object with a remove() method |
Example - addFinalizer
res = createResource() addFinalizer(res.destroy)
finalizers
A Python context manager that defines an extra finalization scope. There are two pre-defined finalization scopes; one on the module level and one for each test function. These two scopes should be enough for normal use.
Example - finalizers
with finalizers: res = createResource() addFinalizer(res.destroy) ... # res.destroy() is called at 'with scope' exit.
Conditional testing
SkipException
Tests can be skipped by raising the SkipException exception in the test function blocks, or the initialize function block.
class SkipException(message=None)
Parameter | Type |
---|---|
message | str |
Example - SkipException
def initialize(): raise SkipException('All tests are skipped') def test(): raise SkipException('This test is skipped')
Log functions
logSearch
Searches for entries in the System Log matching the given criteria. Returns an iterator with all matching entries.
def logSearch(fromDate=None, toDate=None, severities=None, areas=None, wfName=None, wfGroupName=None, agentName=None, userName=None, message=None)
Parameter | Type |
---|---|
fromDate | drdate |
toDate | drdate |
severities | list[str] |
areas | list[str] |
wfName | str |
wfGroupName | str |
agentName | str |
userName | str |
message | str |
Example - logSearch
def test(): myit = logSearch(fromDate=drdate('2021-01-26 16:00:00.0 UTC'), toDate=drdate('2021-01-30 23:59:59.0 UTC')) for logEntry in myit: print(logEntry)
logRemove
Removes the log entry with the specified id.
def logRemove(id)
Parameter | Type |
---|---|
id | str |
Workflow functions
These functions are event driven, meaning that an event is fired when the function is called. This means that if you use wfStart and immediately call wfIsRunning after it might return false as the workflow has yet to start. It might be necessary to loop over some of these functions while waiting for the status to changed.
wfAdd
Adds a new workflow to an existing workflow template.
def wfAdd(wfName, parameters=None)
Parameter | Type |
---|---|
wfName | str |
parameters | dict[str, any] |
Example - wfAdd
def test(): # Add a workflow wfAdd('Default.test.workflow_1')
or
def test(): # Add a workflow and specify parameter values wfAdd('Default.test.workflow_1', {'Time Unit': 'SECONDS', 'Interval': 4})
wfDelete
Deletes a workflow.
def wfDelete(wfName)
Parameter | Type |
---|---|
wfName | str |
Example - wfDelete
def test(): wfDelete('Default.test.workflow_1')
wfExists
Returns True if the workflow exists.
def wfExists(wfName)
Parameter | Type |
---|---|
wfName | str |
Example - wfExists
def test(): wfExists('Default.test.workflow_1')
wfStart
Starts the workflow. If successful, the workflow has been started but may not yet be running.
def wfStart(wfName, ec=None)
Parameter | Type |
---|---|
wfName | str |
ec | str |
Example - wfStart
def test(): # Starts the workflow on the default EC wfStart('Default.test.workflow_1')
or
def test(): # Starts the workflow on ec1 wfStart('Default.test.workflow_1', ec='ec1')
wfSetDebugMode
Sets the workflow debug mode.
Debug mode can be set before starting the workflow or after it has become running.
def wfSetDebugMode(wfName, on)
Parameter | Type |
---|---|
wfName | str |
on | bool |
Example - wfSetDebugMode
def test(): wfSetDebugMode('Default.test.workflow_1', True) wfStart('Default.test.workflow_1')
or
def test(): wfStart('Default.test.workflow_1') # Wait until wf is running wfSetDebugMode('Default.test.workflow_1', True)
wfIsCompleted
Returns True if the workflow completed.
def wfIsCompleted(wfName)
Parameter | Type |
---|---|
wfName | str |
Example - wfIsCompleted
def test(): wfIsCompleted('Default.test.workflow_1') # False wfStart('Default.test.workflow_1') wfIsCompleted('Default.test.workflow_1') # False wfStop('Default.test.workflow_1') # Wait a sec for the workflow to actually stop wfIsCompleted('Default.test.workflow_1') # True
wfIsRunning
Returns True if the workflow is running.
def wfIsRunning(wfName)
Parameter | Type |
---|---|
wfName | str |
Example - wfIsRunning
def test(): wfIsRunning('Default.test.workflow_1') # False wfStart('Default.test.workflow_1') # Wait for the workflow to actually start wfIsRunning('Default.test.workflow_1') # True wfStop('Default.test.workflow_1') wfIsRunning('Default.test.workflow_1') # False
wfIsAborted
Returns True if the workflow aborted.
def wfIsAborted(wfName)
Parameter | Type |
---|---|
wfName | str |
wfAbortMessage
Returns the workflow abort message, if any.
def wfAbortMessage(wfName)
Parameter | Type |
---|---|
wfName | str |
Example - wfAbortMessage
def test(): if wfIsAborted('Default.test.workflow_1'): print(wfAbortMessage('Default.test.workflow_1')) else: pass # do something
Event functions
Below is a general example for all event functions.
Our workflow ('Default.test.workflow_1') used in the example below is a simple workflow with a pulse agent connecting to an analysis agent where we debug the input.
For information on how to format the filter see: Event Types(3.0).
Example - Event functions
def test(): # Create an event buffer for our test workflow eventBuffer = eventBufferCreate(filter=dict(eventName='Debug Event', workflowName='Default.test.workflow_1'), ttl=60) # Make sure we are debugging as the filter is looking for debug events wfSetDebugMode('Default.test.workflow_1', True) # Start the workflow wfStart('Default.test.workflow_1') # Wait until some events have been produced # and then search the buffer events = eventBufferSearch(eventBuffer) for x in events: # Print only the agent message from the event iterator print(x.agentMessage) # Optionally destroy the buffer # it will be destroyed at the end of scope otherwise eventBufferDestroy(eventBuffer)
eventBufferCreate
Creates an event buffer where events matching filter are stored in memory for later inspection.
Returns the event buffer to be used in calls to other event buffer functions.
def eventBufferCreate(filter=None, ttl=None, maxSize=None)
Parameter | Type |
---|---|
filter | dict[str, any] |
ttl | float |
maxSize | int |
eventBufferDestroy
Destroys the event buffer and releases any resources associated with it.
def eventBufferDestroy(buffer)
Parameter | Type |
---|---|
buffer | object |
eventBufferSearch
Searches for events in the event buffer matching filter.
Returns an iterator with all currently matching events, with most recent event first.
def eventBufferSearch(buffer, filter=None)
Parameter | Type |
---|---|
buffer | object |
filter | dict[str, any] |