Skip to content
OBML v1.0
OrionBelt v0.5.0

Python API Reference

Auto-generated documentation from source code docstrings.

Service Layer

ModelStore

orionbelt.service.model_store.ModelStore

In-memory model registry. Thread-safe via threading.Lock.

Models are keyed by short UUID (8-char hex). All parsing, validation, and compilation infrastructure is instantiated internally, following the same singleton pattern as api/deps.py.

Source code in src/orionbelt/service/model_store.py
class ModelStore:
    """In-memory model registry.  Thread-safe via ``threading.Lock``.

    Models are keyed by short UUID (8-char hex).  All parsing, validation,
    and compilation infrastructure is instantiated internally, following the
    same singleton pattern as ``api/deps.py``.
    """

    def __init__(self) -> None:
        self._lock = threading.Lock()
        self._models: dict[str, SemanticModel] = {}

        # Internal pipeline singletons (stateless, safe to share).
        self._loader = TrackedLoader()
        self._resolver = ReferenceResolver()
        self._validator = SemanticValidator()
        self._pipeline = CompilationPipeline()

    # -- helpers -------------------------------------------------------------

    @staticmethod
    def _new_id() -> str:
        return uuid.uuid4().hex[:8]

    def _parse_and_validate(
        self, yaml_str: str
    ) -> tuple[SemanticModel, list[ErrorInfo], list[ErrorInfo]]:
        """Parse YAML, resolve references, run semantic validation.

        Returns ``(model, errors, warnings)``.
        """
        errors: list[ErrorInfo] = []
        warnings: list[ErrorInfo] = []

        # 1. Parse YAML
        try:
            raw, source_map = self._loader.load_string(yaml_str)
        except Exception as exc:
            errors.append(ErrorInfo(code="YAML_PARSE_ERROR", message=str(exc)))
            return SemanticModel(), errors, warnings

        # 2. Resolve references
        model, resolution = self._resolver.resolve(raw, source_map)
        for e in resolution.errors:
            errors.append(
                ErrorInfo(
                    code=e.code,
                    message=e.message,
                    path=e.path,
                    suggestions=list(e.suggestions),
                )
            )
        for w in resolution.warnings:
            warnings.append(
                ErrorInfo(
                    code=w.code,
                    message=w.message,
                    path=w.path,
                    suggestions=list(w.suggestions),
                )
            )

        # 3. Semantic validation
        sem_errors = self._validator.validate(model)
        for e in sem_errors:
            errors.append(
                ErrorInfo(
                    code=e.code,
                    message=e.message,
                    path=e.path,
                    suggestions=list(e.suggestions),
                )
            )

        return model, errors, warnings

    # -- public API ----------------------------------------------------------

    def load_model(self, yaml_str: str) -> LoadResult:
        """Parse, validate, and store a model.  Returns id + summary.

        Raises ``ValueError`` if the model has validation errors.
        """
        model, errors, warnings = self._parse_and_validate(yaml_str)
        if errors:
            msgs = "; ".join(e.message for e in errors)
            raise ValueError(f"Model validation failed: {msgs}")

        model_id = self._new_id()
        with self._lock:
            self._models[model_id] = model

        return LoadResult(
            model_id=model_id,
            data_objects=len(model.data_objects),
            dimensions=len(model.dimensions),
            measures=len(model.measures),
            metrics=len(model.metrics),
            warnings=[w.message for w in warnings],
        )

    def get_model(self, model_id: str) -> SemanticModel:
        """Look up a loaded model.  Raises ``KeyError`` if not found."""
        with self._lock:
            try:
                return self._models[model_id]
            except KeyError:
                raise KeyError(f"No model loaded with id '{model_id}'") from None

    def describe(self, model_id: str) -> ModelDescription:
        """Return a structured summary suitable for LLM consumption."""
        model = self.get_model(model_id)

        data_objects = [
            DataObjectInfo(
                label=obj.label,
                code=obj.qualified_code,
                columns=list(obj.columns.keys()),
                join_targets=[j.join_to for j in obj.joins],
            )
            for obj in model.data_objects.values()
        ]

        dimensions = [
            DimensionInfo(
                name=dim.label,
                result_type=dim.result_type.value,
                data_object=dim.view,
                column=dim.column,
                time_grain=dim.time_grain.value if dim.time_grain else None,
            )
            for dim in model.dimensions.values()
        ]

        measures = [
            MeasureInfo(
                name=m.label,
                result_type=m.result_type.value,
                aggregation=m.aggregation,
                expression=m.expression,
            )
            for m in model.measures.values()
        ]

        metrics = [
            MetricInfo(name=met.label, expression=met.expression) for met in model.metrics.values()
        ]

        return ModelDescription(
            model_id=model_id,
            data_objects=data_objects,
            dimensions=dimensions,
            measures=measures,
            metrics=metrics,
        )

    def list_models(self) -> list[ModelSummary]:
        """Return a short summary for every loaded model."""
        with self._lock:
            items = list(self._models.items())

        return [
            ModelSummary(
                model_id=mid,
                data_objects=len(m.data_objects),
                dimensions=len(m.dimensions),
                measures=len(m.measures),
                metrics=len(m.metrics),
            )
            for mid, m in items
        ]

    def remove_model(self, model_id: str) -> None:
        """Unload a model.  Raises ``KeyError`` if not found."""
        with self._lock:
            try:
                del self._models[model_id]
            except KeyError:
                raise KeyError(f"No model loaded with id '{model_id}'") from None

    def compile_query(
        self,
        model_id: str,
        query: QueryObject,
        dialect: str,
    ) -> CompilationResult:
        """Compile a query against a loaded model."""
        model = self.get_model(model_id)
        return self._pipeline.compile(query, model, dialect)

    def validate(self, yaml_str: str) -> ValidationSummary:
        """Validate a YAML model string without storing it."""
        _model, errors, warnings = self._parse_and_validate(yaml_str)
        return ValidationSummary(
            valid=len(errors) == 0,
            errors=errors,
            warnings=warnings,
        )

load_model(yaml_str)

Parse, validate, and store a model. Returns id + summary.

Raises ValueError if the model has validation errors.

Source code in src/orionbelt/service/model_store.py
def load_model(self, yaml_str: str) -> LoadResult:
    """Parse, validate, and store a model.  Returns id + summary.

    Raises ``ValueError`` if the model has validation errors.
    """
    model, errors, warnings = self._parse_and_validate(yaml_str)
    if errors:
        msgs = "; ".join(e.message for e in errors)
        raise ValueError(f"Model validation failed: {msgs}")

    model_id = self._new_id()
    with self._lock:
        self._models[model_id] = model

    return LoadResult(
        model_id=model_id,
        data_objects=len(model.data_objects),
        dimensions=len(model.dimensions),
        measures=len(model.measures),
        metrics=len(model.metrics),
        warnings=[w.message for w in warnings],
    )

get_model(model_id)

Look up a loaded model. Raises KeyError if not found.

Source code in src/orionbelt/service/model_store.py
def get_model(self, model_id: str) -> SemanticModel:
    """Look up a loaded model.  Raises ``KeyError`` if not found."""
    with self._lock:
        try:
            return self._models[model_id]
        except KeyError:
            raise KeyError(f"No model loaded with id '{model_id}'") from None

describe(model_id)

Return a structured summary suitable for LLM consumption.

Source code in src/orionbelt/service/model_store.py
def describe(self, model_id: str) -> ModelDescription:
    """Return a structured summary suitable for LLM consumption."""
    model = self.get_model(model_id)

    data_objects = [
        DataObjectInfo(
            label=obj.label,
            code=obj.qualified_code,
            columns=list(obj.columns.keys()),
            join_targets=[j.join_to for j in obj.joins],
        )
        for obj in model.data_objects.values()
    ]

    dimensions = [
        DimensionInfo(
            name=dim.label,
            result_type=dim.result_type.value,
            data_object=dim.view,
            column=dim.column,
            time_grain=dim.time_grain.value if dim.time_grain else None,
        )
        for dim in model.dimensions.values()
    ]

    measures = [
        MeasureInfo(
            name=m.label,
            result_type=m.result_type.value,
            aggregation=m.aggregation,
            expression=m.expression,
        )
        for m in model.measures.values()
    ]

    metrics = [
        MetricInfo(name=met.label, expression=met.expression) for met in model.metrics.values()
    ]

    return ModelDescription(
        model_id=model_id,
        data_objects=data_objects,
        dimensions=dimensions,
        measures=measures,
        metrics=metrics,
    )

list_models()

Return a short summary for every loaded model.

Source code in src/orionbelt/service/model_store.py
def list_models(self) -> list[ModelSummary]:
    """Return a short summary for every loaded model."""
    with self._lock:
        items = list(self._models.items())

    return [
        ModelSummary(
            model_id=mid,
            data_objects=len(m.data_objects),
            dimensions=len(m.dimensions),
            measures=len(m.measures),
            metrics=len(m.metrics),
        )
        for mid, m in items
    ]

remove_model(model_id)

Unload a model. Raises KeyError if not found.

Source code in src/orionbelt/service/model_store.py
def remove_model(self, model_id: str) -> None:
    """Unload a model.  Raises ``KeyError`` if not found."""
    with self._lock:
        try:
            del self._models[model_id]
        except KeyError:
            raise KeyError(f"No model loaded with id '{model_id}'") from None

compile_query(model_id, query, dialect)

Compile a query against a loaded model.

Source code in src/orionbelt/service/model_store.py
def compile_query(
    self,
    model_id: str,
    query: QueryObject,
    dialect: str,
) -> CompilationResult:
    """Compile a query against a loaded model."""
    model = self.get_model(model_id)
    return self._pipeline.compile(query, model, dialect)

validate(yaml_str)

Validate a YAML model string without storing it.

Source code in src/orionbelt/service/model_store.py
def validate(self, yaml_str: str) -> ValidationSummary:
    """Validate a YAML model string without storing it."""
    _model, errors, warnings = self._parse_and_validate(yaml_str)
    return ValidationSummary(
        valid=len(errors) == 0,
        errors=errors,
        warnings=warnings,
    )

SessionManager

orionbelt.service.session_manager.SessionManager

Manages TTL-scoped sessions, each holding its own ModelStore.

Thread-safe. Call :meth:start to begin the background cleanup thread and :meth:stop to shut it down.

Source code in src/orionbelt/service/session_manager.py
class SessionManager:
    """Manages TTL-scoped sessions, each holding its own ``ModelStore``.

    Thread-safe.  Call :meth:`start` to begin the background cleanup thread
    and :meth:`stop` to shut it down.
    """

    def __init__(self, ttl_seconds: int = 1800, cleanup_interval: int = 60) -> None:
        self._ttl = ttl_seconds
        self._cleanup_interval = cleanup_interval
        self._lock = threading.Lock()
        self._sessions: dict[str, _Session] = {}
        self._stop_event = threading.Event()
        self._cleanup_thread: threading.Thread | None = None

    # -- lifecycle -----------------------------------------------------------

    def start(self) -> None:
        """Start the background cleanup daemon thread."""
        if self._cleanup_thread is not None:
            return
        self._stop_event.clear()
        self._cleanup_thread = threading.Thread(
            target=self._cleanup_loop, daemon=True, name="session-cleanup"
        )
        self._cleanup_thread.start()

    def stop(self) -> None:
        """Signal the cleanup thread to stop and wait for it."""
        self._stop_event.set()
        if self._cleanup_thread is not None:
            self._cleanup_thread.join(timeout=5)
            self._cleanup_thread = None

    # -- public API ----------------------------------------------------------

    def create_session(self, metadata: dict[str, str] | None = None) -> SessionInfo:
        """Create a new session and return its info."""
        session_id = secrets.token_hex(16)  # 32-char hex (128-bit)
        now_mono = time.monotonic()
        now_wall = datetime.now(UTC)
        session = _Session(
            session_id=session_id,
            store=ModelStore(),
            created_at=now_wall,
            last_accessed=now_mono,
            metadata=metadata or {},
            created_at_wall=now_wall,
            last_accessed_wall=now_wall,
        )
        with self._lock:
            self._sessions[session_id] = session
        return self._session_info(session)

    def get_store(self, session_id: str) -> ModelStore:
        """Get the ModelStore for a session, updating its last-accessed time.

        Raises :class:`SessionNotFoundError` if the session is missing or expired.
        """
        now_mono = time.monotonic()
        with self._lock:
            session = self._sessions.get(session_id)
            if session is None:
                raise SessionNotFoundError(f"Session '{session_id}' not found")
            # Lazy expiration check
            if now_mono - session.last_accessed > self._ttl:
                del self._sessions[session_id]
                raise SessionNotFoundError(f"Session '{session_id}' has expired")
            session.last_accessed = now_mono
            session.last_accessed_wall = datetime.now(UTC)
            return session.store

    def get_session(self, session_id: str) -> SessionInfo:
        """Get session info (also refreshes last-accessed)."""
        now_mono = time.monotonic()
        with self._lock:
            session = self._sessions.get(session_id)
            if session is None:
                raise SessionNotFoundError(f"Session '{session_id}' not found")
            if now_mono - session.last_accessed > self._ttl:
                del self._sessions[session_id]
                raise SessionNotFoundError(f"Session '{session_id}' has expired")
            session.last_accessed = now_mono
            session.last_accessed_wall = datetime.now(UTC)
            return self._session_info(session)

    def close_session(self, session_id: str) -> None:
        """Explicitly close a session."""
        with self._lock:
            if session_id not in self._sessions:
                raise SessionNotFoundError(f"Session '{session_id}' not found")
            del self._sessions[session_id]

    def list_sessions(self) -> list[SessionInfo]:
        """Return info for all non-expired sessions (excluding default)."""
        now_mono = time.monotonic()
        result: list[SessionInfo] = []
        with self._lock:
            for session in self._sessions.values():
                if session.session_id == _DEFAULT_SESSION_ID:
                    continue
                if now_mono - session.last_accessed <= self._ttl:
                    result.append(self._session_info(session))
        return result

    @property
    def active_count(self) -> int:
        """Number of active (non-expired) sessions."""
        now_mono = time.monotonic()
        with self._lock:
            return sum(
                1 for s in self._sessions.values() if now_mono - s.last_accessed <= self._ttl
            )

    def get_or_create_default(self) -> ModelStore:
        """Get (or lazily create) the default session for stdio MCP."""
        with self._lock:
            session = self._sessions.get(_DEFAULT_SESSION_ID)
            if session is not None:
                session.last_accessed = time.monotonic()
                session.last_accessed_wall = datetime.now(UTC)
                return session.store
            now_wall = datetime.now(UTC)
            session = _Session(
                session_id=_DEFAULT_SESSION_ID,
                store=ModelStore(),
                created_at=now_wall,
                last_accessed=time.monotonic(),
                created_at_wall=now_wall,
                last_accessed_wall=now_wall,
            )
            self._sessions[_DEFAULT_SESSION_ID] = session
            return session.store

    # -- internal ------------------------------------------------------------

    @staticmethod
    def _session_info(session: _Session) -> SessionInfo:
        return SessionInfo(
            session_id=session.session_id,
            created_at=session.created_at_wall,
            last_accessed_at=session.last_accessed_wall,
            model_count=len(session.store.list_models()),
            metadata=session.metadata,
        )

    def _purge_expired(self) -> None:
        """Remove all expired sessions (called by cleanup thread)."""
        now_mono = time.monotonic()
        with self._lock:
            expired = [
                sid for sid, s in self._sessions.items() if now_mono - s.last_accessed > self._ttl
            ]
            for sid in expired:
                del self._sessions[sid]

    def _cleanup_loop(self) -> None:
        """Background loop that periodically purges expired sessions."""
        while not self._stop_event.wait(timeout=self._cleanup_interval):
            self._purge_expired()

active_count property

Number of active (non-expired) sessions.

start()

Start the background cleanup daemon thread.

Source code in src/orionbelt/service/session_manager.py
def start(self) -> None:
    """Start the background cleanup daemon thread."""
    if self._cleanup_thread is not None:
        return
    self._stop_event.clear()
    self._cleanup_thread = threading.Thread(
        target=self._cleanup_loop, daemon=True, name="session-cleanup"
    )
    self._cleanup_thread.start()

stop()

Signal the cleanup thread to stop and wait for it.

Source code in src/orionbelt/service/session_manager.py
def stop(self) -> None:
    """Signal the cleanup thread to stop and wait for it."""
    self._stop_event.set()
    if self._cleanup_thread is not None:
        self._cleanup_thread.join(timeout=5)
        self._cleanup_thread = None

create_session(metadata=None)

Create a new session and return its info.

Source code in src/orionbelt/service/session_manager.py
def create_session(self, metadata: dict[str, str] | None = None) -> SessionInfo:
    """Create a new session and return its info."""
    session_id = secrets.token_hex(16)  # 32-char hex (128-bit)
    now_mono = time.monotonic()
    now_wall = datetime.now(UTC)
    session = _Session(
        session_id=session_id,
        store=ModelStore(),
        created_at=now_wall,
        last_accessed=now_mono,
        metadata=metadata or {},
        created_at_wall=now_wall,
        last_accessed_wall=now_wall,
    )
    with self._lock:
        self._sessions[session_id] = session
    return self._session_info(session)

get_store(session_id)

Get the ModelStore for a session, updating its last-accessed time.

Raises :class:SessionNotFoundError if the session is missing or expired.

Source code in src/orionbelt/service/session_manager.py
def get_store(self, session_id: str) -> ModelStore:
    """Get the ModelStore for a session, updating its last-accessed time.

    Raises :class:`SessionNotFoundError` if the session is missing or expired.
    """
    now_mono = time.monotonic()
    with self._lock:
        session = self._sessions.get(session_id)
        if session is None:
            raise SessionNotFoundError(f"Session '{session_id}' not found")
        # Lazy expiration check
        if now_mono - session.last_accessed > self._ttl:
            del self._sessions[session_id]
            raise SessionNotFoundError(f"Session '{session_id}' has expired")
        session.last_accessed = now_mono
        session.last_accessed_wall = datetime.now(UTC)
        return session.store

get_session(session_id)

Get session info (also refreshes last-accessed).

Source code in src/orionbelt/service/session_manager.py
def get_session(self, session_id: str) -> SessionInfo:
    """Get session info (also refreshes last-accessed)."""
    now_mono = time.monotonic()
    with self._lock:
        session = self._sessions.get(session_id)
        if session is None:
            raise SessionNotFoundError(f"Session '{session_id}' not found")
        if now_mono - session.last_accessed > self._ttl:
            del self._sessions[session_id]
            raise SessionNotFoundError(f"Session '{session_id}' has expired")
        session.last_accessed = now_mono
        session.last_accessed_wall = datetime.now(UTC)
        return self._session_info(session)

close_session(session_id)

Explicitly close a session.

Source code in src/orionbelt/service/session_manager.py
def close_session(self, session_id: str) -> None:
    """Explicitly close a session."""
    with self._lock:
        if session_id not in self._sessions:
            raise SessionNotFoundError(f"Session '{session_id}' not found")
        del self._sessions[session_id]

list_sessions()

Return info for all non-expired sessions (excluding default).

Source code in src/orionbelt/service/session_manager.py
def list_sessions(self) -> list[SessionInfo]:
    """Return info for all non-expired sessions (excluding default)."""
    now_mono = time.monotonic()
    result: list[SessionInfo] = []
    with self._lock:
        for session in self._sessions.values():
            if session.session_id == _DEFAULT_SESSION_ID:
                continue
            if now_mono - session.last_accessed <= self._ttl:
                result.append(self._session_info(session))
    return result

get_or_create_default()

Get (or lazily create) the default session for stdio MCP.

Source code in src/orionbelt/service/session_manager.py
def get_or_create_default(self) -> ModelStore:
    """Get (or lazily create) the default session for stdio MCP."""
    with self._lock:
        session = self._sessions.get(_DEFAULT_SESSION_ID)
        if session is not None:
            session.last_accessed = time.monotonic()
            session.last_accessed_wall = datetime.now(UTC)
            return session.store
        now_wall = datetime.now(UTC)
        session = _Session(
            session_id=_DEFAULT_SESSION_ID,
            store=ModelStore(),
            created_at=now_wall,
            last_accessed=time.monotonic(),
            created_at_wall=now_wall,
            last_accessed_wall=now_wall,
        )
        self._sessions[_DEFAULT_SESSION_ID] = session
        return session.store

SessionInfo

orionbelt.service.session_manager.SessionInfo dataclass

Public session metadata (returned by list/get).

