Trajectory analysis uses the standard ECS pattern: define components for the data, processors for the pipeline stages, and run it all through a world. Fork the world to compare different evaluation criteria.

The full runnable example is in examples/06_trajectory_analysis.py.

Components

Trajectory

Stores a complete agent session as JSON-encoded turns:

class Trajectory(Component):
    trajectory_id: str = ""
    source: str = ""
    turns_json: str = "[]"
    total_turns: int = 0
    total_tokens: int = 0
    duration_seconds: float = 0.0
    outcome: str = ""
    tags_json: str = "[]"
    metadata_json: str = "{}"

Build from structured Turn dataclasses:

trajectory = Trajectory.from_turns(
    trajectory_id="session-abc123",
    turns=[
        Turn(role="user", content="Fix the login bug", tokens=12),
        Turn(role="assistant", content="I'll check auth.py", tokens=45),
        Turn(role="tool_call", tool_name="Read",
             tool_input='{"path": "auth.py"}', content="", tokens=8),
        Turn(role="tool_result", content="def login(): ...", tokens=120),
        Turn(role="assistant", content="Found the bug, applying fix", tokens=200),
    ],
    source="claude-code",
    outcome="success: fixed null check in login handler",
    tags=["bugfix", "auth"],
)

Turn

A dataclass (not a Component) representing one step in a trajectory:

Field Type Description
role str "user", "assistant", "tool_call", "tool_result", "system"
content str Main content of the turn
tool_name str \| None Tool called (for tool_call/tool_result roles)
tool_input str \| None JSON tool input
tool_output str \| None JSON tool output
tokens int Token count for this turn
duration_ms float Wall-clock duration
error str \| None Error message if present
metadata dict Arbitrary metadata

Label

An evaluation result attached to a trajectory:

class Label(Component):
    technique: str = ""
    description: str = ""
    value: str = ""
    score: float = 0.0
    rationale: str = ""
    sampled: bool = True

Each (Trajectory, Label) entity represents one labeling technique applied to one trajectory. To compare techniques, fork the world and swap the Label.description.

Processors

Three pipeline stages, priority-ordered within a single tick:

Processor Priority Purpose
SamplingProcessor 10 Marks which trajectories to evaluate based on SamplingConfig
LabelingProcessor 20 Calls LLM to produce value/score/rationale for sampled entities
ScoringProcessor 30 Clamps scores to [0, 1]

SamplingProcessor

Reads SamplingConfig from resources and sets label__sampled = True/False. Never drops rows — all entities are preserved for post-hoc analysis.

@dataclass
class SamplingConfig:
    max_trajectories: int = 0    # 0 = all
    min_turns: int = 0
    max_turns: int = 0           # 0 = no limit
    require_tags: list[str] | None = None
    exclude_tags: list[str] | None = None
    outcome_filter: str | None = None

LabelingProcessor

Reads LabelingConfig from resources. Splits the DataFrame into sampled/unsampled, calls daft.functions.prompt on sampled rows with the evaluation prompt, parses the response into label__value, label__score, label__rationale, and rejoins.

@dataclass
class LabelingConfig:
    model: str = "gpt-5-mini"
    max_output_tokens: int = 512

ScoringProcessor

Clamps label__score to [0, 1].

Wiring It Up

Standard ECS setup — no framework abstraction:

container = ServiceContainer()
ctx = ActorCtx(id=uuid7(), roles={"operator"})

world = await container.world_service.create_world(
    WorldConfig(name="trajectory-eval"),
    StorageConfig(uri="./trajectory_data", namespace="trajectories"),
)

# Add processors
await world.system.add_processor(SamplingProcessor())
await world.system.add_processor(LabelingProcessor())
await world.system.add_processor(ScoringProcessor())

# Inject config
world.resources.insert(SamplingConfig(min_turns=3))
world.resources.insert(LabelingConfig(model="gpt-5-mini"))

# Spawn one entity per (trajectory, technique) pair
for trajectory in trajectories:
    for technique, description in label_specs:
        label = Label(technique=technique, description=description)
        cmd = Command(
            type=CommandType.SPAWN,
            payload={
                "components": [
                    {"type": "Trajectory", **trajectory.model_dump()},
                    {"type": "Label", **label.model_dump()},
                ],
            },
        )
        await container.command_service.submit(world.world_id, cmd, ctx)

# Run: one tick = sample -> label -> score
await container.simulation_service.step(
    world.world_id, RunConfig(num_steps=1, prefer_live_reads=True),
)

# Collect results
df = await world.get_components([Trajectory, Label])
rows = df.collect().to_pylist()

Fork-Based Comparison

Clone the world, swap config, run independently:

fork = await container.world_service.fork_world(
    source_world_id=world.world_id,
    name="strict-eval",
    storage_config=StorageConfig(uri="./trajectory_data", namespace="trajectories"),
)

# Re-add processors (not cloned)
await fork.system.add_processor(SamplingProcessor())
await fork.system.add_processor(LabelingProcessor())
await fork.system.add_processor(ScoringProcessor())

# Different config
fork.resources.insert(SamplingConfig(min_turns=3))
fork.resources.insert(LabelingConfig(model="gpt-5-mini"))

await container.simulation_service.step(
    fork.world_id, RunConfig(num_steps=1, prefer_live_reads=True),
)

Both worlds persist to the same storage. Query either one at any tick.

When to Use

Scenario Trajectory analysis?
Evaluating recorded agent sessions Yes
Comparing labeling criteria (A/B) Yes, with fork_world()
Benchmarking prompt variations Yes
Real-time agent processing per tick No, use regular processors
Simple data transforms No, use DataFrame expressions