Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

This section provides an example of how you can use the Python collection and processing agents for a batch workflow. In this instance the Python collection agent reads an Excel file. 

Workflow example with the Python collection and processing agents for batch


The Excel file being processed

Workflow Configuration

The sections below provide descriptions of agent configurations for this example workflow.

Workflow configuration, showing the workflow table

Read_Excel

The Python collection agent is configured to read all the Excel files from the configured directory, process each file in a separate batch and output one UDR per row.

General Tab

Example - Code for Read_Excel

import os
import pandas
from .UFL_Types import ItemCount

txn_directory = persistent()
txn_filename = persistent()

def execute():
    # Read all excel files from the configured directory
    txn_directory.value = dynamicFieldGet("Directory", "Name")
    xls = [f for f in os.listdir(txn_directory.value) if f.endswith(".xlsx")]
    # Process each excel file in a separate batch, output one UDR per row
    for fn in xls:
        if not isStopped():
            txn_filename.value = fn
            df = pandas.read_excel(os.path.join(txn_directory.value, fn))
            mimSet("Filename", fn)
            beginBatch()
            debug("Begin batch (txnid=%s, filename=%r)" % (getTransactionId(), fn))
            for idx, item, count in df.itertuples():
                udr = ItemCount()
                udr.item = item
                udr.count = count
                udrRoute(udr)
            endBatch()

def commit():
    # On commit, move the processed file into the "done/" subdirectory
    debug("Commit (txnid=%s, filename=%r)" % (getTransactionId(), txn_filename.value))
    old = os.path.join(txn_directory.value, txn_filename.value)
    new = os.path.join(os.path.join(txn_directory.value, "done"), txn_filename.value)
    if os.path.exists(old):
        os.rename(old, new)
        

MIM Tab

The agent publishes Filename as a string.

Analyse_Rows

The Python processing agent processes UDRs by defining a consume block, and uses the selected Interpreter profile that is used to configure the Python executables.

Example - Code for Analyse_Rows

total_count = 0

def beginBatch():
    fn = mimGet("Read_Excel", "Filename")
    debug("Begin batch (txnid=%s, filename=%r)" % (getTransactionId(), fn))
    global total_count
    total_count = 0

def consume(input):
    debug("%s=%s" % (input.item, input.count))
    global total_count
    total_count += input.count

def endBatch():
    fn = mimGet("Read_Excel", "Filename")
    debug("File %r contained %s items" % (fn, total_count))

Workflow Monitor

The output is shown step by step in the workflow monitor when you have debug enabled.

  • No labels