AsyncStore is the persistence layer for archetype tables. It manages table creation, lazy reads/writes via Daft catalogs, and storage namespacing for multi-world/multi-run isolation.

How It Works

The store delegates persistence to Daft's catalog and session system. All reads and writes go through lazy DataFrame references:

  • Reads return a lazy DataFrame -- no data is materialized until you collect
  • Writes append rows to the backing table via Table.append()
  • Tables are created on demand when an archetype is first accessed

Each archetype signature maps to a single table, named by the archetype's deterministic hash (see Archetype).

StorageContext

Before creating a store, you need a StorageContext -- the initialized runtime resources:

from archetype.core.config import StorageConfig, StorageBackend
from archetype.core.runtime.storage import StorageContextFactory

config = StorageConfig(
    uri="./my_data",
    namespace="experiment_1",
)
context = StorageContextFactory.build(config)

StorageContextFactory.build() initializes:

  1. An Iceberg SqlCatalog backed by SQLite for metadata
  2. A Daft Session attached to the catalog
  3. The namespace (created if it doesn't exist)

Local vs Remote Storage

URI scheme Warehouse Metadata
./path or file:// Local filesystem SQLite in path/catalog.db
s3://bucket or gs://bucket Remote object store SQLite in .archetype_meta/catalog.db

Remote warehouses store data in the cloud but keep catalog metadata locally in a .archetype_meta/ directory.

StorageContext Fields

Field Type Description
uri str Resolved storage URI
namespace str Daft namespace for table isolation
session Session Daft session with catalog attached
catalog Catalog Iceberg catalog (via Daft)
io_config IOConfig Daft I/O configuration

Store API

Reading

df = await store.get_archetype_df(sig, world_id="abc", run_id="run-1")

Returns a lazy DataFrame filtered by world_id and run_id. The table is created if it doesn't exist yet.

Writing

await store.append(sig, df)

Appends rows to the archetype table. Zero-row and empty-schema DataFrames are silently skipped. The table is created if it doesn't exist.

Shutdown

await store.shutdown()

No-op in the base implementation -- Daft handles cleanup automatically. AsyncCachedStore overrides this to flush pending data.

Append-Only Model

Storage is strictly append-only. Nothing is overwritten or deleted. Each tick appends new rows with the current tick number. This gives you:

  • Time-travel -- query any tick's state by filtering on tick
  • Replay -- re-run from any checkpoint
  • Forking -- branch a world and append independently
  • Audit -- full history of every entity at every tick

Storage Backends

StorageService selects the store implementation based on StorageConfig.backend:

Backend Store class Format Best for
StorageBackend.LANCEDB (default) AsyncLancedbStore Lance columnar Local development, single-process
StorageBackend.ICEBERG AsyncStore Iceberg (Parquet via Daft catalog) Distributed, cloud-native

Both implement the iAsyncStore interface -- the querier and updater are backend-agnostic.

LanceDB (Default)

LanceDB stores data in Lance format on the local filesystem. It is the default because it requires no external infrastructure and provides fast columnar reads for single-process simulations.

Iceberg

The Iceberg backend uses Daft's native Iceberg integration with a SQLite-backed PyIceberg SQL catalog. It writes Parquet files and supports:

  • Cloud object stores (S3, GCS) via StorageConfig.io_config
  • Catalog-level namespace isolation
  • Compatibility with the broader Iceberg ecosystem

Backend Selection

StorageService._create_backend() checks storage_config.use_lancedb (derived from the backend enum) to pick the store class. Both are wrapped identically by AsyncQueryManager and AsyncUpdateManager:

StorageService._create_backend(config, cache_config)
    |
    +-- config.use_lancedb? --> AsyncLancedbStore(context)
    +-- else              --> AsyncStore(context)
    |
    +-- cache_config?     --> AsyncCachedStore(store, cache_config)
    |
    +-- AsyncQueryManager(store)
    +-- AsyncUpdateManager(store)

Write-Behind Cache

AsyncCachedStore wraps any iAsyncStore with an in-memory write buffer. Appends accumulate in per-archetype MemTable structures (lists of PyArrow RecordBatch) and flush to the inner store when thresholds are exceeded.

Flush Triggers

A flush fires when any of these conditions is met:

Threshold Config field Default
Row count per archetype flush_rows 1,000,000
Bytes per archetype flush_mb 512 MB
Total cached bytes (global) global_mb 1 GB
Idle time (background loop) idle_sec 30 seconds

The first three are checked synchronously after each append(). The idle timer runs as a background asyncio.Task that scans all memtables and flushes any that have been untouched for idle_sec.

Read Path

AsyncCachedStore.get_archetype_df() checks the memtable first. If the archetype has cached rows, it builds a DataFrame directly from the in-memory Arrow batches. Otherwise it falls through to the inner store.

Shutdown

AsyncCachedStore.shutdown() cancels the background task, flushes all remaining memtables, and delegates to the inner store's shutdown.

Configuration

from archetype.core.config import CacheConfig

cache = CacheConfig(flush_rows=500_000, idle_sec=15.0)

Pass CacheConfig to StorageService.get_backend() or WorldService.create_world() to enable caching. See Configuration for all fields.

Source Reference

  • Store (Iceberg): src/archetype/core/aio/async_store.py
  • Store (LanceDB): src/archetype/core/storage.py
  • Cached store: src/archetype/core/aio/async_cached_store.py
  • Storage context: src/archetype/core/runtime/storage.py
  • Storage service: src/archetype/app/storage_service.py