docs
Workflows
@Workflow composes multi-step agent pipelines. Each step can call agents, run Python code, or fan out in parallel, with checkpointing so crashed workflows resume from the last completed step.
Basic workflow
Python
from ninetrix import Ninetrix, Workflow
ntx = Ninetrix(provider="anthropic", model="claude-sonnet-4-6")
researcher = ntx.agent(name="researcher", instructions="Find information on the given topic.")
writer = ntx.agent(name="writer", instructions="Write a clear, well-structured report.")
@Workflow(durable=True)
async def research_report(topic: str, wf: Workflow):
# Step 1: Research
research = await wf.step("research")(
researcher.arun(f"Research: {topic}")
)
# Step 2: Write report based on research
report = await wf.step("write")(
writer.arun(f"Write a report based on this research:\n{research.output}")
)
return report.output
Parallel execution
Python
@Workflow(durable=True)
async def multi_source_research(topic: str, wf: Workflow):
# Fan out — run 3 agents in parallel
results = await wf.parallel(
researcher.arun(f"Search academic papers on: {topic}"),
researcher.arun(f"Search news articles on: {topic}"),
researcher.arun(f"Search GitHub repos related to: {topic}"),
)
# Combine results
combined = "\n\n".join(r.output for r in results)
report = await wf.step("synthesize")(
writer.arun(f"Synthesize these sources into one report:\n{combined}")
)
return report.output
Approval gates
Python
@Workflow(durable=True)
async def publish_workflow(topic: str, wf: Workflow):
draft = await wf.step("draft")(
writer.arun(f"Write a blog post about: {topic}")
)
# Pauses here until approved (via dashboard or API)
await wf.step("review", requires_approval=True)(draft)
published = await wf.step("publish")(
publisher.arun(f"Publish this post:\n{draft.output}")
)
return published.output
Running a workflow
Python
# Run the workflow
result = await research_report("quantum computing advances in 2025")
print(result.output) # Final output
print(result.step_results) # Results per step
print(result.tokens_used) # Total tokens across all steps
print(result.cost_usd) # Total cost across all agents
print(result.elapsed_seconds) # Wall clock time
Durability
Crash recovery
With
durable=True, completed steps are checkpointed. If the process crashes mid-workflow, restarting with the same thread_id skips completed steps and resumes from where it left off. Requires a Checkpointer.Early termination
Python
@Workflow(durable=True)
async def guarded_workflow(topic: str, wf: Workflow):
research = await wf.step("research")(researcher.arun(topic))
# Bail out if research found nothing useful
if "no results" in research.output.lower():
return wf.terminate("No relevant information found")
return await wf.step("write")(writer.arun(research.output))