docs

Workflows

@Workflow composes multi-step agent pipelines. Each step can call agents, run Python code, or fan out in parallel, with checkpointing so crashed workflows resume from the last completed step.

Basic workflow

Python
from ninetrix import Ninetrix, Workflow

ntx = Ninetrix(provider="anthropic", model="claude-sonnet-4-6")

researcher = ntx.agent(name="researcher", instructions="Find information on the given topic.")
writer = ntx.agent(name="writer", instructions="Write a clear, well-structured report.")

@Workflow(durable=True)
async def research_report(topic: str, wf: Workflow):
    # Step 1: Research
    research = await wf.step("research")(
        researcher.arun(f"Research: {topic}")
    )

    # Step 2: Write report based on research
    report = await wf.step("write")(
        writer.arun(f"Write a report based on this research:\n{research.output}")
    )

    return report.output

Parallel execution

Python
@Workflow(durable=True)
async def multi_source_research(topic: str, wf: Workflow):
    # Fan out — run 3 agents in parallel
    results = await wf.parallel(
        researcher.arun(f"Search academic papers on: {topic}"),
        researcher.arun(f"Search news articles on: {topic}"),
        researcher.arun(f"Search GitHub repos related to: {topic}"),
    )

    # Combine results
    combined = "\n\n".join(r.output for r in results)
    report = await wf.step("synthesize")(
        writer.arun(f"Synthesize these sources into one report:\n{combined}")
    )

    return report.output

Approval gates

Python
@Workflow(durable=True)
async def publish_workflow(topic: str, wf: Workflow):
    draft = await wf.step("draft")(
        writer.arun(f"Write a blog post about: {topic}")
    )

    # Pauses here until approved (via dashboard or API)
    await wf.step("review", requires_approval=True)(draft)

    published = await wf.step("publish")(
        publisher.arun(f"Publish this post:\n{draft.output}")
    )

    return published.output

Running a workflow

Python
# Run the workflow
result = await research_report("quantum computing advances in 2025")

print(result.output)           # Final output
print(result.step_results)     # Results per step
print(result.tokens_used)      # Total tokens across all steps
print(result.cost_usd)         # Total cost across all agents
print(result.elapsed_seconds)  # Wall clock time

Durability

Crash recovery
With durable=True, completed steps are checkpointed. If the process crashes mid-workflow, restarting with the same thread_id skips completed steps and resumes from where it left off. Requires a Checkpointer.

Early termination

Python
@Workflow(durable=True)
async def guarded_workflow(topic: str, wf: Workflow):
    research = await wf.step("research")(researcher.arun(topic))

    # Bail out if research found nothing useful
    if "no results" in research.output.lower():
        return wf.terminate("No relevant information found")

    return await wf.step("write")(writer.arun(research.output))
On this page