Step Handlers

Understanding Step Handlers

Step handlers are asynchronous functions that implement the behavior of workflow steps. They:

  • Process user input and form submissions
  • Perform business logic and data processing
  • Update the UI to reflect the current state
  • Update the shared context with relevant data
  • Determine the next step in the workflow

Step handlers are connected to workflow steps using the workflow_step decorator, which associates the function with a specific step in a workflow.

Basic Step Handler Structure

A step handler follows this basic structure:

from agentnexus.workflow_manager import workflow_step
from agentnexus.base_types import WorkflowStepResponse, UIComponentUpdate

@workflow_step(
    agent_config=my_agent,
    workflow_id="my_workflow",
    step_id="my_step",
    name="My Step",
    description="This is a sample step",
    ui_components=[my_form, my_display]
)
async def handle_my_step(input_data) -> WorkflowStepResponse:
    """Handle my workflow step."""
    # Extract context and form data
    context = getattr(input_data, 'context', {}) or {}
    form_data = getattr(input_data, 'form_data', None)

    # Check for form submission
    if form_data and form_data.get("action") == "submit":
        # Process form submission
        values = form_data.get("values", {})

        # Return response with next step
        return WorkflowStepResponse(
            data={"status": "complete"},
            ui_updates=[],
            next_step_id="next_step",
            context_updates={
                "field1": values.get("field1", ""),
                "field2": values.get("field2", "")
            }
        )

    # Initial step load
    return WorkflowStepResponse(
        data={"status": "ready"},
        ui_updates=[
            UIComponentUpdate(
                key="my_form",
                state={"values": {
                    "field1": context.get("field1", ""),
                    "field2": context.get("field2", "")
                }}
            )
        ],
        context_updates={}
    )

Step Handler Decorator

The workflow_step decorator associates a function with a workflow step:

@workflow_step(
    agent_config=document_agent,
    workflow_id="document_processing",
    step_id="upload",
    name="Document Upload",
    description="Upload a document for processing",
    ui_components=[upload_form, document_preview]
)
async def handle_upload_step(input_data) -> WorkflowStepResponse:
    # Implementation
    # ...

Decorator Parameters

ParameterTypeDescription
agent_configAgentConfigThe agent this workflow belongs to (required)
workflow_idstrIdentifier of the workflow (required)
step_idstrIdentifier of the step within the workflow (required)
namestrHuman-readable name of the step (required)
descriptionstrDetailed description of the step (required)
ui_componentsOptional[List[UIComponent]]UI components for this step
allow_dynamic_uiboolWhether to allow dynamic UI updates (default: True)

Input Data Structure

The input_data parameter passed to step handlers contains:

class WorkflowStepInput:
    # Session ID for this workflow instance
    session_id: Optional[str]

    # Shared context data
    context: Dict[str, Any]

    # Optional form submission data
    form_data: Optional[Dict[str, Any]]

    # Additional parameters
    params: Optional[Dict[str, Any]]

Extracting Input Data

Since the input structure can vary, it’s best to extract data defensively:

def extract_input_data(input_data) -> Dict[str, Any]:
    """Extract data from input safely."""
    result = {
        'context': {},
        'form_data': {},
        'session_id': None
    }

    # Extract context
    if hasattr(input_data, 'context'):
        result['context'] = input_data.context or {}
    elif isinstance(input_data, dict) and 'context' in input_data:
        result['context'] = input_data['context'] or {}

    # Extract form_data
    if hasattr(input_data, 'form_data'):
        result['form_data'] = input_data.form_data or {}
    elif isinstance(input_data, dict) and 'form_data' in input_data:
        result['form_data'] = input_data['form_data'] or {}

    # Extract session_id
    if hasattr(input_data, 'session_id'):
        result['session_id'] = input_data.session_id
    elif isinstance(input_data, dict) and 'session_id' in input_data:
        result['session_id'] = input_data['session_id']

    return result

Then use it in your handler:

async def handle_my_step(input_data) -> WorkflowStepResponse:
    """Handle my workflow step."""
    # Extract data safely
    extracted = extract_input_data(input_data)
    context = extracted['context']
    form_data = extracted['form_data']
    session_id = extracted['session_id']

    # Rest of handler implementation
    # ...

Step Response Structure