Source code in src/orionbelt/service/session_manager.py
@dataclass
class SessionInfo:
    """Public session metadata (returned by list/get)."""

    session_id: str
    created_at: datetime
    last_accessed_at: datetime
    model_count: int
    metadata: dict[str, str]

Compiler Pipeline

orionbelt.compiler.pipeline.CompilationPipeline

Orchestrates: Query → Resolution → Planning → AST → SQL.

Source code in src/orionbelt/compiler/pipeline.py
class CompilationPipeline:
    """Orchestrates: Query → Resolution → Planning → AST → SQL."""

    def __init__(self) -> None:
        self._resolver = QueryResolver()
        self._star_planner = StarSchemaPlanner()
        self._cfl_planner = CFLPlanner()

    def compile(
        self,
        query: QueryObject,
        model: SemanticModel,
        dialect_name: str,
    ) -> CompilationResult:
        """Compile a query to SQL for the specified dialect."""
        # Phase 1: Resolution
        resolved = self._resolver.resolve(query, model)

        # Phase 1.5: Fanout detection (skip for CFL — each fact queried independently)
        if not resolved.requires_cfl:
            detect_fanout(resolved, model)

        # Phase 2: Planning (star schema or CFL)
        if resolved.requires_cfl:
            plan = self._cfl_planner.plan(resolved, model)
        else:
            plan = self._star_planner.plan(resolved, model)

        # Phase 2.5: Wrap with totals CTE if needed
        wrapped_ast = wrap_with_totals(plan.ast, resolved)

        # Phase 3: Dialect-specific SQL rendering
        dialect = DialectRegistry.get(dialect_name)
        codegen = CodeGenerator(dialect)
        sql = codegen.generate(wrapped_ast)

        # Phase 4: SQL validation (non-blocking)
        validation_errors = validate_sql(sql, dialect_name)
        sql_valid = len(validation_errors) == 0
        warnings = resolved.warnings
        if not sql_valid:
            warnings = warnings + [f"SQL validation: {e}" for e in validation_errors]

        return CompilationResult(
            sql=sql,
            dialect=dialect_name,
            resolved=ResolvedInfo(
                fact_tables=resolved.fact_tables,
                dimensions=[d.name for d in resolved.dimensions],
                measures=[m.name for m in resolved.measures],
            ),
            warnings=warnings,
            sql_valid=sql_valid,
        )

compile(query, model, dialect_name)

Compile a query to SQL for the specified dialect.

Source code in src/orionbelt/compiler/pipeline.py
def compile(
    self,
    query: QueryObject,
    model: SemanticModel,
    dialect_name: str,
) -> CompilationResult:
    """Compile a query to SQL for the specified dialect."""
    # Phase 1: Resolution
    resolved = self._resolver.resolve(query, model)

    # Phase 1.5: Fanout detection (skip for CFL — each fact queried independently)
    if not resolved.requires_cfl:
        detect_fanout(resolved, model)

    # Phase 2: Planning (star schema or CFL)
    if resolved.requires_cfl:
        plan = self._cfl_planner.plan(resolved, model)
    else:
        plan = self._star_planner.plan(resolved, model)

    # Phase 2.5: Wrap with totals CTE if needed
    wrapped_ast = wrap_with_totals(plan.ast, resolved)

    # Phase 3: Dialect-specific SQL rendering
    dialect = DialectRegistry.get(dialect_name)
    codegen = CodeGenerator(dialect)
    sql = codegen.generate(wrapped_ast)

    # Phase 4: SQL validation (non-blocking)
    validation_errors = validate_sql(sql, dialect_name)
    sql_valid = len(validation_errors) == 0
    warnings = resolved.warnings
    if not sql_valid:
        warnings = warnings + [f"SQL validation: {e}" for e in validation_errors]

    return CompilationResult(
        sql=sql,
        dialect=dialect_name,
        resolved=ResolvedInfo(
            fact_tables=resolved.fact_tables,
            dimensions=[d.name for d in resolved.dimensions],
            measures=[m.name for m in resolved.measures],
        ),
        warnings=warnings,
        sql_valid=sql_valid,
    )

Query Resolution

orionbelt.compiler.resolution.QueryResolver

Resolves a QueryObject + SemanticModel into a ResolvedQuery.

Source code in src/orionbelt/compiler/resolution.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
class QueryResolver:
    """Resolves a QueryObject + SemanticModel into a ResolvedQuery."""

    def resolve(self, query: QueryObject, model: SemanticModel) -> ResolvedQuery:
        errors: list[SemanticError] = []
        result = ResolvedQuery(
            limit=query.limit,
            use_path_names=list(query.use_path_names),
        )

        # Build global column lookup: col_name → (object_name, source_column)
        global_columns: dict[str, tuple[str, str]] = {}
        for obj_name, obj in model.data_objects.items():
            for col_name, col_obj in obj.columns.items():
                global_columns[col_name] = (obj_name, col_obj.code)

        # 1. Resolve dimensions
        for dim_str in query.select.dimensions:
            dim_ref = DimensionRef.parse(dim_str)
            resolved_dim = self._resolve_dimension(dim_ref, model, errors)
            if resolved_dim:
                result.dimensions.append(resolved_dim)
                result.required_objects.add(resolved_dim.object_name)

        # 2. Resolve measures and track their source objects
        for measure_name in query.select.measures:
            resolved_meas = self._resolve_measure(
                measure_name, model, global_columns, errors, result
            )
            if resolved_meas:
                result.measures.append(resolved_meas)
                # Collect all source objects for this measure/metric
                source_objs = self._get_measure_source_objects(measure_name, model, global_columns)
                result.measure_source_objects.update(source_objs)
                result.required_objects.update(source_objs)

        # 3. Determine base object (the one with most joins / most measures)
        result.base_object = self._select_base_object(result, model)
        if result.base_object:
            result.required_objects.add(result.base_object)

        # Detect multi-fact: CFL is needed only when measure source objects
        # span multiple independent fact tables.  If all measure sources are
        # reachable from the base object via directed join paths (i.e. they
        # are dimension tables joined from the same fact), a single star
        # schema query suffices.
        if len(result.measure_source_objects) > 1:
            graph = JoinGraph(model, use_path_names=query.use_path_names or None)
            reachable = graph.descendants(result.base_object)
            unreachable = result.measure_source_objects - reachable - {result.base_object}
            if unreachable:
                result.requires_cfl = True

        # 4. Validate usePathNames before building join graph
        for upn in query.use_path_names:
            if upn.source not in model.data_objects:
                errors.append(
                    SemanticError(
                        code="UNKNOWN_DATA_OBJECT",
                        message=(f"usePathNames references unknown data object '{upn.source}'"),
                        path="usePathNames",
                    )
                )
                continue
            if upn.target not in model.data_objects:
                errors.append(
                    SemanticError(
                        code="UNKNOWN_DATA_OBJECT",
                        message=(f"usePathNames references unknown data object '{upn.target}'"),
                        path="usePathNames",
                    )
                )
                continue
            # Check that a secondary join with this pathName exists for the pair
            source_obj = model.data_objects[upn.source]
            found = False
            for join in source_obj.joins:
                if (
                    join.join_to == upn.target
                    and join.secondary
                    and join.path_name == upn.path_name
                ):
                    found = True
                    break
            if not found:
                errors.append(
                    SemanticError(
                        code="UNKNOWN_PATH_NAME",
                        message=(
                            f"No secondary join with pathName '{upn.path_name}' "
                            f"from '{upn.source}' to '{upn.target}'"
                        ),
                        path="usePathNames",
                    )
                )

        # 5. Resolve join paths
        graph = JoinGraph(model, use_path_names=query.use_path_names or None)
        if result.base_object and len(result.required_objects) > 1:
            result.join_steps = graph.find_join_path({result.base_object}, result.required_objects)

        # Build set of all objects present in the query's join graph
        joined_objects: set[str] = set()
        if result.base_object:
            joined_objects.add(result.base_object)
        for step in result.join_steps:
            joined_objects.add(step.to_object)

        # 6. Classify filters — filters may auto-extend the join path
        for qf in query.where:
            resolved_filter = self._resolve_filter(
                qf,
                model,
                is_having=False,
                errors=errors,
                joined_objects=joined_objects,
                graph=graph,
                result=result,
            )
            if resolved_filter:
                result.where_filters.append(resolved_filter)

        for qf in query.having:
            resolved_filter = self._resolve_filter(
                qf,
                model,
                is_having=True,
                errors=errors,
                joined_objects=joined_objects,
                graph=graph,
                result=result,
            )
            if resolved_filter:
                result.having_filters.append(resolved_filter)

        # 7. Resolve order by — must reference a dimension or measure in SELECT
        select_count = len(result.dimensions) + len(result.measures)
        for ob in query.order_by:
            expr = self._resolve_order_by_field(
                ob.field,
                result,
                errors=errors,
                select_count=select_count,
            )
            if expr:
                result.order_by_exprs.append((expr, ob.direction == "desc"))

        if errors:
            raise ResolutionError(errors)

        return result

    def _resolve_dimension(
        self,
        ref: DimensionRef,
        model: SemanticModel,
        errors: list[SemanticError],
    ) -> ResolvedDimension | None:
        """Resolve a dimension reference to its physical column."""
        dim = model.dimensions.get(ref.name)
        if dim is None:
            errors.append(
                SemanticError(
                    code="UNKNOWN_DIMENSION",
                    message=f"Unknown dimension '{ref.name}'",
                    path="select.dimensions",
                )
            )
            return None

        obj_name = dim.view
        col_name = dim.column
        obj = model.data_objects.get(obj_name)
        if obj is None:
            errors.append(
                SemanticError(
                    code="UNKNOWN_DATA_OBJECT",
                    message=f"Dimension '{ref.name}' references unknown data object '{obj_name}'",
                )
            )
            return None

        vf = obj.columns.get(col_name)
        source_col = vf.code if vf else col_name

        return ResolvedDimension(
            name=ref.name,
            object_name=obj_name,
            column_name=col_name,
            source_column=source_col,
            grain=ref.grain or dim.time_grain,
        )

    def _resolve_measure(
        self,
        name: str,
        model: SemanticModel,
        global_columns: dict[str, tuple[str, str]],
        errors: list[SemanticError],
        result: ResolvedQuery | None = None,
    ) -> ResolvedMeasure | None:
        """Resolve a measure name to its aggregate expression."""
        measure = model.measures.get(name)
        if measure is None:
            # Check metrics
            metric = model.metrics.get(name)
            if metric:
                return self._resolve_metric(name, metric, model, global_columns, errors, result)
            errors.append(
                SemanticError(
                    code="UNKNOWN_MEASURE",
                    message=f"Unknown measure '{name}'",
                    path="select.measures",
                )
            )
            return None

        expr = self._build_measure_expr(measure.label, measure, model, global_columns)
        return ResolvedMeasure(
            name=name,
            aggregation=measure.aggregation,
            expression=expr,
            is_expression=measure.expression is not None,
            total=measure.total,
        )

    def _build_measure_expr(
        self,
        name: str,
        measure: Measure,
        model: SemanticModel,
        global_columns: dict[str, tuple[str, str]],
    ) -> Expr:
        """Build the aggregate expression for a measure."""
        if measure.expression:
            return self._expand_expression(measure, model, global_columns)

        # Build column references for all columns
        args: list[Expr] = []
        if measure.columns:
            for ref in measure.columns:
                obj_name = ref.view or ""
                col_name = ref.column or ""
                obj = model.data_objects.get(obj_name)
                source = obj.columns[col_name].code if obj and col_name in obj.columns else col_name
                args.append(ColumnRef(name=source, table=obj_name))
        if not args:
            args = [Literal.number(1)]

        agg = measure.aggregation.upper()
        distinct = measure.distinct
        if agg == "COUNT_DISTINCT":
            agg = "COUNT"
            distinct = True

        # LISTAGG: attach separator and optional ordering
        separator: str | None = None
        order_by: list[OrderByItem] = []
        if agg == "LISTAGG":
            separator = measure.delimiter if measure.delimiter is not None else ","
            if measure.within_group:
                wg = measure.within_group
                wg_obj_name = wg.column.view or ""
                wg_col_name = wg.column.column or ""
                wg_obj = model.data_objects.get(wg_obj_name)
                wg_source = (
                    wg_obj.columns[wg_col_name].code
                    if wg_obj and wg_col_name in wg_obj.columns
                    else wg_col_name
                )
                order_by = [
                    OrderByItem(
                        expr=ColumnRef(name=wg_source, table=wg_obj_name),
                        desc=wg.order.upper() == "DESC",
                    )
                ]

        return FunctionCall(
            name=agg,
            args=args,
            distinct=distinct,
            order_by=order_by,
            separator=separator,
        )

    def _expand_expression(
        self,
        measure: Measure,
        model: SemanticModel,
        global_columns: dict[str, tuple[str, str]],
    ) -> Expr:
        """Expand a measure expression into AST nodes.

        Handles {[DataObject].[Column]} placeholders.
        """

        formula = measure.expression or ""
        agg = measure.aggregation.upper()

        # Replace {[DataObject].[Column]} with column references
        named_refs = re.findall(r"\{\[([^\]]+)\]\.\[([^\]]+)\]\}", formula)
        for obj_name, col_name in named_refs:
            obj = model.data_objects.get(obj_name)
            if obj and col_name in obj.columns:
                source = obj.columns[col_name].code
                formula = formula.replace(f"{{[{obj_name}].[{col_name}]}}", f"{obj_name}.{source}")

        # Wrap the whole formula in the aggregation function as raw SQL
        from orionbelt.ast.nodes import RawSQL

        distinct = measure.distinct
        if agg == "COUNT_DISTINCT":
            agg = "COUNT"
            distinct = True

        return FunctionCall(
            name=agg,
            args=[RawSQL(sql=formula)],
            distinct=distinct,
        )

    def _resolve_metric(
        self,
        name: str,
        metric: Metric,
        model: SemanticModel,
        global_columns: dict[str, tuple[str, str]],
        errors: list[SemanticError],
        result: ResolvedQuery | None = None,
    ) -> ResolvedMeasure | None:
        """Resolve a metric to its combined expression.

        Parses the formula into a proper AST tree and resolves each
        component measure so that planners can substitute them later.
        """
        formula = metric.expression

        # Extract and resolve each component measure
        component_names = re.findall(r"\{\[([^\]]+)\]\}", formula)
        for comp_name in component_names:
            if result is not None and comp_name not in result.metric_components:
                comp = self._resolve_measure(comp_name, model, global_columns, errors, result)
                if comp:
                    result.metric_components[comp_name] = comp

        # Parse the formula into an AST tree
        try:
            tokens = _tokenize_formula(formula)
            parsed_expr = _parse_metric_formula(tokens)
        except Exception as exc:
            errors.append(
                SemanticError(
                    code="INVALID_METRIC_EXPRESSION",
                    message=f"Metric '{name}' has invalid expression: {exc}",
                    path=f"metrics.{name}.expression",
                )
            )
            return None

        return ResolvedMeasure(
            name=name,
            aggregation="",
            expression=parsed_expr,
            component_measures=component_names,
            is_expression=True,
        )

    def _get_measure_source_objects(
        self,
        name: str,
        model: SemanticModel,
        global_columns: dict[str, tuple[str, str]],
    ) -> set[str]:
        """Extract all source data objects for a measure or metric."""
        result: set[str] = set()

        # Check simple measures first
        measure = model.measures.get(name)
        if measure:
            # Columns-based measure
            for cref in measure.columns:
                if cref.view:
                    result.add(cref.view)
            # Expression-based measure: extract {[DataObject].[Column]} references
            if measure.expression:
                col_refs = re.findall(r"\{\[([^\]]+)\]\.\[([^\]]+)\]\}", measure.expression)
                for obj_name, _col_name in col_refs:
                    result.add(obj_name)
            return result

        # Check metrics: {[Measure Name]} references → recurse
        metric = model.metrics.get(name)
        if metric:
            measure_refs = re.findall(r"\{\[([^\]]+)\]\}", metric.expression)
            for ref_name in measure_refs:
                result.update(self._get_measure_source_objects(ref_name, model, global_columns))

        return result

    def _select_base_object(self, result: ResolvedQuery, model: SemanticModel) -> str:
        """Select the base (fact) object — prefer measure source objects with most joins."""
        # Priority 1: measure source objects (true fact tables) — pick the one with most joins
        if result.measure_source_objects:
            best = ""
            best_joins = -1
            for obj_name in sorted(result.measure_source_objects):
                obj = model.data_objects.get(obj_name)
                n = len(obj.joins) if obj else 0
                if n > best_joins:
                    best = obj_name
                    best_joins = n
            if best:
                return best

        # Priority 2: any required object with joins defined
        for obj_name in sorted(result.required_objects):
            obj = model.data_objects.get(obj_name)
            if obj and obj.joins:
                return obj_name

        # Fallback: first required object
        if result.required_objects:
            return next(iter(sorted(result.required_objects)))
        if model.data_objects:
            return next(iter(model.data_objects))
        return ""

    def _resolve_filter(
        self,
        qf: QueryFilter,
        model: SemanticModel,
        is_having: bool,
        errors: list[SemanticError],
        joined_objects: set[str] | None = None,
        graph: JoinGraph | None = None,
        result: ResolvedQuery | None = None,
    ) -> ResolvedFilter | None:
        """Resolve a query filter to a physical expression.

        Filter fields can reference any dimension whose data object is
        reachable from the query's join graph (including descendants).
        If the object is reachable but not yet joined, the join path is
        auto-extended.
        """
        filter_path = "having" if is_having else "where"

        # Try to find the field in dimensions first
        dim = model.dimensions.get(qf.field)
        if dim:
            obj_name = dim.view
            # Check reachability: in join graph or reachable via descendants
            if joined_objects is not None and obj_name not in joined_objects:
                # Check if reachable via directed joins from any joined object
                reachable = False
                if graph is not None:
                    for joined_obj in list(joined_objects):
                        if obj_name in graph.descendants(joined_obj):
                            reachable = True
                            break
                if not reachable:
                    errors.append(
                        SemanticError(
                            code="UNREACHABLE_FILTER_FIELD",
                            message=(
                                f"Filter field '{qf.field}' references data object "
                                f"'{obj_name}' which is not reachable from "
                                f"the query's join graph"
                            ),
                            path=filter_path,
                        )
                    )
                    return None
                # Auto-extend the join path to include this object
                if graph is not None and result is not None:
                    new_steps = graph.find_join_path(joined_objects, {obj_name})
                    for step in new_steps:
                        if step.to_object not in joined_objects:
                            result.join_steps.append(step)
                            joined_objects.add(step.to_object)
                            result.required_objects.add(step.to_object)

            col_name = dim.column
            obj = model.data_objects.get(obj_name)
            source = obj.columns[col_name].code if obj and col_name in obj.columns else col_name
            col_expr: Expr = ColumnRef(name=source, table=obj_name)
        elif is_having and qf.field in model.measures:
            # HAVING filter on a measure — valid
            col_expr = ColumnRef(name=qf.field)
        else:
            errors.append(
                SemanticError(
                    code="UNKNOWN_FILTER_FIELD",
                    message=f"Unknown filter field '{qf.field}'",
                    path=filter_path,
                )
            )
            return None

        filter_expr = self._build_filter_expr(col_expr, qf, errors)
        if filter_expr is None:
            return None
        return ResolvedFilter(expression=filter_expr, is_aggregate=is_having)

    def _build_filter_expr(
        self, col: Expr, qf: QueryFilter, errors: list[SemanticError]
    ) -> Expr | None:
        """Build a filter expression from operator and value."""
        from orionbelt.ast.nodes import InList, IsNull

        op = qf.op
        val = qf.value

        match op:
            case FilterOperator.EQUALS | FilterOperator.EQ:
                return BinaryOp(left=col, op="=", right=Literal(value=val))
            case FilterOperator.NOT_EQUALS | FilterOperator.NEQ:
                return BinaryOp(left=col, op="<>", right=Literal(value=val))
            case FilterOperator.GT | FilterOperator.GREATER:
                return BinaryOp(left=col, op=">", right=Literal(value=val))
            case FilterOperator.GTE | FilterOperator.GREATER_EQ:
                return BinaryOp(left=col, op=">=", right=Literal(value=val))
            case FilterOperator.LT | FilterOperator.LESS:
                return BinaryOp(left=col, op="<", right=Literal(value=val))
            case FilterOperator.LTE | FilterOperator.LESS_EQ:
                return BinaryOp(left=col, op="<=", right=Literal(value=val))
            case FilterOperator.IN_LIST | FilterOperator.IN:
                vals: list[Expr] = (
                    [Literal(value=v) for v in val]
                    if isinstance(val, list)
                    else [Literal(value=val)]
                )
                return InList(expr=col, values=vals)
            case FilterOperator.NOT_IN_LIST | FilterOperator.NOT_IN:
                not_vals: list[Expr] = (
                    [Literal(value=v) for v in val]
                    if isinstance(val, list)
                    else [Literal(value=val)]
                )
                return InList(expr=col, values=not_vals, negated=True)
            case FilterOperator.SET | FilterOperator.IS_NOT_NULL:
                return IsNull(expr=col, negated=True)
            case FilterOperator.NOT_SET | FilterOperator.IS_NULL:
                return IsNull(expr=col, negated=False)
            case FilterOperator.CONTAINS:
                return BinaryOp(
                    left=col,
                    op="LIKE",
                    right=Literal.string(f"%{val}%"),
                )
            case FilterOperator.NOT_CONTAINS:
                return BinaryOp(
                    left=col,
                    op="NOT LIKE",
                    right=Literal.string(f"%{val}%"),
                )
            case FilterOperator.STARTS_WITH:
                return BinaryOp(
                    left=col,
                    op="LIKE",
                    right=Literal.string(f"{val}%"),
                )
            case FilterOperator.ENDS_WITH:
                return BinaryOp(
                    left=col,
                    op="LIKE",
                    right=Literal.string(f"%{val}"),
                )
            case FilterOperator.LIKE:
                return BinaryOp(left=col, op="LIKE", right=Literal.string(str(val)))
            case FilterOperator.NOT_LIKE:
                return BinaryOp(left=col, op="NOT LIKE", right=Literal.string(str(val)))
            case FilterOperator.BETWEEN:
                from orionbelt.ast.nodes import Between

                if isinstance(val, list) and len(val) >= 2:
                    return Between(
                        expr=col,
                        low=Literal(value=val[0]),
                        high=Literal(value=val[1]),
                    )
                return BinaryOp(left=col, op="=", right=Literal(value=val))
            case FilterOperator.NOT_BETWEEN:
                from orionbelt.ast.nodes import Between

                if isinstance(val, list) and len(val) >= 2:
                    return Between(
                        expr=col,
                        low=Literal(value=val[0]),
                        high=Literal(value=val[1]),
                        negated=True,
                    )
                return BinaryOp(left=col, op="<>", right=Literal(value=val))
            case FilterOperator.RELATIVE:
                relative = self._parse_relative_filter(val, errors, field=qf.field)
                if relative is None:
                    return None
                return RelativeDateRange(
                    column=col,
                    unit=relative["unit"],
                    count=relative["count"],
                    direction=relative["direction"],
                    include_current=relative["include_current"],
                )
            case _:
                errors.append(
                    SemanticError(
                        code="INVALID_FILTER_OPERATOR",
                        message=f"Unsupported filter operator '{op}'",
                        path="filters",
                    )
                )
                return None

    def _parse_relative_filter(
        self, value: object, errors: list[SemanticError], field: str
    ) -> _RelativeFilterParsed | None:
        if not isinstance(value, dict):
            errors.append(
                SemanticError(
                    code="INVALID_RELATIVE_FILTER",
                    message=(
                        f"Relative filter for '{field}' must be an object "
                        "with keys {unit, count, direction?, include_current?}"
                    ),
                    path="filters",
                )
            )
            return None

        unit = value.get("unit")
        count = value.get("count")
        direction = value.get("direction", "past")
        include_current = value.get("include_current", value.get("includeCurrent", True))

        if not isinstance(unit, str):
            errors.append(
                SemanticError(
                    code="INVALID_RELATIVE_FILTER",
                    message=f"Relative filter for '{field}' requires string 'unit'",
                    path="filters",
                )
            )
            return None
        unit = unit.lower()
        if unit not in {"day", "week", "month", "year"}:
            errors.append(
                SemanticError(
                    code="INVALID_RELATIVE_FILTER",
                    message=f"Relative filter for '{field}' has unsupported unit '{unit}'",
                    path="filters",
                )
            )
            return None
        if not isinstance(count, int) or count <= 0:
            errors.append(
                SemanticError(
                    code="INVALID_RELATIVE_FILTER",
                    message=f"Relative filter for '{field}' requires positive integer 'count'",
                    path="filters",
                )
            )
            return None
        if direction not in {"past", "future"}:
            errors.append(
                SemanticError(
                    code="INVALID_RELATIVE_FILTER",
                    message=(f"Relative filter for '{field}' has invalid direction '{direction}'"),
                    path="filters",
                )
            )
            return None
        if not isinstance(include_current, bool):
            errors.append(
                SemanticError(
                    code="INVALID_RELATIVE_FILTER",
                    message=(f"Relative filter for '{field}' has non-boolean include_current"),
                    path="filters",
                )
            )
            return None

        return {
            "unit": unit,
            "count": count,
            "direction": direction,
            "include_current": include_current,
        }

    def _resolve_order_by_field(
        self,
        field_name: str,
        result: ResolvedQuery,
        errors: list[SemanticError],
        select_count: int,
    ) -> Expr | None:
        """Resolve an order-by field to its expression.

        Must reference a dimension or measure already in the query's SELECT,
        or be a numeric positional reference (1-based).
        """
        # Check if it's a resolved dimension in SELECT
        for dim in result.dimensions:
            if dim.name == field_name:
                return ColumnRef(name=dim.source_column, table=dim.object_name)

        # Check if it's a resolved measure in SELECT
        for meas in result.measures:
            if meas.name == field_name:
                return meas.expression

        # Check if it's a numeric positional reference (e.g. "1", "2")
        if field_name.isdigit():
            pos = int(field_name)
            if 1 <= pos <= select_count:
                return Literal.number(pos)
            errors.append(
                SemanticError(
                    code="INVALID_ORDER_BY_POSITION",
                    message=(
                        f"ORDER BY position {pos} is out of range "
                        f"(SELECT has {select_count} columns)"
                    ),
                    path="order_by",
                )
            )
            return None

        errors.append(
            SemanticError(
                code="UNKNOWN_ORDER_BY_FIELD",
                message=(
                    f"ORDER BY field '{field_name}' is not a dimension "
                    f"or measure in the query's SELECT"
                ),
                path="order_by",
            )
        )
        return None

