Processors are the simulation logic. They're pure DataFrame transforms that run each tick on entities matching their component requirements.
Basic Processor¶
from daft import DataFrame, col
from archetype.core.aio.async_processor import AsyncProcessor
from archetype.core.component import Component
class Position(Component):
x: float = 0.0
y: float = 0.0
class Velocity(Component):
vx: float = 0.0
vy: float = 0.0
class MovementProcessor(AsyncProcessor):
components = (Position, Velocity) # Only runs on entities with BOTH
priority = 10 # Lower = runs earlier
async def process(self, df: DataFrame, **kwargs) -> DataFrame:
return df.with_columns({
"position__x": col("position__x") + col("velocity__vx"),
"position__y": col("position__y") + col("velocity__vy"),
})
Key points:
- components declares required component types — the system only passes entities that have all of them
- priority controls execution order within a tick (lower runs first)
- process() receives a Daft DataFrame and must return a DataFrame
- Column names are prefixed: ComponentName__field_name
Accessing Resources¶
Processors receive the world's Resources container via kwargs:
from dataclasses import dataclass
from archetype.core.resources import Resources
@dataclass
class SimConfig:
gravity: float = 9.8
max_speed: float = 100.0
class PhysicsProcessor(AsyncProcessor):
components = (Position, Velocity)
priority = 5
async def process(self, df: DataFrame, resources: Resources = None, **kwargs) -> DataFrame:
config = resources.require(SimConfig) if resources else SimConfig()
return df.with_columns({
"velocity__vy": col("velocity__vy") - config.gravity,
"position__x": col("position__x") + col("velocity__vx"),
"position__y": col("position__y") + col("velocity__vy"),
})
Setup:
world.resources.insert(SimConfig(gravity=9.8))
await world.system.add_processor(PhysicsProcessor())
LLM-Powered Processors¶
The real power of Archetype: use daft.functions.prompt to call LLMs for every entity in parallel.
from daft.functions import prompt
class Agent(Component):
name: str = ""
role: str = ""
last_thought: str = ""
class ThinkProcessor(AsyncProcessor):
components = (Agent,)
priority = 10
async def process(self, df: DataFrame, tick: int = 0, **kwargs) -> DataFrame:
return df.with_columns({
"agent__last_thought": prompt(
col("agent__role") + "\nYou are " + col("agent__name")
+ ". Tick " + str(tick) + ". What do you do next? One sentence.",
system_message="You are an agent in a simulation. Stay in character.",
model="gpt-5-mini",
max_output_tokens=60,
),
})
Because Daft executes prompts across the entire DataFrame, all entities get LLM calls in parallel — no manual batching needed.
Structured LLM Outputs¶
Use Pydantic models for type-safe LLM responses:
from pydantic import BaseModel
class Decision(BaseModel):
action: str
target: str
confidence: float
class DecisionProcessor(AsyncProcessor):
components = (Agent,)
priority = 20
async def process(self, df: DataFrame, **kwargs) -> DataFrame:
return df.with_columns({
"decision": prompt(
col("agent__role") + ": Choose an action.",
return_format=Decision,
model="gpt-5-mini",
),
}).unnest("decision")
Tick and Run Context¶
Processors receive useful context via kwargs:
async def process(self, df, tick=0, resources=None, **kwargs):
# tick: current tick number
# resources: world's Resources container
# Additional kwargs from RunConfig or world.step()
...
Processor Ordering¶
Processors run in priority order within each tick. Use this to chain logic:
class InputProcessor(AsyncProcessor):
components = (Agent,)
priority = 1 # Runs first — gather input
class ThinkProcessor(AsyncProcessor):
components = (Agent,)
priority = 10 # Runs second — process input
class ActionProcessor(AsyncProcessor):
components = (Agent, Position)
priority = 20 # Runs third — execute actions
class CleanupProcessor(AsyncProcessor):
components = (Agent,)
priority = 100 # Runs last — bookkeeping
Adding Processors to a World¶
# Direct (Python API)
await world.system.add_processor(ThinkProcessor())
# Via SimulationService
container.simulation_service.add_processor(world_id, ThinkProcessor())
Interacting with the Broker¶
Processors can submit commands via the broker in Resources:
from archetype.app.broker import CommandBroker
from archetype.app.models import Command, CommandType
class SpawnerProcessor(AsyncProcessor):
components = (Agent,)
priority = 50
async def process(self, df, resources=None, tick=0, **kwargs):
broker = resources.get(CommandBroker) if resources else None
if broker:
cmd = Command(
type=CommandType.SPAWN,
tick=tick,
payload={"components": [Agent(name="child").model_dump()]},
)
await broker.enqueue("my_world", cmd)
return df
This enables agents to spawn new entities, send messages, or trigger world-level operations from within the simulation loop.
Testing Processors¶
import pytest
from archetype.core.aio.async_system import AsyncSystem
from archetype.core.aio.async_world import AsyncWorld
from archetype.core.config import WorldConfig, RunConfig
@pytest.fixture
async def world():
# Minimal world setup for testing
querier = InMemoryQuerier()
updater = InMemoryUpdater(querier)
system = AsyncSystem()
return AsyncWorld(WorldConfig(name="test"), querier, updater, system)
@pytest.mark.asyncio
async def test_movement(world):
await world.system.add_processor(MovementProcessor())
await world.create_entity([Position(x=0, y=0), Velocity(vx=1, vy=2)])
await world.run(RunConfig(num_steps=3))
for _sig, df in world._live.items():
rows = df.collect().to_pylist()
assert rows[0]["position__x"] == 3.0
assert rows[0]["position__y"] == 6.0