Step handlers return a WorkflowStepResponse object that determines:

  • What data to return to the client
  • Which UI components to update
  • What context to preserve for the next step
  • Which step to transition to next
return WorkflowStepResponse(
    data={"status": "complete", "message": "Step completed successfully"},
    ui_updates=[
        UIComponentUpdate(
            key="result_display",
            state={"markdown_content": "Step completed successfully!"}
        )
    ],
    next_step_id="next_step",  # Go to this step next
    context_updates={
        "completed_at": datetime.now().isoformat(),
        "result": process_result
    }
)

Response Parameters

ParameterTypeDescription
dataDict[str, Any]Data to return to the client
ui_updatesList[UIComponentUpdate]UI component updates
next_step_idOptional[str]ID of the next step (None to stay on current step)
context_updatesDict[str, Any]Updates to the workflow context

Component Event Handling

Step handlers can process events from UI components:

async def handle_document_step(input_data) -> WorkflowStepResponse:
    """Handle document processing step."""
    # Extract data
    extracted = extract_input_data(input_data)
    context = extracted['context']
    form_data = extracted['form_data']

    # Check for component events
    if form_data:
        action = form_data.get("action")
        component_key = form_data.get("component_key")

        # Handle file upload event
        if action == "file_uploaded" and component_key == "document_uploader":
            file_data = form_data.get("file_data", {})
            file_name = file_data.get("name", "")
            file_content = file_data.get("content", "")

            # Process the uploaded file
            processed_content = process_file(file_content)

            # Update UI and context
            return WorkflowStepResponse(
                data={"file_processed": True, "file_name": file_name},
                ui_updates=[
                    UIComponentUpdate(
                        key="document_preview",
                        state={"content": processed_content}
                    ),
                    UIComponentUpdate(
                        key="status_message",
                        state={"markdown_content": f"File '{file_name}' uploaded successfully."}
                    )
                ],
                context_updates={
                    "file_name": file_name,
                    "file_content": file_content,
                    "processed_content": processed_content
                }
            )

        # Handle form submission
        elif action == "submit" and component_key == "settings_form":
            # Process form values
            # ...

    # Initial step load
    # ...

Common Step Handler Patterns

Form Processing Step

A step that collects and processes form data:

@workflow_step(
    agent_config=registration_agent,
    workflow_id="user_registration",
    step_id="contact_info",
    name="Contact Information",
    description="Collect user contact information",
    ui_components=[contact_form, instructions]
)
async def handle_contact_step(input_data) -> WorkflowStepResponse:
    """Handle contact information step."""
    # Extract context and form data
    context = getattr(input_data, 'context', {}) or {}
    form_data = getattr(input_data, 'form_data', None)

    # Process form submission
    if form_data and form_data.get("action") == "submit":
        # Extract form values
        values = form_data.get("values", {})
        email = values.get("email", "")
        phone = values.get("phone", "")
        address = values.get("address", "")

        # Validate email (simplified example)
        if not "@" in email:
            return WorkflowStepResponse(
                data={"status": "validation_error", "field": "email"},
                ui_updates=[
                    UIComponentUpdate(
                        key="contact_form",
                        state={"errors": {"email": "Invalid email address"}}
                    )
                ],
                context_updates={}
            )

        # Move to next step
        return WorkflowStepResponse(
            data={"status": "contact_info_complete"},
            ui_updates=[],
            next_step_id="preferences",
            context_updates={
                "contact_email": email,
                "contact_phone": phone,
                "contact_address": address
            }
        )

    # Initial step load - populate form with existing data
    return WorkflowStepResponse(
        data={"status": "ready"},
        ui_updates=[
            UIComponentUpdate(
                key="contact_form",
                state={"values": {
                    "email": context.get("contact_email", ""),
                    "phone": context.get("contact_phone", ""),
                    "address": context.get("contact_address", "")
                }}
            )
        ],
        context_updates={}
    )

Data Processing Step

A step that processes data without user input:

@workflow_step(
    agent_config=data_agent,
    workflow_id="data_analysis",
    step_id="analyze",
    name="Data Analysis",
    description="Analyze the uploaded data",
    ui_components=[analysis_results, progress_indicator]
)
async def handle_analyze_step(input_data) -> WorkflowStepResponse:
    """Handle data analysis step."""
    # Extract context
    context = getattr(input_data, 'context', {}) or {}

    # Get data from context
    data = context.get("uploaded_data")
    if not data:
        return WorkflowStepResponse(
            data={"status": "error", "message": "No data to analyze"},
            ui_updates=[
                UIComponentUpdate(
                    key="analysis_results",
                    state={"markdown_content": "## Error\n\nNo data available for analysis."}
                )
            ],
            context_updates={}
        )

    # Perform analysis (simplified example)
    try:
        import pandas as pd
        import numpy as np
        import json

        # Parse data as CSV
        df = pd.read_csv(io.StringIO(data))

        # Perform basic analysis
        analysis_results = {
            "row_count": len(df),
            "column_count": len(df.columns),
            "columns": list(df.columns),
            "numeric_columns": list(df.select_dtypes(include=[np.number]).columns),
            "missing_values": df.isnull().sum().to_dict(),
            "summary": json.loads(df.describe().to_json())
        }

        # Format results as markdown
        markdown_results = f"""
## Analysis Results

**Dataset Overview:**
- Rows: {analysis_results['row_count']}
- Columns: {analysis_results['column_count']}

**Column Types:**
- Numeric columns: {', '.join(analysis_results['numeric_columns'])}

**Missing Values:**
{chr(10).join(['- ' + col + ': ' + str(val) for col, val in analysis_results['missing_values'].items() if val > 0])}

**Basic Statistics:**

{df.describe().to_string()}

        """

        # Update UI and context
        return WorkflowStepResponse(
            data={"status": "analysis_complete"},
            ui_updates=[
                UIComponentUpdate(
                    key="analysis_results",
                    state={"markdown_content": markdown_results}
                ),
                UIComponentUpdate(
                    key="progress_indicator",
                    state={"progress": 100, "status": "complete"}
                )
            ],
            next_step_id="visualize",
            context_updates={
                "analysis_results": analysis_results,
                "analysis_markdown": markdown_results
            }
        )
    except Exception as e:
        return WorkflowStepResponse(
            data={"status": "error", "message": str(e)},
            ui_updates=[
                UIComponentUpdate(
                    key="analysis_results",
                    state={"markdown_content": f"## Error\n\nAn error occurred during analysis: {str(e)}"}
                ),
                UIComponentUpdate(
                    key="progress_indicator",
                    state={"progress": 0, "status": "error"}
                )
            ],
            context_updates={}
        )

Decision Step

A step with branching based on user choice:

@workflow_step(
    agent_config=support_agent,
    workflow_id="support_request",
    step_id="issue_type",
    name="Issue Type",
    description="Determine the type of support issue",
    ui_components=[issue_type_form, help_text]
)
async def handle_issue_type_step(input_data) -> WorkflowStepResponse:
    """Handle issue type selection step."""
    # Extract context and form data
    context = getattr(input_data, 'context', {}) or {}
    form_data = getattr(input_data, 'form_data', None)

    # Process form submission
    if form_data and form_data.get("action") == "submit":
        # Extract issue type
        values = form_data.get("values", {})
        issue_type = values.get("issue_type", "")

        # Determine next step based on issue type
        next_steps = {
            "technical": "technical_details",
            "billing": "billing_details",
            "account": "account_details",
            "other": "describe_issue"
        }

        next_step = next_steps.get(issue_type, "describe_issue")

        # Move to appropriate next step
        return WorkflowStepResponse(
            data={"status": "issue_type_selected", "issue_type": issue_type},
            ui_updates=[],
            next_step_id=next_step,
            context_updates={
                "issue_type": issue_type
            }
        )

    # Initial step load
    return WorkflowStepResponse(
        data={"status": "ready"},
        ui_updates=[
            UIComponentUpdate(
                key="issue_type_form",
                state={"values": {
                    "issue_type": context.get("issue_type", "")
                }}
            ),
            UIComponentUpdate(
                key="help_text",
                state={"markdown_content": """
## Select Issue Type

Please select the category that best describes your issue:

- **Technical**: Issues with software, hardware, or services
- **Billing**: Questions about invoices, payments, or subscriptions
- **Account**: Problems with login, permissions, or account settings
- **Other**: Any issues that don't fit the categories above
                """}
            )
        ],
        context_updates={}
    )

Final/Summary Step

A step that presents a summary at the end of a workflow:

@workflow_step(
    agent_config=order_agent,
    workflow_id="order_placement",
    step_id="confirmation",
    name="Order Confirmation",
    description="Confirm order details and complete the order",
    ui_components=[order_summary, confirmation_message]
)
async def handle_confirmation_step(input_data) -> WorkflowStepResponse:
    """Handle order confirmation step."""
    # Extract context
    context = getattr(input_data, 'context', {}) or {}

    # Get order details from context
    order_items = context.get("order_items", [])
    shipping_address = context.get("shipping_address", {})
    payment_method = context.get("payment_method", {})

    # Calculate order total
    order_total = sum(item.get("price", 0) * item.get("quantity", 0) for item in order_items)

    # Generate order ID
    import uuid
    order_id = str(uuid.uuid4())[:8].upper()

    # Format order summary
    items_list = "\n".join([
        f"- {item.get('quantity')}x {item.get('name')} - ${item.get('price'):.2f} each"
        for item in order_items
    ])

    address_formatted = f"{shipping_address.get('street')}, {shipping_address.get('city')}, {shipping_address.get('state')} {shipping_address.get('zip')}"

    summary_markdown = f"""
## Order Confirmation

**Order ID:** {order_id}

### Items:
{items_list}

**Total:** ${order_total:.2f}

### Shipping Address:
{address_formatted}

### Payment Method:
{payment_method.get('type')} ending in {payment_method.get('last4')}

Your order has been placed successfully and will be processed shortly.
You will receive a confirmation email at {context.get('email')}.

Thank you for your order!
    """

    # Create confirmation message
    confirmation_text = f"""
# Thank You for Your Order!

Your order #{order_id} has been successfully placed.

## Next Steps

1. You'll receive a confirmation email shortly
2. You can track your order status on your account page
3. Expected delivery: 3-5 business days

[View Order Status](#) | [Continue Shopping](#)
    """

    # Return final step
    return WorkflowStepResponse(
        data={
            "status": "order_complete",
            "order_id": order_id,
            "order_total": order_total
        },
        ui_updates=[
            UIComponentUpdate(
                key="order_summary",
                state={"markdown_content": summary_markdown}
            ),
            UIComponentUpdate(
                key="confirmation_message",
                state={"markdown_content": confirmation_text}
            )
        ],
        next_step_id=None,  # No next step (end of workflow)
        context_updates={
            "order_id": order_id,
            "order_total": order_total,
            "order_date": datetime.now().isoformat(),
            "order_status": "placed"
        }
    )

Integration with LLM Agents

Step handlers can integrate with LLM-powered agents:

@workflow_step(
    agent_config=content_agent,
    workflow_id="content_creation",
    step_id="generate",
    name="Content Generation",
    description="Generate content based on requirements",
    ui_components=[requirements_form, content_editor, generation_options]
)
async def handle_generate_step(input_data) -> WorkflowStepResponse:
    """Handle content generation step with LLM integration."""
    # Extract context and form data
    context = getattr(input_data, 'context', {}) or {}
    form_data = getattr(input_data, 'form_data', None)

    # Get generation parameters
    if form_data and form_data.get("action") == "submit":
        values = form_data.get("values", {})

        # Topic or title
        topic = values.get("topic", "")

        # Content type
        content_type = values.get("content_type", "blog_post")

        # Content parameters
        tone = values.get("tone", "informative")
        length = values.get("length", "medium")

        # Additional requirements
        keywords = values.get("keywords", "").split(",")
        target_audience = values.get("target_audience", "general")

        # Create prompt for LLM
        prompt = f"""
        Create a {content_type} about "{topic}".

        Tone: {tone}
        Length: {length}
        Target audience: {target_audience}
        Keywords to include: {', '.join(keywords)}

        The content should be well-structured, engaging, and formatted with Markdown.
        """

        try:
            # Initialize LLM client
            from agents.llm_client import create_llm_client
            llm_client = create_llm_client()

            # Generate content with LLM
            response = await llm_client.complete(
                prompt=prompt,
                system_message="You are an expert content creator specializing in producing high-quality, engaging content.",
                temperature=0.7
            )

            generated_content = response.content

            # Return generated content
            return WorkflowStepResponse(
                data={"status": "content_generated", "content_type": content_type},
                ui_updates=[
                    UIComponentUpdate(
                        key="content_editor",
                        state={"editor_content": generated_content}
                    )
                ],
                context_updates={
                    "generated_content": generated_content,
                    "content_type": content_type,
                    "topic": topic,
                    "generation_parameters": {
                        "tone": tone,
                        "length": length,
                        "keywords": keywords,
                        "target_audience": target_audience
                    }
                }
            )
        except Exception as e:
            return WorkflowStepResponse(
                data={"status": "error", "message": str(e)},
                ui_updates=[
                    UIComponentUpdate(
                        key="error_message",
                        state={"markdown_content": f"## Error\n\nAn error occurred during content generation: {str(e)}"}
                    )
                ],
                context_updates={}
            )

    # Initial step load
    return WorkflowStepResponse(
        data={"status": "ready"},
        ui_updates=[
            UIComponentUpdate(
                key="requirements_form",
                state={"values": {
                    "topic": context.get("topic", ""),
                    "content_type": context.get("content_type", "blog_post"),
                    "tone": context.get("tone", "informative"),
                    "length": context.get("length", "medium"),
                    "keywords": ",".join(context.get("keywords", [])),
                    "target_audience": context.get("target_audience", "general")
                }}
            )
        ],
        context_updates={}
    )