resolve(query, model)

Source code in src/orionbelt/compiler/resolution.py
def resolve(self, query: QueryObject, model: SemanticModel) -> ResolvedQuery:
    errors: list[SemanticError] = []
    result = ResolvedQuery(
        limit=query.limit,
        use_path_names=list(query.use_path_names),
    )

    # Build global column lookup: col_name → (object_name, source_column)
    global_columns: dict[str, tuple[str, str]] = {}
    for obj_name, obj in model.data_objects.items():
        for col_name, col_obj in obj.columns.items():
            global_columns[col_name] = (obj_name, col_obj.code)

    # 1. Resolve dimensions
    for dim_str in query.select.dimensions:
        dim_ref = DimensionRef.parse(dim_str)
        resolved_dim = self._resolve_dimension(dim_ref, model, errors)
        if resolved_dim:
            result.dimensions.append(resolved_dim)
            result.required_objects.add(resolved_dim.object_name)

    # 2. Resolve measures and track their source objects
    for measure_name in query.select.measures:
        resolved_meas = self._resolve_measure(
            measure_name, model, global_columns, errors, result
        )
        if resolved_meas:
            result.measures.append(resolved_meas)
            # Collect all source objects for this measure/metric
            source_objs = self._get_measure_source_objects(measure_name, model, global_columns)
            result.measure_source_objects.update(source_objs)
            result.required_objects.update(source_objs)

    # 3. Determine base object (the one with most joins / most measures)
    result.base_object = self._select_base_object(result, model)
    if result.base_object:
        result.required_objects.add(result.base_object)

    # Detect multi-fact: CFL is needed only when measure source objects
    # span multiple independent fact tables.  If all measure sources are
    # reachable from the base object via directed join paths (i.e. they
    # are dimension tables joined from the same fact), a single star
    # schema query suffices.
    if len(result.measure_source_objects) > 1:
        graph = JoinGraph(model, use_path_names=query.use_path_names or None)
        reachable = graph.descendants(result.base_object)
        unreachable = result.measure_source_objects - reachable - {result.base_object}
        if unreachable:
            result.requires_cfl = True

    # 4. Validate usePathNames before building join graph
    for upn in query.use_path_names:
        if upn.source not in model.data_objects:
            errors.append(
                SemanticError(
                    code="UNKNOWN_DATA_OBJECT",
                    message=(f"usePathNames references unknown data object '{upn.source}'"),
                    path="usePathNames",
                )
            )
            continue
        if upn.target not in model.data_objects:
            errors.append(
                SemanticError(
                    code="UNKNOWN_DATA_OBJECT",
                    message=(f"usePathNames references unknown data object '{upn.target}'"),
                    path="usePathNames",
                )
            )
            continue
        # Check that a secondary join with this pathName exists for the pair
        source_obj = model.data_objects[upn.source]
        found = False
        for join in source_obj.joins:
            if (
                join.join_to == upn.target
                and join.secondary
                and join.path_name == upn.path_name
            ):
                found = True
                break
        if not found:
            errors.append(
                SemanticError(
                    code="UNKNOWN_PATH_NAME",
                    message=(
                        f"No secondary join with pathName '{upn.path_name}' "
                        f"from '{upn.source}' to '{upn.target}'"
                    ),
                    path="usePathNames",
                )
            )

    # 5. Resolve join paths
    graph = JoinGraph(model, use_path_names=query.use_path_names or None)
    if result.base_object and len(result.required_objects) > 1:
        result.join_steps = graph.find_join_path({result.base_object}, result.required_objects)

    # Build set of all objects present in the query's join graph
    joined_objects: set[str] = set()
    if result.base_object:
        joined_objects.add(result.base_object)
    for step in result.join_steps:
        joined_objects.add(step.to_object)

    # 6. Classify filters — filters may auto-extend the join path
    for qf in query.where:
        resolved_filter = self._resolve_filter(
            qf,
            model,
            is_having=False,
            errors=errors,
            joined_objects=joined_objects,
            graph=graph,
            result=result,
        )
        if resolved_filter:
            result.where_filters.append(resolved_filter)

    for qf in query.having:
        resolved_filter = self._resolve_filter(
            qf,
            model,
            is_having=True,
            errors=errors,
            joined_objects=joined_objects,
            graph=graph,
            result=result,
        )
        if resolved_filter:
            result.having_filters.append(resolved_filter)

    # 7. Resolve order by — must reference a dimension or measure in SELECT
    select_count = len(result.dimensions) + len(result.measures)
    for ob in query.order_by:
        expr = self._resolve_order_by_field(
            ob.field,
            result,
            errors=errors,
            select_count=select_count,
        )
        if expr:
            result.order_by_exprs.append((expr, ob.direction == "desc"))

    if errors:
        raise ResolutionError(errors)

    return result

Star Schema Planner

orionbelt.compiler.star.StarSchemaPlanner

Plans star-schema queries: single fact base with dimension joins.

Source code in src/orionbelt/compiler/star.py
class StarSchemaPlanner:
    """Plans star-schema queries: single fact base with dimension joins."""

    def plan(self, resolved: ResolvedQuery, model: SemanticModel) -> QueryPlan:
        builder = QueryBuilder()
        graph = JoinGraph(model, use_path_names=resolved.use_path_names or None)

        base_object = model.data_objects.get(resolved.base_object)
        if not base_object:
            return QueryPlan(ast=builder.build())

        base_alias = resolved.base_object

        # SELECT: dimensions
        for dim in resolved.dimensions:
            col = ColumnRef(name=dim.source_column, table=dim.object_name)
            if dim.grain:
                # Time grain will be applied by dialect, for now use column directly
                builder.select(AliasedExpr(expr=col, alias=dim.name))
            else:
                builder.select(AliasedExpr(expr=col, alias=dim.name))

        # SELECT: measures (aggregated) — for metrics, substitute component refs
        for measure in resolved.measures:
            if measure.component_measures:
                substituted = _substitute_measure_refs(
                    measure.expression, resolved.metric_components
                )
                builder.select(AliasedExpr(expr=substituted, alias=measure.name))
            else:
                builder.select(AliasedExpr(expr=measure.expression, alias=measure.name))

        # FROM: base fact table
        builder.from_(base_object.qualified_code, alias=base_alias)

        # JOINs: dimension tables
        for step in resolved.join_steps:
            target_object = model.data_objects.get(step.to_object)
            if not target_object:
                continue
            on_expr = graph.build_join_condition(step)
            builder.join(
                table=target_object.qualified_code,
                on=on_expr,
                join_type=step.join_type,
                alias=step.to_object,
            )

        # WHERE
        for wf in resolved.where_filters:
            builder.where(wf.expression)

        # GROUP BY (all dimension columns)
        for dim in resolved.dimensions:
            col = ColumnRef(name=dim.source_column, table=dim.object_name)
            builder.group_by(col)

        # HAVING
        for hf in resolved.having_filters:
            builder.having(hf.expression)

        # ORDER BY
        for expr, desc in resolved.order_by_exprs:
            builder.order_by(expr, desc=desc)

        # LIMIT
        if resolved.limit is not None:
            builder.limit(resolved.limit)

        return QueryPlan(ast=builder.build())

plan(resolved, model)

Source code in src/orionbelt/compiler/star.py
def plan(self, resolved: ResolvedQuery, model: SemanticModel) -> QueryPlan:
    builder = QueryBuilder()
    graph = JoinGraph(model, use_path_names=resolved.use_path_names or None)

    base_object = model.data_objects.get(resolved.base_object)
    if not base_object:
        return QueryPlan(ast=builder.build())

    base_alias = resolved.base_object

    # SELECT: dimensions
    for dim in resolved.dimensions:
        col = ColumnRef(name=dim.source_column, table=dim.object_name)
        if dim.grain:
            # Time grain will be applied by dialect, for now use column directly
            builder.select(AliasedExpr(expr=col, alias=dim.name))
        else:
            builder.select(AliasedExpr(expr=col, alias=dim.name))

    # SELECT: measures (aggregated) — for metrics, substitute component refs
    for measure in resolved.measures:
        if measure.component_measures:
            substituted = _substitute_measure_refs(
                measure.expression, resolved.metric_components
            )
            builder.select(AliasedExpr(expr=substituted, alias=measure.name))
        else:
            builder.select(AliasedExpr(expr=measure.expression, alias=measure.name))

    # FROM: base fact table
    builder.from_(base_object.qualified_code, alias=base_alias)

    # JOINs: dimension tables
    for step in resolved.join_steps:
        target_object = model.data_objects.get(step.to_object)
        if not target_object:
            continue
        on_expr = graph.build_join_condition(step)
        builder.join(
            table=target_object.qualified_code,
            on=on_expr,
            join_type=step.join_type,
            alias=step.to_object,
        )

    # WHERE
    for wf in resolved.where_filters:
        builder.where(wf.expression)

    # GROUP BY (all dimension columns)
    for dim in resolved.dimensions:
        col = ColumnRef(name=dim.source_column, table=dim.object_name)
        builder.group_by(col)

    # HAVING
    for hf in resolved.having_filters:
        builder.having(hf.expression)

    # ORDER BY
    for expr, desc in resolved.order_by_exprs:
        builder.order_by(expr, desc=desc)

    # LIMIT
    if resolved.limit is not None:
        builder.limit(resolved.limit)

    return QueryPlan(ast=builder.build())

CFL Planner

orionbelt.compiler.cfl.CFLPlanner

Plans Composite Fact Layer queries: conformed dimensions + fact stitching.

Uses a UNION ALL strategy: 1. Each fact leg SELECTs conformed dimensions + its own measures (NULL for others) 2. UNION ALL combines the legs into a single CTE 3. Outer query aggregates over the union, grouping by conformed dimensions

