This section provides an example of how you can use the Python collection and processing agents for a batch workflow. In this instance the Python collection agent reads an Excel file.
Workflow example with the Python collection and processing agents for batch
The Excel file being processed
Workflow Configuration
The sections below provide descriptions of agent configurations for this example workflow.
Workflow configuration, showing the workflow table
Read_Excel
The Python collection agent is configured to read all the Excel files from the configured directory, process each file in a separate batch and output one UDR per row.
General Tab
Example - Code for Read_Excel
import os import pandas from .UFL_Types import ItemCount txn_directory = persistent() txn_filename = persistent() def execute(): # Read all excel files from the configured directory txn_directory.value = dynamicFieldGet("Directory", "Name") xls = [f for f in os.listdir(txn_directory.value) if f.endswith(".xlsx")] # Process each excel file in a separate batch, output one UDR per row for fn in xls: if not isStopped(): txn_filename.value = fn df = pandas.read_excel(os.path.join(txn_directory.value, fn)) mimSet("Filename", fn) beginBatch() debug("Begin batch (txnid=%s, filename=%r)" % (getTransactionId(), fn)) for idx, item, count in df.itertuples(): udr = ItemCount() udr.item = item udr.count = count udrRoute(udr) endBatch() def commit(): # On commit, move the processed file into the "done/" subdirectory debug("Commit (txnid=%s, filename=%r)" % (getTransactionId(), txn_filename.value)) old = os.path.join(txn_directory.value, txn_filename.value) new = os.path.join(os.path.join(txn_directory.value, "done"), txn_filename.value) if os.path.exists(old): os.rename(old, new)
MIM Tab
The agent publishes Filename as a string.
Analyse_Rows
The Python processing agent processes UDRs by defining a consume
block, and uses the selected Interpreter profile that is used to configure the Python executables.
Example - Code for Analyse_Rows
total_count = 0 def beginBatch(): fn = mimGet("Read_Excel", "Filename") debug("Begin batch (txnid=%s, filename=%r)" % (getTransactionId(), fn)) global total_count total_count = 0 def consume(input): debug("%s=%s" % (input.item, input.count)) global total_count total_count += input.count def endBatch(): fn = mimGet("Read_Excel", "Filename") debug("File %r contained %s items" % (fn, total_count))
Workflow Monitor
The output is shown step by step in the workflow monitor when you have debug enabled.