Processors are the simulation logic. They're pure DataFrame transforms that run each tick on entities matching their component requirements.

Basic Processor

from daft import DataFrame, col
from archetype.core.aio.async_processor import AsyncProcessor
from archetype.core.component import Component

class Position(Component):
    x: float = 0.0
    y: float = 0.0

class Velocity(Component):
    vx: float = 0.0
    vy: float = 0.0

class MovementProcessor(AsyncProcessor):
    components = (Position, Velocity)  # Only runs on entities with BOTH
    priority = 10                       # Lower = runs earlier

    async def process(self, df: DataFrame, **kwargs) -> DataFrame:
        return df.with_columns({
            "position__x": col("position__x") + col("velocity__vx"),
            "position__y": col("position__y") + col("velocity__vy"),
        })

Key points: - components declares required component types — the system only passes entities that have all of them - priority controls execution order within a tick (lower runs first) - process() receives a Daft DataFrame and must return a DataFrame - Column names are prefixed: ComponentName__field_name

Accessing Resources

Processors receive the world's Resources container via kwargs:

from dataclasses import dataclass
from archetype.core.resources import Resources

@dataclass
class SimConfig:
    gravity: float = 9.8
    max_speed: float = 100.0

class PhysicsProcessor(AsyncProcessor):
    components = (Position, Velocity)
    priority = 5

    async def process(self, df: DataFrame, resources: Resources = None, **kwargs) -> DataFrame:
        config = resources.require(SimConfig) if resources else SimConfig()
        return df.with_columns({
            "velocity__vy": col("velocity__vy") - config.gravity,
            "position__x": col("position__x") + col("velocity__vx"),
            "position__y": col("position__y") + col("velocity__vy"),
        })

Setup:

world.resources.insert(SimConfig(gravity=9.8))
await world.system.add_processor(PhysicsProcessor())

LLM-Powered Processors

The real power of Archetype: use daft.functions.prompt to call LLMs for every entity in parallel.

from daft.functions import prompt

class Agent(Component):
    name: str = ""
    role: str = ""
    last_thought: str = ""

class ThinkProcessor(AsyncProcessor):
    components = (Agent,)
    priority = 10

    async def process(self, df: DataFrame, tick: int = 0, **kwargs) -> DataFrame:
        return df.with_columns({
            "agent__last_thought": prompt(
                col("agent__role") + "\nYou are " + col("agent__name")
                + ". Tick " + str(tick) + ". What do you do next? One sentence.",
                system_message="You are an agent in a simulation. Stay in character.",
                model="gpt-5-mini",
                max_output_tokens=60,
            ),
        })

Because Daft executes prompts across the entire DataFrame, all entities get LLM calls in parallel — no manual batching needed.

Structured LLM Outputs

Use Pydantic models for type-safe LLM responses:

from pydantic import BaseModel

class Decision(BaseModel):
    action: str
    target: str
    confidence: float

class DecisionProcessor(AsyncProcessor):
    components = (Agent,)
    priority = 20

    async def process(self, df: DataFrame, **kwargs) -> DataFrame:
        return df.with_columns({
            "decision": prompt(
                col("agent__role") + ": Choose an action.",
                return_format=Decision,
                model="gpt-5-mini",
            ),
        }).unnest("decision")

Tick and Run Context

Processors receive useful context via kwargs:

async def process(self, df, tick=0, resources=None, **kwargs):
    # tick: current tick number
    # resources: world's Resources container
    # Additional kwargs from RunConfig or world.step()
    ...

Processor Ordering

Processors run in priority order within each tick. Use this to chain logic:

class InputProcessor(AsyncProcessor):
    components = (Agent,)
    priority = 1      # Runs first — gather input

class ThinkProcessor(AsyncProcessor):
    components = (Agent,)
    priority = 10     # Runs second — process input

class ActionProcessor(AsyncProcessor):
    components = (Agent, Position)
    priority = 20     # Runs third — execute actions

class CleanupProcessor(AsyncProcessor):
    components = (Agent,)
    priority = 100    # Runs last — bookkeeping

Adding Processors to a World

# Direct (Python API)
await world.system.add_processor(ThinkProcessor())

# Via SimulationService
container.simulation_service.add_processor(world_id, ThinkProcessor())

Interacting with the Broker

Processors can submit commands via the broker in Resources:

from archetype.app.broker import CommandBroker
from archetype.app.models import Command, CommandType

class SpawnerProcessor(AsyncProcessor):
    components = (Agent,)
    priority = 50

    async def process(self, df, resources=None, tick=0, **kwargs):
        broker = resources.get(CommandBroker) if resources else None
        if broker:
            cmd = Command(
                type=CommandType.SPAWN,
                tick=tick,
                payload={"components": [Agent(name="child").model_dump()]},
            )
            await broker.enqueue("my_world", cmd)
        return df

This enables agents to spawn new entities, send messages, or trigger world-level operations from within the simulation loop.

Testing Processors

import pytest
from archetype.core.aio.async_system import AsyncSystem
from archetype.core.aio.async_world import AsyncWorld
from archetype.core.config import WorldConfig, RunConfig

@pytest.fixture
async def world():
    # Minimal world setup for testing
    querier = InMemoryQuerier()
    updater = InMemoryUpdater(querier)
    system = AsyncSystem()
    return AsyncWorld(WorldConfig(name="test"), querier, updater, system)

@pytest.mark.asyncio
async def test_movement(world):
    await world.system.add_processor(MovementProcessor())
    await world.create_entity([Position(x=0, y=0), Velocity(vx=1, vy=2)])

    await world.run(RunConfig(num_steps=3))

    for _sig, df in world._live.items():
        rows = df.collect().to_pylist()
        assert rows[0]["position__x"] == 3.0
        assert rows[0]["position__y"] == 6.0