Source code in src/orionbelt/compiler/cfl.py
class CFLPlanner:
    """Plans Composite Fact Layer queries: conformed dimensions + fact stitching.

    Uses a UNION ALL strategy:
    1. Each fact leg SELECTs conformed dimensions + its own measures (NULL for others)
    2. UNION ALL combines the legs into a single CTE
    3. Outer query aggregates over the union, grouping by conformed dimensions
    """

    def plan(self, resolved: ResolvedQuery, model: SemanticModel) -> QueryPlan:
        """Plan a CFL query."""
        self._validate_fanout(resolved, model)

        # Group measures by their source object
        measures_by_object, cross_fact = self._group_measures_by_object(resolved, model)

        if len(measures_by_object) <= 1 and not cross_fact:
            # Single fact — delegate to star schema
            from orionbelt.compiler.star import StarSchemaPlanner

            return StarSchemaPlanner().plan(resolved, model)

        # Multi-fact: UNION ALL strategy
        return self._plan_union_all(resolved, model, measures_by_object, cross_fact)

    def _validate_fanout(self, resolved: ResolvedQuery, model: SemanticModel) -> None:
        """Validate that grain is compatible and no fanout will occur."""
        errors: list[str] = []

        for dim in resolved.dimensions:
            if dim.object_name not in model.data_objects:
                errors.append(
                    f"Dimension '{dim.name}' references unknown data object '{dim.object_name}'"
                )

        if errors:
            raise FanoutError("; ".join(errors))

    def _group_measures_by_object(
        self,
        resolved: ResolvedQuery,
        model: SemanticModel,
    ) -> tuple[dict[str, list[ResolvedMeasure]], list[ResolvedMeasure]]:
        """Group measures by their primary source object.

        Returns ``(groups, cross_fact)`` where *cross_fact* contains
        multi-field measures whose fields span multiple objects.
        For metrics, expand their component measures into the grouping
        instead of the metric itself.  Cross-fact measures ensure every
        involved object has a leg, but are not assigned to any single
        group — their individual fields are distributed per-leg by
        ``_plan_union_all``.
        """
        groups: dict[str, list[ResolvedMeasure]] = {}
        cross_fact: list[ResolvedMeasure] = []
        seen: set[str] = set()

        for measure in resolved.measures:
            if measure.component_measures:
                # Metric: add each component measure to its source object
                for comp_name in measure.component_measures:
                    if comp_name in seen:
                        continue
                    seen.add(comp_name)
                    comp = resolved.metric_components.get(comp_name)
                    if comp is None:
                        continue
                    model_measure = model.measures.get(comp_name)
                    if model_measure and model_measure.columns:
                        obj_name = model_measure.columns[0].view or resolved.base_object
                    else:
                        obj_name = resolved.base_object
                    groups.setdefault(obj_name, []).append(comp)
            else:
                if measure.name in seen:
                    continue
                seen.add(measure.name)
                model_measure = model.measures.get(measure.name)
                if not model_measure or not model_measure.columns:
                    groups.setdefault(resolved.base_object, []).append(measure)
                    continue

                # Collect all distinct source objects for this measure
                field_objects = {f.view for f in model_measure.columns if f.view}
                if len(field_objects) > 1:
                    # Cross-fact multi-field measure: ensure each
                    # involved object has a leg, but don't assign
                    # the measure to any single group.
                    cross_fact.append(measure)
                    for obj in field_objects:
                        groups.setdefault(obj, [])
                else:
                    obj_name = model_measure.columns[0].view or resolved.base_object
                    groups.setdefault(obj_name, []).append(measure)

        return groups, cross_fact

    @staticmethod
    def _is_multi_field(measure: ResolvedMeasure) -> bool:
        """Check if a measure has multiple field args (e.g. COUNT(a, b))."""
        return isinstance(measure.expression, FunctionCall) and len(measure.expression.args) > 1

    @staticmethod
    def _multi_field_cte_alias(measure_name: str, idx: int) -> str:
        """CTE column name for the *idx*-th field of a multi-field measure."""
        return f"{measure_name}__f{idx}"

    @staticmethod
    def _unwrap_aggregation(measure: ResolvedMeasure) -> Expr:
        """Extract the inner expression from an aggregated measure.

        For FunctionCall(SUM, [inner]) → returns inner.
        Falls back to the full expression if not a FunctionCall.
        """
        if isinstance(measure.expression, FunctionCall) and measure.expression.args:
            return measure.expression.args[0]
        return measure.expression

    def _build_outer_metric_expr(
        self,
        metric: ResolvedMeasure,
        resolved: ResolvedQuery,
    ) -> Expr:
        """Build the outer query expression for a metric.

        Walks the metric's AST tree and replaces each ColumnRef(measure_name)
        with ``AGG("measure_name")`` using the component measure's aggregation.
        """
        return self._substitute_outer_refs(metric.expression, resolved)

    def _substitute_outer_refs(self, expr: Expr, resolved: ResolvedQuery) -> Expr:
        """Recursively substitute measure refs with outer aggregations."""
        if isinstance(expr, ColumnRef) and expr.table is None:
            comp = resolved.metric_components.get(expr.name)
            if comp:
                agg = comp.aggregation.upper()
                distinct = False
                if agg == "COUNT_DISTINCT":
                    agg = "COUNT"
                    distinct = True
                if isinstance(comp.expression, FunctionCall) and comp.expression.distinct:
                    distinct = True
                return FunctionCall(
                    name=agg,
                    args=[ColumnRef(name=comp.name)],
                    distinct=distinct,
                )
        if isinstance(expr, BinaryOp):
            new_left = self._substitute_outer_refs(expr.left, resolved)
            new_right = self._substitute_outer_refs(expr.right, resolved)
            if new_left is not expr.left or new_right is not expr.right:
                return BinaryOp(left=new_left, op=expr.op, right=new_right)
        return expr

    @staticmethod
    def _collect_table_refs(expr: Expr, tables: set[str]) -> None:
        """Recursively collect table names from ColumnRef nodes."""
        if isinstance(expr, ColumnRef) and expr.table:
            tables.add(expr.table)
        elif isinstance(expr, BinaryOp):
            CFLPlanner._collect_table_refs(expr.left, tables)
            CFLPlanner._collect_table_refs(expr.right, tables)
        elif isinstance(expr, (InList, IsNull, Between)):
            CFLPlanner._collect_table_refs(expr.expr, tables)
        elif isinstance(expr, RelativeDateRange):
            CFLPlanner._collect_table_refs(expr.column, tables)
        elif isinstance(expr, FunctionCall):
            for arg in expr.args:
                CFLPlanner._collect_table_refs(arg, tables)

    @staticmethod
    def _remap_cfl_order_by(expr: Expr, resolved: ResolvedQuery) -> Expr:
        """Remap ORDER BY expressions to use CTE aliases for the outer query.

        In CFL, the outer query selects from the composite CTE — original
        table-qualified refs are out of scope.  Remap dimension and measure
        expressions to their CTE alias names.
        """
        # Dimension: ColumnRef(name=source_col, table=obj) → ColumnRef(name=dim.name)
        if isinstance(expr, ColumnRef) and expr.table is not None:
            for dim in resolved.dimensions:
                if expr.name == dim.source_column and expr.table == dim.object_name:
                    return ColumnRef(name=dim.name)
        # Measure: match by identity (same expression object)
        for meas in resolved.measures:
            if expr is meas.expression:
                return ColumnRef(name=meas.name)
        # Numeric position — pass through
        return expr

    def _build_outer_concat_count(
        self,
        measure_name: str,
        n_fields: int,
        agg: str,
        distinct: bool,
    ) -> Expr:
        """Build ``COUNT(DISTINCT CAST(f0 AS VARCHAR) || '|' || ...)`` for the outer query."""
        parts: list[Expr] = [
            Cast(
                expr=ColumnRef(name=self._multi_field_cte_alias(measure_name, i)),
                type_name="VARCHAR",
            )
            for i in range(n_fields)
        ]
        concat: Expr = parts[0]
        for part in parts[1:]:
            concat = BinaryOp(
                left=concat,
                op="||",
                right=BinaryOp(
                    left=Literal.string("|"),
                    op="||",
                    right=part,
                ),
            )
        return FunctionCall(name=agg, args=[concat], distinct=distinct)

    def _plan_union_all(
        self,
        resolved: ResolvedQuery,
        model: SemanticModel,
        measures_by_object: dict[str, list[ResolvedMeasure]],
        cross_fact: list[ResolvedMeasure] | None = None,
    ) -> QueryPlan:
        """UNION ALL strategy: stack fact legs with NULL padding, aggregate outside."""
        graph = JoinGraph(model, use_path_names=resolved.use_path_names or None)

        # Collect all measures across all objects + cross-fact measures
        all_measures: list[ResolvedMeasure] = []
        for measures in measures_by_object.values():
            all_measures.extend(measures)
        if cross_fact:
            all_measures.extend(cross_fact)

        # Collect data objects referenced by WHERE filters — each leg
        # must join these tables so the filter predicates are valid.
        filter_objects: set[str] = set()
        for wf in resolved.where_filters:
            self._collect_table_refs(wf.expression, filter_objects)

        # Build one SELECT per fact object group.
        # Each leg computes its own LCA (least common ancestor) as the lead
        # table — the graph-central node that can reach all dimension objects
        # and the measure's source object with minimal hops.
        union_legs: list[Select] = []
        for obj_name, measures in measures_by_object.items():
            leg_builder = QueryBuilder()
            this_measure_names = {m.name for m in measures}

            # SELECT conformed dimensions
            for dim in resolved.dimensions:
                col = ColumnRef(name=dim.source_column, table=dim.object_name)
                leg_builder.select(AliasedExpr(expr=col, alias=dim.name))

            # SELECT this fact's measures (raw expressions, no aggregation)
            # and NULL for the other facts' measures.
            # Multi-field measures expand into one CTE column per field.
            for m in all_measures:
                if self._is_multi_field(m):
                    assert isinstance(m.expression, FunctionCall)
                    for i, arg in enumerate(m.expression.args):
                        alias = self._multi_field_cte_alias(m.name, i)
                        # Each field goes into the leg that owns its table
                        arg_table = arg.table if isinstance(arg, ColumnRef) else None
                        if arg_table == obj_name:
                            leg_builder.select(AliasedExpr(expr=arg, alias=alias))
                        else:
                            leg_builder.select(AliasedExpr(expr=Literal.null(), alias=alias))
                elif m.name in this_measure_names:
                    leg_builder.select(AliasedExpr(expr=self._unwrap_aggregation(m), alias=m.name))
                else:
                    leg_builder.select(AliasedExpr(expr=Literal.null(), alias=m.name))

            # Determine the common root for this leg:
            # the deepest directed ancestor that can reach all dimension
            # objects, measure's source object, and filter-referenced objects.
            leg_required = {dim.object_name for dim in resolved.dimensions}
            leg_required.add(obj_name)
            leg_required.update(filter_objects)
            lead = graph.find_common_root(leg_required)
            lead_obj = model.data_objects.get(lead)

            # FROM: the lead (LCA) table
            if lead_obj:
                leg_builder.from_(lead_obj.qualified_code, alias=lead)

            # JOINs: all required objects reachable from the lead
            join_targets = leg_required - {lead}
            if join_targets:
                steps = graph.find_join_path({lead}, leg_required)
                for step in steps:
                    target_object = model.data_objects.get(step.to_object)
                    if target_object:
                        on_expr = graph.build_join_condition(step)
                        leg_builder.join(
                            table=target_object.qualified_code,
                            on=on_expr,
                            join_type=step.join_type,
                            alias=step.to_object,
                        )

            # Apply WHERE filters to each leg
            for wf in resolved.where_filters:
                leg_builder.where(wf.expression)

            union_legs.append(leg_builder.build())

        # Create the UNION ALL CTE
        cte_name = "composite_01"
        union_cte = CTE(name=cte_name, query=UnionAll(queries=union_legs))

        # Build outer query: aggregate over the composite CTE
        outer_builder = QueryBuilder()

        # SELECT dimensions
        for dim in resolved.dimensions:
            outer_builder.select(
                AliasedExpr(
                    expr=ColumnRef(name=dim.name),
                    alias=dim.name,
                )
            )

        # SELECT aggregated measures and metrics
        # First, add all component measures (from UNION ALL legs)
        seen_measure_names: set[str] = set()
        for m in all_measures:
            seen_measure_names.add(m.name)
            agg = m.aggregation.upper()
            distinct = False
            if agg == "COUNT_DISTINCT":
                agg = "COUNT"
                distinct = True
            if isinstance(m.expression, FunctionCall) and m.expression.distinct:
                distinct = True

            if self._is_multi_field(m):
                # Multi-field: concat CTE columns in outer query
                assert isinstance(m.expression, FunctionCall)
                n_fields = len(m.expression.args)
                agg_expr: Expr = self._build_outer_concat_count(m.name, n_fields, agg, distinct)
            else:
                agg_expr = FunctionCall(
                    name=agg,
                    args=[ColumnRef(name=m.name)],
                    distinct=distinct,
                )
            outer_builder.select(AliasedExpr(expr=agg_expr, alias=m.name))

        # Then, add metric expressions that combine component measures
        for m in resolved.measures:
            if m.component_measures and m.name not in seen_measure_names:
                metric_expr = self._build_outer_metric_expr(m, resolved)
                outer_builder.select(AliasedExpr(expr=metric_expr, alias=m.name))

        outer_builder.from_(cte_name, alias=cte_name)

        # GROUP BY dimensions
        for dim in resolved.dimensions:
            outer_builder.group_by(ColumnRef(name=dim.name))

        # HAVING filters on the outer query
        for hf in resolved.having_filters:
            outer_builder.having(hf.expression)

        # ORDER BY and LIMIT — remap to CTE aliases
        for expr, desc in resolved.order_by_exprs:
            outer_builder.order_by(self._remap_cfl_order_by(expr, resolved), desc=desc)
        if resolved.limit is not None:
            outer_builder.limit(resolved.limit)

        outer_select = outer_builder.build()

        # Attach CTE
        final = Select(
            columns=outer_select.columns,
            from_=outer_select.from_,
            joins=outer_select.joins,
            where=outer_select.where,
            group_by=outer_select.group_by,
            having=outer_select.having,
            order_by=outer_select.order_by,
            limit=outer_select.limit,
            ctes=[union_cte],
        )

        return QueryPlan(ast=final)

plan(resolved, model)

Plan a CFL query.

Source code in src/orionbelt/compiler/cfl.py
def plan(self, resolved: ResolvedQuery, model: SemanticModel) -> QueryPlan:
    """Plan a CFL query."""
    self._validate_fanout(resolved, model)

    # Group measures by their source object
    measures_by_object, cross_fact = self._group_measures_by_object(resolved, model)

    if len(measures_by_object) <= 1 and not cross_fact:
        # Single fact — delegate to star schema
        from orionbelt.compiler.star import StarSchemaPlanner

        return StarSchemaPlanner().plan(resolved, model)

    # Multi-fact: UNION ALL strategy
    return self._plan_union_all(resolved, model, measures_by_object, cross_fact)

Join Graph

orionbelt.compiler.graph.JoinGraph

Graph of data objects (nodes) and relationships (edges) for join path resolution.

Source code in src/orionbelt/compiler/graph.py
class JoinGraph:
    """Graph of data objects (nodes) and relationships (edges) for join path resolution."""

    def __init__(
        self,
        model: SemanticModel,
        use_path_names: list[UsePathName] | None = None,
    ) -> None:
        self._graph: nx.Graph[str] = nx.Graph()
        self._directed: nx.DiGraph[str] = nx.DiGraph()
        self._model = model
        self._build(model, use_path_names)

    def _build(
        self,
        model: SemanticModel,
        use_path_names: list[UsePathName] | None = None,
    ) -> None:
        """Build the graph from the semantic model.

        Secondary joins are only included when their pathName is requested
        via *use_path_names*.  When a secondary override is active for a
        ``(source, target)`` pair, the primary join for that pair is excluded.
        """
        for name in model.data_objects:
            self._graph.add_node(name)
            self._directed.add_node(name)

        # Build a lookup: (source, target) → pathName for active overrides
        active_overrides: dict[tuple[str, str], str] = {}
        if use_path_names:
            for upn in use_path_names:
                active_overrides[(upn.source, upn.target)] = upn.path_name

        for obj_name, obj in model.data_objects.items():
            for join in obj.joins:
                if join.join_to not in model.data_objects:
                    continue
                pair = (obj_name, join.join_to)

                if join.secondary:
                    # Only include if this secondary join's pathName is active
                    if pair in active_overrides and active_overrides[pair] == join.path_name:
                        self._add_edge(obj_name, join)
                else:
                    # Primary join: skip if an active override exists for this pair
                    if pair not in active_overrides:
                        self._add_edge(obj_name, join)

    def _add_edge(self, obj_name: str, join: object) -> None:
        """Add an edge to both the undirected and directed graphs."""
        from orionbelt.models.semantic import DataObjectJoin

        assert isinstance(join, DataObjectJoin)
        self._graph.add_edge(
            obj_name,
            join.join_to,
            columns_from=join.columns_from,
            columns_to=join.columns_to,
            cardinality=join.join_type,
            source_object=obj_name,
        )
        self._directed.add_edge(
            obj_name,
            join.join_to,
            columns_from=join.columns_from,
            columns_to=join.columns_to,
            cardinality=join.join_type,
        )

    def descendants(self, node: str) -> set[str]:
        """Return all nodes reachable from *node* via directed join paths."""
        if node not in self._directed:
            return set()
        return nx.descendants(self._directed, node)

    def find_common_root(self, required_objects: set[str]) -> str:
        """Find the common root for a set of required objects.

        The join graph is a DAG (joins define direction: source → joinTo).
        The common root is the **deepest** node that can reach ALL
        *required_objects* via directed join paths.  "Deepest" = smallest
        descendant set (most specific ancestor, closest to the required nodes).

        In ``returns → sales → customer``, with required ``{customer, item}``,
        the common root is ``sales`` (it can reach both).  With required
        ``{customer, item, returns}``, the common root is ``returns`` (the
        only node that can reach all three).
        """
        required = required_objects & set(self._directed.nodes)
        if len(required) <= 1:
            return next(iter(sorted(required))) if required else ""

        # Find all nodes that can reach ALL required nodes via directed paths
        candidates: list[tuple[str, int]] = []
        for node in self._directed.nodes:
            reachable = nx.descendants(self._directed, node) | {node}
            if required <= reachable:
                candidates.append((node, len(reachable)))

        if not candidates:
            # Fallback: no single directed ancestor covers all —
            # use undirected shortest-path center
            return self._find_center_undirected(required)

        # Pick the deepest ancestor: smallest reachable set that still covers all
        candidates.sort(key=lambda x: (x[1], x[0]))
        return candidates[0][0]

    def _find_center_undirected(self, required: set[str]) -> str:
        """Fallback: center of the Steiner tree in the undirected graph."""
        nodes = sorted(required)
        if len(nodes) <= 1:
            return nodes[0] if nodes else ""

        steiner: set[str] = set()
        for i in range(len(nodes)):
            for j in range(i + 1, len(nodes)):
                try:
                    path: list[str] = nx.shortest_path(self._graph, nodes[i], nodes[j])
                    steiner.update(path)
                except nx.NetworkXNoPath:
                    pass

        if not steiner:
            return nodes[0]

        best: str = nodes[0]
        best_max = len(self._graph.nodes) + 1
        for node in sorted(steiner):
            max_dist = max(
                nx.shortest_path_length(self._graph, node, r) for r in nodes
            )
            if max_dist < best_max:
                best_max = max_dist
                best = node
        return best

    def find_join_path(self, from_objects: set[str], to_objects: set[str]) -> list[JoinStep]:
        """Find a minimal join path connecting all required data objects.

        Uses shortest path for each target object from the set of source objects.
        """
        steps: list[JoinStep] = []
        visited_edges: set[tuple[str, str]] = set()

        all_targets = to_objects - from_objects
        source_list = list(from_objects)

        for target in sorted(all_targets):
            best_path: list[str] | None = None
            for source in source_list:
                try:
                    path = nx.shortest_path(self._graph, source, target)
                    if best_path is None or len(path) < len(best_path):
                        best_path = path
                except nx.NetworkXNoPath:
                    continue

            if best_path is None:
                continue

            for i in range(len(best_path) - 1):
                edge = (best_path[i], best_path[i + 1])
                rev_edge = (best_path[i + 1], best_path[i])
                if edge in visited_edges or rev_edge in visited_edges:
                    continue
                visited_edges.add(edge)

                edge_data = self._graph.edges[edge]
                source_object = edge_data.get("source_object", edge[0])

                if source_object == edge[0]:
                    step = JoinStep(
                        from_object=edge[0],
                        to_object=edge[1],
                        from_columns=edge_data["columns_from"],
                        to_columns=edge_data["columns_to"],
                        join_type=ASTJoinType.LEFT,
                        cardinality=edge_data["cardinality"],
                    )
                else:
                    step = JoinStep(
                        from_object=edge[1],
                        to_object=edge[0],
                        from_columns=edge_data["columns_to"],
                        to_columns=edge_data["columns_from"],
                        join_type=ASTJoinType.LEFT,
                        cardinality=edge_data["cardinality"],
                        reversed=True,
                    )
                steps.append(step)

            # Add target to sources for subsequent lookups
            source_list.append(target)

        return steps

    def build_join_condition(self, step: JoinStep) -> Expr:
        """Build the ON clause expression for a join step."""
        conditions: list[Expr] = []
        for from_c, to_c in zip(step.from_columns, step.to_columns, strict=True):
            # Resolve to physical column names
            from_obj = self._model.data_objects.get(step.from_object)
            to_obj = self._model.data_objects.get(step.to_object)
            if from_obj and from_c in from_obj.columns:
                from_col = from_obj.columns[from_c].code
            else:
                from_col = from_c
            to_col = to_obj.columns[to_c].code if to_obj and to_c in to_obj.columns else to_c
            conditions.append(
                BinaryOp(
                    left=ColumnRef(name=from_col, table=step.from_object),
                    op="=",
                    right=ColumnRef(name=to_col, table=step.to_object),
                )
            )

        result: Expr = conditions[0]
        for cond in conditions[1:]:
            result = BinaryOp(left=result, op="AND", right=cond)
        return result

    def detect_cycles(self) -> list[list[str]]:
        """Detect cyclic join paths."""
        try:
            cycles = list(nx.simple_cycles(self._directed))
            return cycles
        except nx.NetworkXError:
            return []

    def validate_deterministic(self) -> list[SemanticError]:
        """Ensure join paths are deterministic (no ambiguity)."""
        errors: list[SemanticError] = []
        # Check for multiple edges between the same pair of nodes
        for u, v in self._graph.edges():
            if self._graph.number_of_edges(u, v) > 1:
                errors.append(
                    SemanticError(
                        code="AMBIGUOUS_JOIN",
                        message=f"Multiple join paths between '{u}' and '{v}'",
                        path=f"dataObjects.{u}.joins",
                    )
                )
        return errors

find_join_path(from_objects, to_objects)

Find a minimal join path connecting all required data objects.

Uses shortest path for each target object from the set of source objects.

Source code in src/orionbelt/compiler/graph.py
def find_join_path(self, from_objects: set[str], to_objects: set[str]) -> list[JoinStep]:
    """Find a minimal join path connecting all required data objects.

    Uses shortest path for each target object from the set of source objects.
    """
    steps: list[JoinStep] = []
    visited_edges: set[tuple[str, str]] = set()

    all_targets = to_objects - from_objects
    source_list = list(from_objects)

    for target in sorted(all_targets):
        best_path: list[str] | None = None
        for source in source_list:
            try:
                path = nx.shortest_path(self._graph, source, target)
                if best_path is None or len(path) < len(best_path):
                    best_path = path
            except nx.NetworkXNoPath:
                continue

        if best_path is None:
            continue

        for i in range(len(best_path) - 1):
            edge = (best_path[i], best_path[i + 1])
            rev_edge = (best_path[i + 1], best_path[i])
            if edge in visited_edges or rev_edge in visited_edges:
                continue
            visited_edges.add(edge)

            edge_data = self._graph.edges[edge]
            source_object = edge_data.get("source_object", edge[0])

            if source_object == edge[0]:
                step = JoinStep(
                    from_object=edge[0],
                    to_object=edge[1],
                    from_columns=edge_data["columns_from"],
                    to_columns=edge_data["columns_to"],
                    join_type=ASTJoinType.LEFT,
                    cardinality=edge_data["cardinality"],
                )
            else:
                step = JoinStep(
                    from_object=edge[1],
                    to_object=edge[0],
                    from_columns=edge_data["columns_to"],
                    to_columns=edge_data["columns_from"],
                    join_type=ASTJoinType.LEFT,
                    cardinality=edge_data["cardinality"],
                    reversed=True,
                )
            steps.append(step)

        # Add target to sources for subsequent lookups
        source_list.append(target)

    return steps