Best Practices

Error Handling

Implement proper error handling in step handlers:

try:
    # Main logic
    result = process_data(data)
    return WorkflowStepResponse(
        data={"status": "success"},
        ui_updates=[...],
        context_updates={...}
    )
except ValidationError as e:
    # Handle validation errors
    return WorkflowStepResponse(
        data={"status": "validation_error", "errors": e.errors()},
        ui_updates=[
            UIComponentUpdate(
                key="error_message",
                state={"markdown_content": f"## Validation Error\n\n{str(e)}"}
            ),
            UIComponentUpdate(
                key="form",
                state={"errors": e.errors()}
            )
        ],
        context_updates={}
    )
except Exception as e:
    # Handle unexpected errors
    logger.error(f"Error in step handler: {str(e)}")
    return WorkflowStepResponse(
        data={"status": "error", "message": str(e)},
        ui_updates=[
            UIComponentUpdate(
                key="error_message",
                state={"markdown_content": f"## Error\n\nAn unexpected error occurred: {str(e)}"}
            )
        ],
        context_updates={}
    )

Context Management

Use consistent context keys and provide fallbacks:

# Consistently defined context keys
CONTEXT_KEYS = {
    "USER_PROFILE": "user_profile",
    "PREFERENCES": "preferences",
    "SUBMISSION_DATA": "submission_data"
}

# Access context with fallbacks
user_profile = context.get(CONTEXT_KEYS["USER_PROFILE"], {})
name = user_profile.get("name", "Guest")

Step Independence

Design steps to be as independent as possible:

# Each step should validate its inputs
def validate_inputs(context):
    """Validate required context data for this step."""
    errors = []
    if "user_id" not in context:
        errors.append("User ID is required")
    if "document_id" not in context:
        errors.append("Document ID is required")
    return errors

# Check for input issues
input_errors = validate_inputs(context)
if input_errors:
    return WorkflowStepResponse(
        data={"status": "input_error", "errors": input_errors},
        ui_updates=[
            UIComponentUpdate(
                key="error_message",
                state={"markdown_content": f"## Input Errors\n\n- {chr(10).join(input_errors)}"}
            )
        ],
        context_updates={}
    )

Progress Tracking

Include progress information in responses:

# Calculate workflow progress
total_steps = 5
current_step_index = 2  # 0-based index
progress_percentage = int((current_step_index / (total_steps - 1)) * 100)

# Include progress in the response
return WorkflowStepResponse(
    data={
        "status": "step_complete",
        "progress": {
            "current_step": current_step_index + 1,
            "total_steps": total_steps,
            "percentage": progress_percentage
        }
    },
    ui_updates=[
        UIComponentUpdate(
            key="progress_bar",
            state={"progress": progress_percentage}
        ),
        UIComponentUpdate(
            key="progress_text",
            state={"markdown_content": f"Step {current_step_index + 1} of {total_steps}"}
        )
    ],
    context_updates={
        "workflow_progress": {
            "current_step": current_step_index + 1,
            "total_steps": total_steps,
            "percentage": progress_percentage
        }
    }
)

Next Steps