build_join_condition(step)

Build the ON clause expression for a join step.

Source code in src/orionbelt/compiler/graph.py
def build_join_condition(self, step: JoinStep) -> Expr:
    """Build the ON clause expression for a join step."""
    conditions: list[Expr] = []
    for from_c, to_c in zip(step.from_columns, step.to_columns, strict=True):
        # Resolve to physical column names
        from_obj = self._model.data_objects.get(step.from_object)
        to_obj = self._model.data_objects.get(step.to_object)
        if from_obj and from_c in from_obj.columns:
            from_col = from_obj.columns[from_c].code
        else:
            from_col = from_c
        to_col = to_obj.columns[to_c].code if to_obj and to_c in to_obj.columns else to_c
        conditions.append(
            BinaryOp(
                left=ColumnRef(name=from_col, table=step.from_object),
                op="=",
                right=ColumnRef(name=to_col, table=step.to_object),
            )
        )

    result: Expr = conditions[0]
    for cond in conditions[1:]:
        result = BinaryOp(left=result, op="AND", right=cond)
    return result

detect_cycles()

Detect cyclic join paths.

Source code in src/orionbelt/compiler/graph.py
def detect_cycles(self) -> list[list[str]]:
    """Detect cyclic join paths."""
    try:
        cycles = list(nx.simple_cycles(self._directed))
        return cycles
    except nx.NetworkXError:
        return []

Code Generator

orionbelt.compiler.codegen.CodeGenerator

Generates SQL from AST using a dialect.

Source code in src/orionbelt/compiler/codegen.py
class CodeGenerator:
    """Generates SQL from AST using a dialect."""

    def __init__(self, dialect: Dialect) -> None:
        self._dialect = dialect

    @property
    def dialect(self) -> Dialect:
        return self._dialect

    def generate(self, ast: Select) -> str:
        """Generate SQL string from AST using the configured dialect."""
        return self._dialect.compile(ast)

generate(ast)

Generate SQL string from AST using the configured dialect.

Source code in src/orionbelt/compiler/codegen.py
def generate(self, ast: Select) -> str:
    """Generate SQL string from AST using the configured dialect."""
    return self._dialect.compile(ast)

Dialect Base

orionbelt.dialect.base.Dialect

Bases: ABC

Abstract base for all SQL dialects.

Provides default SQL compilation; dialects override specific methods.

Source code in src/orionbelt/dialect/base.py
class Dialect(ABC):
    """Abstract base for all SQL dialects.

    Provides default SQL compilation; dialects override specific methods.
    """

    @property
    @abstractmethod
    def name(self) -> str: ...

    @property
    @abstractmethod
    def capabilities(self) -> DialectCapabilities: ...

    @abstractmethod
    def quote_identifier(self, name: str) -> str:
        """Quote an identifier per dialect rules."""

    @abstractmethod
    def render_time_grain(self, column: Expr, grain: TimeGrain) -> Expr:
        """Wrap a column expression for the given time grain."""

    @abstractmethod
    def render_cast(self, expr: Expr, target_type: str) -> Expr:
        """Render a CAST expression."""

    @abstractmethod
    def current_date_sql(self) -> str:
        """Return SQL for the current date."""

    @abstractmethod
    def date_add_sql(self, date_sql: str, unit: str, count: int) -> str:
        """Return SQL that adds count units to date_sql."""

    def render_string_contains(self, column: Expr, pattern: Expr) -> Expr:
        """Default: column LIKE '%' || pattern || '%'."""
        return BinaryOp(
            left=column,
            op="LIKE",
            right=BinaryOp(
                left=BinaryOp(left=Literal.string("%"), op="||", right=pattern),
                op="||",
                right=Literal.string("%"),
            ),
        )

    def _map_function_name(self, name: str) -> str:
        """Map a function name to the dialect-specific equivalent.

        Override in subclasses to remap names (e.g. ANY_VALUE → any in ClickHouse).
        """
        return name

    def _compile_median(self, args: list[Expr]) -> str:
        """Compile MEDIAN — default uses MEDIAN(col).

        Works for Snowflake, ClickHouse, Databricks, and Dremio. Postgres overrides.
        """
        col_sql = self.compile_expr(args[0]) if args else "NULL"
        return f"MEDIAN({col_sql})"

    def _compile_mode(self, args: list[Expr]) -> str:
        """Compile MODE — default uses MODE(col).

        Works for Snowflake and Databricks. Postgres, ClickHouse, and Dremio override.
        """
        col_sql = self.compile_expr(args[0]) if args else "NULL"
        return f"MODE({col_sql})"

    def _compile_listagg(
        self,
        args: list[Expr],
        distinct: bool,
        order_by: list[OrderByItem],
        separator: str | None,
    ) -> str:
        """Compile LISTAGG — default uses LISTAGG(col, sep) WITHIN GROUP (ORDER BY ...).

        Works for Snowflake and Dremio. Postgres, ClickHouse, and Databricks override.
        """
        sep = separator if separator is not None else ","
        col_sql = self.compile_expr(args[0]) if args else "''"
        distinct_sql = "DISTINCT " if distinct else ""
        escaped_sep = sep.replace("'", "''")
        result = f"LISTAGG({distinct_sql}{col_sql}, '{escaped_sep}')"
        if order_by:
            ob = ", ".join(self.compile_order_by(o) for o in order_by)
            result += f" WITHIN GROUP (ORDER BY {ob})"
        return result

    def _compile_multi_field_count(self, args: list[Expr], distinct: bool) -> str:
        """Compile COUNT with multiple fields by concatenating with ``||``.

        Default (non-Snowflake) strategy: cast each field to VARCHAR and
        join with ``'|'`` separator so the database sees a single expression.
        Snowflake overrides this to emit native ``COUNT(col1, col2)``.
        """
        parts = [f"CAST({self.compile_expr(a)} AS VARCHAR)" for a in args]
        concat = " || '|' || ".join(parts)
        if distinct:
            return f"COUNT(DISTINCT {concat})"
        return f"COUNT({concat})"

    def compile(self, ast: Select) -> str:
        """Render a complete SQL AST to a dialect-specific string."""
        return self.compile_select(ast)

    def compile_select(self, node: Select) -> str:
        """Compile a SELECT statement."""
        parts: list[str] = []

        # CTEs
        if node.ctes:
            cte_parts = []
            for cte in node.ctes:
                if isinstance(cte.query, UnionAll):
                    cte_sql = self.compile_union_all(cte.query)
                else:
                    cte_sql = self.compile_select(cte.query)
                cte_parts.append(f"{self.quote_identifier(cte.name)} AS (\n{cte_sql}\n)")
            parts.append("WITH " + ",\n".join(cte_parts))

        # SELECT
        if node.columns:
            cols = ", ".join(self.compile_expr(c) for c in node.columns)
            parts.append(f"SELECT {cols}")
        else:
            parts.append("SELECT *")

        # FROM
        if node.from_:
            parts.append(f"FROM {self.compile_from(node.from_)}")

        # JOINs
        for join in node.joins:
            parts.append(self.compile_join(join))

        # WHERE
        if node.where:
            parts.append(f"WHERE {self.compile_expr(node.where)}")

        # GROUP BY
        if node.group_by:
            groups = ", ".join(self.compile_expr(g) for g in node.group_by)
            parts.append(f"GROUP BY {groups}")

        # HAVING
        if node.having:
            parts.append(f"HAVING {self.compile_expr(node.having)}")

        # ORDER BY
        if node.order_by:
            orders = ", ".join(self.compile_order_by(o) for o in node.order_by)
            parts.append(f"ORDER BY {orders}")

        # LIMIT
        if node.limit is not None:
            parts.append(f"LIMIT {node.limit}")

        # OFFSET
        if node.offset is not None:
            parts.append(f"OFFSET {node.offset}")

        return "\n".join(parts)

    def compile_from(self, node: From) -> str:
        if isinstance(node.source, Select):
            sub = self.compile_select(node.source)
            result = f"(\n{sub}\n)"
        else:
            result = str(node.source)
        if node.alias:
            result += f" AS {self.quote_identifier(node.alias)}"
        return result

    def compile_join(self, node: Join) -> str:
        if isinstance(node.source, Select):
            source = f"(\n{self.compile_select(node.source)}\n)"
        else:
            source = str(node.source)
        if node.alias:
            source += f" AS {self.quote_identifier(node.alias)}"

        parts = [f"{node.join_type.value} JOIN {source}"]
        if node.on:
            parts.append(f"ON {self.compile_expr(node.on)}")
        return " ".join(parts)

    def compile_order_by(self, node: OrderByItem) -> str:
        result = self.compile_expr(node.expr)
        if node.desc:
            result += " DESC"
        else:
            result += " ASC"
        if node.nulls_last is True:
            result += " NULLS LAST"
        elif node.nulls_last is False:
            result += " NULLS FIRST"
        return result

    def compile_union_all(self, node: UnionAll) -> str:
        """Compile a UNION ALL of multiple SELECT statements."""
        return "\nUNION ALL\n".join(self.compile_select(q) for q in node.queries)

    def compile_expr(self, expr: Expr) -> str:
        """Compile an expression node to SQL string."""
        match expr:
            case Literal(value=None):
                return "NULL"
            case Literal(value=True):
                return "TRUE"
            case Literal(value=False):
                return "FALSE"
            case Literal(value=v) if isinstance(v, str):
                escaped = v.replace("'", "''")
                return f"'{escaped}'"
            case Literal(value=v):
                return str(v)
            case Star(table=None):
                return "*"
            case Star(table=t) if t is not None:
                return f"{self.quote_identifier(t)}.*"
            case ColumnRef(name=name, table=None):
                return self.quote_identifier(name)
            case ColumnRef(name=name, table=table) if table is not None:
                return f"{self.quote_identifier(table)}.{self.quote_identifier(name)}"
            case AliasedExpr(expr=inner, alias=alias):
                return f"{self.compile_expr(inner)} AS {self.quote_identifier(alias)}"
            case FunctionCall(
                name=fname,
                args=args,
                distinct=distinct,
                order_by=order_by,
                separator=separator,
            ):
                # LISTAGG: dialect-specific rendering
                if fname.upper() == "LISTAGG":
                    return self._compile_listagg(args, distinct, order_by, separator)
                # MODE: dialect-specific rendering
                if fname.upper() == "MODE":
                    return self._compile_mode(args)
                # MEDIAN: dialect-specific rendering
                if fname.upper() == "MEDIAN":
                    return self._compile_median(args)
                # Multi-field COUNT: concatenate fields for portability
                # (Snowflake overrides to use native multi-arg syntax)
                if fname.upper() == "COUNT" and len(args) > 1:
                    return self._compile_multi_field_count(args, distinct)
                fname = self._map_function_name(fname)
                args_sql = ", ".join(self.compile_expr(a) for a in args)
                if distinct:
                    return f"{fname}(DISTINCT {args_sql})"
                return f"{fname}({args_sql})"
            case BinaryOp(left=left, op=op, right=right):
                return f"({self.compile_expr(left)} {op} {self.compile_expr(right)})"
            case UnaryOp(op=op, operand=operand):
                return f"({op} {self.compile_expr(operand)})"
            case IsNull(expr=inner, negated=False):
                return f"({self.compile_expr(inner)} IS NULL)"
            case IsNull(expr=inner, negated=True):
                return f"({self.compile_expr(inner)} IS NOT NULL)"
            case InList(expr=inner, values=values, negated=negated):
                vals = ", ".join(self.compile_expr(v) for v in values)
                op = "NOT IN" if negated else "IN"
                return f"({self.compile_expr(inner)} {op} ({vals}))"
            case CaseExpr(when_clauses=whens, else_clause=else_):
                parts = ["CASE"]
                for when_cond, then_val in whens:
                    parts.append(
                        f"WHEN {self.compile_expr(when_cond)} THEN {self.compile_expr(then_val)}"
                    )
                if else_ is not None:
                    parts.append(f"ELSE {self.compile_expr(else_)}")
                parts.append("END")
                return " ".join(parts)
            case Cast(expr=inner, type_name=type_name):
                return f"CAST({self.compile_expr(inner)} AS {type_name})"
            case SubqueryExpr(query=query):
                return f"(\n{self.compile_select(query)}\n)"
            case RawSQL(sql=sql):
                return sql
            case Between(expr=inner, low=low, high=high, negated=negated):
                op = "NOT BETWEEN" if negated else "BETWEEN"
                return (
                    f"({self.compile_expr(inner)} {op} "
                    f"{self.compile_expr(low)} AND {self.compile_expr(high)})"
                )
            case RelativeDateRange(
                column=column,
                unit=unit,
                count=count,
                direction=direction,
                include_current=include_current,
            ):
                return self.compile_relative_date_range(
                    column=column,
                    unit=unit,
                    count=count,
                    direction=direction,
                    include_current=include_current,
                )
            case WindowFunction(
                func_name=fname,
                args=args,
                partition_by=partition_by,
                order_by=order_by,
                distinct=distinct,
            ):
                args_sql = ", ".join(self.compile_expr(a) for a in args)
                func_sql = f"{fname}(DISTINCT {args_sql})" if distinct else f"{fname}({args_sql})"
                over_parts: list[str] = []
                if partition_by:
                    pb = ", ".join(self.compile_expr(p) for p in partition_by)
                    over_parts.append(f"PARTITION BY {pb}")
                if order_by:
                    ob = ", ".join(self.compile_order_by(o) for o in order_by)
                    over_parts.append(f"ORDER BY {ob}")
                over_clause = " ".join(over_parts)
                return f"{func_sql} OVER ({over_clause})"
            case _:
                raise ValueError(f"Unknown AST node type: {type(expr).__name__}")

    def compile_relative_date_range(
        self,
        column: Expr,
        unit: str,
        count: int,
        direction: str,
        include_current: bool,
    ) -> str:
        """Compile a relative date range predicate to SQL."""
        col_sql = self.compile_expr(column)
        base = self.current_date_sql()

        if direction == "future":
            start = base if include_current else self.date_add_sql(base, "day", 1)
            end = self.date_add_sql(start, unit, count)
        else:
            end = self.date_add_sql(base, "day", 1) if include_current else base
            start = self.date_add_sql(end, unit, -count)

        return f"({col_sql} >= {start} AND {col_sql} < {end})"

quote_identifier(name) abstractmethod

Quote an identifier per dialect rules.

Source code in src/orionbelt/dialect/base.py
@abstractmethod
def quote_identifier(self, name: str) -> str:
    """Quote an identifier per dialect rules."""

render_time_grain(column, grain) abstractmethod

Wrap a column expression for the given time grain.

Source code in src/orionbelt/dialect/base.py
@abstractmethod
def render_time_grain(self, column: Expr, grain: TimeGrain) -> Expr:
    """Wrap a column expression for the given time grain."""

render_cast(expr, target_type) abstractmethod

Render a CAST expression.

Source code in src/orionbelt/dialect/base.py
@abstractmethod
def render_cast(self, expr: Expr, target_type: str) -> Expr:
    """Render a CAST expression."""

current_date_sql() abstractmethod

Return SQL for the current date.

Source code in src/orionbelt/dialect/base.py
@abstractmethod
def current_date_sql(self) -> str:
    """Return SQL for the current date."""

date_add_sql(date_sql, unit, count) abstractmethod

Return SQL that adds count units to date_sql.

Source code in src/orionbelt/dialect/base.py
@abstractmethod
def date_add_sql(self, date_sql: str, unit: str, count: int) -> str:
    """Return SQL that adds count units to date_sql."""

render_string_contains(column, pattern)

Default: column LIKE '%' || pattern || '%'.

Source code in src/orionbelt/dialect/base.py
def render_string_contains(self, column: Expr, pattern: Expr) -> Expr:
    """Default: column LIKE '%' || pattern || '%'."""
    return BinaryOp(
        left=column,
        op="LIKE",
        right=BinaryOp(
            left=BinaryOp(left=Literal.string("%"), op="||", right=pattern),
            op="||",
            right=Literal.string("%"),
        ),
    )

compile(ast)

Render a complete SQL AST to a dialect-specific string.

Source code in src/orionbelt/dialect/base.py
def compile(self, ast: Select) -> str:
    """Render a complete SQL AST to a dialect-specific string."""
    return self.compile_select(ast)

compile_select(node)

Compile a SELECT statement.

Source code in src/orionbelt/dialect/base.py
def compile_select(self, node: Select) -> str:
    """Compile a SELECT statement."""
    parts: list[str] = []

    # CTEs
    if node.ctes:
        cte_parts = []
        for cte in node.ctes:
            if isinstance(cte.query, UnionAll):
                cte_sql = self.compile_union_all(cte.query)
            else:
                cte_sql = self.compile_select(cte.query)
            cte_parts.append(f"{self.quote_identifier(cte.name)} AS (\n{cte_sql}\n)")
        parts.append("WITH " + ",\n".join(cte_parts))

    # SELECT
    if node.columns:
        cols = ", ".join(self.compile_expr(c) for c in node.columns)
        parts.append(f"SELECT {cols}")
    else:
        parts.append("SELECT *")

    # FROM
    if node.from_:
        parts.append(f"FROM {self.compile_from(node.from_)}")

    # JOINs
    for join in node.joins:
        parts.append(self.compile_join(join))

    # WHERE
    if node.where:
        parts.append(f"WHERE {self.compile_expr(node.where)}")

    # GROUP BY
    if node.group_by:
        groups = ", ".join(self.compile_expr(g) for g in node.group_by)
        parts.append(f"GROUP BY {groups}")

    # HAVING
    if node.having:
        parts.append(f"HAVING {self.compile_expr(node.having)}")

    # ORDER BY
    if node.order_by:
        orders = ", ".join(self.compile_order_by(o) for o in node.order_by)
        parts.append(f"ORDER BY {orders}")

    # LIMIT
    if node.limit is not None:
        parts.append(f"LIMIT {node.limit}")

    # OFFSET
    if node.offset is not None:
        parts.append(f"OFFSET {node.offset}")

    return "\n".join(parts)

compile_union_all(node)

Compile a UNION ALL of multiple SELECT statements.

Source code in src/orionbelt/dialect/base.py
def compile_union_all(self, node: UnionAll) -> str:
    """Compile a UNION ALL of multiple SELECT statements."""
    return "\nUNION ALL\n".join(self.compile_select(q) for q in node.queries)

compile_expr(expr)

Compile an expression node to SQL string.

Source code in src/orionbelt/dialect/base.py
def compile_expr(self, expr: Expr) -> str:
    """Compile an expression node to SQL string."""
    match expr:
        case Literal(value=None):
            return "NULL"
        case Literal(value=True):
            return "TRUE"
        case Literal(value=False):
            return "FALSE"
        case Literal(value=v) if isinstance(v, str):
            escaped = v.replace("'", "''")
            return f"'{escaped}'"
        case Literal(value=v):
            return str(v)
        case Star(table=None):
            return "*"
        case Star(table=t) if t is not None:
            return f"{self.quote_identifier(t)}.*"
        case ColumnRef(name=name, table=None):
            return self.quote_identifier(name)
        case ColumnRef(name=name, table=table) if table is not None:
            return f"{self.quote_identifier(table)}.{self.quote_identifier(name)}"
        case AliasedExpr(expr=inner, alias=alias):
            return f"{self.compile_expr(inner)} AS {self.quote_identifier(alias)}"
        case FunctionCall(
            name=fname,
            args=args,
            distinct=distinct,
            order_by=order_by,
            separator=separator,
        ):
            # LISTAGG: dialect-specific rendering
            if fname.upper() == "LISTAGG":
                return self._compile_listagg(args, distinct, order_by, separator)
            # MODE: dialect-specific rendering
            if fname.upper() == "MODE":
                return self._compile_mode(args)
            # MEDIAN: dialect-specific rendering
            if fname.upper() == "MEDIAN":
                return self._compile_median(args)
            # Multi-field COUNT: concatenate fields for portability
            # (Snowflake overrides to use native multi-arg syntax)
            if fname.upper() == "COUNT" and len(args) > 1:
                return self._compile_multi_field_count(args, distinct)
            fname = self._map_function_name(fname)
            args_sql = ", ".join(self.compile_expr(a) for a in args)
            if distinct:
                return f"{fname}(DISTINCT {args_sql})"
            return f"{fname}({args_sql})"
        case BinaryOp(left=left, op=op, right=right):
            return f"({self.compile_expr(left)} {op} {self.compile_expr(right)})"
        case UnaryOp(op=op, operand=operand):
            return f"({op} {self.compile_expr(operand)})"
        case IsNull(expr=inner, negated=False):
            return f"({self.compile_expr(inner)} IS NULL)"
        case IsNull(expr=inner, negated=True):
            return f"({self.compile_expr(inner)} IS NOT NULL)"
        case InList(expr=inner, values=values, negated=negated):
            vals = ", ".join(self.compile_expr(v) for v in values)
            op = "NOT IN" if negated else "IN"
            return f"({self.compile_expr(inner)} {op} ({vals}))"
        case CaseExpr(when_clauses=whens, else_clause=else_):
            parts = ["CASE"]
            for when_cond, then_val in whens:
                parts.append(
                    f"WHEN {self.compile_expr(when_cond)} THEN {self.compile_expr(then_val)}"
                )
            if else_ is not None:
                parts.append(f"ELSE {self.compile_expr(else_)}")
            parts.append("END")
            return " ".join(parts)
        case Cast(expr=inner, type_name=type_name):
            return f"CAST({self.compile_expr(inner)} AS {type_name})"
        case SubqueryExpr(query=query):
            return f"(\n{self.compile_select(query)}\n)"
        case RawSQL(sql=sql):
            return sql
        case Between(expr=inner, low=low, high=high, negated=negated):
            op = "NOT BETWEEN" if negated else "BETWEEN"
            return (
                f"({self.compile_expr(inner)} {op} "
                f"{self.compile_expr(low)} AND {self.compile_expr(high)})"
            )
        case RelativeDateRange(
            column=column,
            unit=unit,
            count=count,
            direction=direction,
            include_current=include_current,
        ):
            return self.compile_relative_date_range(
                column=column,
                unit=unit,
                count=count,
                direction=direction,
                include_current=include_current,
            )
        case WindowFunction(
            func_name=fname,
            args=args,
            partition_by=partition_by,
            order_by=order_by,
            distinct=distinct,
        ):
            args_sql = ", ".join(self.compile_expr(a) for a in args)
            func_sql = f"{fname}(DISTINCT {args_sql})" if distinct else f"{fname}({args_sql})"
            over_parts: list[str] = []
            if partition_by:
                pb = ", ".join(self.compile_expr(p) for p in partition_by)
                over_parts.append(f"PARTITION BY {pb}")
            if order_by:
                ob = ", ".join(self.compile_order_by(o) for o in order_by)
                over_parts.append(f"ORDER BY {ob}")
            over_clause = " ".join(over_parts)
            return f"{func_sql} OVER ({over_clause})"
        case _:
            raise ValueError(f"Unknown AST node type: {type(expr).__name__}")

compile_relative_date_range(column, unit, count, direction, include_current)

Compile a relative date range predicate to SQL.

Source code in src/orionbelt/dialect/base.py
def compile_relative_date_range(
    self,
    column: Expr,
    unit: str,
    count: int,
    direction: str,
    include_current: bool,
) -> str:
    """Compile a relative date range predicate to SQL."""
    col_sql = self.compile_expr(column)
    base = self.current_date_sql()

    if direction == "future":
        start = base if include_current else self.date_add_sql(base, "day", 1)
        end = self.date_add_sql(start, unit, count)
    else:
        end = self.date_add_sql(base, "day", 1) if include_current else base
        start = self.date_add_sql(end, unit, -count)

    return f"({col_sql} >= {start} AND {col_sql} < {end})"

orionbelt.dialect.base.DialectCapabilities dataclass

Flags indicating what SQL features a dialect supports.

Source code in src/orionbelt/dialect/base.py
@dataclass
class DialectCapabilities:
    """Flags indicating what SQL features a dialect supports."""

    supports_cte: bool = True
    supports_qualify: bool = False
    supports_arrays: bool = False
    supports_window_filters: bool = False
    supports_ilike: bool = False
    supports_time_travel: bool = False
    supports_semi_structured: bool = False

Dialect Registry

orionbelt.dialect.registry.DialectRegistry

Registry for SQL dialect plugins.

Source code in src/orionbelt/dialect/registry.py
class DialectRegistry:
    """Registry for SQL dialect plugins."""

    _dialects: dict[str, type[Dialect]] = {}

    @classmethod
    def register(cls, dialect_class: type[Dialect]) -> type[Dialect]:
        """Register a dialect class. Can be used as a decorator."""
        # Instantiate to read the name property
        instance = dialect_class()
        cls._dialects[instance.name] = dialect_class
        return dialect_class

    @classmethod
    def get(cls, name: str) -> Dialect:
        """Get an instance of the named dialect."""
        if name not in cls._dialects:
            raise UnsupportedDialectError(name, available=cls.available())
        return cls._dialects[name]()

    @classmethod
    def available(cls) -> list[str]:
        """List registered dialect names."""
        return sorted(cls._dialects.keys())

    @classmethod
    def reset(cls) -> None:
        """Clear all registered dialects (for testing)."""
        cls._dialects.clear()

get(name) classmethod

Get an instance of the named dialect.

Source code in src/orionbelt/dialect/registry.py
@classmethod
def get(cls, name: str) -> Dialect:
    """Get an instance of the named dialect."""
    if name not in cls._dialects:
        raise UnsupportedDialectError(name, available=cls.available())
    return cls._dialects[name]()

available() classmethod

List registered dialect names.

Source code in src/orionbelt/dialect/registry.py
@classmethod
def available(cls) -> list[str]:
    """List registered dialect names."""
    return sorted(cls._dialects.keys())

register(dialect_class) classmethod

Register a dialect class. Can be used as a decorator.

Source code in src/orionbelt/dialect/registry.py
@classmethod
def register(cls, dialect_class: type[Dialect]) -> type[Dialect]:
    """Register a dialect class. Can be used as a decorator."""
    # Instantiate to read the name property
    instance = dialect_class()
    cls._dialects[instance.name] = dialect_class
    return dialect_class

YAML Parser

orionbelt.parser.loader.TrackedLoader

YAML loader that tracks source positions for error reporting.

Uses ruamel.yaml which preserves line/column info on every parsed node.

Source code in src/orionbelt/parser/loader.py
class TrackedLoader:
    """YAML loader that tracks source positions for error reporting.

    Uses ruamel.yaml which preserves line/column info on every parsed node.
    """

    def __init__(self) -> None:
        self._yaml = YAML()
        self._yaml.preserve_quotes = True

    def load(self, path: Path) -> tuple[dict[str, Any], SourceMap]:
        """Load a YAML file and return parsed dict + source position map."""
        with path.open("r", encoding="utf-8") as handle:
            data = self._yaml.load(handle)
        if data is None:
            return {}, SourceMap()
        source_map = SourceMap()
        self._extract_positions(data, str(path), "", source_map)
        return self._to_plain_dict(data), source_map

    def load_string(
        self, content: str, filename: str = "<string>"
    ) -> tuple[dict[str, Any], SourceMap]:
        """Load YAML from a string."""
        data = self._yaml.load(content)
        if data is None:
            return {}, SourceMap()
        source_map = SourceMap()
        self._extract_positions(data, filename, "", source_map)
        return self._to_plain_dict(data), source_map

    def load_model_directory(self, root: Path) -> tuple[dict[str, Any], SourceMap]:
        """Load a model directory: model.yaml + facts/*.yaml + dimensions/*.yaml + measures/*.yaml.

        Returns a merged dict with all artifacts and a combined source map.
        """
        merged: dict[str, Any] = {}
        combined_map = SourceMap()

        # Load model.yaml (root file)
        model_file = root / "model.yaml"
        if model_file.exists():
            data, smap = self.load(model_file)
            merged.update(data)
            combined_map.merge(smap)

        # Load subdirectory YAML files
        for subdir in ("facts", "dimensions", "measures", "macros", "policies"):
            subdir_path = root / subdir
            if subdir_path.is_dir():
                section: dict[str, Any] = merged.get(subdir, {})
                for yaml_file in sorted(subdir_path.glob("*.yaml")):
                    data, smap = self.load(yaml_file)
                    if isinstance(data, dict):
                        # Use the filename stem as key if the file is a single artifact
                        if "name" in data:
                            section[data["name"]] = data
                        else:
                            section.update(data)
                    combined_map.merge(smap)
                if section:
                    merged[subdir] = section

        return merged, combined_map

    def _extract_positions(
        self,
        data: Any,
        filename: str,
        prefix: str,
        source_map: SourceMap,
    ) -> None:
        """Recursively extract source positions from ruamel.yaml nodes."""
        if isinstance(data, CommentedMap):
            for key in data:
                key_path = f"{prefix}.{key}" if prefix else str(key)
                # Try to get position for this key from ruamel.yaml's lc object
                try:
                    lc = data.lc
                    # lc.key() returns a callable in newer ruamel.yaml
                    key_positions = lc.key(key)
                    if key_positions:
                        line, col = key_positions
                        source_map.add(
                            key_path,
                            SourceSpan(file=filename, line=line + 1, column=col + 1),
                        )
                except (AttributeError, KeyError, TypeError):
                    # Fallback: use the map's own position
                    try:
                        lc = data.lc
                        source_map.add(
                            key_path,
                            SourceSpan(file=filename, line=lc.line + 1, column=lc.col + 1),
                        )
                    except (AttributeError, TypeError):
                        pass
                self._extract_positions(data[key], filename, key_path, source_map)
        elif isinstance(data, CommentedSeq):
            for i, item in enumerate(data):
                item_path = f"{prefix}[{i}]"
                try:
                    lc = data.lc
                    item_pos = lc.item(i)
                    if item_pos:
                        line, col = item_pos
                        source_map.add(
                            item_path,
                            SourceSpan(file=filename, line=line + 1, column=col + 1),
                        )
                except (AttributeError, KeyError, TypeError):
                    pass
                self._extract_positions(item, filename, item_path, source_map)

    def _to_plain_dict(self, data: Any) -> dict[str, Any]:
        """Convert ruamel.yaml CommentedMap/Seq to plain Python dict/list."""
        if isinstance(data, CommentedMap):
            return {str(k): self._to_plain_value(v) for k, v in data.items()}
        if isinstance(data, dict):
            return {str(k): self._to_plain_value(v) for k, v in data.items()}
        return {}

    def _to_plain_value(self, data: Any) -> Any:
        if isinstance(data, CommentedMap):
            return {str(k): self._to_plain_value(v) for k, v in data.items()}
        if isinstance(data, CommentedSeq):
            return [self._to_plain_value(item) for item in data]
        if isinstance(data, dict):
            return {str(k): self._to_plain_value(v) for k, v in data.items()}
        if isinstance(data, list):
            return [self._to_plain_value(item) for item in data]
        return data

load(path)

Load a YAML file and return parsed dict + source position map.

Source code in src/orionbelt/parser/loader.py
def load(self, path: Path) -> tuple[dict[str, Any], SourceMap]:
    """Load a YAML file and return parsed dict + source position map."""
    with path.open("r", encoding="utf-8") as handle:
        data = self._yaml.load(handle)
    if data is None:
        return {}, SourceMap()
    source_map = SourceMap()
    self._extract_positions(data, str(path), "", source_map)
    return self._to_plain_dict(data), source_map

load_string(content, filename='<string>')

Load YAML from a string.

Source code in src/orionbelt/parser/loader.py
def load_string(
    self, content: str, filename: str = "<string>"
) -> tuple[dict[str, Any], SourceMap]:
    """Load YAML from a string."""
    data = self._yaml.load(content)
    if data is None:
        return {}, SourceMap()
    source_map = SourceMap()
    self._extract_positions(data, filename, "", source_map)
    return self._to_plain_dict(data), source_map

load_model_directory(root)

Load a model directory: model.yaml + facts/.yaml + dimensions/.yaml + measures/*.yaml.

Returns a merged dict with all artifacts and a combined source map.

Source code in src/orionbelt/parser/loader.py
def load_model_directory(self, root: Path) -> tuple[dict[str, Any], SourceMap]:
    """Load a model directory: model.yaml + facts/*.yaml + dimensions/*.yaml + measures/*.yaml.

    Returns a merged dict with all artifacts and a combined source map.
    """
    merged: dict[str, Any] = {}
    combined_map = SourceMap()

    # Load model.yaml (root file)
    model_file = root / "model.yaml"
    if model_file.exists():
        data, smap = self.load(model_file)
        merged.update(data)
        combined_map.merge(smap)

    # Load subdirectory YAML files
    for subdir in ("facts", "dimensions", "measures", "macros", "policies"):
        subdir_path = root / subdir
        if subdir_path.is_dir():
            section: dict[str, Any] = merged.get(subdir, {})
            for yaml_file in sorted(subdir_path.glob("*.yaml")):
                data, smap = self.load(yaml_file)
                if isinstance(data, dict):
                    # Use the filename stem as key if the file is a single artifact
                    if "name" in data:
                        section[data["name"]] = data
                    else:
                        section.update(data)
                combined_map.merge(smap)
            if section:
                merged[subdir] = section

    return merged, combined_map

Reference Resolver

orionbelt.parser.resolver.ReferenceResolver

Resolves all references in a raw YAML model to a fully-typed SemanticModel.

Source code in src/orionbelt/parser/resolver.py
class ReferenceResolver:
    """Resolves all references in a raw YAML model to a fully-typed SemanticModel."""

    def resolve(
        self,
        raw: dict[str, Any],
        source_map: SourceMap | None = None,
    ) -> tuple[SemanticModel, ValidationResult]:
        """Resolve raw YAML dict into a validated SemanticModel.

        Returns (model, validation_result). If there are errors,
        the model may be partially populated.
        """
        errors: list[SemanticError] = []
        warnings: list[SemanticError] = []

        # Parse data objects
        data_objects: dict[str, DataObject] = {}
        raw_objects = raw.get("dataObjects", {})
        if not isinstance(raw_objects, dict):
            errors.append(
                SemanticError(
                    code="DATA_OBJECT_PARSE_ERROR",
                    message="'dataObjects' must be a YAML mapping, not a list or scalar",
                    path="dataObjects",
                )
            )
            raw_objects = {}
        for name, raw_obj in raw_objects.items():
            try:
                obj_columns: dict[str, DataObjectColumn] = {}
                for fname, fdata in raw_obj.get("columns", {}).items():
                    obj_columns[fname] = DataObjectColumn(
                        label=fname,
                        code=fdata.get("code", fname),
                        abstract_type=fdata.get("abstractType", "string"),
                        sql_type=fdata.get("sqlType"),
                        sql_precision=fdata.get("sqlPrecision"),
                        sql_scale=fdata.get("sqlScale"),
                        comment=fdata.get("comment"),
                        custom_extensions=_parse_extensions(fdata),
                    )

                obj_joins: list[DataObjectJoin] = []
                for jdata in raw_obj.get("joins", []):
                    obj_joins.append(
                        DataObjectJoin(
                            join_type=jdata["joinType"],
                            join_to=jdata["joinTo"],
                            columns_from=jdata["columnsFrom"],
                            columns_to=jdata["columnsTo"],
                            secondary=jdata.get("secondary", False),
                            path_name=jdata.get("pathName"),
                        )
                    )

                data_objects[name] = DataObject(
                    label=name,
                    code=raw_obj.get("code", ""),
                    database=raw_obj.get("database", ""),
                    schema_name=raw_obj.get("schema", ""),
                    columns=obj_columns,
                    joins=obj_joins,
                    comment=raw_obj.get("comment"),
                    custom_extensions=_parse_extensions(raw_obj),
                )
            except Exception as e:
                span = source_map.get(f"dataObjects.{name}") if source_map else None
                errors.append(
                    SemanticError(
                        code="DATA_OBJECT_PARSE_ERROR",
                        message=f"Failed to parse data object '{name}': {e}",
                        path=f"dataObjects.{name}",
                        span=span,
                    )
                )

        # Parse dimensions
        dimensions: dict[str, Dimension] = {}
        raw_dims = raw.get("dimensions", {})
        if not isinstance(raw_dims, dict):
            errors.append(
                SemanticError(
                    code="DIMENSION_PARSE_ERROR",
                    message="'dimensions' must be a YAML mapping, not a list or scalar",
                    path="dimensions",
                )
            )
            raw_dims = {}
        for name, raw_dim in raw_dims.items():
            try:
                data_object = raw_dim.get("dataObject")
                column = raw_dim.get("column")

                # Validate the data object exists
                if data_object and data_object not in data_objects:
                    span = source_map.get(f"dimensions.{name}") if source_map else None
                    errors.append(
                        SemanticError(
                            code="UNKNOWN_DATA_OBJECT",
                            message=(
                                f"Dimension '{name}' references unknown data object '{data_object}'"
                            ),
                            path=f"dimensions.{name}",
                            span=span,
                            suggestions=_suggest_similar(data_object, list(data_objects.keys())),
                        )
                    )

                # Validate the column exists in the data object
                if (
                    data_object
                    and column
                    and data_object in data_objects
                    and column not in data_objects[data_object].columns
                ):
                    span = source_map.get(f"dimensions.{name}") if source_map else None
                    errors.append(
                        SemanticError(
                            code="UNKNOWN_COLUMN",
                            message=(
                                f"Dimension '{name}' references unknown column "
                                f"'{column}' in data object '{data_object}'"
                            ),
                            path=f"dimensions.{name}",
                            span=span,
                            suggestions=_suggest_similar(
                                column, list(data_objects[data_object].columns.keys())
                            ),
                        )
                    )

                dimensions[name] = Dimension(
                    label=name,
                    view=data_object or "",
                    column=column or "",
                    result_type=raw_dim.get("resultType", "string"),
                    time_grain=raw_dim.get("timeGrain"),
                    format=raw_dim.get("format"),
                    custom_extensions=_parse_extensions(raw_dim),
                )
            except Exception as e:
                span = source_map.get(f"dimensions.{name}") if source_map else None
                errors.append(
                    SemanticError(
                        code="DIMENSION_PARSE_ERROR",
                        message=f"Failed to parse dimension '{name}': {e}",
                        path=f"dimensions.{name}",
                        span=span,
                    )
                )

        # Parse measures
        measures: dict[str, Measure] = {}
        raw_measures = raw.get("measures", {})
        if not isinstance(raw_measures, dict):
            errors.append(
                SemanticError(
                    code="MEASURE_PARSE_ERROR",
                    message="'measures' must be a YAML mapping, not a list or scalar",
                    path="measures",
                )
            )
            raw_measures = {}
        for name, raw_meas in raw_measures.items():
            try:
                measure_columns: list[DataColumnRef] = []
                for fdata in raw_meas.get("columns", []):
                    measure_columns.append(
                        DataColumnRef(
                            view=fdata.get("dataObject"),
                            column=fdata.get("column"),
                        )
                    )

                # Resolve expression field references
                expression = raw_meas.get("expression")
                if expression:
                    self._validate_expression_refs(
                        name, expression, data_objects, errors, source_map
                    )

                mfilter = None
                raw_filter = raw_meas.get("filter")
                if raw_filter:
                    filter_values = []
                    for vdata in raw_filter.get("values", []):
                        filter_values.append(
                            FilterValue(
                                data_type=vdata.get("dataType", "string"),
                                is_null=vdata.get("isNull"),
                                value_string=vdata.get("valueString"),
                                value_int=vdata.get("valueInt"),
                                value_float=vdata.get("valueFloat"),
                                value_date=vdata.get("valueDate"),
                                value_boolean=vdata.get("valueBoolean"),
                            )
                        )
                    filter_column = None
                    if "column" in raw_filter:
                        filter_column = DataColumnRef(
                            view=raw_filter["column"].get("dataObject"),
                            column=raw_filter["column"].get("column"),
                        )
                    mfilter = MeasureFilter(
                        column=filter_column,
                        operator=raw_filter.get("operator", "equals"),
                        values=filter_values,
                    )

                measures[name] = Measure(
                    label=name,
                    columns=measure_columns,
                    result_type=raw_meas.get("resultType", "float"),
                    aggregation=raw_meas.get("aggregation", "sum"),
                    expression=expression,
                    distinct=raw_meas.get("distinct", False),
                    total=raw_meas.get("total", False),
                    filter=mfilter,
                    format=raw_meas.get("format"),
                    allow_fan_out=raw_meas.get("allowFanOut", False),
                    custom_extensions=_parse_extensions(raw_meas),
                )
            except Exception as e:
                span = source_map.get(f"measures.{name}") if source_map else None
                errors.append(
                    SemanticError(
                        code="MEASURE_PARSE_ERROR",
                        message=f"Failed to parse measure '{name}': {e}",
                        path=f"measures.{name}",
                        span=span,
                    )
                )

        # Parse metrics
        metrics: dict[str, Metric] = {}
        raw_metrics = raw.get("metrics", {})
        if not isinstance(raw_metrics, dict):
            errors.append(
                SemanticError(
                    code="METRIC_PARSE_ERROR",
                    message="'metrics' must be a YAML mapping, not a list or scalar",
                    path="metrics",
                )
            )
            raw_metrics = {}
        for name, raw_metric in raw_metrics.items():
            try:
                # Validate measure references in expression
                expression = raw_metric.get("expression", "")
                self._validate_metric_expression_refs(
                    name, expression, measures, errors, source_map
                )

                metrics[name] = Metric(
                    label=name,
                    expression=expression,
                    format=raw_metric.get("format"),
                    custom_extensions=_parse_extensions(raw_metric),
                )
            except Exception as e:
                span = source_map.get(f"metrics.{name}") if source_map else None
                errors.append(
                    SemanticError(
                        code="METRIC_PARSE_ERROR",
                        message=f"Failed to parse metric '{name}': {e}",
                        path=f"metrics.{name}",
                        span=span,
                    )
                )

        model = SemanticModel(
            version=raw.get("version", 1.0),
            data_objects=data_objects,
            dimensions=dimensions,
            measures=measures,
            metrics=metrics,
            custom_extensions=_parse_extensions(raw),
        )

        result = ValidationResult(
            valid=len(errors) == 0,
            errors=errors,
            warnings=warnings,
        )

        return model, result

    def _validate_expression_refs(
        self,
        measure_name: str,
        expression: str,
        data_objects: dict[str, DataObject],
        errors: list[SemanticError],
        source_map: SourceMap | None,
    ) -> None:
        """Validate {[DataObject].[Column]} references in a measure expression."""
        named_refs = re.findall(r"\{\[([^\]]+)\]\.\[([^\]]+)\]\}", expression)
        for obj_name, col_name in named_refs:
            span = source_map.get(f"measures.{measure_name}.expression") if source_map else None
            if obj_name not in data_objects:
                errors.append(
                    SemanticError(
                        code="UNKNOWN_DATA_OBJECT_IN_EXPRESSION",
                        message=(
                            f"Measure '{measure_name}' expression references unknown "
                            f"data object '{obj_name}'"
                        ),
                        path=f"measures.{measure_name}.expression",
                        span=span,
                    )
                )
            elif col_name not in data_objects[obj_name].columns:
                errors.append(
                    SemanticError(
                        code="UNKNOWN_COLUMN_IN_EXPRESSION",
                        message=(
                            f"Measure '{measure_name}' expression references unknown column "
                            f"'{col_name}' in data object '{obj_name}'"
                        ),
                        path=f"measures.{measure_name}.expression",
                        span=span,
                    )
                )

    def _validate_metric_expression_refs(
        self,
        metric_name: str,
        expression: str,
        measures: dict[str, Measure],
        errors: list[SemanticError],
        source_map: SourceMap | None,
    ) -> None:
        """Validate {[Measure Name]} references in a metric expression."""
        # Extract all {[Name]} references from the expression
        named_refs = re.findall(r"\{\[([^\]]+)\]\}", expression)
        for ref_name in named_refs:
            if ref_name not in measures:
                span = source_map.get(f"metrics.{metric_name}.expression") if source_map else None
                errors.append(
                    SemanticError(
                        code="UNKNOWN_MEASURE_REF",
                        message=(f"Metric '{metric_name}' references unknown measure '{ref_name}'"),
                        path=f"metrics.{metric_name}.expression",
                        span=span,
                        suggestions=_suggest_similar(ref_name, list(measures.keys())),
                    )
                )

resolve(raw, source_map=None)

Resolve raw YAML dict into a validated SemanticModel.

Returns (model, validation_result). If there are errors, the model may be partially populated.

Source code in src/orionbelt/parser/resolver.py
def resolve(
    self,
    raw: dict[str, Any],
    source_map: SourceMap | None = None,
) -> tuple[SemanticModel, ValidationResult]:
    """Resolve raw YAML dict into a validated SemanticModel.

    Returns (model, validation_result). If there are errors,
    the model may be partially populated.
    """
    errors: list[SemanticError] = []
    warnings: list[SemanticError] = []

    # Parse data objects
    data_objects: dict[str, DataObject] = {}
    raw_objects = raw.get("dataObjects", {})
    if not isinstance(raw_objects, dict):
        errors.append(
            SemanticError(
                code="DATA_OBJECT_PARSE_ERROR",
                message="'dataObjects' must be a YAML mapping, not a list or scalar",
                path="dataObjects",
            )
        )
        raw_objects = {}
    for name, raw_obj in raw_objects.items():
        try:
            obj_columns: dict[str, DataObjectColumn] = {}
            for fname, fdata in raw_obj.get("columns", {}).items():
                obj_columns[fname] = DataObjectColumn(
                    label=fname,
                    code=fdata.get("code", fname),
                    abstract_type=fdata.get("abstractType", "string"),
                    sql_type=fdata.get("sqlType"),
                    sql_precision=fdata.get("sqlPrecision"),
                    sql_scale=fdata.get("sqlScale"),
                    comment=fdata.get("comment"),
                    custom_extensions=_parse_extensions(fdata),
                )

            obj_joins: list[DataObjectJoin] = []
            for jdata in raw_obj.get("joins", []):
                obj_joins.append(
                    DataObjectJoin(
                        join_type=jdata["joinType"],
                        join_to=jdata["joinTo"],
                        columns_from=jdata["columnsFrom"],
                        columns_to=jdata["columnsTo"],
                        secondary=jdata.get("secondary", False),
                        path_name=jdata.get("pathName"),
                    )
                )

            data_objects[name] = DataObject(
                label=name,
                code=raw_obj.get("code", ""),
                database=raw_obj.get("database", ""),
                schema_name=raw_obj.get("schema", ""),
                columns=obj_columns,
                joins=obj_joins,
                comment=raw_obj.get("comment"),
                custom_extensions=_parse_extensions(raw_obj),
            )
        except Exception as e:
            span = source_map.get(f"dataObjects.{name}") if source_map else None
            errors.append(
                SemanticError(
                    code="DATA_OBJECT_PARSE_ERROR",
                    message=f"Failed to parse data object '{name}': {e}",
                    path=f"dataObjects.{name}",
                    span=span,
                )
            )

    # Parse dimensions
    dimensions: dict[str, Dimension] = {}
    raw_dims = raw.get("dimensions", {})
    if not isinstance(raw_dims, dict):
        errors.append(
            SemanticError(
                code="DIMENSION_PARSE_ERROR",
                message="'dimensions' must be a YAML mapping, not a list or scalar",
                path="dimensions",
            )
        )
        raw_dims = {}
    for name, raw_dim in raw_dims.items():
        try:
            data_object = raw_dim.get("dataObject")
            column = raw_dim.get("column")

            # Validate the data object exists
            if data_object and data_object not in data_objects:
                span = source_map.get(f"dimensions.{name}") if source_map else None
                errors.append(
                    SemanticError(
                        code="UNKNOWN_DATA_OBJECT",
                        message=(
                            f"Dimension '{name}' references unknown data object '{data_object}'"
                        ),
                        path=f"dimensions.{name}",
                        span=span,
                        suggestions=_suggest_similar(data_object, list(data_objects.keys())),
                    )
                )

            # Validate the column exists in the data object
            if (
                data_object
                and column
                and data_object in data_objects
                and column not in data_objects[data_object].columns
            ):
                span = source_map.get(f"dimensions.{name}") if source_map else None
                errors.append(
                    SemanticError(
                        code="UNKNOWN_COLUMN",
                        message=(
                            f"Dimension '{name}' references unknown column "
                            f"'{column}' in data object '{data_object}'"
                        ),
                        path=f"dimensions.{name}",
                        span=span,
                        suggestions=_suggest_similar(
                            column, list(data_objects[data_object].columns.keys())
                        ),
                    )
                )

            dimensions[name] = Dimension(
                label=name,
                view=data_object or "",
                column=column or "",
                result_type=raw_dim.get("resultType", "string"),
                time_grain=raw_dim.get("timeGrain"),
                format=raw_dim.get("format"),
                custom_extensions=_parse_extensions(raw_dim),
            )
        except Exception as e:
            span = source_map.get(f"dimensions.{name}") if source_map else None
            errors.append(
                SemanticError(
                    code="DIMENSION_PARSE_ERROR",
                    message=f"Failed to parse dimension '{name}': {e}",
                    path=f"dimensions.{name}",
                    span=span,
                )
            )

    # Parse measures
    measures: dict[str, Measure] = {}
    raw_measures = raw.get("measures", {})
    if not isinstance(raw_measures, dict):
        errors.append(
            SemanticError(
                code="MEASURE_PARSE_ERROR",
                message="'measures' must be a YAML mapping, not a list or scalar",
                path="measures",
            )
        )
        raw_measures = {}
    for name, raw_meas in raw_measures.items():
        try:
            measure_columns: list[DataColumnRef] = []
            for fdata in raw_meas.get("columns", []):
                measure_columns.append(
                    DataColumnRef(
                        view=fdata.get("dataObject"),
                        column=fdata.get("column"),
                    )
                )

            # Resolve expression field references
            expression = raw_meas.get("expression")
            if expression:
                self._validate_expression_refs(
                    name, expression, data_objects, errors, source_map
                )

            mfilter = None
            raw_filter = raw_meas.get("filter")
            if raw_filter:
                filter_values = []
                for vdata in raw_filter.get("values", []):
                    filter_values.append(
                        FilterValue(
                            data_type=vdata.get("dataType", "string"),
                            is_null=vdata.get("isNull"),
                            value_string=vdata.get("valueString"),
                            value_int=vdata.get("valueInt"),
                            value_float=vdata.get("valueFloat"),
                            value_date=vdata.get("valueDate"),
                            value_boolean=vdata.get("valueBoolean"),
                        )
                    )
                filter_column = None
                if "column" in raw_filter:
                    filter_column = DataColumnRef(
                        view=raw_filter["column"].get("dataObject"),
                        column=raw_filter["column"].get("column"),
                    )
                mfilter = MeasureFilter(
                    column=filter_column,
                    operator=raw_filter.get("operator", "equals"),
                    values=filter_values,
                )

            measures[name] = Measure(
                label=name,
                columns=measure_columns,
                result_type=raw_meas.get("resultType", "float"),
                aggregation=raw_meas.get("aggregation", "sum"),
                expression=expression,
                distinct=raw_meas.get("distinct", False),
                total=raw_meas.get("total", False),
                filter=mfilter,
                format=raw_meas.get("format"),
                allow_fan_out=raw_meas.get("allowFanOut", False),
                custom_extensions=_parse_extensions(raw_meas),
            )
        except Exception as e:
            span = source_map.get(f"measures.{name}") if source_map else None
            errors.append(
                SemanticError(
                    code="MEASURE_PARSE_ERROR",
                    message=f"Failed to parse measure '{name}': {e}",
                    path=f"measures.{name}",
                    span=span,
                )
            )

    # Parse metrics
    metrics: dict[str, Metric] = {}
    raw_metrics = raw.get("metrics", {})
    if not isinstance(raw_metrics, dict):
        errors.append(
            SemanticError(
                code="METRIC_PARSE_ERROR",
                message="'metrics' must be a YAML mapping, not a list or scalar",
                path="metrics",
            )
        )
        raw_metrics = {}
    for name, raw_metric in raw_metrics.items():
        try:
            # Validate measure references in expression
            expression = raw_metric.get("expression", "")
            self._validate_metric_expression_refs(
                name, expression, measures, errors, source_map
            )

            metrics[name] = Metric(
                label=name,
                expression=expression,
                format=raw_metric.get("format"),
                custom_extensions=_parse_extensions(raw_metric),
            )
        except Exception as e:
            span = source_map.get(f"metrics.{name}") if source_map else None
            errors.append(
                SemanticError(
                    code="METRIC_PARSE_ERROR",
                    message=f"Failed to parse metric '{name}': {e}",
                    path=f"metrics.{name}",
                    span=span,
                )
            )

    model = SemanticModel(
        version=raw.get("version", 1.0),
        data_objects=data_objects,
        dimensions=dimensions,
        measures=measures,
        metrics=metrics,
        custom_extensions=_parse_extensions(raw),
    )

    result = ValidationResult(
        valid=len(errors) == 0,
        errors=errors,
        warnings=warnings,
    )

    return model, result

Semantic Validator

orionbelt.parser.validator.SemanticValidator

Validates semantic rules from spec §3.8.

Source code in src/orionbelt/parser/validator.py
class SemanticValidator:
    """Validates semantic rules from spec §3.8."""

    def validate(self, model: SemanticModel) -> list[SemanticError]:
        errors: list[SemanticError] = []
        errors.extend(self._check_unique_identifiers(model))
        errors.extend(self._check_unique_column_names(model))
        errors.extend(self._check_secondary_joins(model))
        errors.extend(self._check_no_cyclic_joins(model))
        errors.extend(self._check_no_multipath_joins(model))
        errors.extend(self._check_measures_resolve(model))
        errors.extend(self._check_join_targets_exist(model))
        errors.extend(self._check_references_resolve(model))
        return errors

    def _check_unique_identifiers(self, model: SemanticModel) -> list[SemanticError]:
        """Ensure no duplicate names across data objects, dimensions, measures, metrics."""
        errors: list[SemanticError] = []
        all_names: dict[str, str] = {}  # name -> type

        def _register(name: str, kind: str, path: str) -> None:
            existing = all_names.get(name)
            if existing is not None:
                errors.append(
                    SemanticError(
                        code="DUPLICATE_IDENTIFIER",
                        message=(
                            f"{kind.title()} '{name}' conflicts with existing {existing} '{name}'"
                        ),
                        path=path,
                    )
                )
            all_names[name] = kind

        for name in model.data_objects:
            _register(name, "dataObject", f"dataObjects.{name}")

        for name in model.dimensions:
            _register(name, "dimension", f"dimensions.{name}")

        for name in model.measures:
            _register(name, "measure", f"measures.{name}")

        for name in model.metrics:
            _register(name, "metric", f"metrics.{name}")

        return errors

    def _check_unique_column_names(self, model: SemanticModel) -> list[SemanticError]:
        """Column names must be unique within each data object.

        Since columns are stored as dict keys, YAML parsers silently drop
        duplicates. This validator therefore returns no errors — uniqueness
        is structurally enforced by the dict representation. The method is
        retained as a hook for future stricter duplicate-key detection at
        the YAML parse level.
        """
        return []

    def _check_secondary_joins(self, model: SemanticModel) -> list[SemanticError]:
        """Validate secondary join constraints.

        - Every secondary join MUST have a pathName.
        - pathName must be unique per (source, target) pair.
        """
        errors: list[SemanticError] = []
        # Track pathName per (source, target) pair
        path_names: dict[tuple[str, str], set[str]] = {}

        for obj_name, obj in model.data_objects.items():
            for i, join in enumerate(obj.joins):
                if join.secondary and not join.path_name:
                    errors.append(
                        SemanticError(
                            code="SECONDARY_JOIN_MISSING_PATH_NAME",
                            message=(
                                f"Data object '{obj_name}' join[{i}] is secondary "
                                f"but has no pathName"
                            ),
                            path=f"dataObjects.{obj_name}.joins[{i}]",
                        )
                    )
                if join.path_name:
                    pair = (obj_name, join.join_to)
                    if pair not in path_names:
                        path_names[pair] = set()
                    if join.path_name in path_names[pair]:
                        errors.append(
                            SemanticError(
                                code="DUPLICATE_JOIN_PATH_NAME",
                                message=(
                                    f"Data object '{obj_name}' join[{i}] has duplicate "
                                    f"pathName '{join.path_name}' for target '{join.join_to}'"
                                ),
                                path=f"dataObjects.{obj_name}.joins[{i}]",
                            )
                        )
                    else:
                        path_names[pair].add(join.path_name)

        return errors

    def _check_no_cyclic_joins(self, model: SemanticModel) -> list[SemanticError]:
        """Detect cyclic join paths."""
        errors: list[SemanticError] = []

        # Build adjacency list from joins (skip secondary joins)
        adj: dict[str, set[str]] = {}
        for obj_name, obj in model.data_objects.items():
            if obj_name not in adj:
                adj[obj_name] = set()
            for join in obj.joins:
                if not join.secondary:
                    adj[obj_name].add(join.join_to)

        # DFS cycle detection
        visited: set[str] = set()
        rec_stack: set[str] = set()

        def _dfs(node: str, path: list[str]) -> None:
            visited.add(node)
            rec_stack.add(node)
            for neighbor in adj.get(node, set()):
                if neighbor not in visited:
                    _dfs(neighbor, path + [neighbor])
                elif neighbor in rec_stack:
                    if neighbor in path:
                        cycle = path[path.index(neighbor) :] + [neighbor]
                    else:
                        cycle = [node, neighbor]
                    errors.append(
                        SemanticError(
                            code="CYCLIC_JOIN",
                            message=f"Cyclic join detected: {' -> '.join(cycle)}",
                            path=f"dataObjects.{node}.joins",
                        )
                    )
            rec_stack.discard(node)

        for node in adj:
            if node not in visited:
                _dfs(node, [node])

        return errors

    def _check_no_multipath_joins(self, model: SemanticModel) -> list[SemanticError]:
        """Detect multiple distinct paths between any pair of nodes in the join DAG.

        Only flags true diamonds where both paths go through intermediaries.
        A direct edge from start to target is canonical, so an additional
        indirect path (e.g. Purchases→Suppliers direct + Purchases→Products→Suppliers)
        is not ambiguous and is not flagged.
        """
        errors: list[SemanticError] = []

        # Build adjacency list from joins (skip secondary joins)
        adj: dict[str, list[str]] = {}
        for obj_name, obj in model.data_objects.items():
            if obj_name not in adj:
                adj[obj_name] = []
            for join in obj.joins:
                if not join.secondary:
                    adj[obj_name].append(join.join_to)

        reported: set[tuple[str, str]] = set()

        for start in adj:
            if not adj[start]:
                continue
            # BFS from start; track first parent that reached each node
            direct_neighbors: set[str] = set()
            first_parent: dict[str, str] = {}
            queue: deque[tuple[str, str]] = deque()
            for neighbor in adj[start]:
                if neighbor == start:
                    continue
                direct_neighbors.add(neighbor)
                if neighbor not in first_parent:
                    first_parent[neighbor] = start
                    queue.append((neighbor, start))

            while queue:
                node, parent = queue.popleft()
                for neighbor in adj.get(node, []):
                    if neighbor == start:
                        continue
                    if neighbor not in first_parent:
                        first_parent[neighbor] = node
                        queue.append((neighbor, node))
                    elif first_parent[neighbor] != node:
                        # Skip if target has a direct edge from start —
                        # the direct join is the canonical path.
                        if neighbor in direct_neighbors:
                            continue
                        pair = (start, neighbor)
                        if pair not in reported:
                            reported.add(pair)
                            errors.append(
                                SemanticError(
                                    code="MULTIPATH_JOIN",
                                    message=(
                                        f"Multiple join paths from '{start}' to "
                                        f"'{neighbor}' (via '{first_parent[neighbor]}' "
                                        f"and '{node}'). "
                                        f"Join paths must be unambiguous."
                                    ),
                                    path=f"dataObjects.{start}.joins",
                                )
                            )

        return errors

    def _check_measures_resolve(self, model: SemanticModel) -> list[SemanticError]:
        """Ensure measure column references resolve to actual data object columns."""
        errors: list[SemanticError] = []
        for name, measure in model.measures.items():
            for i, col_ref in enumerate(measure.columns):
                obj_name = col_ref.view
                col_name = col_ref.column
                if obj_name and obj_name not in model.data_objects:
                    errors.append(
                        SemanticError(
                            code="UNKNOWN_DATA_OBJECT",
                            message=(
                                f"Measure '{name}' column[{i}] references "
                                f"unknown data object '{obj_name}'"
                            ),
                            path=f"measures.{name}.columns[{i}]",
                        )
                    )
                elif obj_name and col_name:
                    obj = model.data_objects[obj_name]
                    if col_name not in obj.columns:
                        errors.append(
                            SemanticError(
                                code="UNKNOWN_COLUMN",
                                message=(
                                    f"Measure '{name}' column[{i}] references "
                                    f"unknown column '{col_name}' in data object '{obj_name}'"
                                ),
                                path=f"measures.{name}.columns[{i}]",
                            )
                        )
        return errors

    def _check_join_targets_exist(self, model: SemanticModel) -> list[SemanticError]:
        """Ensure join targets reference existing data objects."""
        errors: list[SemanticError] = []
        for obj_name, obj in model.data_objects.items():
            for i, join in enumerate(obj.joins):
                if len(join.columns_from) != len(join.columns_to):
                    errors.append(
                        SemanticError(
                            code="JOIN_COLUMN_COUNT_MISMATCH",
                            message=(
                                f"Data object '{obj_name}' join[{i}] has "
                                f"{len(join.columns_from)} columnsFrom and "
                                f"{len(join.columns_to)} columnsTo"
                            ),
                            path=f"dataObjects.{obj_name}.joins[{i}]",
                        )
                    )
                if join.join_to not in model.data_objects:
                    errors.append(
                        SemanticError(
                            code="UNKNOWN_JOIN_TARGET",
                            message=(
                                f"Data object '{obj_name}' join[{i}] references "
                                f"unknown data object '{join.join_to}'"
                            ),
                            path=f"dataObjects.{obj_name}.joins[{i}]",
                        )
                    )
                else:
                    # Validate join columns exist
                    for col_name in join.columns_from:
                        if col_name not in obj.columns:
                            errors.append(
                                SemanticError(
                                    code="UNKNOWN_JOIN_COLUMN",
                                    message=(
                                        f"Data object '{obj_name}' join[{i}] columnsFrom "
                                        f"references unknown column '{col_name}'"
                                    ),
                                    path=f"dataObjects.{obj_name}.joins[{i}].columnsFrom",
                                )
                            )
                    target_obj = model.data_objects[join.join_to]
                    for col_name in join.columns_to:
                        if col_name not in target_obj.columns:
                            errors.append(
                                SemanticError(
                                    code="UNKNOWN_JOIN_COLUMN",
                                    message=(
                                        f"Data object '{obj_name}' join[{i}] columnsTo "
                                        f"references unknown column '{col_name}' "
                                        f"in data object '{join.join_to}'"
                                    ),
                                    path=f"dataObjects.{obj_name}.joins[{i}].columnsTo",
                                )
                            )
        return errors

    def _check_references_resolve(self, model: SemanticModel) -> list[SemanticError]:
        """Ensure dimension references resolve."""
        errors: list[SemanticError] = []
        for name, dim in model.dimensions.items():
            obj_name = dim.view
            col_name = dim.column
            if obj_name and obj_name not in model.data_objects:
                errors.append(
                    SemanticError(
                        code="UNKNOWN_DATA_OBJECT",
                        message=f"Dimension '{name}' references unknown data object '{obj_name}'",
                        path=f"dimensions.{name}",
                    )
                )
            elif obj_name and col_name:
                obj = model.data_objects[obj_name]
                if col_name not in obj.columns:
                    errors.append(
                        SemanticError(
                            code="UNKNOWN_COLUMN",
                            message=(
                                f"Dimension '{name}' references unknown column "
                                f"'{col_name}' in data object '{obj_name}'"
                            ),
                            path=f"dimensions.{name}",
                        )
                    )
        return errors

validate(model)

Source code in src/orionbelt/parser/validator.py
def validate(self, model: SemanticModel) -> list[SemanticError]:
    errors: list[SemanticError] = []
    errors.extend(self._check_unique_identifiers(model))
    errors.extend(self._check_unique_column_names(model))
    errors.extend(self._check_secondary_joins(model))
    errors.extend(self._check_no_cyclic_joins(model))
    errors.extend(self._check_no_multipath_joins(model))
    errors.extend(self._check_measures_resolve(model))
    errors.extend(self._check_join_targets_exist(model))
    errors.extend(self._check_references_resolve(model))
    return errors

Semantic Model

orionbelt.models.semantic.SemanticModel

Bases: BaseModel

Complete semantic model parsed from OBML YAML.

Source code in src/orionbelt/models/semantic.py
class SemanticModel(BaseModel):
    """Complete semantic model parsed from OBML YAML."""

    version: float = 1.0
    data_objects: dict[str, DataObject] = Field(default={}, alias="dataObjects")
    dimensions: dict[str, Dimension] = {}
    measures: dict[str, Measure] = {}
    metrics: dict[str, Metric] = {}
    custom_extensions: list[CustomExtension] = Field(
        default_factory=list, alias="customExtensions"
    )

    model_config = {"populate_by_name": True}

orionbelt.models.semantic.DataObject

Bases: BaseModel

A database table or view with its columns and joins.

Source code in src/orionbelt/models/semantic.py
class DataObject(BaseModel):
    """A database table or view with its columns and joins."""

    label: str
    code: str
    database: str
    schema_name: str = Field(alias="schema")
    columns: dict[str, DataObjectColumn] = {}
    joins: list[DataObjectJoin] = []
    comment: str | None = None
    custom_extensions: list[CustomExtension] = Field(
        default_factory=list, alias="customExtensions"
    )

    @property
    def qualified_code(self) -> str:
        """Full qualified table reference: database.schema.code."""
        return f"{self.database}.{self.schema_name}.{self.code}"

    model_config = {"populate_by_name": True}

qualified_code property

Full qualified table reference: database.schema.code.

orionbelt.models.semantic.Dimension

Bases: BaseModel

A named dimension referencing a data object column.

Source code in src/orionbelt/models/semantic.py
class Dimension(BaseModel):
    """A named dimension referencing a data object column."""

    label: str
    view: str = Field(alias="dataObject")
    column: str = ""
    result_type: DataType = Field(DataType.STRING, alias="resultType")
    time_grain: TimeGrain | None = Field(None, alias="timeGrain")
    format: str | None = None
    custom_extensions: list[CustomExtension] = Field(
        default_factory=list, alias="customExtensions"
    )

    model_config = {"populate_by_name": True}

orionbelt.models.semantic.Measure

Bases: BaseModel

An aggregation measure with optional expression template.

Source code in src/orionbelt/models/semantic.py
class Measure(BaseModel):
    """An aggregation measure with optional expression template."""

    label: str
    columns: list[DataColumnRef] = []
    result_type: DataType = Field(DataType.FLOAT, alias="resultType")
    aggregation: str
    expression: str | None = None
    distinct: bool = False
    total: bool = False
    filter: MeasureFilter | None = None
    format: str | None = None
    allow_fan_out: bool = Field(False, alias="allowFanOut")
    delimiter: str | None = None
    within_group: WithinGroup | None = Field(None, alias="withinGroup")
    custom_extensions: list[CustomExtension] = Field(
        default_factory=list, alias="customExtensions"
    )

    model_config = {"populate_by_name": True}

orionbelt.models.semantic.Metric

Bases: BaseModel

A composite metric combining measures via an expression.

The expression references measures by name using {[Measure Name]} syntax.

Source code in src/orionbelt/models/semantic.py
class Metric(BaseModel):
    """A composite metric combining measures via an expression.

    The expression references measures by name using ``{[Measure Name]}`` syntax.
    """

    label: str
    expression: str
    format: str | None = None
    custom_extensions: list[CustomExtension] = Field(
        default_factory=list, alias="customExtensions"
    )

    model_config = {"populate_by_name": True}

Query Models

orionbelt.models.query.QueryObject

Bases: BaseModel

A complete YAML analytical query.

Source code in src/orionbelt/models/query.py
class QueryObject(BaseModel):
    """A complete YAML analytical query."""

    select: QuerySelect
    where: list[QueryFilter] = []
    having: list[QueryFilter] = []
    order_by: list[QueryOrderBy] = Field([], alias="order_by")
    limit: int | None = None
    use_path_names: list[UsePathName] = Field([], alias="usePathNames")

    model_config = {"populate_by_name": True}

orionbelt.models.query.QuerySelect

Bases: BaseModel

The SELECT part of a query: dimensions + measures.

Source code in src/orionbelt/models/query.py
class QuerySelect(BaseModel):
    """The SELECT part of a query: dimensions + measures."""

    dimensions: list[str] = []
    measures: list[str] = []

orionbelt.models.query.QueryFilter

Bases: BaseModel

A filter condition in a query.

Source code in src/orionbelt/models/query.py
class QueryFilter(BaseModel):
    """A filter condition in a query."""

    field: str
    op: FilterOperator
    value: Any = None

    model_config = {"populate_by_name": True}

orionbelt.models.query.UsePathName

Bases: BaseModel

Selects a named secondary join path for a specific (source, target) pair.

Source code in src/orionbelt/models/query.py
class UsePathName(BaseModel):
    """Selects a named secondary join path for a specific (source, target) pair."""

    source: str
    target: str
    path_name: str = Field(alias="pathName")

    model_config = {"populate_by_name": True}

orionbelt.models.query.DimensionRef

Bases: BaseModel

Reference to a dimension, optionally with time grain.

Supports notation like "customer.country" or "order.order_date:month".

Source code in src/orionbelt/models/query.py
class DimensionRef(BaseModel):
    """Reference to a dimension, optionally with time grain.

    Supports notation like "customer.country" or "order.order_date:month".
    """

    name: str
    grain: TimeGrain | None = None

    @classmethod
    def parse(cls, raw: str) -> DimensionRef:
        """Parse 'name:grain' notation."""
        if ":" in raw:
            name, grain_str = raw.rsplit(":", 1)
            return cls(name=name, grain=TimeGrain(grain_str))
        return cls(name=raw)

parse(raw) classmethod

Parse 'name:grain' notation.

Source code in src/orionbelt/models/query.py
@classmethod
def parse(cls, raw: str) -> DimensionRef:
    """Parse 'name:grain' notation."""
    if ":" in raw:
        name, grain_str = raw.rsplit(":", 1)
        return cls(name=name, grain=TimeGrain(grain_str))
    return cls(name=raw)

Error Models

orionbelt.models.errors.SemanticError

Bases: BaseModel

A structured error with optional source position and suggestions.

Source code in src/orionbelt/models/errors.py
class SemanticError(BaseModel):
    """A structured error with optional source position and suggestions."""

    code: str
    message: str
    path: str | None = None
    span: SourceSpan | None = None
    suggestions: list[str] = []

orionbelt.models.errors.ValidationResult

Bases: BaseModel

Result of semantic model validation.

Source code in src/orionbelt/models/errors.py
class ValidationResult(BaseModel):
    """Result of semantic model validation."""

    valid: bool
    errors: list[SemanticError] = []
    warnings: list[SemanticError] = []

orionbelt.models.errors.SourceSpan

Bases: BaseModel

Points to exact location in YAML source for error reporting.

Source code in src/orionbelt/models/errors.py
class SourceSpan(BaseModel):
    """Points to exact location in YAML source for error reporting."""

    file: str
    line: int
    column: int
    end_line: int | None = None
    end_column: int | None = None

SQL AST Nodes

orionbelt.ast.nodes.Select dataclass

A complete SELECT statement.

Source code in src/orionbelt/ast/nodes.py
@dataclass(frozen=True)
class Select:
    """A complete SELECT statement."""

    columns: list[Expr] = field(default_factory=list)
    from_: From | None = None
    joins: list[Join] = field(default_factory=list)
    where: Expr | None = None
    group_by: list[Expr] = field(default_factory=list)
    having: Expr | None = None
    order_by: list[OrderByItem] = field(default_factory=list)
    limit: int | None = None
    offset: int | None = None
    ctes: list[CTE] = field(default_factory=list)

orionbelt.ast.nodes.ColumnRef dataclass

Reference to a column, optionally qualified by table/alias.

Source code in src/orionbelt/ast/nodes.py
@dataclass(frozen=True)
class ColumnRef:
    """Reference to a column, optionally qualified by table/alias."""

    name: str
    table: str | None = None

orionbelt.ast.nodes.FunctionCall dataclass

SQL function call, e.g. SUM(col), DATE_TRUNC('month', col).

Source code in src/orionbelt/ast/nodes.py
@dataclass(frozen=True)
class FunctionCall:
    """SQL function call, e.g. SUM(col), DATE_TRUNC('month', col)."""

    name: str
    args: list[Expr] = field(default_factory=list)
    distinct: bool = False
    order_by: list[OrderByItem] = field(default_factory=list)
    separator: str | None = None

orionbelt.ast.nodes.BinaryOp dataclass

Binary operation: left op right.

Source code in src/orionbelt/ast/nodes.py
@dataclass(frozen=True)
class BinaryOp:
    """Binary operation: left op right."""

    left: Expr
    op: str  # +, -, *, /, =, <>, AND, OR, LIKE, etc.
    right: Expr

orionbelt.ast.nodes.Literal dataclass

A literal value: number, string, boolean, or NULL.

Source code in src/orionbelt/ast/nodes.py
@dataclass(frozen=True)
class Literal:
    """A literal value: number, string, boolean, or NULL."""

    value: str | int | float | bool | None

    @classmethod
    def string(cls, v: str) -> Literal:
        return cls(value=v)

    @classmethod
    def number(cls, v: int | float) -> Literal:
        return cls(value=v)

    @classmethod
    def null(cls) -> Literal:
        return cls(value=None)

    @classmethod
    def boolean(cls, v: bool) -> Literal:
        return cls(value=v)

AST Builder

orionbelt.ast.builder.QueryBuilder

Fluent builder for ergonomic AST construction.

Source code in src/orionbelt/ast/builder.py
class QueryBuilder:
    """Fluent builder for ergonomic AST construction."""

    def __init__(self) -> None:
        self._columns: list[Expr] = []
        self._from: From | None = None
        self._joins: list[Join] = []
        self._where: Expr | None = None
        self._group_by: list[Expr] = []
        self._having: Expr | None = None
        self._order_by: list[OrderByItem] = []
        self._limit: int | None = None
        self._offset: int | None = None
        self._ctes: list[CTE] = []

    def select(self, *columns: Expr) -> Self:
        self._columns.extend(columns)
        return self

    def select_aliased(self, expr: Expr, alias: str) -> Self:
        self._columns.append(AliasedExpr(expr=expr, alias=alias))
        return self

    def from_(self, table: str, alias: str | None = None) -> Self:
        self._from = From(source=table, alias=alias)
        return self

    def from_subquery(self, subquery: Select, alias: str) -> Self:
        self._from = From(source=subquery, alias=alias)
        return self

    def join(
        self,
        table: str,
        on: Expr,
        join_type: JoinType = JoinType.LEFT,
        alias: str | None = None,
    ) -> Self:
        self._joins.append(Join(join_type=join_type, source=table, alias=alias, on=on))
        return self

    def where(self, condition: Expr) -> Self:
        if self._where is None:
            self._where = condition
        else:
            self._where = BinaryOp(left=self._where, op="AND", right=condition)
        return self

    def group_by(self, *exprs: Expr) -> Self:
        self._group_by.extend(exprs)
        return self

    def having(self, condition: Expr) -> Self:
        if self._having is None:
            self._having = condition
        else:
            self._having = BinaryOp(left=self._having, op="AND", right=condition)
        return self

    def order_by(self, expr: Expr, desc: bool = False) -> Self:
        self._order_by.append(OrderByItem(expr=expr, desc=desc))
        return self

    def limit(self, n: int) -> Self:
        self._limit = n
        return self

    def offset(self, n: int) -> Self:
        self._offset = n
        return self

    def with_cte(self, name: str, query: Select) -> Self:
        self._ctes.append(CTE(name=name, query=query))
        return self

    def build(self) -> Select:
        return Select(
            columns=self._columns,
            from_=self._from,
            joins=self._joins,
            where=self._where,
            group_by=self._group_by,
            having=self._having,
            order_by=self._order_by,
            limit=self._limit,
            offset=self._offset,
            ctes=self._ctes,
        )

API Schemas

orionbelt.api.schemas

API request/response Pydantic schemas.

SessionCreateRequest

Bases: BaseModel

Request body for POST /sessions.

Source code in src/orionbelt/api/schemas.py
class SessionCreateRequest(BaseModel):
    """Request body for POST /sessions."""

    metadata: dict[str, str] = Field(default_factory=dict)

SessionResponse

Bases: BaseModel

Single session info.

Source code in src/orionbelt/api/schemas.py
class SessionResponse(BaseModel):
    """Single session info."""

    session_id: str
    created_at: datetime
    last_accessed_at: datetime
    model_count: int
    metadata: dict[str, str] = Field(default_factory=dict)

SessionListResponse

Bases: BaseModel

Response for GET /sessions.

Source code in src/orionbelt/api/schemas.py
class SessionListResponse(BaseModel):
    """Response for GET /sessions."""

    sessions: list[SessionResponse]

ModelLoadRequest

Bases: BaseModel

Request body for POST /sessions/{session_id}/models.

Source code in src/orionbelt/api/schemas.py
class ModelLoadRequest(BaseModel):
    """Request body for POST /sessions/{session_id}/models."""

    model_yaml: str = Field(description="OBML YAML content", max_length=5_000_000)

ModelLoadResponse

Bases: BaseModel

Response for POST /sessions/{session_id}/models.

Source code in src/orionbelt/api/schemas.py
class ModelLoadResponse(BaseModel):
    """Response for POST /sessions/{session_id}/models."""

    model_id: str
    data_objects: int
    dimensions: int
    measures: int
    metrics: int
    warnings: list[str] = []

ModelSummaryResponse

Bases: BaseModel

Short model summary for listing.

Source code in src/orionbelt/api/schemas.py
class ModelSummaryResponse(BaseModel):
    """Short model summary for listing."""

    model_id: str
    data_objects: int
    dimensions: int
    measures: int
    metrics: int

SessionQueryRequest

Bases: BaseModel

Request body for POST /sessions/{session_id}/query/sql.

Source code in src/orionbelt/api/schemas.py
class SessionQueryRequest(BaseModel):
    """Request body for POST /sessions/{session_id}/query/sql."""

    model_id: str
    query: QueryObject
    dialect: str = Field(default="postgres")

QueryCompileResponse

Bases: BaseModel

Response body for POST /query/sql.

Source code in src/orionbelt/api/schemas.py
class QueryCompileResponse(BaseModel):
    """Response body for POST /query/sql."""

    sql: str
    dialect: str
    resolved: ResolvedInfoResponse
    warnings: list[str] = []
    sql_valid: bool = True

ValidateRequest

Bases: BaseModel

Request body for POST /validate.

Source code in src/orionbelt/api/schemas.py
class ValidateRequest(BaseModel):
    """Request body for POST /validate."""

    model_yaml: str = Field(
        description="YAML semantic model content to validate", max_length=5_000_000
    )

ValidateResponse

Bases: BaseModel

Response body for POST /validate.

Source code in src/orionbelt/api/schemas.py
class ValidateResponse(BaseModel):
    """Response body for POST /validate."""

    valid: bool
    errors: list[ErrorDetail] = []
    warnings: list[ErrorDetail] = []

DialectListResponse

Bases: BaseModel

Response for GET /dialects.

Source code in src/orionbelt/api/schemas.py
class DialectListResponse(BaseModel):
    """Response for GET /dialects."""

    dialects: list[DialectInfo] = []

HealthResponse

Bases: BaseModel

Health check response.

Source code in src/orionbelt/api/schemas.py
class HealthResponse(BaseModel):
    """Health check response."""

    status: str = "ok"
    version: str = ""

Settings

orionbelt.settings.Settings

Bases: BaseSettings

Configuration for OrionBelt servers (API + MCP).

Values are read from environment variables and from a .env file in the working directory. See .env.example for all options.

Source code in src/orionbelt/settings.py
class Settings(BaseSettings):
    """Configuration for OrionBelt servers (API + MCP).

    Values are read from environment variables and from a ``.env`` file
    in the working directory.  See ``.env.example`` for all options.
    """

    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        extra="ignore",
    )

    # Shared
    log_level: str = "INFO"

    # REST API
    api_server_host: str = "localhost"
    api_server_port: int = 8000
    port: int | None = None  # Cloud Run injects PORT; takes precedence over api_server_port

    @property
    def effective_port(self) -> int:
        """Return the port to listen on (Cloud Run PORT takes precedence)."""
        return self.port if self.port is not None else self.api_server_port

    # MCP
    mcp_transport: Literal["stdio", "http", "sse"] = "stdio"
    mcp_server_host: str = "localhost"
    mcp_server_port: int = 9000

    # Sessions
    session_ttl_seconds: int = 1800  # 30 min inactivity
    session_cleanup_interval: int = 60  # seconds between cleanup sweeps
    disable_session_list: bool = False  # hide GET /sessions endpoint

effective_port property

Return the port to listen on (Cloud Run PORT takes precedence).