Skip to content
OBML v1.0
OrionBelt v1.4.0

Python API Reference

Auto-generated documentation from source code docstrings.

Service Layer

ModelStore

orionbelt.service.model_store.ModelStore

In-memory model registry. Thread-safe via threading.Lock.

Models are keyed by short UUID (8-char hex). All parsing, validation, and compilation infrastructure is instantiated internally, following the same singleton pattern as api/deps.py.

Source code in src/orionbelt/service/model_store.py
class ModelStore:
    """In-memory model registry.  Thread-safe via ``threading.Lock``.

    Models are keyed by short UUID (8-char hex).  All parsing, validation,
    and compilation infrastructure is instantiated internally, following the
    same singleton pattern as ``api/deps.py``.
    """

    def __init__(self, max_models: int = 10) -> None:
        self._lock = threading.Lock()
        self._models: dict[str, SemanticModel] = {}
        self._graphs: dict[str, GraphArtifact] = {}
        self._max_models = max_models

        # Internal pipeline singletons (stateless, safe to share).
        self._loader = TrackedLoader()
        self._resolver = ReferenceResolver()
        self._validator = SemanticValidator()
        self._pipeline = CompilationPipeline()

    # -- helpers -------------------------------------------------------------

    @staticmethod
    def _new_id() -> str:
        return uuid.uuid4().hex[:8]

    def _parse_and_validate(
        self, yaml_str: str
    ) -> tuple[SemanticModel, list[ErrorInfo], list[ErrorInfo]]:
        """Parse YAML, resolve references, run semantic validation.

        Returns ``(model, errors, warnings)``.
        """
        errors: list[ErrorInfo] = []
        warnings: list[ErrorInfo] = []

        # 1. Parse YAML
        try:
            raw, source_map = self._loader.load_string(yaml_str)
        except YAMLSafetyError as exc:
            errors.append(ErrorInfo(code="YAML_SAFETY_ERROR", message=str(exc)))
            return SemanticModel(), errors, warnings
        except Exception as exc:
            errors.append(ErrorInfo(code="YAML_PARSE_ERROR", message=str(exc)))
            return SemanticModel(), errors, warnings

        # 2. Resolve references
        model, resolution = self._resolver.resolve(raw, source_map)
        for e in resolution.errors:
            errors.append(
                ErrorInfo(
                    code=e.code,
                    message=e.message,
                    path=e.path,
                    suggestions=list(e.suggestions),
                )
            )
        for w in resolution.warnings:
            warnings.append(
                ErrorInfo(
                    code=w.code,
                    message=w.message,
                    path=w.path,
                    suggestions=list(w.suggestions),
                )
            )

        # 3. Semantic validation
        sem_errors = self._validator.validate(model)
        for e in sem_errors:
            errors.append(
                ErrorInfo(
                    code=e.code,
                    message=e.message,
                    path=e.path,
                    suggestions=list(e.suggestions),
                )
            )

        return model, errors, warnings

    # -- public API ----------------------------------------------------------

    def load_model(self, yaml_str: str) -> LoadResult:
        """Parse, validate, and store a model.  Returns id + summary.

        Raises ``ModelValidationError`` if the model has validation errors.
        Raises ``ModelCapacityError`` if the session's model cap is reached.
        """
        with self._lock:
            if len(self._models) >= self._max_models:
                raise ModelCapacityError(f"Maximum models per session reached ({self._max_models})")

        model, errors, warnings = self._parse_and_validate(yaml_str)
        if errors:
            raise ModelValidationError(errors, warnings)

        model_id = self._new_id()

        # Eagerly export OBSL-Core graph (Option C: at model load time).
        graph = export_obsl(model, model_id)
        turtle = graph.serialize(format="turtle")
        artifact = GraphArtifact(graph=graph, turtle=turtle, generated_at=time.monotonic())

        with self._lock:
            # Re-check capacity under lock — the first check (above) ran
            # outside the lock while parsing/exporting, so a concurrent
            # request may have filled the slot in the meantime.
            if len(self._models) >= self._max_models:
                raise ModelCapacityError(f"Maximum models per session reached ({self._max_models})")
            self._models[model_id] = model
            self._graphs[model_id] = artifact

        return LoadResult(
            model_id=model_id,
            data_objects=len(model.data_objects),
            dimensions=len(model.dimensions),
            measures=len(model.measures),
            metrics=len(model.metrics),
            warnings=[w.message for w in warnings],
        )

    def get_model(self, model_id: str) -> SemanticModel:
        """Look up a loaded model.  Raises ``KeyError`` if not found."""
        with self._lock:
            try:
                return self._models[model_id]
            except KeyError:
                raise KeyError(f"No model loaded with id '{model_id}'") from None

    def describe(self, model_id: str) -> ModelDescription:
        """Return a structured summary suitable for LLM consumption."""
        model = self.get_model(model_id)

        data_objects = [
            DataObjectInfo(
                label=obj.label,
                code=obj.qualified_code,
                columns=list(obj.columns.keys()),
                join_targets=[j.join_to for j in obj.joins],
                synonyms=obj.synonyms,
                owner=obj.owner,
            )
            for obj in model.data_objects.values()
        ]

        dimensions = [
            DimensionInfo(
                name=dim.label,
                result_type=dim.result_type.value,
                data_object=dim.view,
                column=dim.column,
                time_grain=dim.time_grain.value if dim.time_grain else None,
                synonyms=dim.synonyms,
                owner=dim.owner,
            )
            for dim in model.dimensions.values()
        ]

        measures = [
            MeasureInfo(
                name=m.label,
                result_type=m.result_type.value,
                aggregation=m.aggregation,
                expression=m.expression,
                synonyms=m.synonyms,
                owner=m.owner,
            )
            for m in model.measures.values()
        ]

        metrics = [
            MetricInfo(
                name=met.label,
                expression=met.expression,
                synonyms=met.synonyms,
                type=met.type.value,
                measure=met.measure,
                time_dimension=met.time_dimension,
                owner=met.owner,
            )
            for met in model.metrics.values()
        ]

        return ModelDescription(
            model_id=model_id,
            data_objects=data_objects,
            dimensions=dimensions,
            measures=measures,
            metrics=metrics,
        )

    def list_models(self) -> list[ModelSummary]:
        """Return a short summary for every loaded model."""
        with self._lock:
            items = list(self._models.items())

        return [
            ModelSummary(
                model_id=mid,
                data_objects=len(m.data_objects),
                dimensions=len(m.dimensions),
                measures=len(m.measures),
                metrics=len(m.metrics),
            )
            for mid, m in items
        ]

    def remove_model(self, model_id: str) -> None:
        """Unload a model and its cached OBSL graph.  Raises ``KeyError`` if not found."""
        with self._lock:
            try:
                del self._models[model_id]
            except KeyError:
                raise KeyError(f"No model loaded with id '{model_id}'") from None
            self._graphs.pop(model_id, None)

    def compile_query(
        self,
        model_id: str,
        query: QueryObject,
        dialect: str,
    ) -> CompilationResult:
        """Compile a query against a loaded model."""
        model = self.get_model(model_id)
        return self._pipeline.compile(query, model, dialect)

    def validate(self, yaml_str: str) -> ValidationSummary:
        """Validate a YAML model string without storing it."""
        _model, errors, warnings = self._parse_and_validate(yaml_str)
        return ValidationSummary(
            valid=len(errors) == 0,
            errors=errors,
            warnings=warnings,
        )

    # -- OBSL graph ---------------------------------------------------------

    def get_graph(self, model_id: str) -> GraphArtifact:
        """Return the cached OBSL graph for a model.  Raises ``KeyError`` if not found."""
        with self._lock:
            try:
                return self._graphs[model_id]
            except KeyError:
                raise KeyError(f"No graph for model '{model_id}'") from None

    def query_graph(self, model_id: str, sparql: str) -> SPARQLResult:
        """Execute a read-only SPARQL query against a model's OBSL graph."""
        artifact = self.get_graph(model_id)
        return execute_sparql(artifact.graph, sparql)

load_model(yaml_str)

Parse, validate, and store a model. Returns id + summary.

Raises ModelValidationError if the model has validation errors. Raises ModelCapacityError if the session's model cap is reached.

Source code in src/orionbelt/service/model_store.py
def load_model(self, yaml_str: str) -> LoadResult:
    """Parse, validate, and store a model.  Returns id + summary.

    Raises ``ModelValidationError`` if the model has validation errors.
    Raises ``ModelCapacityError`` if the session's model cap is reached.
    """
    with self._lock:
        if len(self._models) >= self._max_models:
            raise ModelCapacityError(f"Maximum models per session reached ({self._max_models})")

    model, errors, warnings = self._parse_and_validate(yaml_str)
    if errors:
        raise ModelValidationError(errors, warnings)

    model_id = self._new_id()

    # Eagerly export OBSL-Core graph (Option C: at model load time).
    graph = export_obsl(model, model_id)
    turtle = graph.serialize(format="turtle")
    artifact = GraphArtifact(graph=graph, turtle=turtle, generated_at=time.monotonic())

    with self._lock:
        # Re-check capacity under lock — the first check (above) ran
        # outside the lock while parsing/exporting, so a concurrent
        # request may have filled the slot in the meantime.
        if len(self._models) >= self._max_models:
            raise ModelCapacityError(f"Maximum models per session reached ({self._max_models})")
        self._models[model_id] = model
        self._graphs[model_id] = artifact

    return LoadResult(
        model_id=model_id,
        data_objects=len(model.data_objects),
        dimensions=len(model.dimensions),
        measures=len(model.measures),
        metrics=len(model.metrics),
        warnings=[w.message for w in warnings],
    )

get_model(model_id)

Look up a loaded model. Raises KeyError if not found.

Source code in src/orionbelt/service/model_store.py
def get_model(self, model_id: str) -> SemanticModel:
    """Look up a loaded model.  Raises ``KeyError`` if not found."""
    with self._lock:
        try:
            return self._models[model_id]
        except KeyError:
            raise KeyError(f"No model loaded with id '{model_id}'") from None

describe(model_id)

Return a structured summary suitable for LLM consumption.

Source code in src/orionbelt/service/model_store.py
def describe(self, model_id: str) -> ModelDescription:
    """Return a structured summary suitable for LLM consumption."""
    model = self.get_model(model_id)

    data_objects = [
        DataObjectInfo(
            label=obj.label,
            code=obj.qualified_code,
            columns=list(obj.columns.keys()),
            join_targets=[j.join_to for j in obj.joins],
            synonyms=obj.synonyms,
            owner=obj.owner,
        )
        for obj in model.data_objects.values()
    ]

    dimensions = [
        DimensionInfo(
            name=dim.label,
            result_type=dim.result_type.value,
            data_object=dim.view,
            column=dim.column,
            time_grain=dim.time_grain.value if dim.time_grain else None,
            synonyms=dim.synonyms,
            owner=dim.owner,
        )
        for dim in model.dimensions.values()
    ]

    measures = [
        MeasureInfo(
            name=m.label,
            result_type=m.result_type.value,
            aggregation=m.aggregation,
            expression=m.expression,
            synonyms=m.synonyms,
            owner=m.owner,
        )
        for m in model.measures.values()
    ]

    metrics = [
        MetricInfo(
            name=met.label,
            expression=met.expression,
            synonyms=met.synonyms,
            type=met.type.value,
            measure=met.measure,
            time_dimension=met.time_dimension,
            owner=met.owner,
        )
        for met in model.metrics.values()
    ]

    return ModelDescription(
        model_id=model_id,
        data_objects=data_objects,
        dimensions=dimensions,
        measures=measures,
        metrics=metrics,
    )

list_models()

Return a short summary for every loaded model.

Source code in src/orionbelt/service/model_store.py
def list_models(self) -> list[ModelSummary]:
    """Return a short summary for every loaded model."""
    with self._lock:
        items = list(self._models.items())

    return [
        ModelSummary(
            model_id=mid,
            data_objects=len(m.data_objects),
            dimensions=len(m.dimensions),
            measures=len(m.measures),
            metrics=len(m.metrics),
        )
        for mid, m in items
    ]

remove_model(model_id)

Unload a model and its cached OBSL graph. Raises KeyError if not found.

Source code in src/orionbelt/service/model_store.py
def remove_model(self, model_id: str) -> None:
    """Unload a model and its cached OBSL graph.  Raises ``KeyError`` if not found."""
    with self._lock:
        try:
            del self._models[model_id]
        except KeyError:
            raise KeyError(f"No model loaded with id '{model_id}'") from None
        self._graphs.pop(model_id, None)

compile_query(model_id, query, dialect)

Compile a query against a loaded model.

Source code in src/orionbelt/service/model_store.py
def compile_query(
    self,
    model_id: str,
    query: QueryObject,
    dialect: str,
) -> CompilationResult:
    """Compile a query against a loaded model."""
    model = self.get_model(model_id)
    return self._pipeline.compile(query, model, dialect)

validate(yaml_str)

Validate a YAML model string without storing it.

Source code in src/orionbelt/service/model_store.py
def validate(self, yaml_str: str) -> ValidationSummary:
    """Validate a YAML model string without storing it."""
    _model, errors, warnings = self._parse_and_validate(yaml_str)
    return ValidationSummary(
        valid=len(errors) == 0,
        errors=errors,
        warnings=warnings,
    )

SessionManager

orionbelt.service.session_manager.SessionManager

Manages TTL-scoped sessions, each holding its own ModelStore.

Thread-safe. Call :meth:start to begin the background cleanup thread and :meth:stop to shut it down.

Parameters

ttl_seconds: Sliding idle timeout — sessions expire after this many seconds of inactivity. max_age_seconds: Absolute maximum session lifetime regardless of activity. max_sessions: Global cap on concurrent sessions. create_session raises :class:SessionCapacityError when at capacity. max_models_per_session: Maximum models a single session may hold. Passed through to each ModelStore instance. cleanup_interval: Seconds between background purge sweeps. is_single_model_mode: When True the __default__ session is kept alive and excluded from purge. When False (no MODEL_FILE), the default session is treated like any other and subject to TTL/max-age expiry.

Source code in src/orionbelt/service/session_manager.py
class SessionManager:
    """Manages TTL-scoped sessions, each holding its own ``ModelStore``.

    Thread-safe.  Call :meth:`start` to begin the background cleanup thread
    and :meth:`stop` to shut it down.

    Parameters
    ----------
    ttl_seconds:
        Sliding idle timeout — sessions expire after this many seconds of
        inactivity.
    max_age_seconds:
        Absolute maximum session lifetime regardless of activity.
    max_sessions:
        Global cap on concurrent sessions.  ``create_session`` raises
        :class:`SessionCapacityError` when at capacity.
    max_models_per_session:
        Maximum models a single session may hold.  Passed through to each
        ``ModelStore`` instance.
    cleanup_interval:
        Seconds between background purge sweeps.
    is_single_model_mode:
        When True the ``__default__`` session is kept alive and excluded
        from purge.  When False (no ``MODEL_FILE``), the default session
        is treated like any other and subject to TTL/max-age expiry.
    """

    def __init__(
        self,
        ttl_seconds: int = 1800,
        max_age_seconds: int = 86400,
        max_sessions: int = 500,
        max_models_per_session: int = 10,
        cleanup_interval: int = 60,
        is_single_model_mode: bool = False,
    ) -> None:
        self._ttl = ttl_seconds
        self._max_age = max_age_seconds
        self._max_sessions = max_sessions
        self._max_models = max_models_per_session
        self._cleanup_interval = cleanup_interval
        self._is_single_model_mode = is_single_model_mode
        self._lock = threading.Lock()
        self._sessions: dict[str, _Session] = {}
        self._stop_event = threading.Event()
        self._cleanup_thread: threading.Thread | None = None

    @property
    def ttl(self) -> int:
        """Session TTL in seconds."""
        return self._ttl

    @property
    def max_age(self) -> int:
        """Absolute max session lifetime in seconds."""
        return self._max_age

    @property
    def max_sessions(self) -> int:
        """Global concurrent session cap."""
        return self._max_sessions

    @property
    def max_models_per_session(self) -> int:
        """Maximum models a single session may hold."""
        return self._max_models

    # -- lifecycle -----------------------------------------------------------

    def start(self) -> None:
        """Start the background cleanup daemon thread."""
        if self._cleanup_thread is not None:
            return
        self._stop_event.clear()
        self._cleanup_thread = threading.Thread(
            target=self._cleanup_loop, daemon=True, name="session-cleanup"
        )
        self._cleanup_thread.start()

    def stop(self) -> None:
        """Signal the cleanup thread to stop and wait for it."""
        self._stop_event.set()
        if self._cleanup_thread is not None:
            self._cleanup_thread.join(timeout=5)
            self._cleanup_thread = None

    # -- public API ----------------------------------------------------------

    def create_session(self, metadata: dict[str, str] | None = None) -> SessionInfo:
        """Create a new session and return its info.

        Raises :class:`SessionCapacityError` when the global session cap
        is reached.
        """
        now_mono = time.monotonic()
        now_wall = datetime.now(UTC)
        session_id = secrets.token_hex(16)  # 32-char hex (128-bit)
        session = _Session(
            session_id=session_id,
            store=ModelStore(max_models=self._max_models),
            created_at=now_wall,
            created_at_mono=now_mono,
            last_accessed=now_mono,
            metadata=metadata or {},
            created_at_wall=now_wall,
            last_accessed_wall=now_wall,
        )
        with self._lock:
            # Count only non-default, non-expired sessions toward the cap.
            active = sum(
                1
                for s in self._sessions.values()
                if s.session_id != _DEFAULT_SESSION_ID and not self._is_expired(s, now_mono)
            )
            if active >= self._max_sessions:
                logger.warning(
                    "Session cap reached (%d/%d), rejecting create",
                    active,
                    self._max_sessions,
                )
                raise SessionCapacityError(
                    f"Maximum number of concurrent sessions reached ({self._max_sessions})"
                )
            self._sessions[session_id] = session
        logger.info("Session created: %s", session_id)
        return self._session_info(session)

    def get_store(self, session_id: str) -> ModelStore:
        """Get the ModelStore for a session, updating its last-accessed time.

        Raises :class:`SessionExpiredError` if the session has expired.
        Raises :class:`SessionNotFoundError` if the session ID is unknown.
        """
        now_mono = time.monotonic()
        with self._lock:
            session = self._sessions.get(session_id)
            if session is None:
                raise SessionNotFoundError(f"Session '{session_id}' not found")
            if self._is_expired(session, now_mono):
                reason = self._expiry_reason(session, now_mono)
                del self._sessions[session_id]
                logger.info("Session expired on access: %s (%s)", session_id, reason)
                raise SessionExpiredError(f"Session '{session_id}' has expired ({reason})")
            session.last_accessed = now_mono
            session.last_accessed_wall = datetime.now(UTC)
            return session.store

    def get_session(self, session_id: str) -> SessionInfo:
        """Get session info (also refreshes last-accessed)."""
        now_mono = time.monotonic()
        with self._lock:
            session = self._sessions.get(session_id)
            if session is None:
                raise SessionNotFoundError(f"Session '{session_id}' not found")
            if self._is_expired(session, now_mono):
                reason = self._expiry_reason(session, now_mono)
                del self._sessions[session_id]
                logger.info("Session expired on access: %s (%s)", session_id, reason)
                raise SessionExpiredError(f"Session '{session_id}' has expired ({reason})")
            session.last_accessed = now_mono
            session.last_accessed_wall = datetime.now(UTC)
            return self._session_info(session)

    def close_session(self, session_id: str) -> None:
        """Explicitly close a session."""
        with self._lock:
            if session_id not in self._sessions:
                raise SessionNotFoundError(f"Session '{session_id}' not found")
            del self._sessions[session_id]
        logger.info("Session closed: %s", session_id)

    def list_sessions(self) -> list[SessionInfo]:
        """Return info for all non-expired sessions (excluding default)."""
        now_mono = time.monotonic()
        result: list[SessionInfo] = []
        with self._lock:
            for session in self._sessions.values():
                if session.session_id == _DEFAULT_SESSION_ID:
                    continue
                if not self._is_expired(session, now_mono):
                    result.append(self._session_info(session))
        return result

    @property
    def active_count(self) -> int:
        """Number of active (non-expired) sessions."""
        now_mono = time.monotonic()
        with self._lock:
            return sum(1 for s in self._sessions.values() if not self._is_expired(s, now_mono))

    def get_or_create_default(self) -> ModelStore:
        """Get (or lazily create) the default session."""
        with self._lock:
            session = self._sessions.get(_DEFAULT_SESSION_ID)
            if session is not None:
                session.last_accessed = time.monotonic()
                session.last_accessed_wall = datetime.now(UTC)
                return session.store
            now_mono = time.monotonic()
            now_wall = datetime.now(UTC)
            session = _Session(
                session_id=_DEFAULT_SESSION_ID,
                store=ModelStore(max_models=self._max_models),
                created_at=now_wall,
                created_at_mono=now_mono,
                last_accessed=now_mono,
                created_at_wall=now_wall,
                last_accessed_wall=now_wall,
            )
            self._sessions[_DEFAULT_SESSION_ID] = session
            return session.store

    # -- internal ------------------------------------------------------------

    def _is_expired(self, session: _Session, now_mono: float) -> bool:
        """Check if a session has exceeded idle TTL or absolute max-age."""
        idle = now_mono - session.last_accessed > self._ttl
        aged = now_mono - session.created_at_mono > self._max_age
        return idle or aged

    def _expiry_reason(self, session: _Session, now_mono: float) -> str:
        """Return a human-readable reason why a session expired."""
        idle_elapsed = now_mono - session.last_accessed
        age_elapsed = now_mono - session.created_at_mono
        if age_elapsed > self._max_age:
            return f"max-age {self._max_age}s exceeded after {age_elapsed:.0f}s"
        return f"idle {self._ttl}s exceeded after {idle_elapsed:.0f}s"

    def _session_info(self, session: _Session) -> SessionInfo:
        now_wall = datetime.now(UTC)
        idle_remaining = self._ttl - (time.monotonic() - session.last_accessed)
        age_remaining = self._max_age - (time.monotonic() - session.created_at_mono)

        # expires_at = when the idle TTL would fire (from last access)
        expires_at = now_wall + timedelta(seconds=max(0.0, idle_remaining))
        # max_expires_at = absolute deadline (from creation)
        max_expires_at = now_wall + timedelta(seconds=max(0.0, age_remaining))

        return SessionInfo(
            session_id=session.session_id,
            created_at=session.created_at_wall,
            last_accessed_at=session.last_accessed_wall,
            model_count=len(session.store.list_models()),
            metadata=session.metadata,
            expires_at=expires_at,
            max_expires_at=max_expires_at,
        )

    def _purge_expired(self) -> None:
        """Remove all expired sessions (called by cleanup thread)."""
        now_mono = time.monotonic()
        with self._lock:
            # In single-model mode, keep the default session alive.
            # Otherwise, purge it like any other session.
            skip_default = self._is_single_model_mode
            expired = [
                sid
                for sid, s in self._sessions.items()
                if (not skip_default or sid != _DEFAULT_SESSION_ID)
                and self._is_expired(s, now_mono)
            ]
            for sid in expired:
                reason = self._expiry_reason(self._sessions[sid], now_mono)
                del self._sessions[sid]
                logger.info("Session purged: %s (%s)", sid, reason)
        if expired:
            logger.info(
                "Purge sweep: removed %d session(s), %d remaining",
                len(expired),
                len(self._sessions),
            )

    def _cleanup_loop(self) -> None:
        """Background loop that periodically purges expired sessions."""
        while not self._stop_event.wait(timeout=self._cleanup_interval):
            self._purge_expired()

active_count property

Number of active (non-expired) sessions.

start()

Start the background cleanup daemon thread.

Source code in src/orionbelt/service/session_manager.py
def start(self) -> None:
    """Start the background cleanup daemon thread."""
    if self._cleanup_thread is not None:
        return
    self._stop_event.clear()
    self._cleanup_thread = threading.Thread(
        target=self._cleanup_loop, daemon=True, name="session-cleanup"
    )
    self._cleanup_thread.start()

stop()

Signal the cleanup thread to stop and wait for it.

Source code in src/orionbelt/service/session_manager.py
def stop(self) -> None:
    """Signal the cleanup thread to stop and wait for it."""
    self._stop_event.set()
    if self._cleanup_thread is not None:
        self._cleanup_thread.join(timeout=5)
        self._cleanup_thread = None

create_session(metadata=None)

Create a new session and return its info.

Raises :class:SessionCapacityError when the global session cap is reached.

Source code in src/orionbelt/service/session_manager.py
def create_session(self, metadata: dict[str, str] | None = None) -> SessionInfo:
    """Create a new session and return its info.

    Raises :class:`SessionCapacityError` when the global session cap
    is reached.
    """
    now_mono = time.monotonic()
    now_wall = datetime.now(UTC)
    session_id = secrets.token_hex(16)  # 32-char hex (128-bit)
    session = _Session(
        session_id=session_id,
        store=ModelStore(max_models=self._max_models),
        created_at=now_wall,
        created_at_mono=now_mono,
        last_accessed=now_mono,
        metadata=metadata or {},
        created_at_wall=now_wall,
        last_accessed_wall=now_wall,
    )
    with self._lock:
        # Count only non-default, non-expired sessions toward the cap.
        active = sum(
            1
            for s in self._sessions.values()
            if s.session_id != _DEFAULT_SESSION_ID and not self._is_expired(s, now_mono)
        )
        if active >= self._max_sessions:
            logger.warning(
                "Session cap reached (%d/%d), rejecting create",
                active,
                self._max_sessions,
            )
            raise SessionCapacityError(
                f"Maximum number of concurrent sessions reached ({self._max_sessions})"
            )
        self._sessions[session_id] = session
    logger.info("Session created: %s", session_id)
    return self._session_info(session)

get_store(session_id)

Get the ModelStore for a session, updating its last-accessed time.

Raises :class:SessionExpiredError if the session has expired. Raises :class:SessionNotFoundError if the session ID is unknown.

Source code in src/orionbelt/service/session_manager.py
def get_store(self, session_id: str) -> ModelStore:
    """Get the ModelStore for a session, updating its last-accessed time.

    Raises :class:`SessionExpiredError` if the session has expired.
    Raises :class:`SessionNotFoundError` if the session ID is unknown.
    """
    now_mono = time.monotonic()
    with self._lock:
        session = self._sessions.get(session_id)
        if session is None:
            raise SessionNotFoundError(f"Session '{session_id}' not found")
        if self._is_expired(session, now_mono):
            reason = self._expiry_reason(session, now_mono)
            del self._sessions[session_id]
            logger.info("Session expired on access: %s (%s)", session_id, reason)
            raise SessionExpiredError(f"Session '{session_id}' has expired ({reason})")
        session.last_accessed = now_mono
        session.last_accessed_wall = datetime.now(UTC)
        return session.store

get_session(session_id)

Get session info (also refreshes last-accessed).

Source code in src/orionbelt/service/session_manager.py
def get_session(self, session_id: str) -> SessionInfo:
    """Get session info (also refreshes last-accessed)."""
    now_mono = time.monotonic()
    with self._lock:
        session = self._sessions.get(session_id)
        if session is None:
            raise SessionNotFoundError(f"Session '{session_id}' not found")
        if self._is_expired(session, now_mono):
            reason = self._expiry_reason(session, now_mono)
            del self._sessions[session_id]
            logger.info("Session expired on access: %s (%s)", session_id, reason)
            raise SessionExpiredError(f"Session '{session_id}' has expired ({reason})")
        session.last_accessed = now_mono
        session.last_accessed_wall = datetime.now(UTC)
        return self._session_info(session)

close_session(session_id)

Explicitly close a session.

Source code in src/orionbelt/service/session_manager.py
def close_session(self, session_id: str) -> None:
    """Explicitly close a session."""
    with self._lock:
        if session_id not in self._sessions:
            raise SessionNotFoundError(f"Session '{session_id}' not found")
        del self._sessions[session_id]
    logger.info("Session closed: %s", session_id)

list_sessions()

Return info for all non-expired sessions (excluding default).

Source code in src/orionbelt/service/session_manager.py
def list_sessions(self) -> list[SessionInfo]:
    """Return info for all non-expired sessions (excluding default)."""
    now_mono = time.monotonic()
    result: list[SessionInfo] = []
    with self._lock:
        for session in self._sessions.values():
            if session.session_id == _DEFAULT_SESSION_ID:
                continue
            if not self._is_expired(session, now_mono):
                result.append(self._session_info(session))
    return result

get_or_create_default()

Get (or lazily create) the default session.

Source code in src/orionbelt/service/session_manager.py
def get_or_create_default(self) -> ModelStore:
    """Get (or lazily create) the default session."""
    with self._lock:
        session = self._sessions.get(_DEFAULT_SESSION_ID)
        if session is not None:
            session.last_accessed = time.monotonic()
            session.last_accessed_wall = datetime.now(UTC)
            return session.store
        now_mono = time.monotonic()
        now_wall = datetime.now(UTC)
        session = _Session(
            session_id=_DEFAULT_SESSION_ID,
            store=ModelStore(max_models=self._max_models),
            created_at=now_wall,
            created_at_mono=now_mono,
            last_accessed=now_mono,
            created_at_wall=now_wall,
            last_accessed_wall=now_wall,
        )
        self._sessions[_DEFAULT_SESSION_ID] = session
        return session.store

SessionInfo

orionbelt.service.session_manager.SessionInfo dataclass

Public session metadata (returned by list/get).

Source code in src/orionbelt/service/session_manager.py
@dataclass
class SessionInfo:
    """Public session metadata (returned by list/get)."""

    session_id: str
    created_at: datetime
    last_accessed_at: datetime
    model_count: int
    metadata: dict[str, str]
    expires_at: datetime
    max_expires_at: datetime

Compiler Pipeline

orionbelt.compiler.pipeline.CompilationPipeline

Orchestrates: Query → Resolution → Planning → AST → SQL.

Source code in src/orionbelt/compiler/pipeline.py
class CompilationPipeline:
    """Orchestrates: Query → Resolution → Planning → AST → SQL."""

    def __init__(self) -> None:
        self._resolver = QueryResolver()
        self._star_planner = StarSchemaPlanner()
        self._cfl_planner = CFLPlanner()

    def compile(
        self,
        query: QueryObject,
        model: SemanticModel,
        dialect_name: str,
    ) -> CompilationResult:
        """Compile a query to SQL for the specified dialect."""
        # Phase 1: Resolution
        resolved = self._resolver.resolve(query, model)

        # Phase 1.5: Fanout detection (skip for CFL — each fact queried independently)
        if not resolved.requires_cfl:
            detect_fanout(resolved, model)

        # Create dialect early so planners can use dialect-aware table formatting
        dialect = DialectRegistry.get(dialect_name)
        qualify_table = lambda obj: dialect.format_table_ref(  # noqa: E731
            obj.database, obj.schema_name, obj.code
        )

        # Phase 2: Planning (star schema or CFL)
        use_cfl = resolved.requires_cfl or resolved.dimensions_exclude
        if use_cfl:
            plan = self._cfl_planner.plan(
                resolved,
                model,
                qualify_table=qualify_table,
                union_by_name=dialect.capabilities.supports_union_all_by_name,
                dialect=dialect,
            )
        else:
            plan = self._star_planner.plan(
                resolved, model, qualify_table=qualify_table, dialect=dialect
            )

        # Phase 2.4: Wrap with PoP CTEs if needed
        wrapped_ast = wrap_with_pop(plan.ast, resolved, model, dialect, qualify_table)

        # Phase 2.5: Wrap with totals CTE if needed
        # Skip totals wrap when PoP or cumulative is active — the combination
        # produces invalid SQL because totals rewrites the AST structure that
        # PoP/cumulative wrappers depend on.
        if resolved.has_totals and (resolved.has_pop or resolved.has_cumulative):
            resolved.warnings.append(
                "total=True measures are ignored when combined with "
                "period-over-period or cumulative metrics in the same query"
            )
        else:
            wrapped_ast = wrap_with_totals(wrapped_ast, resolved)

        # Phase 2.6: Wrap with cumulative CTE if needed
        wrapped_ast = wrap_with_cumulative(wrapped_ast, resolved)

        # Phase 3: Dialect-specific SQL rendering
        codegen = CodeGenerator(dialect)
        sql = codegen.generate(wrapped_ast)

        # Phase 4: SQL validation (non-blocking)
        validation_errors = validate_sql(sql, dialect_name)
        sql_valid = len(validation_errors) == 0
        warnings = resolved.warnings
        if not sql_valid:
            warnings = warnings + [f"SQL validation: {e}" for e in validation_errors]

        # Build explain plan
        explain = self._build_explain(resolved, model, use_cfl, plan)

        return CompilationResult(
            sql=sql,
            dialect=dialect_name,
            resolved=ResolvedInfo(
                fact_tables=resolved.fact_tables,
                dimensions=[d.name for d in resolved.dimensions],
                measures=[m.name for m in resolved.measures],
            ),
            warnings=warnings,
            sql_valid=sql_valid,
            explain=explain,
        )

    @staticmethod
    def _q(name: str) -> str:
        """Quote an identifier for explain output."""
        return f'"{name}"'

    def _build_explain(
        self,
        resolved: ResolvedQuery,
        model: SemanticModel,
        use_cfl: bool,
        plan: QueryPlan,
    ) -> ExplainPlan:
        """Build the explain plan from resolution results."""
        q = self._q

        # Planner choice
        if use_cfl:
            if resolved.dimensions_exclude:
                planner = "CFL"
                planner_reason = (
                    "dimensionsExclude anti-join — "
                    "CROSS JOIN of distinct values EXCEPT existing combinations"
                )
            else:
                planner = "CFL"
                sources = ", ".join(q(s) for s in sorted(resolved.measure_source_objects))
                planner_reason = (
                    f"Measures reference independent fact tables ({sources}) — "
                    f"Composite Fact Layer merges them via UNION ALL"
                )
        else:
            planner = "Star Schema"
            planner_reason = (
                "All requested objects are reachable from a single base via directed joins"
            )

        # Base object — explain should reflect actual selection logic
        base = resolved.base_object
        if resolved.measure_source_objects:
            if use_cfl and len(resolved.measure_source_objects) > 1:
                base_reason = (
                    "Not applicable — each CFL leg uses its own common root (see cfl_legs)"
                )
            elif len(resolved.measure_source_objects) > 1:
                sources = ", ".join(q(s) for s in sorted(resolved.measure_source_objects))
                base_reason = (
                    f"{q(base)} selected as base — most connected fact table "
                    f"among measure sources ({sources})"
                )
            else:
                base_reason = f"{q(base)} selected as base — sole measure source object"
        elif len(resolved.required_objects) > 1:
            base_reason = (
                f"{q(base)} selected as base — common root that can reach "
                f"all required objects via directed joins"
            )
        else:
            base_reason = f"{q(base)} selected as base for single-object query"

        # Joins — for CFL queries the per-leg joins are more informative,
        # so only include resolution-level joins for star schema queries.
        explain_joins: list[ExplainJoin] = []
        if not use_cfl:
            for step in resolved.join_steps:
                join_cols = [
                    f"{fc} = {tc}"
                    for fc, tc in zip(step.from_columns, step.to_columns, strict=True)
                ]
                if step.reversed:
                    reason = (
                        f"Reversed join from {q(step.from_object)} to {q(step.to_object)} — "
                        f"original join was defined in the opposite direction"
                    )
                else:
                    reason = (
                        f"Join {q(step.from_object)}{q(step.to_object)} to include "
                        f"columns needed by the query"
                    )
                explain_joins.append(
                    ExplainJoin(
                        from_object=step.from_object,
                        to_object=step.to_object,
                        join_columns=join_cols,
                        reason=reason,
                    )
                )

        # CFL leg details
        cfl_leg_explains: list[ExplainCflLeg] = []
        for leg in plan.cfl_legs:
            cfl_leg_explains.append(
                ExplainCflLeg(
                    measure_source=leg.measure_source,
                    common_root=leg.common_root,
                    reason=leg.reason,
                    measures=leg.measures,
                    joins=leg.joins,
                )
            )

        return ExplainPlan(
            planner=planner,
            planner_reason=planner_reason,
            base_object=base,
            base_object_reason=base_reason,
            joins=explain_joins,
            where_filter_count=len(resolved.where_filters),
            having_filter_count=len(resolved.having_filters),
            has_totals=resolved.has_totals,
            has_cumulative=resolved.has_cumulative,
            has_pop=resolved.has_pop,
            cfl_legs=cfl_leg_explains,
        )

compile(query, model, dialect_name)

Compile a query to SQL for the specified dialect.

Source code in src/orionbelt/compiler/pipeline.py
def compile(
    self,
    query: QueryObject,
    model: SemanticModel,
    dialect_name: str,
) -> CompilationResult:
    """Compile a query to SQL for the specified dialect."""
    # Phase 1: Resolution
    resolved = self._resolver.resolve(query, model)

    # Phase 1.5: Fanout detection (skip for CFL — each fact queried independently)
    if not resolved.requires_cfl:
        detect_fanout(resolved, model)

    # Create dialect early so planners can use dialect-aware table formatting
    dialect = DialectRegistry.get(dialect_name)
    qualify_table = lambda obj: dialect.format_table_ref(  # noqa: E731
        obj.database, obj.schema_name, obj.code
    )

    # Phase 2: Planning (star schema or CFL)
    use_cfl = resolved.requires_cfl or resolved.dimensions_exclude
    if use_cfl:
        plan = self._cfl_planner.plan(
            resolved,
            model,
            qualify_table=qualify_table,
            union_by_name=dialect.capabilities.supports_union_all_by_name,
            dialect=dialect,
        )
    else:
        plan = self._star_planner.plan(
            resolved, model, qualify_table=qualify_table, dialect=dialect
        )

    # Phase 2.4: Wrap with PoP CTEs if needed
    wrapped_ast = wrap_with_pop(plan.ast, resolved, model, dialect, qualify_table)

    # Phase 2.5: Wrap with totals CTE if needed
    # Skip totals wrap when PoP or cumulative is active — the combination
    # produces invalid SQL because totals rewrites the AST structure that
    # PoP/cumulative wrappers depend on.
    if resolved.has_totals and (resolved.has_pop or resolved.has_cumulative):
        resolved.warnings.append(
            "total=True measures are ignored when combined with "
            "period-over-period or cumulative metrics in the same query"
        )
    else:
        wrapped_ast = wrap_with_totals(wrapped_ast, resolved)

    # Phase 2.6: Wrap with cumulative CTE if needed
    wrapped_ast = wrap_with_cumulative(wrapped_ast, resolved)

    # Phase 3: Dialect-specific SQL rendering
    codegen = CodeGenerator(dialect)
    sql = codegen.generate(wrapped_ast)

    # Phase 4: SQL validation (non-blocking)
    validation_errors = validate_sql(sql, dialect_name)
    sql_valid = len(validation_errors) == 0
    warnings = resolved.warnings
    if not sql_valid:
        warnings = warnings + [f"SQL validation: {e}" for e in validation_errors]

    # Build explain plan
    explain = self._build_explain(resolved, model, use_cfl, plan)

    return CompilationResult(
        sql=sql,
        dialect=dialect_name,
        resolved=ResolvedInfo(
            fact_tables=resolved.fact_tables,
            dimensions=[d.name for d in resolved.dimensions],
            measures=[m.name for m in resolved.measures],
        ),
        warnings=warnings,
        sql_valid=sql_valid,
        explain=explain,
    )

Query Resolution

orionbelt.compiler.resolution.QueryResolver

Resolves a QueryObject + SemanticModel into a ResolvedQuery.

Source code in src/orionbelt/compiler/resolution.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
class QueryResolver:
    """Resolves a QueryObject + SemanticModel into a ResolvedQuery."""

    def resolve(self, query: QueryObject, model: SemanticModel) -> ResolvedQuery:
        ctx = _ResolutionContext(
            model=model,
            result=ResolvedQuery(
                limit=query.limit,
                offset=query.offset,
                use_path_names=list(query.use_path_names),
            ),
        )

        # Build global column lookup: col_name → (object_name, source_column)
        for obj_name, obj in model.data_objects.items():
            for col_name, col_obj in obj.columns.items():
                ctx.global_columns[col_name] = (obj_name, col_obj.code)

        # 1. Resolve dimensions
        for dim_str in query.select.dimensions:
            dim_ref = DimensionRef.parse(dim_str)
            resolved_dim = self._resolve_dimension(ctx, dim_ref)
            if resolved_dim:
                ctx.result.dimensions.append(resolved_dim)
                ctx.result.required_objects.add(resolved_dim.object_name)

        # 2. Resolve measures and track their source objects
        for measure_name in query.select.measures:
            resolved_meas = self._resolve_measure(ctx, measure_name)
            if resolved_meas:
                ctx.result.measures.append(resolved_meas)
                source_objs = self._get_measure_source_objects(ctx, measure_name)
                ctx.result.measure_source_objects.update(source_objs)
                ctx.result.required_objects.update(source_objs)

        # 3. Determine base object (the one with most joins / most measures)
        ctx.result.base_object = self._select_base_object(ctx)
        if ctx.result.base_object:
            ctx.result.required_objects.add(ctx.result.base_object)

        # Detect multi-fact: CFL is needed only when measure source objects
        # span multiple independent fact tables.
        if len(ctx.result.measure_source_objects) > 1:
            graph = JoinGraph(model, use_path_names=query.use_path_names or None)
            reachable = graph.descendants(ctx.result.base_object)
            unreachable = ctx.result.measure_source_objects - reachable - {ctx.result.base_object}
            if unreachable:
                ctx.result.requires_cfl = True

        # Dimension-only queries: when dimensions span independent branches,
        # join through intermediate bridge/fact tables (no CFL needed).
        # Add intermediate tables from the join steps to required_objects
        # so the star schema planner includes them.
        if not ctx.result.measure_source_objects and ctx.result.dimensions:
            dim_objects = {d.object_name for d in ctx.result.dimensions}
            if not dim_objects <= {ctx.result.base_object}:
                graph = JoinGraph(model, use_path_names=query.use_path_names or None)
                steps = graph.find_join_path({ctx.result.base_object}, dim_objects)
                for step in steps:
                    ctx.result.required_objects.add(step.from_object)
                    ctx.result.required_objects.add(step.to_object)

        # Validate dimensionsExclude constraints
        if query.dimensions_exclude:
            if query.select.measures:
                ctx.errors.append(
                    SemanticError(
                        code="DIMENSIONS_EXCLUDE_WITH_MEASURES",
                        message="dimensionsExclude cannot be combined with measures",
                        path="select",
                    )
                )
            elif len(ctx.result.dimensions) < 2:
                ctx.errors.append(
                    SemanticError(
                        code="DIMENSIONS_EXCLUDE_INSUFFICIENT",
                        message="dimensionsExclude requires at least 2 dimensions",
                        path="select.dimensions",
                    )
                )
            else:
                ctx.result.dimensions_exclude = True

        # 4. Validate usePathNames before building join graph
        self._validate_use_path_names(ctx, query.use_path_names)

        # 5. Resolve join paths
        ctx.graph = JoinGraph(model, use_path_names=query.use_path_names or None)
        if ctx.result.base_object and len(ctx.result.required_objects) > 1:
            ctx.result.join_steps = ctx.graph.find_join_path(
                {ctx.result.base_object}, ctx.result.required_objects
            )

        # Build set of all objects present in the query's join graph
        if ctx.result.base_object:
            ctx.joined_objects.add(ctx.result.base_object)
        for step in ctx.result.join_steps:
            ctx.joined_objects.add(step.to_object)

        # 6. Classify filters — filters may auto-extend the join path
        for qfi in query.where:
            resolved_filter = self._resolve_filter_item(ctx, qfi, is_having=False)
            if resolved_filter:
                ctx.result.where_filters.append(resolved_filter)

        for qfi in query.having:
            resolved_filter = self._resolve_filter_item(ctx, qfi, is_having=True)
            if resolved_filter:
                ctx.result.having_filters.append(resolved_filter)

        # 7. Resolve order by — must reference a dimension or measure in SELECT
        select_count = len(ctx.result.dimensions) + len(ctx.result.measures)
        for ob in query.order_by:
            expr = self._resolve_order_by_field(ctx, ob.field, select_count)
            if expr:
                ctx.result.order_by_exprs.append((expr, ob.direction == "desc"))

        if ctx.errors:
            raise ResolutionError(ctx.errors)

        return ctx.result

    # -- dimensions ----------------------------------------------------------

    def _resolve_dimension(
        self, ctx: _ResolutionContext, ref: DimensionRef
    ) -> ResolvedDimension | None:
        """Resolve a dimension reference to its physical column."""
        dim = ctx.model.dimensions.get(ref.name)
        if dim is None:
            ctx.errors.append(
                SemanticError(
                    code="UNKNOWN_DIMENSION",
                    message=f"Unknown dimension '{ref.name}'",
                    path="select.dimensions",
                )
            )
            return None

        obj_name = dim.view
        col_name = dim.column
        obj = ctx.model.data_objects.get(obj_name)
        if obj is None:
            ctx.errors.append(
                SemanticError(
                    code="UNKNOWN_DATA_OBJECT",
                    message=f"Dimension '{ref.name}' references unknown data object '{obj_name}'",
                )
            )
            return None

        vf = obj.columns.get(col_name)
        source_col = vf.code if vf else col_name

        return ResolvedDimension(
            name=ref.name,
            object_name=obj_name,
            column_name=col_name,
            source_column=source_col,
            grain=ref.grain or dim.time_grain,
        )

    # -- measures & metrics --------------------------------------------------

    def _resolve_measure(self, ctx: _ResolutionContext, name: str) -> ResolvedMeasure | None:
        """Resolve a measure name to its aggregate expression."""
        measure = ctx.model.measures.get(name)
        if measure is None:
            metric = ctx.model.metrics.get(name)
            if metric:
                return self._resolve_metric(ctx, name, metric)
            ctx.errors.append(
                SemanticError(
                    code="UNKNOWN_MEASURE",
                    message=f"Unknown measure '{name}'",
                    path="select.measures",
                )
            )
            return None

        expr = self._build_measure_expr(ctx, measure)
        return ResolvedMeasure(
            name=name,
            aggregation=measure.aggregation,
            expression=expr,
            is_expression=measure.expression is not None,
            total=measure.total,
        )

    def _build_measure_expr(self, ctx: _ResolutionContext, measure: Measure) -> Expr:
        """Build the aggregate expression for a measure."""
        if measure.expression:
            return self._expand_expression(ctx, measure)

        # Build column references for all columns
        args: list[Expr] = []
        if measure.columns:
            for ref in measure.columns:
                obj_name = ref.view or ""
                col_name = ref.column or ""
                obj = ctx.model.data_objects.get(obj_name)
                source = obj.columns[col_name].code if obj and col_name in obj.columns else col_name
                args.append(ColumnRef(name=source, table=obj_name))
        if not args:
            args = [Literal.number(1)]

        agg = measure.aggregation.upper()
        distinct = measure.distinct
        if agg == "COUNT_DISTINCT":
            agg = "COUNT"
            distinct = True

        # LISTAGG: attach separator and optional ordering
        separator: str | None = None
        order_by: list[OrderByItem] = []
        if agg == "LISTAGG":
            separator = measure.delimiter if measure.delimiter is not None else ","
            if measure.within_group:
                wg = measure.within_group
                wg_obj_name = wg.column.view or ""
                wg_col_name = wg.column.column or ""
                wg_obj = ctx.model.data_objects.get(wg_obj_name)
                wg_source = (
                    wg_obj.columns[wg_col_name].code
                    if wg_obj and wg_col_name in wg_obj.columns
                    else wg_col_name
                )
                order_by = [
                    OrderByItem(
                        expr=ColumnRef(name=wg_source, table=wg_obj_name),
                        desc=wg.order.upper() == "DESC",
                    )
                ]

        result = FunctionCall(
            name=agg,
            args=args,
            distinct=distinct,
            order_by=order_by,
            separator=separator,
        )
        return self._apply_measure_filters(ctx, measure, result)

    def _expand_expression(self, ctx: _ResolutionContext, measure: Measure) -> Expr:
        """Expand a measure expression with ``{[DataObject].[Column]}`` refs into AST."""
        formula = measure.expression or ""
        agg = measure.aggregation.upper()

        tokens = tokenize_measure_expression(formula, ctx.model)
        inner = parse_expression(tokens)

        distinct = measure.distinct
        if agg == "COUNT_DISTINCT":
            agg = "COUNT"
            distinct = True

        result = FunctionCall(
            name=agg,
            args=[inner],
            distinct=distinct,
        )
        return self._apply_measure_filters(ctx, measure, result)

    @staticmethod
    def _apply_measure_filters(
        ctx: _ResolutionContext, measure: Measure, func: FunctionCall
    ) -> FunctionCall:
        """Wrap aggregate args with CASE WHEN if the measure has filters."""
        if not measure.filters:
            return func
        condition = build_measure_filter_condition(measure.filters, ctx.model, ctx.errors)
        if condition is None:
            return func
        wrapped_args: list[Expr] = [CaseExpr(when_clauses=[(condition, arg)]) for arg in func.args]
        return FunctionCall(
            name=func.name,
            args=wrapped_args,
            distinct=func.distinct,
            order_by=func.order_by,
            separator=func.separator,
        )

    def _resolve_metric(
        self, ctx: _ResolutionContext, name: str, metric: Metric
    ) -> ResolvedMeasure | None:
        """Resolve a metric to its combined expression."""
        if metric.type == MetricType.CUMULATIVE:
            return self._resolve_cumulative_metric(ctx, name, metric)
        if metric.type == MetricType.PERIOD_OVER_PERIOD:
            return self._resolve_pop_metric(ctx, name, metric)
        return self._resolve_derived_metric(ctx, name, metric)

    def _resolve_derived_metric(
        self, ctx: _ResolutionContext, name: str, metric: Metric
    ) -> ResolvedMeasure | None:
        """Resolve a derived metric to its combined expression."""
        formula = metric.expression

        # Extract and resolve each component measure
        component_names = re.findall(r"\{\[([^\]]+)\]\}", formula or "")
        for comp_name in component_names:
            if comp_name not in ctx.result.metric_components:
                comp = self._resolve_measure(ctx, comp_name)
                if comp:
                    ctx.result.metric_components[comp_name] = comp

        # Parse the formula into an AST tree
        try:
            tokens = tokenize_metric_formula(formula or "")
            parsed_expr = parse_expression(tokens)
        except Exception as exc:
            ctx.errors.append(
                SemanticError(
                    code="INVALID_METRIC_EXPRESSION",
                    message=f"Metric '{name}' has invalid expression: {exc}",
                    path=f"metrics.{name}.expression",
                )
            )
            return None

        return ResolvedMeasure(
            name=name,
            aggregation="",
            expression=parsed_expr,
            component_measures=component_names,
            is_expression=True,
        )

    def _resolve_cumulative_metric(
        self, ctx: _ResolutionContext, name: str, metric: Metric
    ) -> ResolvedMeasure | None:
        """Resolve a cumulative metric referencing an existing measure."""
        if metric.measure is None:
            ctx.errors.append(
                SemanticError(
                    code="INVALID_METRIC",
                    message=f"Cumulative metric '{name}' missing required 'measure' field",
                    path=f"metrics.{name}",
                )
            )
            return None
        if metric.time_dimension is None:
            ctx.errors.append(
                SemanticError(
                    code="INVALID_METRIC",
                    message=f"Cumulative metric '{name}' missing required 'timeDimension' field",
                    path=f"metrics.{name}",
                )
            )
            return None

        # Validate referenced measure exists
        base_measure = ctx.model.measures.get(metric.measure)
        if base_measure is None:
            ctx.errors.append(
                SemanticError(
                    code="UNKNOWN_MEASURE",
                    message=(
                        f"Cumulative metric '{name}' references unknown measure '{metric.measure}'"
                    ),
                    path=f"metrics.{name}.measure",
                )
            )
            return None

        # Validate timeDimension is a known dimension
        dim = ctx.model.dimensions.get(metric.time_dimension)
        if dim is None:
            ctx.errors.append(
                SemanticError(
                    code="UNKNOWN_DIMENSION",
                    message=(
                        f"Cumulative metric '{name}' references unknown "
                        f"timeDimension '{metric.time_dimension}'"
                    ),
                    path=f"metrics.{name}.timeDimension",
                )
            )
            return None

        # Validate timeDimension is in the query's selected dimensions
        dim_names = {d.name for d in ctx.result.dimensions}
        if metric.time_dimension not in dim_names:
            ctx.errors.append(
                SemanticError(
                    code="CUMULATIVE_TIME_DIMENSION_NOT_IN_SELECT",
                    message=(
                        f"Cumulative metric '{name}' requires timeDimension "
                        f"'{metric.time_dimension}' to be in the query's selected dimensions"
                    ),
                    path=f"metrics.{name}.timeDimension",
                )
            )
            return None

        # Resolve the base measure as a component (reuse existing resolution)
        if metric.measure not in ctx.result.metric_components:
            comp = self._resolve_measure(ctx, metric.measure)
            if comp:
                ctx.result.metric_components[metric.measure] = comp

        # The cumulative metric's expression is a placeholder ColumnRef to the base measure
        # The actual window function is built during the cumulative_wrap phase
        return ResolvedMeasure(
            name=name,
            aggregation=base_measure.aggregation,
            expression=ColumnRef(name=metric.measure),
            is_expression=True,
            component_measures=[metric.measure],
            is_cumulative=True,
            cumulative_measure=metric.measure,
            cumulative_time_dimension=metric.time_dimension,
            cumulative_type=metric.cumulative_type,
            cumulative_window=metric.window,
            cumulative_grain_to_date=metric.grain_to_date,
        )

    def _resolve_pop_metric(
        self, ctx: _ResolutionContext, name: str, metric: Metric
    ) -> ResolvedMeasure | None:
        """Resolve a period-over-period metric."""
        if metric.period_over_period is None:
            ctx.errors.append(
                SemanticError(
                    code="INVALID_METRIC",
                    message=f"PoP metric '{name}' missing required 'periodOverPeriod' field",
                    path=f"metrics.{name}",
                )
            )
            return None
        if metric.expression is None:
            ctx.errors.append(
                SemanticError(
                    code="INVALID_METRIC",
                    message=f"PoP metric '{name}' missing required 'expression' field",
                    path=f"metrics.{name}",
                )
            )
            return None

        pop = metric.period_over_period

        # Validate timeDimension is a known dimension
        dim = ctx.model.dimensions.get(pop.time_dimension)
        if dim is None:
            ctx.errors.append(
                SemanticError(
                    code="POP_UNKNOWN_TIME_DIMENSION",
                    message=(
                        f"Period-over-period metric '{name}' references unknown "
                        f"time dimension '{pop.time_dimension}'"
                    ),
                    path=f"metrics.{name}.periodOverPeriod.timeDimension",
                )
            )
            return None

        # Validate timeDimension is in the query's selected dimensions
        dim_names = {d.name for d in ctx.result.dimensions}
        if pop.time_dimension not in dim_names:
            ctx.errors.append(
                SemanticError(
                    code="POP_TIME_DIMENSION_NOT_IN_SELECT",
                    message=(
                        f"Period-over-period metric '{name}' requires time dimension "
                        f"'{pop.time_dimension}' to be in the query's selected dimensions"
                    ),
                    path=f"metrics.{name}.periodOverPeriod.timeDimension",
                )
            )
            return None

        # Validate offset is non-zero
        if pop.offset == 0:
            ctx.errors.append(
                SemanticError(
                    code="POP_INVALID_OFFSET",
                    message=(
                        f"Period-over-period metric '{name}' has offset=0 "
                        f"(must be non-zero, e.g. -1 for previous period)"
                    ),
                    path=f"metrics.{name}.periodOverPeriod.offset",
                )
            )
            return None

        # Resolve the expression (same as derived — parse {[Measure Name]} refs)
        component_names = re.findall(r"\{\[([^\]]+)\]\}", metric.expression)

        # PoP comparison logic only supports single-measure expressions
        if len(component_names) > 1:
            ctx.errors.append(
                SemanticError(
                    code="POP_MULTI_MEASURE_NOT_SUPPORTED",
                    message=(
                        f"Period-over-period metric '{name}' references multiple measures "
                        f"({', '.join(component_names)}). PoP comparison currently supports "
                        f"only single-measure expressions."
                    ),
                    path=f"metrics.{name}.expression",
                )
            )
            return None

        for comp_name in component_names:
            if comp_name not in ctx.result.metric_components:
                comp = self._resolve_measure(ctx, comp_name)
                if comp:
                    ctx.result.metric_components[comp_name] = comp

        try:
            tokens = tokenize_metric_formula(metric.expression)
            parsed_expr = parse_expression(tokens)
        except Exception as exc:
            ctx.errors.append(
                SemanticError(
                    code="INVALID_METRIC_EXPRESSION",
                    message=f"Metric '{name}' has invalid expression: {exc}",
                    path=f"metrics.{name}.expression",
                )
            )
            return None

        # Use the first component measure as the base (for single-measure PoP)
        pop_base = component_names[0] if component_names else None

        return ResolvedMeasure(
            name=name,
            aggregation="",
            expression=parsed_expr,
            component_measures=component_names,
            is_expression=True,
            is_pop=True,
            pop_base_measure=pop_base,
            pop_time_dimension=pop.time_dimension,
            pop_grain=pop.grain,
            pop_offset=pop.offset,
            pop_offset_grain=pop.offset_grain,
            pop_comparison=pop.comparison,
        )

    def _get_measure_source_objects(self, ctx: _ResolutionContext, name: str) -> set[str]:
        """Extract all source data objects for a measure or metric."""
        result: set[str] = set()

        measure = ctx.model.measures.get(name)
        if measure:
            for cref in measure.columns:
                if cref.view:
                    result.add(cref.view)
            if measure.expression:
                col_refs = re.findall(r"\{\[([^\]]+)\]\.\[([^\]]+)\]\}", measure.expression)
                for obj_name, _col_name in col_refs:
                    result.add(obj_name)
            for fi in measure.filters:
                collect_measure_filter_objects(fi, result)
            return result

        metric = ctx.model.metrics.get(name)
        if metric:
            if metric.type == MetricType.CUMULATIVE and metric.measure:
                # Cumulative metric: source objects come from the referenced measure
                result.update(self._get_measure_source_objects(ctx, metric.measure))
            elif metric.expression:
                # Derived or PoP metric: parse expression for measure references
                measure_refs = re.findall(r"\{\[([^\]]+)\]\}", metric.expression)
                for ref_name in measure_refs:
                    result.update(self._get_measure_source_objects(ctx, ref_name))

        return result

    # -- base object selection -----------------------------------------------

    def _select_base_object(self, ctx: _ResolutionContext) -> str:
        """Select the base (fact) object — prefer measure source objects with most joins."""
        if ctx.result.measure_source_objects:
            best = ""
            best_joins = -1
            for obj_name in sorted(ctx.result.measure_source_objects):
                obj = ctx.model.data_objects.get(obj_name)
                n = len(obj.joins) if obj else 0
                if n > best_joins:
                    best = obj_name
                    best_joins = n
            if best:
                return best

        # Dimension-only: use JoinGraph to find the deepest ancestor
        # (possibly an intermediate fact/bridge table) that can reach
        # all required dimension objects via directed join paths.
        if len(ctx.result.required_objects) > 1:
            graph = JoinGraph(ctx.model, use_path_names=ctx.result.use_path_names or None)
            root = graph.find_common_root(ctx.result.required_objects)
            if root:
                return root

        for obj_name in sorted(ctx.result.required_objects):
            obj = ctx.model.data_objects.get(obj_name)
            if obj and obj.joins:
                return obj_name

        if ctx.result.required_objects:
            return next(iter(sorted(ctx.result.required_objects)))
        if ctx.model.data_objects:
            return next(iter(ctx.model.data_objects))
        return ""

    # -- usePathNames validation ---------------------------------------------

    def _validate_use_path_names(
        self, ctx: _ResolutionContext, use_path_names: list[UsePathName]
    ) -> None:
        """Validate usePathNames references."""
        for upn in use_path_names:
            if upn.source not in ctx.model.data_objects:
                ctx.errors.append(
                    SemanticError(
                        code="UNKNOWN_DATA_OBJECT",
                        message=f"usePathNames references unknown data object '{upn.source}'",
                        path="usePathNames",
                    )
                )
                continue
            if upn.target not in ctx.model.data_objects:
                ctx.errors.append(
                    SemanticError(
                        code="UNKNOWN_DATA_OBJECT",
                        message=f"usePathNames references unknown data object '{upn.target}'",
                        path="usePathNames",
                    )
                )
                continue
            source_obj = ctx.model.data_objects[upn.source]
            found = any(
                j.join_to == upn.target and j.secondary and j.path_name == upn.path_name
                for j in source_obj.joins
            )
            if not found:
                ctx.errors.append(
                    SemanticError(
                        code="UNKNOWN_PATH_NAME",
                        message=(
                            f"No secondary join with pathName '{upn.path_name}' "
                            f"from '{upn.source}' to '{upn.target}'"
                        ),
                        path="usePathNames",
                    )
                )

    # -- filters -------------------------------------------------------------

    def _resolve_filter_object(
        self,
        ctx: _ResolutionContext,
        obj_name: str,
        filter_path: str,
        field_label: str,
    ) -> bool:
        """Ensure *obj_name* is joined; auto-extend if reachable. Return success."""
        if obj_name in ctx.joined_objects:
            return True
        reachable = False
        if ctx.graph is not None:
            for joined_obj in list(ctx.joined_objects):
                if obj_name in ctx.graph.descendants(joined_obj):
                    reachable = True
                    break
        if not reachable:
            ctx.errors.append(
                SemanticError(
                    code="UNREACHABLE_FILTER_FIELD",
                    message=(
                        f"Filter field '{field_label}' references data object "
                        f"'{obj_name}' which is not reachable from "
                        f"the query's join graph"
                    ),
                    path=filter_path,
                )
            )
            return False
        if ctx.graph is not None:
            new_steps = ctx.graph.find_join_path(ctx.joined_objects, {obj_name})
            for step in new_steps:
                if step.to_object not in ctx.joined_objects:
                    ctx.result.join_steps.append(step)
                    ctx.joined_objects.add(step.to_object)
                    ctx.result.required_objects.add(step.to_object)
        return True

    def _resolve_filter_item(
        self, ctx: _ResolutionContext, item: QueryFilterItem, *, is_having: bool
    ) -> ResolvedFilter | None:
        """Resolve a filter item (leaf or group) to a physical expression."""
        if isinstance(item, QueryFilter):
            return self._resolve_filter(ctx, item, is_having=is_having)
        return self._resolve_filter_group(ctx, item, is_having=is_having)

    def _resolve_filter_group(
        self, ctx: _ResolutionContext, group: QueryFilterGroup, *, is_having: bool
    ) -> ResolvedFilter | None:
        """Resolve a filter group recursively, combining with AND/OR."""
        child_exprs: list[Expr] = []
        for child in group.filters:
            resolved = self._resolve_filter_item(ctx, child, is_having=is_having)
            if resolved:
                child_exprs.append(resolved.expression)

        if not child_exprs:
            return None

        # Combine children with the group's logic
        op = "AND" if group.logic == "and" else "OR"
        combined: Expr = child_exprs[0]
        for expr in child_exprs[1:]:
            combined = BinaryOp(left=combined, op=op, right=expr)

        # Optionally negate
        if group.negated:
            combined = UnaryOp(op="NOT", operand=combined)

        return ResolvedFilter(expression=combined, is_aggregate=is_having)

    def _resolve_filter(
        self, ctx: _ResolutionContext, qf: QueryFilter, *, is_having: bool
    ) -> ResolvedFilter | None:
        """Resolve a query filter to a physical expression.

        Filter fields can reference:
        1. A dimension name (e.g. ``"Order Priority"``)
        2. A qualified column ``"DataObject.Column"`` (e.g. ``"Orders.Order Priority"``)
        3. For HAVING filters, a measure name (e.g. ``"Revenue"``)

        If the referenced data object is reachable but not yet joined, the
        join path is auto-extended.
        """
        filter_path = "having" if is_having else "where"

        # 1. Try dimension name
        dim = ctx.model.dimensions.get(qf.field)
        if dim:
            obj_name = dim.view
            if not self._resolve_filter_object(ctx, obj_name, filter_path, qf.field):
                return None
            col_name = dim.column
            obj = ctx.model.data_objects.get(obj_name)
            source = obj.columns[col_name].code if obj and col_name in obj.columns else col_name
            col_expr: Expr = ColumnRef(name=source, table=obj_name)

        # 2. HAVING: try measure name
        elif is_having and qf.field in ctx.model.measures:
            col_expr = ColumnRef(name=qf.field)

        # 3. Try qualified column: "DataObject.Column"
        elif "." in qf.field:
            parts = qf.field.split(".", 1)
            obj_name, col_name = parts[0].strip(), parts[1].strip()
            obj = ctx.model.data_objects.get(obj_name)
            if obj is None:
                ctx.errors.append(
                    SemanticError(
                        code="UNKNOWN_FILTER_FIELD",
                        message=(f"Unknown data object '{obj_name}' in filter field '{qf.field}'"),
                        path=filter_path,
                    )
                )
                return None
            if col_name not in obj.columns:
                ctx.errors.append(
                    SemanticError(
                        code="UNKNOWN_FILTER_FIELD",
                        message=(
                            f"Unknown column '{col_name}' in data object "
                            f"'{obj_name}' for filter field '{qf.field}'"
                        ),
                        path=filter_path,
                    )
                )
                return None
            if not self._resolve_filter_object(ctx, obj_name, filter_path, qf.field):
                return None
            source = obj.columns[col_name].code
            col_expr = ColumnRef(name=source, table=obj_name)

        else:
            ctx.errors.append(
                SemanticError(
                    code="UNKNOWN_FILTER_FIELD",
                    message=f"Unknown filter field '{qf.field}'",
                    path=filter_path,
                )
            )
            return None

        filter_expr = build_filter_expr(col_expr, qf, ctx.errors)
        if filter_expr is None:
            return None
        return ResolvedFilter(expression=filter_expr, is_aggregate=is_having)

    # -- order by ------------------------------------------------------------

    def _resolve_order_by_field(
        self, ctx: _ResolutionContext, field_name: str, select_count: int
    ) -> Expr | None:
        """Resolve an order-by field to its expression."""
        for dim in ctx.result.dimensions:
            if dim.name == field_name:
                return ColumnRef(name=dim.source_column, table=dim.object_name)

        for meas in ctx.result.measures:
            if meas.name == field_name:
                return meas.expression

        if field_name.isdigit():
            pos = int(field_name)
            if 1 <= pos <= select_count:
                return Literal.number(pos)
            ctx.errors.append(
                SemanticError(
                    code="INVALID_ORDER_BY_POSITION",
                    message=(
                        f"ORDER BY position {pos} is out of range "
                        f"(SELECT has {select_count} columns)"
                    ),
                    path="order_by",
                )
            )
            return None

        ctx.errors.append(
            SemanticError(
                code="UNKNOWN_ORDER_BY_FIELD",
                message=(
                    f"ORDER BY field '{field_name}' is not a dimension "
                    f"or measure in the query's SELECT"
                ),
                path="order_by",
            )
        )
        return None

resolve(query, model)

Source code in src/orionbelt/compiler/resolution.py
def resolve(self, query: QueryObject, model: SemanticModel) -> ResolvedQuery:
    ctx = _ResolutionContext(
        model=model,
        result=ResolvedQuery(
            limit=query.limit,
            offset=query.offset,
            use_path_names=list(query.use_path_names),
        ),
    )

    # Build global column lookup: col_name → (object_name, source_column)
    for obj_name, obj in model.data_objects.items():
        for col_name, col_obj in obj.columns.items():
            ctx.global_columns[col_name] = (obj_name, col_obj.code)

    # 1. Resolve dimensions
    for dim_str in query.select.dimensions:
        dim_ref = DimensionRef.parse(dim_str)
        resolved_dim = self._resolve_dimension(ctx, dim_ref)
        if resolved_dim:
            ctx.result.dimensions.append(resolved_dim)
            ctx.result.required_objects.add(resolved_dim.object_name)

    # 2. Resolve measures and track their source objects
    for measure_name in query.select.measures:
        resolved_meas = self._resolve_measure(ctx, measure_name)
        if resolved_meas:
            ctx.result.measures.append(resolved_meas)
            source_objs = self._get_measure_source_objects(ctx, measure_name)
            ctx.result.measure_source_objects.update(source_objs)
            ctx.result.required_objects.update(source_objs)

    # 3. Determine base object (the one with most joins / most measures)
    ctx.result.base_object = self._select_base_object(ctx)
    if ctx.result.base_object:
        ctx.result.required_objects.add(ctx.result.base_object)

    # Detect multi-fact: CFL is needed only when measure source objects
    # span multiple independent fact tables.
    if len(ctx.result.measure_source_objects) > 1:
        graph = JoinGraph(model, use_path_names=query.use_path_names or None)
        reachable = graph.descendants(ctx.result.base_object)
        unreachable = ctx.result.measure_source_objects - reachable - {ctx.result.base_object}
        if unreachable:
            ctx.result.requires_cfl = True

    # Dimension-only queries: when dimensions span independent branches,
    # join through intermediate bridge/fact tables (no CFL needed).
    # Add intermediate tables from the join steps to required_objects
    # so the star schema planner includes them.
    if not ctx.result.measure_source_objects and ctx.result.dimensions:
        dim_objects = {d.object_name for d in ctx.result.dimensions}
        if not dim_objects <= {ctx.result.base_object}:
            graph = JoinGraph(model, use_path_names=query.use_path_names or None)
            steps = graph.find_join_path({ctx.result.base_object}, dim_objects)
            for step in steps:
                ctx.result.required_objects.add(step.from_object)
                ctx.result.required_objects.add(step.to_object)

    # Validate dimensionsExclude constraints
    if query.dimensions_exclude:
        if query.select.measures:
            ctx.errors.append(
                SemanticError(
                    code="DIMENSIONS_EXCLUDE_WITH_MEASURES",
                    message="dimensionsExclude cannot be combined with measures",
                    path="select",
                )
            )
        elif len(ctx.result.dimensions) < 2:
            ctx.errors.append(
                SemanticError(
                    code="DIMENSIONS_EXCLUDE_INSUFFICIENT",
                    message="dimensionsExclude requires at least 2 dimensions",
                    path="select.dimensions",
                )
            )
        else:
            ctx.result.dimensions_exclude = True

    # 4. Validate usePathNames before building join graph
    self._validate_use_path_names(ctx, query.use_path_names)

    # 5. Resolve join paths
    ctx.graph = JoinGraph(model, use_path_names=query.use_path_names or None)
    if ctx.result.base_object and len(ctx.result.required_objects) > 1:
        ctx.result.join_steps = ctx.graph.find_join_path(
            {ctx.result.base_object}, ctx.result.required_objects
        )

    # Build set of all objects present in the query's join graph
    if ctx.result.base_object:
        ctx.joined_objects.add(ctx.result.base_object)
    for step in ctx.result.join_steps:
        ctx.joined_objects.add(step.to_object)

    # 6. Classify filters — filters may auto-extend the join path
    for qfi in query.where:
        resolved_filter = self._resolve_filter_item(ctx, qfi, is_having=False)
        if resolved_filter:
            ctx.result.where_filters.append(resolved_filter)

    for qfi in query.having:
        resolved_filter = self._resolve_filter_item(ctx, qfi, is_having=True)
        if resolved_filter:
            ctx.result.having_filters.append(resolved_filter)

    # 7. Resolve order by — must reference a dimension or measure in SELECT
    select_count = len(ctx.result.dimensions) + len(ctx.result.measures)
    for ob in query.order_by:
        expr = self._resolve_order_by_field(ctx, ob.field, select_count)
        if expr:
            ctx.result.order_by_exprs.append((expr, ob.direction == "desc"))

    if ctx.errors:
        raise ResolutionError(ctx.errors)

    return ctx.result

Star Schema Planner

orionbelt.compiler.star.StarSchemaPlanner

Plans star-schema queries: single fact base with dimension joins.

Source code in src/orionbelt/compiler/star.py
class StarSchemaPlanner:
    """Plans star-schema queries: single fact base with dimension joins."""

    def plan(
        self,
        resolved: ResolvedQuery,
        model: SemanticModel,
        qualify_table: Callable[[DataObject], str] | None = None,
        dialect: Dialect | None = None,
    ) -> QueryPlan:
        builder = QueryBuilder()
        graph = JoinGraph(model, use_path_names=resolved.use_path_names or None)

        def qualify(obj: DataObject) -> str:
            return qualify_table(obj) if qualify_table else obj.qualified_code

        base_object = model.data_objects.get(resolved.base_object)
        if not base_object:
            return QueryPlan(ast=builder.build())

        base_alias = resolved.base_object

        # SELECT: dimensions (apply time grain truncation if specified)
        for dim in resolved.dimensions:
            col: Expr = ColumnRef(name=dim.source_column, table=dim.object_name)
            if dim.grain and dialect:
                col = dialect.render_time_grain(col, dim.grain)
            builder.select(AliasedExpr(expr=col, alias=dim.name))

        # SELECT: measures (aggregated) — for metrics, substitute component refs
        for measure in resolved.measures:
            if measure.component_measures:
                substituted = _substitute_measure_refs(
                    measure.expression, resolved.metric_components
                )
                builder.select(AliasedExpr(expr=substituted, alias=measure.name))
            else:
                builder.select(AliasedExpr(expr=measure.expression, alias=measure.name))

        # FROM: base fact table
        builder.from_(qualify(base_object), alias=base_alias)

        # JOINs: dimension and intermediate tables
        joined = {base_alias}
        for step in resolved.join_steps:
            # Determine which side of the step needs to be joined
            if step.to_object not in joined:
                new_object = step.to_object
            elif step.from_object not in joined:
                new_object = step.from_object
            else:
                continue  # both already joined
            obj = model.data_objects.get(new_object)
            if not obj:
                continue
            on_expr = graph.build_join_condition(step)
            builder.join(
                table=qualify(obj),
                on=on_expr,
                join_type=step.join_type,
                alias=new_object,
            )
            joined.add(new_object)

        # WHERE
        for wf in resolved.where_filters:
            builder.where(wf.expression)

        # GROUP BY (all dimension columns, with time grain if applicable)
        for dim in resolved.dimensions:
            gb_col: Expr = ColumnRef(name=dim.source_column, table=dim.object_name)
            if dim.grain and dialect:
                gb_col = dialect.render_time_grain(gb_col, dim.grain)
            builder.group_by(gb_col)

        # HAVING
        for hf in resolved.having_filters:
            builder.having(hf.expression)

        # ORDER BY (use alias for time-grained dimensions)
        grained_cols: dict[tuple[str, str | None], str] = {
            (d.source_column, d.object_name): d.name for d in resolved.dimensions if d.grain
        }
        for expr, desc in resolved.order_by_exprs:
            if isinstance(expr, ColumnRef) and (expr.name, expr.table) in grained_cols:
                expr = ColumnRef(name=grained_cols[(expr.name, expr.table)])
            builder.order_by(expr, desc=desc)

        # LIMIT / OFFSET
        if resolved.limit is not None:
            builder.limit(resolved.limit)
        if resolved.offset is not None:
            builder.offset(resolved.offset)

        return QueryPlan(ast=builder.build())

plan(resolved, model, qualify_table=None, dialect=None)

Source code in src/orionbelt/compiler/star.py
def plan(
    self,
    resolved: ResolvedQuery,
    model: SemanticModel,
    qualify_table: Callable[[DataObject], str] | None = None,
    dialect: Dialect | None = None,
) -> QueryPlan:
    builder = QueryBuilder()
    graph = JoinGraph(model, use_path_names=resolved.use_path_names or None)

    def qualify(obj: DataObject) -> str:
        return qualify_table(obj) if qualify_table else obj.qualified_code

    base_object = model.data_objects.get(resolved.base_object)
    if not base_object:
        return QueryPlan(ast=builder.build())

    base_alias = resolved.base_object

    # SELECT: dimensions (apply time grain truncation if specified)
    for dim in resolved.dimensions:
        col: Expr = ColumnRef(name=dim.source_column, table=dim.object_name)
        if dim.grain and dialect:
            col = dialect.render_time_grain(col, dim.grain)
        builder.select(AliasedExpr(expr=col, alias=dim.name))

    # SELECT: measures (aggregated) — for metrics, substitute component refs
    for measure in resolved.measures:
        if measure.component_measures:
            substituted = _substitute_measure_refs(
                measure.expression, resolved.metric_components
            )
            builder.select(AliasedExpr(expr=substituted, alias=measure.name))
        else:
            builder.select(AliasedExpr(expr=measure.expression, alias=measure.name))

    # FROM: base fact table
    builder.from_(qualify(base_object), alias=base_alias)

    # JOINs: dimension and intermediate tables
    joined = {base_alias}
    for step in resolved.join_steps:
        # Determine which side of the step needs to be joined
        if step.to_object not in joined:
            new_object = step.to_object
        elif step.from_object not in joined:
            new_object = step.from_object
        else:
            continue  # both already joined
        obj = model.data_objects.get(new_object)
        if not obj:
            continue
        on_expr = graph.build_join_condition(step)
        builder.join(
            table=qualify(obj),
            on=on_expr,
            join_type=step.join_type,
            alias=new_object,
        )
        joined.add(new_object)

    # WHERE
    for wf in resolved.where_filters:
        builder.where(wf.expression)

    # GROUP BY (all dimension columns, with time grain if applicable)
    for dim in resolved.dimensions:
        gb_col: Expr = ColumnRef(name=dim.source_column, table=dim.object_name)
        if dim.grain and dialect:
            gb_col = dialect.render_time_grain(gb_col, dim.grain)
        builder.group_by(gb_col)

    # HAVING
    for hf in resolved.having_filters:
        builder.having(hf.expression)

    # ORDER BY (use alias for time-grained dimensions)
    grained_cols: dict[tuple[str, str | None], str] = {
        (d.source_column, d.object_name): d.name for d in resolved.dimensions if d.grain
    }
    for expr, desc in resolved.order_by_exprs:
        if isinstance(expr, ColumnRef) and (expr.name, expr.table) in grained_cols:
            expr = ColumnRef(name=grained_cols[(expr.name, expr.table)])
        builder.order_by(expr, desc=desc)

    # LIMIT / OFFSET
    if resolved.limit is not None:
        builder.limit(resolved.limit)
    if resolved.offset is not None:
        builder.offset(resolved.offset)

    return QueryPlan(ast=builder.build())

CFL Planner

orionbelt.compiler.cfl.CFLPlanner

Plans Composite Fact Layer queries: conformed dimensions + fact stitching.

Uses a UNION ALL strategy: 1. Each fact leg SELECTs conformed dimensions + its own measures (NULL for others) 2. UNION ALL combines the legs into a single CTE 3. Outer query aggregates over the union, grouping by conformed dimensions

Source code in src/orionbelt/compiler/cfl.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
class CFLPlanner:
    """Plans Composite Fact Layer queries: conformed dimensions + fact stitching.

    Uses a UNION ALL strategy:
    1. Each fact leg SELECTs conformed dimensions + its own measures (NULL for others)
    2. UNION ALL combines the legs into a single CTE
    3. Outer query aggregates over the union, grouping by conformed dimensions
    """

    def plan(
        self,
        resolved: ResolvedQuery,
        model: SemanticModel,
        qualify_table: Callable[[DataObject], str] | None = None,
        union_by_name: bool = False,
        dialect: Dialect | None = None,
    ) -> QueryPlan:
        """Plan a CFL query."""
        self._validate_fanout(resolved, model)

        # dimensionsExclude: EXCEPT-based anti-join pattern
        if resolved.dimensions_exclude:
            return self._plan_dimensions_exclude(resolved, model, qualify_table)

        # Group measures by their source object
        measures_by_object, cross_fact = self._group_measures_by_object(resolved, model)

        # Dimension-only CFL: no measures but dimensions on independent branches.
        # Create leg groupings from connecting fact tables.
        if not measures_by_object and not cross_fact and resolved.requires_cfl:
            measures_by_object = self._group_dimensions_into_legs(resolved, model)

        if len(measures_by_object) <= 1 and not cross_fact:
            # Single fact — delegate to star schema
            from orionbelt.compiler.star import StarSchemaPlanner

            return StarSchemaPlanner().plan(
                resolved, model, qualify_table=qualify_table, dialect=dialect
            )

        # Multi-fact: UNION ALL strategy
        return self._plan_union_all(
            resolved,
            model,
            measures_by_object,
            cross_fact,
            qualify_table=qualify_table,
            union_by_name=union_by_name,
            dialect=dialect,
        )

    def _validate_fanout(self, resolved: ResolvedQuery, model: SemanticModel) -> None:
        """Validate that grain is compatible and no fanout will occur."""
        errors: list[str] = []

        for dim in resolved.dimensions:
            if dim.object_name not in model.data_objects:
                errors.append(
                    f"Dimension '{dim.name}' references unknown data object '{dim.object_name}'"
                )

        if errors:
            raise FanoutError("; ".join(errors))

    def _group_measures_by_object(
        self,
        resolved: ResolvedQuery,
        model: SemanticModel,
    ) -> tuple[dict[str, list[ResolvedMeasure]], list[ResolvedMeasure]]:
        """Group measures by their primary source object.

        Returns ``(groups, cross_fact)`` where *cross_fact* contains
        multi-field measures whose fields span multiple objects.
        For metrics, expand their component measures into the grouping
        instead of the metric itself.  Cross-fact measures ensure every
        involved object has a leg, but are not assigned to any single
        group — their individual fields are distributed per-leg by
        ``_plan_union_all``.
        """
        groups: dict[str, list[ResolvedMeasure]] = {}
        cross_fact: list[ResolvedMeasure] = []
        seen: set[str] = set()

        for measure in resolved.measures:
            if measure.component_measures:
                # Metric: add each component measure to its source object
                for comp_name in measure.component_measures:
                    if comp_name in seen:
                        continue
                    seen.add(comp_name)
                    comp = resolved.metric_components.get(comp_name)
                    if comp is None:
                        continue
                    model_measure = model.measures.get(comp_name)
                    if model_measure and model_measure.columns:
                        obj_name = model_measure.columns[0].view or resolved.base_object
                    else:
                        obj_name = resolved.base_object
                    groups.setdefault(obj_name, []).append(comp)
            else:
                if measure.name in seen:
                    continue
                seen.add(measure.name)
                model_measure = model.measures.get(measure.name)
                if not model_measure:
                    groups.setdefault(resolved.base_object, []).append(measure)
                    continue

                # Collect source objects: from explicit columns or expression AST
                field_objects: set[str]
                if model_measure.columns:
                    field_objects = {f.view for f in model_measure.columns if f.view}
                else:
                    # Expression-based measure: extract table refs from the AST
                    field_objects = set()
                    self._collect_table_refs(measure.expression, field_objects)
                if len(field_objects) > 1:
                    # Cross-fact multi-field measure: ensure each
                    # involved object has a leg, but don't assign
                    # the measure to any single group.
                    cross_fact.append(measure)
                    for obj in field_objects:
                        groups.setdefault(obj, [])
                elif field_objects:
                    obj_name = next(iter(field_objects))
                    groups.setdefault(obj_name, []).append(measure)
                else:
                    groups.setdefault(resolved.base_object, []).append(measure)

        return groups, cross_fact

    @staticmethod
    def _group_dimensions_into_legs(
        resolved: ResolvedQuery,
        model: SemanticModel,
    ) -> dict[str, list[ResolvedMeasure]]:
        """Group dimensions into CFL legs for dimension-only queries.

        For each dimension, find the fact/bridge table that can reach it
        via directed join paths, and use that as the leg's key object.
        Returns empty measure lists per leg (dimension-only, no aggregates).
        """
        graph = JoinGraph(model, use_path_names=resolved.use_path_names or None)
        legs: dict[str, list[ResolvedMeasure]] = {}
        assigned: set[str] = set()

        # Build a lookup: for each dimension object, which fact tables can reach it?
        dim_objects = {d.object_name for d in resolved.dimensions}
        fact_candidates: list[tuple[str, set[str]]] = []
        for obj_name, obj in model.data_objects.items():
            if not obj.joins:
                continue
            reachable_dims = dim_objects & (graph.descendants(obj_name) | {obj_name})
            if reachable_dims:
                fact_candidates.append((obj_name, reachable_dims))

        # Greedy: pick fact table covering most unassigned dimensions first
        fact_candidates.sort(key=lambda x: (-len(x[1]), x[0]))
        for fact_obj, reachable in fact_candidates:
            covers = reachable - assigned
            if covers:
                legs[fact_obj] = []
                assigned.update(covers)

        return legs

    @staticmethod
    def _is_multi_field(measure: ResolvedMeasure) -> bool:
        """Check if a measure has multiple field args (e.g. COUNT(a, b))."""
        return isinstance(measure.expression, FunctionCall) and len(measure.expression.args) > 1

    @staticmethod
    def _resolve_null_type_for_field(
        measure: ResolvedMeasure,
        field_idx: int,
        model: SemanticModel,
    ) -> str | None:
        """Look up the abstract type for a multi-field measure's *field_idx*-th column.

        Falls back to the measure's ``result_type`` if the column cannot be found.
        """
        model_measure = model.measures.get(measure.name)
        if not model_measure:
            return None
        # Try to find the column's abstract_type from the data object
        if field_idx < len(model_measure.columns):
            ref = model_measure.columns[field_idx]
            obj = model.data_objects.get(ref.view) if ref.view else None
            if obj and ref.column in obj.columns:
                return obj.columns[ref.column].abstract_type.value
        # Fallback to measure result_type
        return model_measure.result_type.value

    @staticmethod
    def _multi_field_cte_alias(measure_name: str, idx: int) -> str:
        """CTE column name for the *idx*-th field of a multi-field measure."""
        return f"{measure_name}__f{idx}"

    @staticmethod
    def _unwrap_aggregation(measure: ResolvedMeasure) -> Expr:
        """Extract the inner expression from an aggregated measure.

        For FunctionCall(SUM, [inner]) → returns inner.
        Falls back to the full expression if not a FunctionCall.
        """
        if isinstance(measure.expression, FunctionCall) and measure.expression.args:
            return measure.expression.args[0]
        return measure.expression

    def _build_outer_metric_expr(
        self,
        metric: ResolvedMeasure,
        resolved: ResolvedQuery,
    ) -> Expr:
        """Build the outer query expression for a metric.

        Walks the metric's AST tree and replaces each ColumnRef(measure_name)
        with ``AGG("measure_name")`` using the component measure's aggregation.
        """
        return self._substitute_outer_refs(metric.expression, resolved)

    def _substitute_outer_refs(self, expr: Expr, resolved: ResolvedQuery) -> Expr:
        """Recursively substitute measure refs with outer aggregations."""
        if isinstance(expr, ColumnRef) and expr.table is None:
            comp = resolved.metric_components.get(expr.name)
            if comp:
                agg = comp.aggregation.upper()
                distinct = False
                if agg == "COUNT_DISTINCT":
                    agg = "COUNT"
                    distinct = True
                if isinstance(comp.expression, FunctionCall) and comp.expression.distinct:
                    distinct = True
                return FunctionCall(
                    name=agg,
                    args=[ColumnRef(name=comp.name)],
                    distinct=distinct,
                )
        if isinstance(expr, BinaryOp):
            new_left = self._substitute_outer_refs(expr.left, resolved)
            new_right = self._substitute_outer_refs(expr.right, resolved)
            if new_left is not expr.left or new_right is not expr.right:
                return BinaryOp(left=new_left, op=expr.op, right=new_right)
        return expr

    @staticmethod
    def _collect_table_refs(expr: Expr, tables: set[str]) -> None:
        """Recursively collect table names from ColumnRef nodes."""
        if isinstance(expr, ColumnRef) and expr.table:
            tables.add(expr.table)
        elif isinstance(expr, BinaryOp):
            CFLPlanner._collect_table_refs(expr.left, tables)
            CFLPlanner._collect_table_refs(expr.right, tables)
        elif isinstance(expr, UnaryOp):
            CFLPlanner._collect_table_refs(expr.operand, tables)
        elif isinstance(expr, (InList, IsNull, Between)):
            CFLPlanner._collect_table_refs(expr.expr, tables)
        elif isinstance(expr, RelativeDateRange):
            CFLPlanner._collect_table_refs(expr.column, tables)
        elif isinstance(expr, FunctionCall):
            for arg in expr.args:
                CFLPlanner._collect_table_refs(arg, tables)

    @staticmethod
    def _remap_cfl_order_by(expr: Expr, resolved: ResolvedQuery) -> Expr:
        """Remap ORDER BY expressions to use CTE aliases for the outer query.

        In CFL, the outer query selects from the composite CTE — original
        table-qualified refs are out of scope.  Remap dimension and measure
        expressions to their CTE alias names.
        """
        # Dimension: ColumnRef(name=source_col, table=obj) → ColumnRef(name=dim.name)
        if isinstance(expr, ColumnRef) and expr.table is not None:
            for dim in resolved.dimensions:
                if expr.name == dim.source_column and expr.table == dim.object_name:
                    return ColumnRef(name=dim.name)
        # Measure: match by identity (same expression object)
        for meas in resolved.measures:
            if expr is meas.expression:
                return ColumnRef(name=meas.name)
        # Numeric position — pass through
        return expr

    def _build_outer_concat_count(
        self,
        measure_name: str,
        n_fields: int,
        agg: str,
        distinct: bool,
    ) -> Expr:
        """Build ``COUNT(DISTINCT CAST(f0 AS VARCHAR) || '|' || ...)`` for the outer query."""
        parts: list[Expr] = [
            Cast(
                expr=ColumnRef(name=self._multi_field_cte_alias(measure_name, i)),
                type_name="VARCHAR",
            )
            for i in range(n_fields)
        ]
        concat: Expr = parts[0]
        for part in parts[1:]:
            concat = BinaryOp(
                left=concat,
                op="||",
                right=BinaryOp(
                    left=Literal.string("|"),
                    op="||",
                    right=part,
                ),
            )
        return FunctionCall(name=agg, args=[concat], distinct=distinct)

    def _plan_union_all(
        self,
        resolved: ResolvedQuery,
        model: SemanticModel,
        measures_by_object: dict[str, list[ResolvedMeasure]],
        cross_fact: list[ResolvedMeasure] | None = None,
        qualify_table: Callable[[DataObject], str] | None = None,
        union_by_name: bool = False,
        dialect: Dialect | None = None,
    ) -> QueryPlan:
        """UNION ALL strategy: stack fact legs with NULL padding, aggregate outside.

        When *union_by_name* is True (DuckDB, Snowflake) each leg only emits
        the columns it actually has — the database fills missing columns with
        NULL automatically via ``UNION ALL BY NAME``.
        """
        graph = JoinGraph(model, use_path_names=resolved.use_path_names or None)

        def qualify(obj: DataObject) -> str:
            return qualify_table(obj) if qualify_table else obj.qualified_code

        # Collect all measures across all objects + cross-fact measures
        all_measures: list[ResolvedMeasure] = []
        for measures in measures_by_object.values():
            all_measures.extend(measures)
        if cross_fact:
            all_measures.extend(cross_fact)

        # Collect data objects referenced by WHERE filters — each leg
        # must join these tables so the filter predicates are valid.
        filter_objects: set[str] = set()
        for wf in resolved.where_filters:
            self._collect_table_refs(wf.expression, filter_objects)

        # Build one SELECT per fact object group.
        # Each leg computes its own LCA (least common ancestor) as the lead
        # table — the graph-central node that can reach all dimension objects
        # and the measure's source object with minimal hops.
        union_legs: list[Select] = []
        leg_infos: list[CflLegInfo] = []
        for obj_name, measures in measures_by_object.items():
            leg_builder = QueryBuilder()
            this_measure_names = {m.name for m in measures}

            # Compute reachability from this leg's fact object upfront
            reachable = graph.descendants(obj_name) | {obj_name}

            # SELECT conformed dimensions — only emit real column refs for
            # dimensions reachable from this leg's fact; skip unreachable
            # ones when the dialect supports UNION ALL BY NAME.
            for dim in resolved.dimensions:
                if dim.object_name in reachable:
                    col: Expr = ColumnRef(name=dim.source_column, table=dim.object_name)
                    if dim.grain and dialect:
                        col = dialect.render_time_grain(col, dim.grain)
                    leg_builder.select(AliasedExpr(expr=col, alias=dim.name))
                elif not union_by_name:
                    model_dim = model.dimensions.get(dim.name)
                    dim_type = model_dim.result_type.value if model_dim else None
                    col = Cast(Literal.null(), type_name=dim_type) if dim_type else Literal.null()
                    leg_builder.select(AliasedExpr(expr=col, alias=dim.name))

            # SELECT this fact's measures (raw expressions, no aggregation).
            # When union_by_name is True, skip NULL padding for other facts'
            # measures — the database fills them automatically.
            for m in all_measures:
                if self._is_multi_field(m):
                    assert isinstance(m.expression, FunctionCall)
                    for i, arg in enumerate(m.expression.args):
                        alias = self._multi_field_cte_alias(m.name, i)
                        arg_table = arg.table if isinstance(arg, ColumnRef) else None
                        if arg_table == obj_name:
                            leg_builder.select(AliasedExpr(expr=arg, alias=alias))
                        elif not union_by_name:
                            null_type = self._resolve_null_type_for_field(m, i, model)
                            null_expr: Expr = (
                                Cast(Literal.null(), type_name=null_type)
                                if null_type
                                else Literal.null()
                            )
                            leg_builder.select(AliasedExpr(expr=null_expr, alias=alias))
                elif m.name in this_measure_names:
                    leg_builder.select(AliasedExpr(expr=self._unwrap_aggregation(m), alias=m.name))
                elif not union_by_name:
                    model_measure = model.measures.get(m.name)
                    null_type_name = model_measure.result_type.value if model_measure else None
                    null_expr = (
                        Cast(Literal.null(), type_name=null_type_name)
                        if null_type_name
                        else Literal.null()
                    )
                    leg_builder.select(AliasedExpr(expr=null_expr, alias=m.name))

            # Determine the common root for this leg:
            # the deepest directed ancestor that can reach all dimension
            # objects, measure's source object, and filter-referenced objects.
            # Only include dimensions reachable from this leg's fact object.
            leg_required = {
                dim.object_name for dim in resolved.dimensions if dim.object_name in reachable
            }
            leg_required.add(obj_name)
            leg_required.update(filter_objects)
            lead = graph.find_common_root(leg_required)
            lead_obj = model.data_objects.get(lead)

            # FROM: the lead (LCA) table
            if lead_obj:
                leg_builder.from_(qualify(lead_obj), alias=lead)

            # JOINs: all required objects reachable from the lead
            join_targets = leg_required - {lead}
            steps: list[JoinStep] = []
            if join_targets:
                steps = graph.find_join_path({lead}, leg_required)
                for step in steps:
                    target_object = model.data_objects.get(step.to_object)
                    if target_object:
                        on_expr = graph.build_join_condition(step)
                        leg_builder.join(
                            table=qualify(target_object),
                            on=on_expr,
                            join_type=step.join_type,
                            alias=step.to_object,
                        )

            # Capture leg info for explain
            leg_join_strs = (
                [f"{s.from_object}{s.to_object}" for s in steps] if join_targets else []
            )
            if lead == obj_name:
                leg_reason = (
                    f'"{lead}" is the measure source — '
                    f"all required dimension objects are reachable from it"
                )
            else:
                leg_reason = (
                    f'"{lead}" is the deepest common root that can reach '
                    f'measure source "{obj_name}" and all reachable dimension objects'
                )
            leg_infos.append(
                CflLegInfo(
                    measure_source=obj_name,
                    common_root=lead,
                    reason=leg_reason,
                    measures=[m.name for m in measures],
                    joins=leg_join_strs,
                )
            )

            # Apply WHERE filters to each leg
            for wf in resolved.where_filters:
                leg_builder.where(wf.expression)

            union_legs.append(leg_builder.build())

        # Create the UNION ALL CTE
        cte_name = "composite_01"
        union_cte = CTE(name=cte_name, query=UnionAll(queries=union_legs))

        # Build outer query: aggregate over the composite CTE
        outer_builder = QueryBuilder()

        # SELECT dimensions
        for dim in resolved.dimensions:
            outer_builder.select(
                AliasedExpr(
                    expr=ColumnRef(name=dim.name),
                    alias=dim.name,
                )
            )

        # SELECT aggregated measures and metrics
        # First, add all component measures (from UNION ALL legs)
        seen_measure_names: set[str] = set()
        for m in all_measures:
            seen_measure_names.add(m.name)
            agg = m.aggregation.upper()
            distinct = False
            if agg == "COUNT_DISTINCT":
                agg = "COUNT"
                distinct = True
            if isinstance(m.expression, FunctionCall) and m.expression.distinct:
                distinct = True

            if self._is_multi_field(m):
                # Multi-field: concat CTE columns in outer query
                assert isinstance(m.expression, FunctionCall)
                n_fields = len(m.expression.args)
                agg_expr: Expr = self._build_outer_concat_count(m.name, n_fields, agg, distinct)
            else:
                agg_expr = FunctionCall(
                    name=agg,
                    args=[ColumnRef(name=m.name)],
                    distinct=distinct,
                )
            outer_builder.select(AliasedExpr(expr=agg_expr, alias=m.name))

        # Then, add metric expressions that combine component measures
        for m in resolved.measures:
            if m.component_measures and m.name not in seen_measure_names:
                metric_expr = self._build_outer_metric_expr(m, resolved)
                outer_builder.select(AliasedExpr(expr=metric_expr, alias=m.name))

        outer_builder.from_(cte_name, alias=cte_name)

        # GROUP BY dimensions
        for dim in resolved.dimensions:
            outer_builder.group_by(ColumnRef(name=dim.name))

        # HAVING filters on the outer query
        for hf in resolved.having_filters:
            outer_builder.having(hf.expression)

        # ORDER BY and LIMIT — remap to CTE aliases
        for expr, desc in resolved.order_by_exprs:
            outer_builder.order_by(self._remap_cfl_order_by(expr, resolved), desc=desc)
        if resolved.limit is not None:
            outer_builder.limit(resolved.limit)
        if resolved.offset is not None:
            outer_builder.offset(resolved.offset)

        outer_select = outer_builder.build()

        # Attach CTE
        final = Select(
            columns=outer_select.columns,
            from_=outer_select.from_,
            joins=outer_select.joins,
            where=outer_select.where,
            group_by=outer_select.group_by,
            having=outer_select.having,
            order_by=outer_select.order_by,
            limit=outer_select.limit,
            offset=outer_select.offset,
            ctes=[union_cte],
        )

        return QueryPlan(ast=final, cfl_legs=leg_infos)

    # -- dimensionsExclude: EXCEPT-based anti-join ----------------------------

    def _plan_dimensions_exclude(
        self,
        resolved: ResolvedQuery,
        model: SemanticModel,
        qualify_table: Callable[[DataObject], str] | None = None,
    ) -> QueryPlan:
        """Plan a dimensionsExclude query using EXCEPT pattern.

        Generates:
          WITH dim_group_00 AS (SELECT DISTINCT dims FROM ...),
               dim_group_01 AS (...),
               non_combinations AS (
                 SELECT ... FROM dim_group_00 CROSS JOIN dim_group_01
                 EXCEPT
                 SELECT ... FROM fact_joins
               )
          SELECT ... FROM non_combinations ORDER BY ... LIMIT ...
        """
        graph = JoinGraph(model, use_path_names=resolved.use_path_names or None)

        def qualify(obj: DataObject) -> str:
            return qualify_table(obj) if qualify_table else obj.qualified_code

        # Partition dimensions into independent groups
        dim_groups = self._partition_dimensions(resolved, graph)

        ctes: list[CTE] = []

        # CTE per dimension group: SELECT DISTINCT via GROUP BY
        group_cte_names: list[str] = []
        for i, group_dims in enumerate(dim_groups):
            cte_name = f"dim_group_{i:02d}"
            group_cte_names.append(cte_name)
            cte_query = self._build_group_distinct_select(group_dims, model, graph, qualify)
            ctes.append(CTE(name=cte_name, query=cte_query))

        # Build "all_pairs": CROSS JOIN of all dim_group CTEs
        all_pairs_builder = QueryBuilder()
        for dim in resolved.dimensions:
            all_pairs_builder.select(AliasedExpr(expr=ColumnRef(name=dim.name), alias=dim.name))
        all_pairs_builder.from_(group_cte_names[0], alias=group_cte_names[0])
        for cte_name in group_cte_names[1:]:
            all_pairs_builder._joins.append(
                Join(join_type=JoinType.CROSS, source=cte_name, alias=cte_name)
            )
        all_pairs_select = all_pairs_builder.build()

        # Build "existing_pairs": actual combinations via fact-table joins
        existing_pairs_select = self._build_existing_pairs_select(resolved, model, graph, qualify)

        # EXCEPT CTE: all_pairs EXCEPT existing_pairs
        except_cte = CTE(
            name="non_combinations",
            query=Except(left=all_pairs_select, right=existing_pairs_select),
        )
        ctes.append(except_cte)

        # Outer query: SELECT from non_combinations with ORDER BY / LIMIT
        outer_builder = QueryBuilder()
        for dim in resolved.dimensions:
            outer_builder.select(AliasedExpr(expr=ColumnRef(name=dim.name), alias=dim.name))
        outer_builder.from_("non_combinations", alias="non_combinations")

        for expr, desc in resolved.order_by_exprs:
            outer_builder.order_by(self._remap_cfl_order_by(expr, resolved), desc=desc)
        if resolved.limit is not None:
            outer_builder.limit(resolved.limit)
        if resolved.offset is not None:
            outer_builder.offset(resolved.offset)

        outer = outer_builder.build()
        final = Select(
            columns=outer.columns,
            from_=outer.from_,
            joins=outer.joins,
            order_by=outer.order_by,
            limit=outer.limit,
            offset=outer.offset,
            ctes=ctes,
        )
        return QueryPlan(ast=final)

    @staticmethod
    def _partition_dimensions(
        resolved: ResolvedQuery,
        graph: JoinGraph,
    ) -> list[list[ResolvedDimension]]:
        """Partition dimensions into groups on independent branches."""
        obj_to_dims: dict[str, list[ResolvedDimension]] = {}
        for dim in resolved.dimensions:
            obj_to_dims.setdefault(dim.object_name, []).append(dim)

        # Cluster: two objects are in the same group if one is a descendant
        # of the other (i.e., connected via directed join paths).
        objects = sorted(obj_to_dims.keys())
        groups: list[set[str]] = []
        assigned: set[str] = set()

        for obj in objects:
            if obj in assigned:
                continue
            group = {obj}
            reachable = graph.descendants(obj) | {obj}
            for other in objects:
                if (
                    other != obj
                    and other not in assigned
                    and (other in reachable or obj in (graph.descendants(other) | {other}))
                ):
                    group.add(other)
            groups.append(group)
            assigned.update(group)

        # Convert to lists of ResolvedDimension
        result: list[list[ResolvedDimension]] = []
        for group_objs in groups:
            group_dims: list[ResolvedDimension] = []
            for obj in sorted(group_objs):
                group_dims.extend(obj_to_dims[obj])
            result.append(group_dims)
        return result

    @staticmethod
    def _build_group_distinct_select(
        dims: list[ResolvedDimension],
        model: SemanticModel,
        graph: JoinGraph,
        qualify: Callable[[DataObject], str],
    ) -> Select:
        """Build SELECT DISTINCT (via GROUP BY) for a group of dimensions."""
        required_objects = {d.object_name for d in dims}

        # Find the common root that can reach all objects in this group
        if len(required_objects) > 1:
            root = graph.find_common_root(required_objects)
        else:
            root = next(iter(required_objects))

        # If root is a pure dimension table with no joins, check if a fact
        # table can reach it (needed for bridge-table traversal).
        root_obj = model.data_objects.get(root)
        if root_obj and not root_obj.joins and root not in required_objects:
            root = next(iter(sorted(required_objects)))
            root_obj = model.data_objects.get(root)

        builder = QueryBuilder()
        for dim in dims:
            col: Expr = ColumnRef(name=dim.source_column, table=dim.object_name)
            builder.select(AliasedExpr(expr=col, alias=dim.name))
            builder.group_by(col)

        if root_obj:
            builder.from_(qualify(root_obj), alias=root)

        # Join to reach all dimension objects from root
        all_needed = required_objects | {root}
        if len(all_needed) > 1:
            steps = graph.find_join_path({root}, all_needed)
            for step in steps:
                target_obj = model.data_objects.get(step.to_object)
                if target_obj:
                    on_expr = graph.build_join_condition(step)
                    builder.join(
                        table=qualify(target_obj),
                        on=on_expr,
                        join_type=step.join_type,
                        alias=step.to_object,
                    )

        return builder.build()

    def _build_existing_pairs_select(
        self,
        resolved: ResolvedQuery,
        model: SemanticModel,
        graph: JoinGraph,
        qualify: Callable[[DataObject], str],
    ) -> Select:
        """Build SELECT for existing dimension combinations via fact-table joins.

        Uses a fact/bridge table as the base and joins through hub tables
        to reach all dimension objects on both branches.
        """
        all_dim_objects = {d.object_name for d in resolved.dimensions}

        # Find fact tables that connect the dimension groups
        leg_objects = self._group_dimensions_into_legs(resolved, model)
        fact_tables = set(leg_objects.keys())

        # Use a fact table as the base (pick the one with most joins)
        best_fact = max(
            sorted(fact_tables),
            key=lambda f: len(model.data_objects[f].joins) if f in model.data_objects else 0,
        )
        best_fact_obj = model.data_objects.get(best_fact)

        builder = QueryBuilder()
        for dim in resolved.dimensions:
            col: Expr = ColumnRef(name=dim.source_column, table=dim.object_name)
            builder.select(AliasedExpr(expr=col, alias=dim.name))
            builder.group_by(col)

        if best_fact_obj:
            builder.from_(qualify(best_fact_obj), alias=best_fact)

        # Required: all dimension objects + all fact tables
        all_needed = all_dim_objects | fact_tables | {best_fact}
        joined: set[str] = {best_fact}
        steps = graph.find_join_path({best_fact}, all_needed)
        for step in steps:
            # Determine the actual new table to join.
            # For reversed edges, to_object may already be joined and the
            # actual new table is from_object.
            if step.to_object not in joined:
                new_table = step.to_object
            elif step.from_object not in joined:
                new_table = step.from_object
            else:
                continue  # Both already joined

            target_obj = model.data_objects.get(new_table)
            if target_obj:
                on_expr = graph.build_join_condition(step)
                builder.join(
                    table=qualify(target_obj),
                    on=on_expr,
                    join_type=step.join_type,
                    alias=new_table,
                )
                joined.add(new_table)

        # Apply WHERE filters to existing pairs
        for wf in resolved.where_filters:
            builder.where(wf.expression)

        return builder.build()

plan(resolved, model, qualify_table=None, union_by_name=False, dialect=None)

Plan a CFL query.

Source code in src/orionbelt/compiler/cfl.py
def plan(
    self,
    resolved: ResolvedQuery,
    model: SemanticModel,
    qualify_table: Callable[[DataObject], str] | None = None,
    union_by_name: bool = False,
    dialect: Dialect | None = None,
) -> QueryPlan:
    """Plan a CFL query."""
    self._validate_fanout(resolved, model)

    # dimensionsExclude: EXCEPT-based anti-join pattern
    if resolved.dimensions_exclude:
        return self._plan_dimensions_exclude(resolved, model, qualify_table)

    # Group measures by their source object
    measures_by_object, cross_fact = self._group_measures_by_object(resolved, model)

    # Dimension-only CFL: no measures but dimensions on independent branches.
    # Create leg groupings from connecting fact tables.
    if not measures_by_object and not cross_fact and resolved.requires_cfl:
        measures_by_object = self._group_dimensions_into_legs(resolved, model)

    if len(measures_by_object) <= 1 and not cross_fact:
        # Single fact — delegate to star schema
        from orionbelt.compiler.star import StarSchemaPlanner

        return StarSchemaPlanner().plan(
            resolved, model, qualify_table=qualify_table, dialect=dialect
        )

    # Multi-fact: UNION ALL strategy
    return self._plan_union_all(
        resolved,
        model,
        measures_by_object,
        cross_fact,
        qualify_table=qualify_table,
        union_by_name=union_by_name,
        dialect=dialect,
    )

Join Graph

orionbelt.compiler.graph.JoinGraph

Graph of data objects (nodes) and relationships (edges) for join path resolution.

Source code in src/orionbelt/compiler/graph.py
class JoinGraph:
    """Graph of data objects (nodes) and relationships (edges) for join path resolution."""

    def __init__(
        self,
        model: SemanticModel,
        use_path_names: list[UsePathName] | None = None,
    ) -> None:
        self._graph: nx.Graph[str] = nx.Graph()
        self._directed: nx.DiGraph[str] = nx.DiGraph()
        self._model = model
        self._build(model, use_path_names)

    def _build(
        self,
        model: SemanticModel,
        use_path_names: list[UsePathName] | None = None,
    ) -> None:
        """Build the graph from the semantic model.

        Secondary joins are only included when their pathName is requested
        via *use_path_names*.  When a secondary override is active for a
        ``(source, target)`` pair, the primary join for that pair is excluded.
        """
        for name in model.data_objects:
            self._graph.add_node(name)
            self._directed.add_node(name)

        # Build a lookup: (source, target) → pathName for active overrides
        active_overrides: dict[tuple[str, str], str] = {}
        if use_path_names:
            for upn in use_path_names:
                active_overrides[(upn.source, upn.target)] = upn.path_name

        for obj_name, obj in model.data_objects.items():
            for join in obj.joins:
                if join.join_to not in model.data_objects:
                    continue
                pair = (obj_name, join.join_to)

                if join.secondary:
                    # Only include if this secondary join's pathName is active
                    if pair in active_overrides and active_overrides[pair] == join.path_name:
                        self._add_edge(obj_name, join)
                else:
                    # Primary join: skip if an active override exists for this pair
                    if pair not in active_overrides:
                        self._add_edge(obj_name, join)

    def _add_edge(self, obj_name: str, join: object) -> None:
        """Add an edge to both the undirected and directed graphs."""
        from orionbelt.models.semantic import DataObjectJoin

        assert isinstance(join, DataObjectJoin)
        self._graph.add_edge(
            obj_name,
            join.join_to,
            columns_from=join.columns_from,
            columns_to=join.columns_to,
            cardinality=join.join_type,
            source_object=obj_name,
        )
        self._directed.add_edge(
            obj_name,
            join.join_to,
            columns_from=join.columns_from,
            columns_to=join.columns_to,
            cardinality=join.join_type,
        )

    def descendants(self, node: str) -> set[str]:
        """Return all nodes reachable from *node* via directed join paths."""
        if node not in self._directed:
            return set()
        return nx.descendants(self._directed, node)

    def find_common_root(self, required_objects: set[str]) -> str:
        """Find the common root for a set of required objects.

        The join graph is a DAG (joins define direction: source → joinTo).
        The common root is the **deepest** node that can reach ALL
        *required_objects* via directed join paths.  "Deepest" = smallest
        descendant set (most specific ancestor, closest to the required nodes).

        In ``returns → sales → customer``, with required ``{customer, item}``,
        the common root is ``sales`` (it can reach both).  With required
        ``{customer, item, returns}``, the common root is ``returns`` (the
        only node that can reach all three).
        """
        required = required_objects & set(self._directed.nodes)
        if len(required) <= 1:
            return next(iter(sorted(required))) if required else ""

        # Find all nodes that can reach ALL required nodes via directed paths
        candidates: list[tuple[str, int]] = []
        for node in self._directed.nodes:
            reachable = nx.descendants(self._directed, node) | {node}
            if required <= reachable:
                candidates.append((node, len(reachable)))

        if not candidates:
            # Fallback: no single directed ancestor covers all —
            # use undirected shortest-path center
            return self._find_center_undirected(required)

        # Pick the deepest ancestor: smallest reachable set that still covers all
        candidates.sort(key=lambda x: (x[1], x[0]))
        return candidates[0][0]

    def _find_center_undirected(self, required: set[str]) -> str:
        """Fallback: center of the Steiner tree in the undirected graph."""
        nodes = sorted(required)
        if len(nodes) <= 1:
            return nodes[0] if nodes else ""

        steiner: set[str] = set()
        for i in range(len(nodes)):
            for j in range(i + 1, len(nodes)):
                try:
                    path: list[str] = nx.shortest_path(self._graph, nodes[i], nodes[j])
                    steiner.update(path)
                except nx.NetworkXNoPath:
                    pass

        if not steiner:
            return nodes[0]

        best: str = nodes[0]
        best_max: int | float = len(self._graph.nodes) + 1
        for node in sorted(steiner):
            max_dist = max(nx.shortest_path_length(self._graph, node, r) for r in nodes)
            if max_dist < best_max:
                best_max = max_dist
                best = node
        return best

    def find_join_path(self, from_objects: set[str], to_objects: set[str]) -> list[JoinStep]:
        """Find a minimal join path connecting all required data objects.

        Uses shortest path for each target object from the set of source objects.
        """
        steps: list[JoinStep] = []
        visited_edges: set[tuple[str, str]] = set()

        all_targets = to_objects - from_objects
        source_list = list(from_objects)

        for target in sorted(all_targets):
            best_path: list[str] | None = None
            for source in source_list:
                try:
                    path = nx.shortest_path(self._graph, source, target)
                    if best_path is None or len(path) < len(best_path):
                        best_path = path
                except nx.NetworkXNoPath:
                    continue

            if best_path is None:
                continue

            for i in range(len(best_path) - 1):
                edge = (best_path[i], best_path[i + 1])
                rev_edge = (best_path[i + 1], best_path[i])
                if edge in visited_edges or rev_edge in visited_edges:
                    continue
                visited_edges.add(edge)

                edge_data = self._graph.edges[edge]
                source_object = edge_data.get("source_object", edge[0])

                if source_object == edge[0]:
                    step = JoinStep(
                        from_object=edge[0],
                        to_object=edge[1],
                        from_columns=edge_data["columns_from"],
                        to_columns=edge_data["columns_to"],
                        join_type=ASTJoinType.LEFT,
                        cardinality=edge_data["cardinality"],
                    )
                else:
                    # Path traverses edge in reverse direction.
                    # from_object/to_object are swapped, so columns must be
                    # swapped too to keep the ON clause correctly oriented.
                    step = JoinStep(
                        from_object=edge[1],
                        to_object=edge[0],
                        from_columns=edge_data["columns_to"],
                        to_columns=edge_data["columns_from"],
                        join_type=ASTJoinType.LEFT,
                        cardinality=edge_data["cardinality"],
                        reversed=True,
                    )
                steps.append(step)

            # Add target to sources for subsequent lookups
            source_list.append(target)

        return steps

    def build_join_condition(self, step: JoinStep) -> Expr:
        """Build the ON clause expression for a join step."""
        conditions: list[Expr] = []
        for from_c, to_c in zip(step.from_columns, step.to_columns, strict=True):
            # Resolve to physical column names
            from_obj = self._model.data_objects.get(step.from_object)
            to_obj = self._model.data_objects.get(step.to_object)
            if from_obj and from_c in from_obj.columns:
                from_col = from_obj.columns[from_c].code
            else:
                from_col = from_c
            to_col = to_obj.columns[to_c].code if to_obj and to_c in to_obj.columns else to_c
            conditions.append(
                BinaryOp(
                    left=ColumnRef(name=from_col, table=step.from_object),
                    op="=",
                    right=ColumnRef(name=to_col, table=step.to_object),
                )
            )

        if not conditions:
            msg = f"Join from '{step.from_object}' to '{step.to_object}' has no join columns"
            raise ValueError(msg)
        result: Expr = conditions[0]
        for cond in conditions[1:]:
            result = BinaryOp(left=result, op="AND", right=cond)
        return result

    def detect_cycles(self) -> list[list[str]]:
        """Detect cyclic join paths."""
        try:
            cycles = list(nx.simple_cycles(self._directed))
            return cycles
        except nx.NetworkXError:
            return []

    def validate_deterministic(self) -> list[SemanticError]:
        """Ensure join paths are deterministic (no ambiguity)."""
        errors: list[SemanticError] = []
        # Check for multiple edges between the same pair of nodes
        for u, v in self._graph.edges():
            if self._graph.number_of_edges(u, v) > 1:
                errors.append(
                    SemanticError(
                        code="AMBIGUOUS_JOIN",
                        message=f"Multiple join paths between '{u}' and '{v}'",
                        path=f"dataObjects.{u}.joins",
                    )
                )
        return errors

find_join_path(from_objects, to_objects)

Find a minimal join path connecting all required data objects.

Uses shortest path for each target object from the set of source objects.

Source code in src/orionbelt/compiler/graph.py
def find_join_path(self, from_objects: set[str], to_objects: set[str]) -> list[JoinStep]:
    """Find a minimal join path connecting all required data objects.

    Uses shortest path for each target object from the set of source objects.
    """
    steps: list[JoinStep] = []
    visited_edges: set[tuple[str, str]] = set()

    all_targets = to_objects - from_objects
    source_list = list(from_objects)

    for target in sorted(all_targets):
        best_path: list[str] | None = None
        for source in source_list:
            try:
                path = nx.shortest_path(self._graph, source, target)
                if best_path is None or len(path) < len(best_path):
                    best_path = path
            except nx.NetworkXNoPath:
                continue

        if best_path is None:
            continue

        for i in range(len(best_path) - 1):
            edge = (best_path[i], best_path[i + 1])
            rev_edge = (best_path[i + 1], best_path[i])
            if edge in visited_edges or rev_edge in visited_edges:
                continue
            visited_edges.add(edge)

            edge_data = self._graph.edges[edge]
            source_object = edge_data.get("source_object", edge[0])

            if source_object == edge[0]:
                step = JoinStep(
                    from_object=edge[0],
                    to_object=edge[1],
                    from_columns=edge_data["columns_from"],
                    to_columns=edge_data["columns_to"],
                    join_type=ASTJoinType.LEFT,
                    cardinality=edge_data["cardinality"],
                )
            else:
                # Path traverses edge in reverse direction.
                # from_object/to_object are swapped, so columns must be
                # swapped too to keep the ON clause correctly oriented.
                step = JoinStep(
                    from_object=edge[1],
                    to_object=edge[0],
                    from_columns=edge_data["columns_to"],
                    to_columns=edge_data["columns_from"],
                    join_type=ASTJoinType.LEFT,
                    cardinality=edge_data["cardinality"],
                    reversed=True,
                )
            steps.append(step)

        # Add target to sources for subsequent lookups
        source_list.append(target)

    return steps

build_join_condition(step)

Build the ON clause expression for a join step.

Source code in src/orionbelt/compiler/graph.py
def build_join_condition(self, step: JoinStep) -> Expr:
    """Build the ON clause expression for a join step."""
    conditions: list[Expr] = []
    for from_c, to_c in zip(step.from_columns, step.to_columns, strict=True):
        # Resolve to physical column names
        from_obj = self._model.data_objects.get(step.from_object)
        to_obj = self._model.data_objects.get(step.to_object)
        if from_obj and from_c in from_obj.columns:
            from_col = from_obj.columns[from_c].code
        else:
            from_col = from_c
        to_col = to_obj.columns[to_c].code if to_obj and to_c in to_obj.columns else to_c
        conditions.append(
            BinaryOp(
                left=ColumnRef(name=from_col, table=step.from_object),
                op="=",
                right=ColumnRef(name=to_col, table=step.to_object),
            )
        )

    if not conditions:
        msg = f"Join from '{step.from_object}' to '{step.to_object}' has no join columns"
        raise ValueError(msg)
    result: Expr = conditions[0]
    for cond in conditions[1:]:
        result = BinaryOp(left=result, op="AND", right=cond)
    return result

detect_cycles()

Detect cyclic join paths.

Source code in src/orionbelt/compiler/graph.py
def detect_cycles(self) -> list[list[str]]:
    """Detect cyclic join paths."""
    try:
        cycles = list(nx.simple_cycles(self._directed))
        return cycles
    except nx.NetworkXError:
        return []

Code Generator

orionbelt.compiler.codegen.CodeGenerator

Generates SQL from AST using a dialect.

Source code in src/orionbelt/compiler/codegen.py
class CodeGenerator:
    """Generates SQL from AST using a dialect."""

    def __init__(self, dialect: Dialect) -> None:
        self._dialect = dialect

    @property
    def dialect(self) -> Dialect:
        return self._dialect

    def generate(self, ast: Select) -> str:
        """Generate SQL string from AST using the configured dialect."""
        return self._dialect.compile(ast)

generate(ast)

Generate SQL string from AST using the configured dialect.

Source code in src/orionbelt/compiler/codegen.py
def generate(self, ast: Select) -> str:
    """Generate SQL string from AST using the configured dialect."""
    return self._dialect.compile(ast)

Dialect Base

orionbelt.dialect.base.Dialect

Bases: ABC

Abstract base for all SQL dialects.

Provides default SQL compilation; dialects override specific methods.

Source code in src/orionbelt/dialect/base.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
class Dialect(ABC):
    """Abstract base for all SQL dialects.

    Provides default SQL compilation; dialects override specific methods.
    """

    _ABSTRACT_TYPE_MAP: dict[str, str] = {
        "string": "VARCHAR",
        "json": "VARCHAR",
        "int": "INTEGER",
        "float": "FLOAT",
        "date": "DATE",
        "time": "TIME",
        "time_tz": "TIME",
        "timestamp": "TIMESTAMP",
        "timestamp_tz": "TIMESTAMP",
        "boolean": "BOOLEAN",
    }

    def _resolve_type_name(self, type_name: str) -> str:
        """Map an abstract type name to a dialect-specific SQL type.

        Looks up ``_ABSTRACT_TYPE_MAP`` first; if *type_name* is not found
        (e.g. already a concrete SQL type like ``VARCHAR``), returns it as-is.
        """
        return self._ABSTRACT_TYPE_MAP.get(type_name, type_name)

    def format_table_ref(self, database: str, schema: str, code: str) -> str:
        """Format a fully-qualified table reference.

        Default: three-part ``database.schema.code`` (Snowflake/Databricks/Dremio).
        Postgres and ClickHouse override to two-part naming.
        All components are quoted to prevent SQL injection.
        """
        return (
            f"{self.quote_identifier(database)}"
            f".{self.quote_identifier(schema)}"
            f".{self.quote_identifier(code)}"
        )

    @property
    @abstractmethod
    def name(self) -> str: ...

    @property
    @abstractmethod
    def capabilities(self) -> DialectCapabilities: ...

    @abstractmethod
    def quote_identifier(self, name: str) -> str:
        """Quote an identifier per dialect rules."""

    @abstractmethod
    def render_time_grain(self, column: Expr, grain: TimeGrain) -> Expr:
        """Wrap a column expression for the given time grain."""

    @abstractmethod
    def render_cast(self, expr: Expr, target_type: str) -> Expr:
        """Render a CAST expression."""

    @abstractmethod
    def current_date_sql(self) -> str:
        """Return SQL for the current date."""

    @abstractmethod
    def date_add_sql(self, date_sql: str, unit: str, count: int) -> str:
        """Return SQL that adds count units to date_sql."""

    @abstractmethod
    def render_date_trunc_sql(self, column_sql: str, grain: str) -> str:
        """Return SQL string that truncates a date/timestamp to the given grain.

        String-level helper (not AST) for use in raw SQL CTEs like date_range.
        """

    @abstractmethod
    def render_date_spine_cte_sql(
        self,
        min_date: str,
        max_date: str,
        grain: str,
        offset: int,
        offset_grain: str,
    ) -> str:
        """Return the SQL body for a date spine CTE.

        Must produce two columns: ``spine_date`` and ``spine_date_prev``.
        ``spine_date_prev`` is NULL when the offset date falls before min_date.

        Parameters
        ----------
        min_date : str
            SQL expression referencing the minimum date (e.g. ``date_range.min_date``).
        max_date : str
            SQL expression referencing the maximum date.
        grain : str
            Time grain string: ``day``, ``week``, ``month``, ``quarter``, ``year``.
        offset : int
            Signed period offset (e.g. ``-1`` for previous period).
        offset_grain : str
            Grain of the offset (e.g. ``year`` for YoY).
        """

    def render_string_contains(self, column: Expr, pattern: Expr) -> Expr:
        """Default: column LIKE '%' || pattern || '%'."""
        return BinaryOp(
            left=column,
            op="LIKE",
            right=BinaryOp(
                left=BinaryOp(left=Literal.string("%"), op="||", right=pattern),
                op="||",
                right=Literal.string("%"),
            ),
        )

    def _map_function_name(self, name: str) -> str:
        """Map a function name to the dialect-specific equivalent.

        Override in subclasses to remap names (e.g. ANY_VALUE → any in ClickHouse).
        """
        return name

    def _compile_median(self, args: list[Expr]) -> str:
        """Compile MEDIAN — default uses MEDIAN(col).

        Works for Snowflake, ClickHouse, Databricks, and Dremio. Postgres overrides.
        """
        col_sql = self.compile_expr(args[0]) if args else "NULL"
        return f"MEDIAN({col_sql})"

    def _compile_mode(self, args: list[Expr]) -> str:
        """Compile MODE — default uses MODE(col).

        Works for Snowflake and Databricks. Postgres, ClickHouse, and Dremio override.
        """
        col_sql = self.compile_expr(args[0]) if args else "NULL"
        return f"MODE({col_sql})"

    def _compile_listagg(
        self,
        args: list[Expr],
        distinct: bool,
        order_by: list[OrderByItem],
        separator: str | None,
    ) -> str:
        """Compile LISTAGG — default uses LISTAGG(col, sep) WITHIN GROUP (ORDER BY ...).

        Works for Snowflake and Dremio. Postgres, ClickHouse, and Databricks override.
        """
        sep = separator if separator is not None else ","
        col_sql = self.compile_expr(args[0]) if args else "''"
        distinct_sql = "DISTINCT " if distinct else ""
        escaped_sep = sep.replace("'", "''")
        result = f"LISTAGG({distinct_sql}{col_sql}, '{escaped_sep}')"
        if order_by:
            ob = ", ".join(self.compile_order_by(o) for o in order_by)
            result += f" WITHIN GROUP (ORDER BY {ob})"
        return result

    def _compile_multi_field_count(self, args: list[Expr], distinct: bool) -> str:
        """Compile COUNT with multiple fields by concatenating with ``||``.

        Default (non-Snowflake) strategy: cast each field to VARCHAR and
        join with ``'|'`` separator so the database sees a single expression.
        Snowflake overrides this to emit native ``COUNT(col1, col2)``.
        """
        parts = [f"CAST({self.compile_expr(a)} AS VARCHAR)" for a in args]
        concat = " || '|' || ".join(parts)
        if distinct:
            return f"COUNT(DISTINCT {concat})"
        return f"COUNT({concat})"

    def compile(self, ast: Select) -> str:
        """Render a complete SQL AST to a dialect-specific string."""
        return self.compile_select(ast)

    def compile_select(self, node: Select) -> str:
        """Compile a SELECT statement."""
        parts: list[str] = []

        # CTEs
        if node.ctes:
            cte_parts = []
            for cte in node.ctes:
                if isinstance(cte.query, RawSQL):
                    cte_sql = cte.query.sql
                elif isinstance(cte.query, UnionAll):
                    cte_sql = self.compile_union_all(cte.query)
                elif isinstance(cte.query, Except):
                    cte_sql = self.compile_except(cte.query)
                else:
                    cte_sql = self.compile_select(cte.query)
                cte_parts.append(f"{self.quote_identifier(cte.name)} AS (\n{cte_sql}\n)")
            parts.append("WITH " + ",\n".join(cte_parts))

        # SELECT
        if node.columns:
            cols = ", ".join(self.compile_expr(c) for c in node.columns)
            parts.append(f"SELECT {cols}")
        else:
            parts.append("SELECT *")

        # FROM
        if node.from_:
            parts.append(f"FROM {self.compile_from(node.from_)}")

        # JOINs
        for join in node.joins:
            parts.append(self.compile_join(join))

        # WHERE
        if node.where:
            parts.append(f"WHERE {self.compile_expr(node.where)}")

        # GROUP BY
        if node.group_by:
            groups = ", ".join(self.compile_expr(g) for g in node.group_by)
            parts.append(f"GROUP BY {groups}")

        # HAVING
        if node.having:
            parts.append(f"HAVING {self.compile_expr(node.having)}")

        # ORDER BY
        if node.order_by:
            orders = ", ".join(self.compile_order_by(o) for o in node.order_by)
            parts.append(f"ORDER BY {orders}")

        # LIMIT
        if node.limit is not None:
            parts.append(f"LIMIT {node.limit}")

        # OFFSET
        if node.offset is not None:
            parts.append(f"OFFSET {node.offset}")

        return "\n".join(parts)

    def compile_from(self, node: From) -> str:
        if isinstance(node.source, Select):
            sub = self.compile_select(node.source)
            result = f"(\n{sub}\n)"
        else:
            result = str(node.source)
        if node.alias:
            result += f" AS {self.quote_identifier(node.alias)}"
        return result

    def compile_join(self, node: Join) -> str:
        if isinstance(node.source, Select):
            source = f"(\n{self.compile_select(node.source)}\n)"
        else:
            source = str(node.source)
        if node.alias:
            source += f" AS {self.quote_identifier(node.alias)}"

        parts = [f"{node.join_type.value} JOIN {source}"]
        if node.on:
            parts.append(f"ON {self.compile_expr(node.on)}")
        return " ".join(parts)

    def compile_order_by(self, node: OrderByItem) -> str:
        result = self.compile_expr(node.expr)
        if node.desc:
            result += " DESC"
        else:
            result += " ASC"
        if node.nulls_last is True:
            result += " NULLS LAST"
        elif node.nulls_last is False:
            result += " NULLS FIRST"
        return result

    def compile_union_all(self, node: UnionAll) -> str:
        """Compile a UNION ALL of multiple SELECT statements."""
        return "\nUNION ALL\n".join(self.compile_select(q) for q in node.queries)

    def compile_except(self, node: Except) -> str:
        """Compile an EXCEPT of two SELECT statements."""
        return self.compile_select(node.left) + "\nEXCEPT\n" + self.compile_select(node.right)

    def compile_expr(self, expr: Expr) -> str:
        """Compile an expression node to SQL string."""
        match expr:
            case Literal(value=None):
                return "NULL"
            case Literal(value=True):
                return "TRUE"
            case Literal(value=False):
                return "FALSE"
            case Literal(value=v) if isinstance(v, str):
                escaped = v.replace("'", "''")
                return f"'{escaped}'"
            case Literal(value=v):
                return str(v)
            case Star(table=None):
                return "*"
            case Star(table=t) if t is not None:
                return f"{self.quote_identifier(t)}.*"
            case ColumnRef(name=name, table=None):
                return self.quote_identifier(name)
            case ColumnRef(name=name, table=table) if table is not None:
                return f"{self.quote_identifier(table)}.{self.quote_identifier(name)}"
            case AliasedExpr(expr=inner, alias=alias):
                return f"{self.compile_expr(inner)} AS {self.quote_identifier(alias)}"
            case FunctionCall(
                name=fname,
                args=args,
                distinct=distinct,
                order_by=order_by,
                separator=separator,
            ):
                # LISTAGG: dialect-specific rendering
                if fname.upper() == "LISTAGG":
                    return self._compile_listagg(args, distinct, order_by, separator)
                # MODE: dialect-specific rendering
                if fname.upper() == "MODE":
                    return self._compile_mode(args)
                # MEDIAN: dialect-specific rendering
                if fname.upper() == "MEDIAN":
                    return self._compile_median(args)
                # Multi-field COUNT: concatenate fields for portability
                # (Snowflake overrides to use native multi-arg syntax)
                if fname.upper() == "COUNT" and len(args) > 1:
                    return self._compile_multi_field_count(args, distinct)
                fname = self._map_function_name(fname)
                args_sql = ", ".join(self.compile_expr(a) for a in args)
                if distinct:
                    return f"{fname}(DISTINCT {args_sql})"
                return f"{fname}({args_sql})"
            case BinaryOp(left=left, op=op, right=right):
                return f"({self.compile_expr(left)} {op} {self.compile_expr(right)})"
            case UnaryOp(op=op, operand=operand):
                return f"({op} {self.compile_expr(operand)})"
            case IsNull(expr=inner, negated=False):
                return f"({self.compile_expr(inner)} IS NULL)"
            case IsNull(expr=inner, negated=True):
                return f"({self.compile_expr(inner)} IS NOT NULL)"
            case InList(expr=inner, values=values, negated=negated):
                vals = ", ".join(self.compile_expr(v) for v in values)
                op = "NOT IN" if negated else "IN"
                return f"({self.compile_expr(inner)} {op} ({vals}))"
            case CaseExpr(when_clauses=whens, else_clause=else_):
                parts = ["CASE"]
                for when_cond, then_val in whens:
                    parts.append(
                        f"WHEN {self.compile_expr(when_cond)} THEN {self.compile_expr(then_val)}"
                    )
                if else_ is not None:
                    parts.append(f"ELSE {self.compile_expr(else_)}")
                parts.append("END")
                return " ".join(parts)
            case Cast(expr=inner, type_name=type_name):
                resolved_type = self._resolve_type_name(type_name)
                return f"CAST({self.compile_expr(inner)} AS {resolved_type})"
            case SubqueryExpr(query=query):
                return f"(\n{self.compile_select(query)}\n)"
            case RawSQL(sql=sql):
                return sql
            case Between(expr=inner, low=low, high=high, negated=negated):
                op = "NOT BETWEEN" if negated else "BETWEEN"
                return (
                    f"({self.compile_expr(inner)} {op} "
                    f"{self.compile_expr(low)} AND {self.compile_expr(high)})"
                )
            case RelativeDateRange(
                column=column,
                unit=unit,
                count=count,
                direction=direction,
                include_current=include_current,
            ):
                return self.compile_relative_date_range(
                    column=column,
                    unit=unit,
                    count=count,
                    direction=direction,
                    include_current=include_current,
                )
            case WindowFunction(
                func_name=fname,
                args=args,
                partition_by=partition_by,
                order_by=order_by,
                frame=frame,
                distinct=distinct,
            ):
                args_sql = ", ".join(self.compile_expr(a) for a in args)
                func_sql = f"{fname}(DISTINCT {args_sql})" if distinct else f"{fname}({args_sql})"
                over_parts: list[str] = []
                if partition_by:
                    pb = ", ".join(self.compile_expr(p) for p in partition_by)
                    over_parts.append(f"PARTITION BY {pb}")
                if order_by:
                    ob = ", ".join(self.compile_order_by(o) for o in order_by)
                    over_parts.append(f"ORDER BY {ob}")
                if frame is not None:
                    over_parts.append(f"{frame.mode} BETWEEN {frame.start} AND {frame.end}")
                over_clause = " ".join(over_parts)
                return f"{func_sql} OVER ({over_clause})"
            case _:
                raise ValueError(f"Unknown AST node type: {type(expr).__name__}")

    def compile_relative_date_range(
        self,
        column: Expr,
        unit: str,
        count: int,
        direction: str,
        include_current: bool,
    ) -> str:
        """Compile a relative date range predicate to SQL."""
        col_sql = self.compile_expr(column)
        base = self.current_date_sql()

        if direction == "future":
            start = base if include_current else self.date_add_sql(base, "day", 1)
            end = self.date_add_sql(start, unit, count)
        else:
            end = self.date_add_sql(base, "day", 1) if include_current else base
            start = self.date_add_sql(end, unit, -count)

        return f"({col_sql} >= {start} AND {col_sql} < {end})"

format_table_ref(database, schema, code)

Format a fully-qualified table reference.

Default: three-part database.schema.code (Snowflake/Databricks/Dremio). Postgres and ClickHouse override to two-part naming. All components are quoted to prevent SQL injection.

Source code in src/orionbelt/dialect/base.py
def format_table_ref(self, database: str, schema: str, code: str) -> str:
    """Format a fully-qualified table reference.

    Default: three-part ``database.schema.code`` (Snowflake/Databricks/Dremio).
    Postgres and ClickHouse override to two-part naming.
    All components are quoted to prevent SQL injection.
    """
    return (
        f"{self.quote_identifier(database)}"
        f".{self.quote_identifier(schema)}"
        f".{self.quote_identifier(code)}"
    )

quote_identifier(name) abstractmethod

Quote an identifier per dialect rules.

Source code in src/orionbelt/dialect/base.py
@abstractmethod
def quote_identifier(self, name: str) -> str:
    """Quote an identifier per dialect rules."""

render_time_grain(column, grain) abstractmethod

Wrap a column expression for the given time grain.

Source code in src/orionbelt/dialect/base.py
@abstractmethod
def render_time_grain(self, column: Expr, grain: TimeGrain) -> Expr:
    """Wrap a column expression for the given time grain."""

render_cast(expr, target_type) abstractmethod

Render a CAST expression.

Source code in src/orionbelt/dialect/base.py
@abstractmethod
def render_cast(self, expr: Expr, target_type: str) -> Expr:
    """Render a CAST expression."""

current_date_sql() abstractmethod

Return SQL for the current date.

Source code in src/orionbelt/dialect/base.py
@abstractmethod
def current_date_sql(self) -> str:
    """Return SQL for the current date."""

date_add_sql(date_sql, unit, count) abstractmethod

Return SQL that adds count units to date_sql.

Source code in src/orionbelt/dialect/base.py
@abstractmethod
def date_add_sql(self, date_sql: str, unit: str, count: int) -> str:
    """Return SQL that adds count units to date_sql."""

render_date_trunc_sql(column_sql, grain) abstractmethod

Return SQL string that truncates a date/timestamp to the given grain.

String-level helper (not AST) for use in raw SQL CTEs like date_range.

Source code in src/orionbelt/dialect/base.py
@abstractmethod
def render_date_trunc_sql(self, column_sql: str, grain: str) -> str:
    """Return SQL string that truncates a date/timestamp to the given grain.

    String-level helper (not AST) for use in raw SQL CTEs like date_range.
    """

render_date_spine_cte_sql(min_date, max_date, grain, offset, offset_grain) abstractmethod

Return the SQL body for a date spine CTE.

Must produce two columns: spine_date and spine_date_prev. spine_date_prev is NULL when the offset date falls before min_date.

Parameters

min_date : str SQL expression referencing the minimum date (e.g. date_range.min_date). max_date : str SQL expression referencing the maximum date. grain : str Time grain string: day, week, month, quarter, year. offset : int Signed period offset (e.g. -1 for previous period). offset_grain : str Grain of the offset (e.g. year for YoY).

Source code in src/orionbelt/dialect/base.py
@abstractmethod
def render_date_spine_cte_sql(
    self,
    min_date: str,
    max_date: str,
    grain: str,
    offset: int,
    offset_grain: str,
) -> str:
    """Return the SQL body for a date spine CTE.

    Must produce two columns: ``spine_date`` and ``spine_date_prev``.
    ``spine_date_prev`` is NULL when the offset date falls before min_date.

    Parameters
    ----------
    min_date : str
        SQL expression referencing the minimum date (e.g. ``date_range.min_date``).
    max_date : str
        SQL expression referencing the maximum date.
    grain : str
        Time grain string: ``day``, ``week``, ``month``, ``quarter``, ``year``.
    offset : int
        Signed period offset (e.g. ``-1`` for previous period).
    offset_grain : str
        Grain of the offset (e.g. ``year`` for YoY).
    """

render_string_contains(column, pattern)

Default: column LIKE '%' || pattern || '%'.

Source code in src/orionbelt/dialect/base.py
def render_string_contains(self, column: Expr, pattern: Expr) -> Expr:
    """Default: column LIKE '%' || pattern || '%'."""
    return BinaryOp(
        left=column,
        op="LIKE",
        right=BinaryOp(
            left=BinaryOp(left=Literal.string("%"), op="||", right=pattern),
            op="||",
            right=Literal.string("%"),
        ),
    )

compile(ast)

Render a complete SQL AST to a dialect-specific string.

Source code in src/orionbelt/dialect/base.py
def compile(self, ast: Select) -> str:
    """Render a complete SQL AST to a dialect-specific string."""
    return self.compile_select(ast)

compile_select(node)

Compile a SELECT statement.

Source code in src/orionbelt/dialect/base.py
def compile_select(self, node: Select) -> str:
    """Compile a SELECT statement."""
    parts: list[str] = []

    # CTEs
    if node.ctes:
        cte_parts = []
        for cte in node.ctes:
            if isinstance(cte.query, RawSQL):
                cte_sql = cte.query.sql
            elif isinstance(cte.query, UnionAll):
                cte_sql = self.compile_union_all(cte.query)
            elif isinstance(cte.query, Except):
                cte_sql = self.compile_except(cte.query)
            else:
                cte_sql = self.compile_select(cte.query)
            cte_parts.append(f"{self.quote_identifier(cte.name)} AS (\n{cte_sql}\n)")
        parts.append("WITH " + ",\n".join(cte_parts))

    # SELECT
    if node.columns:
        cols = ", ".join(self.compile_expr(c) for c in node.columns)
        parts.append(f"SELECT {cols}")
    else:
        parts.append("SELECT *")

    # FROM
    if node.from_:
        parts.append(f"FROM {self.compile_from(node.from_)}")

    # JOINs
    for join in node.joins:
        parts.append(self.compile_join(join))

    # WHERE
    if node.where:
        parts.append(f"WHERE {self.compile_expr(node.where)}")

    # GROUP BY
    if node.group_by:
        groups = ", ".join(self.compile_expr(g) for g in node.group_by)
        parts.append(f"GROUP BY {groups}")

    # HAVING
    if node.having:
        parts.append(f"HAVING {self.compile_expr(node.having)}")

    # ORDER BY
    if node.order_by:
        orders = ", ".join(self.compile_order_by(o) for o in node.order_by)
        parts.append(f"ORDER BY {orders}")

    # LIMIT
    if node.limit is not None:
        parts.append(f"LIMIT {node.limit}")

    # OFFSET
    if node.offset is not None:
        parts.append(f"OFFSET {node.offset}")

    return "\n".join(parts)

compile_union_all(node)

Compile a UNION ALL of multiple SELECT statements.

Source code in src/orionbelt/dialect/base.py
def compile_union_all(self, node: UnionAll) -> str:
    """Compile a UNION ALL of multiple SELECT statements."""
    return "\nUNION ALL\n".join(self.compile_select(q) for q in node.queries)

compile_except(node)

Compile an EXCEPT of two SELECT statements.

Source code in src/orionbelt/dialect/base.py
def compile_except(self, node: Except) -> str:
    """Compile an EXCEPT of two SELECT statements."""
    return self.compile_select(node.left) + "\nEXCEPT\n" + self.compile_select(node.right)

compile_expr(expr)

Compile an expression node to SQL string.

Source code in src/orionbelt/dialect/base.py
def compile_expr(self, expr: Expr) -> str:
    """Compile an expression node to SQL string."""
    match expr:
        case Literal(value=None):
            return "NULL"
        case Literal(value=True):
            return "TRUE"
        case Literal(value=False):
            return "FALSE"
        case Literal(value=v) if isinstance(v, str):
            escaped = v.replace("'", "''")
            return f"'{escaped}'"
        case Literal(value=v):
            return str(v)
        case Star(table=None):
            return "*"
        case Star(table=t) if t is not None:
            return f"{self.quote_identifier(t)}.*"
        case ColumnRef(name=name, table=None):
            return self.quote_identifier(name)
        case ColumnRef(name=name, table=table) if table is not None:
            return f"{self.quote_identifier(table)}.{self.quote_identifier(name)}"
        case AliasedExpr(expr=inner, alias=alias):
            return f"{self.compile_expr(inner)} AS {self.quote_identifier(alias)}"
        case FunctionCall(
            name=fname,
            args=args,
            distinct=distinct,
            order_by=order_by,
            separator=separator,
        ):
            # LISTAGG: dialect-specific rendering
            if fname.upper() == "LISTAGG":
                return self._compile_listagg(args, distinct, order_by, separator)
            # MODE: dialect-specific rendering
            if fname.upper() == "MODE":
                return self._compile_mode(args)
            # MEDIAN: dialect-specific rendering
            if fname.upper() == "MEDIAN":
                return self._compile_median(args)
            # Multi-field COUNT: concatenate fields for portability
            # (Snowflake overrides to use native multi-arg syntax)
            if fname.upper() == "COUNT" and len(args) > 1:
                return self._compile_multi_field_count(args, distinct)
            fname = self._map_function_name(fname)
            args_sql = ", ".join(self.compile_expr(a) for a in args)
            if distinct:
                return f"{fname}(DISTINCT {args_sql})"
            return f"{fname}({args_sql})"
        case BinaryOp(left=left, op=op, right=right):
            return f"({self.compile_expr(left)} {op} {self.compile_expr(right)})"
        case UnaryOp(op=op, operand=operand):
            return f"({op} {self.compile_expr(operand)})"
        case IsNull(expr=inner, negated=False):
            return f"({self.compile_expr(inner)} IS NULL)"
        case IsNull(expr=inner, negated=True):
            return f"({self.compile_expr(inner)} IS NOT NULL)"
        case InList(expr=inner, values=values, negated=negated):
            vals = ", ".join(self.compile_expr(v) for v in values)
            op = "NOT IN" if negated else "IN"
            return f"({self.compile_expr(inner)} {op} ({vals}))"
        case CaseExpr(when_clauses=whens, else_clause=else_):
            parts = ["CASE"]
            for when_cond, then_val in whens:
                parts.append(
                    f"WHEN {self.compile_expr(when_cond)} THEN {self.compile_expr(then_val)}"
                )
            if else_ is not None:
                parts.append(f"ELSE {self.compile_expr(else_)}")
            parts.append("END")
            return " ".join(parts)
        case Cast(expr=inner, type_name=type_name):
            resolved_type = self._resolve_type_name(type_name)
            return f"CAST({self.compile_expr(inner)} AS {resolved_type})"
        case SubqueryExpr(query=query):
            return f"(\n{self.compile_select(query)}\n)"
        case RawSQL(sql=sql):
            return sql
        case Between(expr=inner, low=low, high=high, negated=negated):
            op = "NOT BETWEEN" if negated else "BETWEEN"
            return (
                f"({self.compile_expr(inner)} {op} "
                f"{self.compile_expr(low)} AND {self.compile_expr(high)})"
            )
        case RelativeDateRange(
            column=column,
            unit=unit,
            count=count,
            direction=direction,
            include_current=include_current,
        ):
            return self.compile_relative_date_range(
                column=column,
                unit=unit,
                count=count,
                direction=direction,
                include_current=include_current,
            )
        case WindowFunction(
            func_name=fname,
            args=args,
            partition_by=partition_by,
            order_by=order_by,
            frame=frame,
            distinct=distinct,
        ):
            args_sql = ", ".join(self.compile_expr(a) for a in args)
            func_sql = f"{fname}(DISTINCT {args_sql})" if distinct else f"{fname}({args_sql})"
            over_parts: list[str] = []
            if partition_by:
                pb = ", ".join(self.compile_expr(p) for p in partition_by)
                over_parts.append(f"PARTITION BY {pb}")
            if order_by:
                ob = ", ".join(self.compile_order_by(o) for o in order_by)
                over_parts.append(f"ORDER BY {ob}")
            if frame is not None:
                over_parts.append(f"{frame.mode} BETWEEN {frame.start} AND {frame.end}")
            over_clause = " ".join(over_parts)
            return f"{func_sql} OVER ({over_clause})"
        case _:
            raise ValueError(f"Unknown AST node type: {type(expr).__name__}")

compile_relative_date_range(column, unit, count, direction, include_current)

Compile a relative date range predicate to SQL.

Source code in src/orionbelt/dialect/base.py
def compile_relative_date_range(
    self,
    column: Expr,
    unit: str,
    count: int,
    direction: str,
    include_current: bool,
) -> str:
    """Compile a relative date range predicate to SQL."""
    col_sql = self.compile_expr(column)
    base = self.current_date_sql()

    if direction == "future":
        start = base if include_current else self.date_add_sql(base, "day", 1)
        end = self.date_add_sql(start, unit, count)
    else:
        end = self.date_add_sql(base, "day", 1) if include_current else base
        start = self.date_add_sql(end, unit, -count)

    return f"({col_sql} >= {start} AND {col_sql} < {end})"

orionbelt.dialect.base.DialectCapabilities dataclass

Flags indicating what SQL features a dialect supports.

Source code in src/orionbelt/dialect/base.py
@dataclass
class DialectCapabilities:
    """Flags indicating what SQL features a dialect supports."""

    supports_cte: bool = True
    supports_qualify: bool = False
    supports_arrays: bool = False
    supports_window_filters: bool = False
    supports_ilike: bool = False
    supports_time_travel: bool = False
    supports_semi_structured: bool = False
    supports_union_all_by_name: bool = False
    unsupported_aggregations: list[str] = field(default_factory=list)

Dialect Registry

orionbelt.dialect.registry.DialectRegistry

Registry for SQL dialect plugins.

Source code in src/orionbelt/dialect/registry.py
class DialectRegistry:
    """Registry for SQL dialect plugins."""

    _dialects: dict[str, type[Dialect]] = {}

    @classmethod
    def register(cls, dialect_class: type[Dialect]) -> type[Dialect]:
        """Register a dialect class. Can be used as a decorator."""
        # Instantiate to read the name property
        instance = dialect_class()
        cls._dialects[instance.name] = dialect_class
        return dialect_class

    @classmethod
    def get(cls, name: str) -> Dialect:
        """Get an instance of the named dialect."""
        if name not in cls._dialects:
            raise UnsupportedDialectError(name, available=cls.available())
        return cls._dialects[name]()

    @classmethod
    def available(cls) -> list[str]:
        """List registered dialect names."""
        return sorted(cls._dialects.keys())

    @classmethod
    def reset(cls) -> None:
        """Clear all registered dialects (for testing)."""
        cls._dialects.clear()

get(name) classmethod

Get an instance of the named dialect.

Source code in src/orionbelt/dialect/registry.py
@classmethod
def get(cls, name: str) -> Dialect:
    """Get an instance of the named dialect."""
    if name not in cls._dialects:
        raise UnsupportedDialectError(name, available=cls.available())
    return cls._dialects[name]()

available() classmethod

List registered dialect names.

Source code in src/orionbelt/dialect/registry.py
@classmethod
def available(cls) -> list[str]:
    """List registered dialect names."""
    return sorted(cls._dialects.keys())

register(dialect_class) classmethod

Register a dialect class. Can be used as a decorator.

Source code in src/orionbelt/dialect/registry.py
@classmethod
def register(cls, dialect_class: type[Dialect]) -> type[Dialect]:
    """Register a dialect class. Can be used as a decorator."""
    # Instantiate to read the name property
    instance = dialect_class()
    cls._dialects[instance.name] = dialect_class
    return dialect_class

YAML Parser

orionbelt.parser.loader.TrackedLoader

YAML loader that tracks source positions for error reporting.

Uses ruamel.yaml which preserves line/column info on every parsed node.

Source code in src/orionbelt/parser/loader.py
class TrackedLoader:
    """YAML loader that tracks source positions for error reporting.

    Uses ruamel.yaml which preserves line/column info on every parsed node.
    """

    def __init__(self) -> None:
        self._yaml = YAML()
        self._yaml.preserve_quotes = True
        # Reject duplicate YAML keys (e.g. two columns with the same name).
        # Without this, ruamel.yaml silently keeps only the last value.
        self._yaml.allow_duplicate_keys = False
        # Reject deeply nested structures (mitigates stack-based DoS).
        # ruamel.yaml raises an error when nesting exceeds this limit.
        self._yaml.max_depth = _MAX_DEPTH

    # -- safety checks -------------------------------------------------------

    @staticmethod
    def _check_yaml_safety(content: str) -> None:
        """Pre-parse safety checks on raw YAML text.

        Raises ``YAMLSafetyError`` if the content contains anchors/aliases
        (not used in OBML) or exceeds the maximum document size.
        """
        if len(content) > _MAX_DOCUMENT_SIZE:
            raise YAMLSafetyError(
                f"YAML document exceeds maximum size "
                f"({len(content):,} chars > {_MAX_DOCUMENT_SIZE:,} limit)"
            )
        # Strip full-line comments before scanning so that &name inside
        # comments (e.g. "# see R&D notes") does not cause a false positive.
        stripped = _COMMENT_LINE_RE.sub("", content)
        if _ANCHOR_RE.search(stripped):
            raise YAMLSafetyError("YAML anchors/aliases are not supported in OBML")

    @staticmethod
    def _check_node_count(data: Any, limit: int = _MAX_NODE_COUNT) -> None:
        """Post-parse defense-in-depth: reject documents with too many nodes."""
        count = 0
        stack: list[Any] = [data]
        while stack:
            node = stack.pop()
            count += 1
            if count > limit:
                raise YAMLSafetyError(f"YAML document exceeds maximum node count ({limit:,})")
            if isinstance(node, dict):
                stack.extend(node.values())
            elif isinstance(node, list):
                stack.extend(node)

    # -- public loading API --------------------------------------------------

    def load(self, path: Path) -> tuple[dict[str, Any], SourceMap]:
        """Load a YAML file and return parsed dict + source position map."""
        with path.open("r", encoding="utf-8") as handle:
            content = handle.read()
        self._check_yaml_safety(content)
        data = self._yaml.load(content)
        if data is None:
            return {}, SourceMap()
        self._check_node_count(data)
        source_map = SourceMap()
        self._extract_positions(data, str(path), "", source_map)
        return self._to_plain_dict(data), source_map

    def load_string(
        self, content: str, filename: str = "<string>"
    ) -> tuple[dict[str, Any], SourceMap]:
        """Load YAML from a string."""
        self._check_yaml_safety(content)
        data = self._yaml.load(content)
        if data is None:
            return {}, SourceMap()
        self._check_node_count(data)
        source_map = SourceMap()
        self._extract_positions(data, filename, "", source_map)
        return self._to_plain_dict(data), source_map

    def _extract_positions(
        self,
        data: Any,
        filename: str,
        prefix: str,
        source_map: SourceMap,
    ) -> None:
        """Recursively extract source positions from ruamel.yaml nodes."""
        if isinstance(data, CommentedMap):
            for key in data:
                key_path = f"{prefix}.{key}" if prefix else str(key)
                # Try to get position for this key from ruamel.yaml's lc object
                try:
                    lc = data.lc
                    # lc.key() returns a callable in newer ruamel.yaml
                    key_positions = lc.key(key)
                    if key_positions:
                        line, col = key_positions
                        source_map.add(
                            key_path,
                            SourceSpan(file=filename, line=line + 1, column=col + 1),
                        )
                except (AttributeError, KeyError, TypeError):
                    # Fallback: use the map's own position
                    try:
                        lc = data.lc
                        source_map.add(
                            key_path,
                            SourceSpan(file=filename, line=lc.line + 1, column=lc.col + 1),
                        )
                    except (AttributeError, TypeError):
                        pass
                self._extract_positions(data[key], filename, key_path, source_map)
        elif isinstance(data, CommentedSeq):
            for i, item in enumerate(data):
                item_path = f"{prefix}[{i}]"
                try:
                    lc = data.lc
                    item_pos = lc.item(i)
                    if item_pos:
                        line, col = item_pos
                        source_map.add(
                            item_path,
                            SourceSpan(file=filename, line=line + 1, column=col + 1),
                        )
                except (AttributeError, KeyError, TypeError):
                    pass
                self._extract_positions(item, filename, item_path, source_map)

    def _to_plain_dict(self, data: Any) -> dict[str, Any]:
        """Convert ruamel.yaml CommentedMap/Seq to plain Python dict/list."""
        if isinstance(data, CommentedMap):
            return {str(k): self._to_plain_value(v) for k, v in data.items()}
        if isinstance(data, dict):
            return {str(k): self._to_plain_value(v) for k, v in data.items()}
        return {}

    def _to_plain_value(self, data: Any) -> Any:
        if isinstance(data, CommentedMap):
            return {str(k): self._to_plain_value(v) for k, v in data.items()}
        if isinstance(data, CommentedSeq):
            return [self._to_plain_value(item) for item in data]
        if isinstance(data, dict):
            return {str(k): self._to_plain_value(v) for k, v in data.items()}
        if isinstance(data, list):
            return [self._to_plain_value(item) for item in data]
        return data

load(path)

Load a YAML file and return parsed dict + source position map.

Source code in src/orionbelt/parser/loader.py
def load(self, path: Path) -> tuple[dict[str, Any], SourceMap]:
    """Load a YAML file and return parsed dict + source position map."""
    with path.open("r", encoding="utf-8") as handle:
        content = handle.read()
    self._check_yaml_safety(content)
    data = self._yaml.load(content)
    if data is None:
        return {}, SourceMap()
    self._check_node_count(data)
    source_map = SourceMap()
    self._extract_positions(data, str(path), "", source_map)
    return self._to_plain_dict(data), source_map

load_string(content, filename='<string>')

Load YAML from a string.

Source code in src/orionbelt/parser/loader.py
def load_string(
    self, content: str, filename: str = "<string>"
) -> tuple[dict[str, Any], SourceMap]:
    """Load YAML from a string."""
    self._check_yaml_safety(content)
    data = self._yaml.load(content)
    if data is None:
        return {}, SourceMap()
    self._check_node_count(data)
    source_map = SourceMap()
    self._extract_positions(data, filename, "", source_map)
    return self._to_plain_dict(data), source_map

Reference Resolver

orionbelt.parser.resolver.ReferenceResolver

Resolves all references in a raw YAML model to a fully-typed SemanticModel.

Source code in src/orionbelt/parser/resolver.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
class ReferenceResolver:
    """Resolves all references in a raw YAML model to a fully-typed SemanticModel."""

    def resolve(
        self,
        raw: dict[str, Any],
        source_map: SourceMap | None = None,
    ) -> tuple[SemanticModel, ValidationResult]:
        """Resolve raw YAML dict into a validated SemanticModel.

        Returns (model, validation_result). If there are errors,
        the model may be partially populated.
        """
        errors: list[SemanticError] = []
        warnings: list[SemanticError] = []

        # Parse data objects
        data_objects: dict[str, DataObject] = {}
        raw_objects = raw.get("dataObjects", {})
        if not isinstance(raw_objects, dict):
            errors.append(
                SemanticError(
                    code="DATA_OBJECT_PARSE_ERROR",
                    message="'dataObjects' must be a YAML mapping, not a list or scalar",
                    path="dataObjects",
                )
            )
            raw_objects = {}
        for name, raw_obj in raw_objects.items():
            try:
                obj_columns: dict[str, DataObjectColumn] = {}
                for fname, fdata in raw_obj.get("columns", {}).items():
                    obj_columns[fname] = DataObjectColumn(
                        label=fname,
                        code=fdata.get("code", fname),
                        abstract_type=fdata.get("abstractType", "string"),
                        sql_type=fdata.get("sqlType"),
                        sql_precision=fdata.get("sqlPrecision"),
                        sql_scale=fdata.get("sqlScale"),
                        num_class=fdata.get("numClass"),
                        comment=fdata.get("comment"),
                        owner=fdata.get("owner"),
                        synonyms=fdata.get("synonyms", []),
                        custom_extensions=_parse_extensions(fdata),
                    )

                obj_joins: list[DataObjectJoin] = []
                for jdata in raw_obj.get("joins", []):
                    obj_joins.append(
                        DataObjectJoin(
                            join_type=jdata["joinType"],
                            join_to=jdata["joinTo"],
                            columns_from=jdata["columnsFrom"],
                            columns_to=jdata["columnsTo"],
                            secondary=jdata.get("secondary", False),
                            path_name=jdata.get("pathName"),
                        )
                    )

                data_objects[name] = DataObject(
                    label=name,
                    code=raw_obj.get("code", ""),
                    database=raw_obj.get("database", ""),
                    schema_name=raw_obj.get("schema", ""),
                    columns=obj_columns,
                    joins=obj_joins,
                    comment=raw_obj.get("comment"),
                    owner=raw_obj.get("owner"),
                    synonyms=raw_obj.get("synonyms", []),
                    custom_extensions=_parse_extensions(raw_obj),
                )
            except Exception as e:
                span = source_map.get(f"dataObjects.{name}") if source_map else None
                errors.append(
                    SemanticError(
                        code="DATA_OBJECT_PARSE_ERROR",
                        message=f"Failed to parse data object '{name}': {e}",
                        path=f"dataObjects.{name}",
                        span=span,
                    )
                )

        # Parse dimensions
        dimensions: dict[str, Dimension] = {}
        raw_dims = raw.get("dimensions", {})
        if not isinstance(raw_dims, dict):
            errors.append(
                SemanticError(
                    code="DIMENSION_PARSE_ERROR",
                    message="'dimensions' must be a YAML mapping, not a list or scalar",
                    path="dimensions",
                )
            )
            raw_dims = {}
        for name, raw_dim in raw_dims.items():
            try:
                data_object = raw_dim.get("dataObject")
                column = raw_dim.get("column")

                # Validate the data object exists
                if data_object and data_object not in data_objects:
                    span = source_map.get(f"dimensions.{name}") if source_map else None
                    errors.append(
                        SemanticError(
                            code="UNKNOWN_DATA_OBJECT",
                            message=(
                                f"Dimension '{name}' references unknown data object '{data_object}'"
                            ),
                            path=f"dimensions.{name}",
                            span=span,
                            suggestions=_suggest_similar(data_object, list(data_objects.keys())),
                        )
                    )

                # Validate the column exists in the data object
                if (
                    data_object
                    and column
                    and data_object in data_objects
                    and column not in data_objects[data_object].columns
                ):
                    span = source_map.get(f"dimensions.{name}") if source_map else None
                    errors.append(
                        SemanticError(
                            code="UNKNOWN_COLUMN",
                            message=(
                                f"Dimension '{name}' references unknown column "
                                f"'{column}' in data object '{data_object}'"
                            ),
                            path=f"dimensions.{name}",
                            span=span,
                            suggestions=_suggest_similar(
                                column, list(data_objects[data_object].columns.keys())
                            ),
                        )
                    )

                dimensions[name] = Dimension(
                    label=name,
                    view=data_object or "",
                    column=column or "",
                    result_type=raw_dim.get("resultType", "string"),
                    time_grain=raw_dim.get("timeGrain"),
                    format=raw_dim.get("format"),
                    owner=raw_dim.get("owner"),
                    synonyms=raw_dim.get("synonyms", []),
                    custom_extensions=_parse_extensions(raw_dim),
                )
            except Exception as e:
                span = source_map.get(f"dimensions.{name}") if source_map else None
                errors.append(
                    SemanticError(
                        code="DIMENSION_PARSE_ERROR",
                        message=f"Failed to parse dimension '{name}': {e}",
                        path=f"dimensions.{name}",
                        span=span,
                    )
                )

        # Parse measures
        measures: dict[str, Measure] = {}
        raw_measures = raw.get("measures", {})
        if not isinstance(raw_measures, dict):
            errors.append(
                SemanticError(
                    code="MEASURE_PARSE_ERROR",
                    message="'measures' must be a YAML mapping, not a list or scalar",
                    path="measures",
                )
            )
            raw_measures = {}
        for name, raw_meas in raw_measures.items():
            try:
                measure_columns: list[DataColumnRef] = []
                for fdata in raw_meas.get("columns", []):
                    measure_columns.append(
                        DataColumnRef(
                            view=fdata.get("dataObject"),
                            column=fdata.get("column"),
                        )
                    )

                # Resolve expression field references
                expression = raw_meas.get("expression")
                if expression:
                    self._validate_expression_refs(
                        name, expression, data_objects, errors, source_map
                    )

                # Parse measure filters (new `filters:` list or legacy `filter:` single)
                measure_filters: list[MeasureFilterItem] = []
                raw_filters = raw_meas.get("filters")
                if raw_filters and isinstance(raw_filters, list):
                    for rf in raw_filters:
                        measure_filters.append(_parse_measure_filter_item(rf))
                else:
                    # Backward compat: single `filter:` key → [filter]
                    raw_filter = raw_meas.get("filter")
                    if raw_filter:
                        measure_filters.append(_parse_measure_filter_item(raw_filter))

                measures[name] = Measure(
                    label=name,
                    columns=measure_columns,
                    result_type=raw_meas.get("resultType", "float"),
                    aggregation=raw_meas.get("aggregation", "sum"),
                    expression=expression,
                    distinct=raw_meas.get("distinct", False),
                    total=raw_meas.get("total", False),
                    filters=measure_filters,
                    format=raw_meas.get("format"),
                    allow_fan_out=raw_meas.get("allowFanOut", False),
                    owner=raw_meas.get("owner"),
                    synonyms=raw_meas.get("synonyms", []),
                    custom_extensions=_parse_extensions(raw_meas),
                )
            except Exception as e:
                span = source_map.get(f"measures.{name}") if source_map else None
                errors.append(
                    SemanticError(
                        code="MEASURE_PARSE_ERROR",
                        message=f"Failed to parse measure '{name}': {e}",
                        path=f"measures.{name}",
                        span=span,
                    )
                )

        # Parse metrics
        metrics: dict[str, Metric] = {}
        raw_metrics = raw.get("metrics", {})
        if not isinstance(raw_metrics, dict):
            errors.append(
                SemanticError(
                    code="METRIC_PARSE_ERROR",
                    message="'metrics' must be a YAML mapping, not a list or scalar",
                    path="metrics",
                )
            )
            raw_metrics = {}
        for name, raw_metric in raw_metrics.items():
            try:
                metric_type = raw_metric.get("type", "derived")

                if metric_type == MetricType.CUMULATIVE:
                    # Cumulative metric: validate measure reference exists
                    ref_measure = raw_metric.get("measure", "")
                    if ref_measure and ref_measure not in measures:
                        span = source_map.get(f"metrics.{name}.measure") if source_map else None
                        errors.append(
                            SemanticError(
                                code="UNKNOWN_MEASURE",
                                message=(
                                    f"Cumulative metric '{name}' references "
                                    f"unknown measure '{ref_measure}'"
                                ),
                                path=f"metrics.{name}.measure",
                                span=span,
                            )
                        )

                    # Validate timeDimension references a known dimension
                    cum_time_dim = raw_metric.get("timeDimension", "")
                    if cum_time_dim and cum_time_dim not in dimensions:
                        span = (
                            source_map.get(f"metrics.{name}.timeDimension") if source_map else None
                        )
                        errors.append(
                            SemanticError(
                                code="CUMULATIVE_UNKNOWN_TIME_DIMENSION",
                                message=(
                                    f"Cumulative metric '{name}' references "
                                    f"unknown time dimension '{cum_time_dim}'"
                                ),
                                path=f"metrics.{name}.timeDimension",
                                span=span,
                                suggestions=_suggest_similar(cum_time_dim, list(dimensions.keys())),
                            )
                        )

                    metrics[name] = Metric(
                        label=name,
                        type=MetricType.CUMULATIVE,
                        measure=raw_metric.get("measure"),
                        time_dimension=raw_metric.get("timeDimension"),
                        cumulative_type=raw_metric.get("cumulativeType", "sum"),
                        window=raw_metric.get("window"),
                        grain_to_date=raw_metric.get("grainToDate"),
                        description=raw_metric.get("description"),
                        format=raw_metric.get("format"),
                        owner=raw_metric.get("owner"),
                        synonyms=raw_metric.get("synonyms", []),
                        custom_extensions=_parse_extensions(raw_metric),
                    )
                elif metric_type == MetricType.PERIOD_OVER_PERIOD:
                    # Period-over-period metric: validate expression + PoP config
                    expression = raw_metric.get("expression", "")
                    self._validate_metric_expression_refs(
                        name, expression, measures, errors, source_map
                    )

                    raw_pop = raw_metric.get("periodOverPeriod")
                    if not raw_pop:
                        span = source_map.get(f"metrics.{name}") if source_map else None
                        errors.append(
                            SemanticError(
                                code="METRIC_PARSE_ERROR",
                                message=(
                                    f"Period-over-period metric '{name}' "
                                    f"requires 'periodOverPeriod' configuration"
                                ),
                                path=f"metrics.{name}",
                                span=span,
                            )
                        )
                        raw_pop = {}

                    # Validate time dimension reference
                    pop_time_dim = raw_pop.get("timeDimension", "")
                    if pop_time_dim and pop_time_dim not in dimensions:
                        span = (
                            source_map.get(f"metrics.{name}.periodOverPeriod")
                            if source_map
                            else None
                        )
                        errors.append(
                            SemanticError(
                                code="POP_UNKNOWN_TIME_DIMENSION",
                                message=(
                                    f"Period-over-period metric '{name}' references "
                                    f"unknown time dimension '{pop_time_dim}'"
                                ),
                                path=f"metrics.{name}.periodOverPeriod.timeDimension",
                                span=span,
                                suggestions=_suggest_similar(pop_time_dim, list(dimensions.keys())),
                            )
                        )

                    pop_config = PeriodOverPeriod(
                        time_dimension=raw_pop.get("timeDimension", ""),
                        grain=raw_pop.get("grain", "month"),
                        offset=raw_pop.get("offset", -1),
                        offset_grain=raw_pop.get("offsetGrain", "year"),
                        comparison=raw_pop.get("comparison", "percentChange"),
                    )

                    metrics[name] = Metric(
                        label=name,
                        type=MetricType.PERIOD_OVER_PERIOD,
                        expression=expression,
                        period_over_period=pop_config,
                        description=raw_metric.get("description"),
                        format=raw_metric.get("format"),
                        owner=raw_metric.get("owner"),
                        synonyms=raw_metric.get("synonyms", []),
                        custom_extensions=_parse_extensions(raw_metric),
                    )
                else:
                    # Derived metric (default)
                    expression = raw_metric.get("expression", "")
                    self._validate_metric_expression_refs(
                        name, expression, measures, errors, source_map
                    )

                    metrics[name] = Metric(
                        label=name,
                        expression=expression,
                        description=raw_metric.get("description"),
                        format=raw_metric.get("format"),
                        owner=raw_metric.get("owner"),
                        synonyms=raw_metric.get("synonyms", []),
                        custom_extensions=_parse_extensions(raw_metric),
                    )
            except Exception as e:
                span = source_map.get(f"metrics.{name}") if source_map else None
                errors.append(
                    SemanticError(
                        code="METRIC_PARSE_ERROR",
                        message=f"Failed to parse metric '{name}': {e}",
                        path=f"metrics.{name}",
                        span=span,
                    )
                )

        model = SemanticModel(
            version=raw.get("version", 1.0),
            data_objects=data_objects,
            dimensions=dimensions,
            measures=measures,
            metrics=metrics,
            owner=raw.get("owner"),
            custom_extensions=_parse_extensions(raw),
        )

        result = ValidationResult(
            valid=len(errors) == 0,
            errors=errors,
            warnings=warnings,
        )

        return model, result

    def _validate_expression_refs(
        self,
        measure_name: str,
        expression: str,
        data_objects: dict[str, DataObject],
        errors: list[SemanticError],
        source_map: SourceMap | None,
    ) -> None:
        """Validate {[DataObject].[Column]} references in a measure expression."""
        named_refs = re.findall(r"\{\[([^\]]+)\]\.\[([^\]]+)\]\}", expression)
        for obj_name, col_name in named_refs:
            span = source_map.get(f"measures.{measure_name}.expression") if source_map else None
            if obj_name not in data_objects:
                errors.append(
                    SemanticError(
                        code="UNKNOWN_DATA_OBJECT_IN_EXPRESSION",
                        message=(
                            f"Measure '{measure_name}' expression references unknown "
                            f"data object '{obj_name}'"
                        ),
                        path=f"measures.{measure_name}.expression",
                        span=span,
                    )
                )
            elif col_name not in data_objects[obj_name].columns:
                errors.append(
                    SemanticError(
                        code="UNKNOWN_COLUMN_IN_EXPRESSION",
                        message=(
                            f"Measure '{measure_name}' expression references unknown column "
                            f"'{col_name}' in data object '{obj_name}'"
                        ),
                        path=f"measures.{measure_name}.expression",
                        span=span,
                    )
                )

    def _validate_metric_expression_refs(
        self,
        metric_name: str,
        expression: str,
        measures: dict[str, Measure],
        errors: list[SemanticError],
        source_map: SourceMap | None,
    ) -> None:
        """Validate {[Measure Name]} references in a metric expression."""
        # Extract all {[Name]} references from the expression
        named_refs = re.findall(r"\{\[([^\]]+)\]\}", expression)
        for ref_name in named_refs:
            if ref_name not in measures:
                span = source_map.get(f"metrics.{metric_name}.expression") if source_map else None
                errors.append(
                    SemanticError(
                        code="UNKNOWN_MEASURE_REF",
                        message=(f"Metric '{metric_name}' references unknown measure '{ref_name}'"),
                        path=f"metrics.{metric_name}.expression",
                        span=span,
                        suggestions=_suggest_similar(ref_name, list(measures.keys())),
                    )
                )

resolve(raw, source_map=None)

Resolve raw YAML dict into a validated SemanticModel.

Returns (model, validation_result). If there are errors, the model may be partially populated.

Source code in src/orionbelt/parser/resolver.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
def resolve(
    self,
    raw: dict[str, Any],
    source_map: SourceMap | None = None,
) -> tuple[SemanticModel, ValidationResult]:
    """Resolve raw YAML dict into a validated SemanticModel.

    Returns (model, validation_result). If there are errors,
    the model may be partially populated.
    """
    errors: list[SemanticError] = []
    warnings: list[SemanticError] = []

    # Parse data objects
    data_objects: dict[str, DataObject] = {}
    raw_objects = raw.get("dataObjects", {})
    if not isinstance(raw_objects, dict):
        errors.append(
            SemanticError(
                code="DATA_OBJECT_PARSE_ERROR",
                message="'dataObjects' must be a YAML mapping, not a list or scalar",
                path="dataObjects",
            )
        )
        raw_objects = {}
    for name, raw_obj in raw_objects.items():
        try:
            obj_columns: dict[str, DataObjectColumn] = {}
            for fname, fdata in raw_obj.get("columns", {}).items():
                obj_columns[fname] = DataObjectColumn(
                    label=fname,
                    code=fdata.get("code", fname),
                    abstract_type=fdata.get("abstractType", "string"),
                    sql_type=fdata.get("sqlType"),
                    sql_precision=fdata.get("sqlPrecision"),
                    sql_scale=fdata.get("sqlScale"),
                    num_class=fdata.get("numClass"),
                    comment=fdata.get("comment"),
                    owner=fdata.get("owner"),
                    synonyms=fdata.get("synonyms", []),
                    custom_extensions=_parse_extensions(fdata),
                )

            obj_joins: list[DataObjectJoin] = []
            for jdata in raw_obj.get("joins", []):
                obj_joins.append(
                    DataObjectJoin(
                        join_type=jdata["joinType"],
                        join_to=jdata["joinTo"],
                        columns_from=jdata["columnsFrom"],
                        columns_to=jdata["columnsTo"],
                        secondary=jdata.get("secondary", False),
                        path_name=jdata.get("pathName"),
                    )
                )

            data_objects[name] = DataObject(
                label=name,
                code=raw_obj.get("code", ""),
                database=raw_obj.get("database", ""),
                schema_name=raw_obj.get("schema", ""),
                columns=obj_columns,
                joins=obj_joins,
                comment=raw_obj.get("comment"),
                owner=raw_obj.get("owner"),
                synonyms=raw_obj.get("synonyms", []),
                custom_extensions=_parse_extensions(raw_obj),
            )
        except Exception as e:
            span = source_map.get(f"dataObjects.{name}") if source_map else None
            errors.append(
                SemanticError(
                    code="DATA_OBJECT_PARSE_ERROR",
                    message=f"Failed to parse data object '{name}': {e}",
                    path=f"dataObjects.{name}",
                    span=span,
                )
            )

    # Parse dimensions
    dimensions: dict[str, Dimension] = {}
    raw_dims = raw.get("dimensions", {})
    if not isinstance(raw_dims, dict):
        errors.append(
            SemanticError(
                code="DIMENSION_PARSE_ERROR",
                message="'dimensions' must be a YAML mapping, not a list or scalar",
                path="dimensions",
            )
        )
        raw_dims = {}
    for name, raw_dim in raw_dims.items():
        try:
            data_object = raw_dim.get("dataObject")
            column = raw_dim.get("column")

            # Validate the data object exists
            if data_object and data_object not in data_objects:
                span = source_map.get(f"dimensions.{name}") if source_map else None
                errors.append(
                    SemanticError(
                        code="UNKNOWN_DATA_OBJECT",
                        message=(
                            f"Dimension '{name}' references unknown data object '{data_object}'"
                        ),
                        path=f"dimensions.{name}",
                        span=span,
                        suggestions=_suggest_similar(data_object, list(data_objects.keys())),
                    )
                )

            # Validate the column exists in the data object
            if (
                data_object
                and column
                and data_object in data_objects
                and column not in data_objects[data_object].columns
            ):
                span = source_map.get(f"dimensions.{name}") if source_map else None
                errors.append(
                    SemanticError(
                        code="UNKNOWN_COLUMN",
                        message=(
                            f"Dimension '{name}' references unknown column "
                            f"'{column}' in data object '{data_object}'"
                        ),
                        path=f"dimensions.{name}",
                        span=span,
                        suggestions=_suggest_similar(
                            column, list(data_objects[data_object].columns.keys())
                        ),
                    )
                )

            dimensions[name] = Dimension(
                label=name,
                view=data_object or "",
                column=column or "",
                result_type=raw_dim.get("resultType", "string"),
                time_grain=raw_dim.get("timeGrain"),
                format=raw_dim.get("format"),
                owner=raw_dim.get("owner"),
                synonyms=raw_dim.get("synonyms", []),
                custom_extensions=_parse_extensions(raw_dim),
            )
        except Exception as e:
            span = source_map.get(f"dimensions.{name}") if source_map else None
            errors.append(
                SemanticError(
                    code="DIMENSION_PARSE_ERROR",
                    message=f"Failed to parse dimension '{name}': {e}",
                    path=f"dimensions.{name}",
                    span=span,
                )
            )

    # Parse measures
    measures: dict[str, Measure] = {}
    raw_measures = raw.get("measures", {})
    if not isinstance(raw_measures, dict):
        errors.append(
            SemanticError(
                code="MEASURE_PARSE_ERROR",
                message="'measures' must be a YAML mapping, not a list or scalar",
                path="measures",
            )
        )
        raw_measures = {}
    for name, raw_meas in raw_measures.items():
        try:
            measure_columns: list[DataColumnRef] = []
            for fdata in raw_meas.get("columns", []):
                measure_columns.append(
                    DataColumnRef(
                        view=fdata.get("dataObject"),
                        column=fdata.get("column"),
                    )
                )

            # Resolve expression field references
            expression = raw_meas.get("expression")
            if expression:
                self._validate_expression_refs(
                    name, expression, data_objects, errors, source_map
                )

            # Parse measure filters (new `filters:` list or legacy `filter:` single)
            measure_filters: list[MeasureFilterItem] = []
            raw_filters = raw_meas.get("filters")
            if raw_filters and isinstance(raw_filters, list):
                for rf in raw_filters:
                    measure_filters.append(_parse_measure_filter_item(rf))
            else:
                # Backward compat: single `filter:` key → [filter]
                raw_filter = raw_meas.get("filter")
                if raw_filter:
                    measure_filters.append(_parse_measure_filter_item(raw_filter))

            measures[name] = Measure(
                label=name,
                columns=measure_columns,
                result_type=raw_meas.get("resultType", "float"),
                aggregation=raw_meas.get("aggregation", "sum"),
                expression=expression,
                distinct=raw_meas.get("distinct", False),
                total=raw_meas.get("total", False),
                filters=measure_filters,
                format=raw_meas.get("format"),
                allow_fan_out=raw_meas.get("allowFanOut", False),
                owner=raw_meas.get("owner"),
                synonyms=raw_meas.get("synonyms", []),
                custom_extensions=_parse_extensions(raw_meas),
            )
        except Exception as e:
            span = source_map.get(f"measures.{name}") if source_map else None
            errors.append(
                SemanticError(
                    code="MEASURE_PARSE_ERROR",
                    message=f"Failed to parse measure '{name}': {e}",
                    path=f"measures.{name}",
                    span=span,
                )
            )

    # Parse metrics
    metrics: dict[str, Metric] = {}
    raw_metrics = raw.get("metrics", {})
    if not isinstance(raw_metrics, dict):
        errors.append(
            SemanticError(
                code="METRIC_PARSE_ERROR",
                message="'metrics' must be a YAML mapping, not a list or scalar",
                path="metrics",
            )
        )
        raw_metrics = {}
    for name, raw_metric in raw_metrics.items():
        try:
            metric_type = raw_metric.get("type", "derived")

            if metric_type == MetricType.CUMULATIVE:
                # Cumulative metric: validate measure reference exists
                ref_measure = raw_metric.get("measure", "")
                if ref_measure and ref_measure not in measures:
                    span = source_map.get(f"metrics.{name}.measure") if source_map else None
                    errors.append(
                        SemanticError(
                            code="UNKNOWN_MEASURE",
                            message=(
                                f"Cumulative metric '{name}' references "
                                f"unknown measure '{ref_measure}'"
                            ),
                            path=f"metrics.{name}.measure",
                            span=span,
                        )
                    )

                # Validate timeDimension references a known dimension
                cum_time_dim = raw_metric.get("timeDimension", "")
                if cum_time_dim and cum_time_dim not in dimensions:
                    span = (
                        source_map.get(f"metrics.{name}.timeDimension") if source_map else None
                    )
                    errors.append(
                        SemanticError(
                            code="CUMULATIVE_UNKNOWN_TIME_DIMENSION",
                            message=(
                                f"Cumulative metric '{name}' references "
                                f"unknown time dimension '{cum_time_dim}'"
                            ),
                            path=f"metrics.{name}.timeDimension",
                            span=span,
                            suggestions=_suggest_similar(cum_time_dim, list(dimensions.keys())),
                        )
                    )

                metrics[name] = Metric(
                    label=name,
                    type=MetricType.CUMULATIVE,
                    measure=raw_metric.get("measure"),
                    time_dimension=raw_metric.get("timeDimension"),
                    cumulative_type=raw_metric.get("cumulativeType", "sum"),
                    window=raw_metric.get("window"),
                    grain_to_date=raw_metric.get("grainToDate"),
                    description=raw_metric.get("description"),
                    format=raw_metric.get("format"),
                    owner=raw_metric.get("owner"),
                    synonyms=raw_metric.get("synonyms", []),
                    custom_extensions=_parse_extensions(raw_metric),
                )
            elif metric_type == MetricType.PERIOD_OVER_PERIOD:
                # Period-over-period metric: validate expression + PoP config
                expression = raw_metric.get("expression", "")
                self._validate_metric_expression_refs(
                    name, expression, measures, errors, source_map
                )

                raw_pop = raw_metric.get("periodOverPeriod")
                if not raw_pop:
                    span = source_map.get(f"metrics.{name}") if source_map else None
                    errors.append(
                        SemanticError(
                            code="METRIC_PARSE_ERROR",
                            message=(
                                f"Period-over-period metric '{name}' "
                                f"requires 'periodOverPeriod' configuration"
                            ),
                            path=f"metrics.{name}",
                            span=span,
                        )
                    )
                    raw_pop = {}

                # Validate time dimension reference
                pop_time_dim = raw_pop.get("timeDimension", "")
                if pop_time_dim and pop_time_dim not in dimensions:
                    span = (
                        source_map.get(f"metrics.{name}.periodOverPeriod")
                        if source_map
                        else None
                    )
                    errors.append(
                        SemanticError(
                            code="POP_UNKNOWN_TIME_DIMENSION",
                            message=(
                                f"Period-over-period metric '{name}' references "
                                f"unknown time dimension '{pop_time_dim}'"
                            ),
                            path=f"metrics.{name}.periodOverPeriod.timeDimension",
                            span=span,
                            suggestions=_suggest_similar(pop_time_dim, list(dimensions.keys())),
                        )
                    )

                pop_config = PeriodOverPeriod(
                    time_dimension=raw_pop.get("timeDimension", ""),
                    grain=raw_pop.get("grain", "month"),
                    offset=raw_pop.get("offset", -1),
                    offset_grain=raw_pop.get("offsetGrain", "year"),
                    comparison=raw_pop.get("comparison", "percentChange"),
                )

                metrics[name] = Metric(
                    label=name,
                    type=MetricType.PERIOD_OVER_PERIOD,
                    expression=expression,
                    period_over_period=pop_config,
                    description=raw_metric.get("description"),
                    format=raw_metric.get("format"),
                    owner=raw_metric.get("owner"),
                    synonyms=raw_metric.get("synonyms", []),
                    custom_extensions=_parse_extensions(raw_metric),
                )
            else:
                # Derived metric (default)
                expression = raw_metric.get("expression", "")
                self._validate_metric_expression_refs(
                    name, expression, measures, errors, source_map
                )

                metrics[name] = Metric(
                    label=name,
                    expression=expression,
                    description=raw_metric.get("description"),
                    format=raw_metric.get("format"),
                    owner=raw_metric.get("owner"),
                    synonyms=raw_metric.get("synonyms", []),
                    custom_extensions=_parse_extensions(raw_metric),
                )
        except Exception as e:
            span = source_map.get(f"metrics.{name}") if source_map else None
            errors.append(
                SemanticError(
                    code="METRIC_PARSE_ERROR",
                    message=f"Failed to parse metric '{name}': {e}",
                    path=f"metrics.{name}",
                    span=span,
                )
            )

    model = SemanticModel(
        version=raw.get("version", 1.0),
        data_objects=data_objects,
        dimensions=dimensions,
        measures=measures,
        metrics=metrics,
        owner=raw.get("owner"),
        custom_extensions=_parse_extensions(raw),
    )

    result = ValidationResult(
        valid=len(errors) == 0,
        errors=errors,
        warnings=warnings,
    )

    return model, result

Semantic Validator

orionbelt.parser.validator.SemanticValidator

Validates semantic rules from spec §3.8.

Source code in src/orionbelt/parser/validator.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
class SemanticValidator:
    """Validates semantic rules from spec §3.8."""

    def validate(self, model: SemanticModel) -> list[SemanticError]:
        errors: list[SemanticError] = []
        errors.extend(self._check_unique_identifiers(model))
        errors.extend(self._check_unique_column_names(model))
        errors.extend(self._check_secondary_joins(model))
        errors.extend(self._check_no_cyclic_joins(model))
        errors.extend(self._check_no_multipath_joins(model))
        errors.extend(self._check_measures_resolve(model))
        errors.extend(self._check_join_targets_exist(model))
        errors.extend(self._check_references_resolve(model))
        errors.extend(self._check_num_class_on_numeric_columns(model))
        errors.extend(self._check_measure_filter_refs(model))
        return errors

    def _check_unique_identifiers(self, model: SemanticModel) -> list[SemanticError]:
        """Ensure no duplicate names across dimensions, measures, and metrics.

        Data object names live in a separate namespace — a dimension may share
        its name with a data object (e.g. dimension "Region" on data object "Region").
        """
        errors: list[SemanticError] = []
        all_names: dict[str, str] = {}  # name -> type

        def _register(name: str, kind: str, path: str) -> None:
            existing = all_names.get(name)
            if existing is not None:
                errors.append(
                    SemanticError(
                        code="DUPLICATE_IDENTIFIER",
                        message=(
                            f"{kind.title()} '{name}' conflicts with existing {existing} '{name}'"
                        ),
                        path=path,
                    )
                )
            all_names[name] = kind

        for name in model.dimensions:
            _register(name, "dimension", f"dimensions.{name}")

        for name in model.measures:
            _register(name, "measure", f"measures.{name}")

        for name in model.metrics:
            _register(name, "metric", f"metrics.{name}")

        return errors

    def _check_unique_column_names(self, model: SemanticModel) -> list[SemanticError]:
        """Column names must be unique within each data object.

        Duplicate YAML keys are now rejected at parse time by TrackedLoader
        (``allow_duplicate_keys = False``). This validator is retained as a
        structural hook in case models are constructed programmatically.
        """
        return []

    def _check_secondary_joins(self, model: SemanticModel) -> list[SemanticError]:
        """Validate secondary join constraints.

        - Every secondary join MUST have a pathName.
        - pathName must be unique per (source, target) pair.
        """
        errors: list[SemanticError] = []
        # Track pathName per (source, target) pair
        path_names: dict[tuple[str, str], set[str]] = {}

        for obj_name, obj in model.data_objects.items():
            for i, join in enumerate(obj.joins):
                if join.secondary and not join.path_name:
                    errors.append(
                        SemanticError(
                            code="SECONDARY_JOIN_MISSING_PATH_NAME",
                            message=(
                                f"Data object '{obj_name}' join[{i}] is secondary "
                                f"but has no pathName"
                            ),
                            path=f"dataObjects.{obj_name}.joins[{i}]",
                        )
                    )
                if join.path_name:
                    pair = (obj_name, join.join_to)
                    if pair not in path_names:
                        path_names[pair] = set()
                    if join.path_name in path_names[pair]:
                        errors.append(
                            SemanticError(
                                code="DUPLICATE_JOIN_PATH_NAME",
                                message=(
                                    f"Data object '{obj_name}' join[{i}] has duplicate "
                                    f"pathName '{join.path_name}' for target '{join.join_to}'"
                                ),
                                path=f"dataObjects.{obj_name}.joins[{i}]",
                            )
                        )
                    else:
                        path_names[pair].add(join.path_name)

        return errors

    def _check_no_cyclic_joins(self, model: SemanticModel) -> list[SemanticError]:
        """Detect cyclic join paths."""
        errors: list[SemanticError] = []

        # Build adjacency list from joins (skip secondary joins)
        adj: dict[str, set[str]] = {}
        for obj_name, obj in model.data_objects.items():
            if obj_name not in adj:
                adj[obj_name] = set()
            for join in obj.joins:
                if not join.secondary:
                    adj[obj_name].add(join.join_to)

        # Iterative DFS cycle detection (avoids RecursionError on large models)
        visited: set[str] = set()
        rec_stack: set[str] = set()

        for start in adj:
            if start in visited:
                continue
            stack: list[tuple[str, list[str]]] = [(start, iter(adj.get(start, set())))]  # type: ignore[list-item]
            path: list[str] = [start]
            visited.add(start)
            rec_stack.add(start)

            while stack:
                node, neighbors = stack[-1]
                advanced = False
                for neighbor in neighbors:
                    if neighbor not in visited:
                        visited.add(neighbor)
                        rec_stack.add(neighbor)
                        path.append(neighbor)
                        stack.append((neighbor, iter(adj.get(neighbor, set()))))  # type: ignore[arg-type]
                        advanced = True
                        break
                    elif neighbor in rec_stack:
                        if neighbor in path:
                            cycle = path[path.index(neighbor) :] + [neighbor]
                        else:
                            cycle = [node, neighbor]
                        errors.append(
                            SemanticError(
                                code="CYCLIC_JOIN",
                                message=f"Cyclic join detected: {' -> '.join(cycle)}",
                                path=f"dataObjects.{node}.joins",
                            )
                        )
                if not advanced:
                    stack.pop()
                    rec_stack.discard(node)
                    if path:
                        path.pop()

        return errors

    def _check_no_multipath_joins(self, model: SemanticModel) -> list[SemanticError]:
        """Detect multiple distinct paths between any pair of nodes in the join DAG.

        Only flags true diamonds where both paths go through intermediaries.
        A direct edge from start to target is canonical, so an additional
        indirect path (e.g. Purchases→Suppliers direct + Purchases→Products→Suppliers)
        is not ambiguous and is not flagged.
        """
        errors: list[SemanticError] = []

        # Build adjacency list from joins (skip secondary joins)
        adj: dict[str, list[str]] = {}
        for obj_name, obj in model.data_objects.items():
            if obj_name not in adj:
                adj[obj_name] = []
            for join in obj.joins:
                if not join.secondary:
                    adj[obj_name].append(join.join_to)

        reported: set[tuple[str, str]] = set()

        for start in adj:
            if not adj[start]:
                continue
            # BFS from start; track first parent that reached each node
            direct_neighbors: set[str] = set()
            first_parent: dict[str, str] = {}
            queue: deque[tuple[str, str]] = deque()
            for neighbor in adj[start]:
                if neighbor == start:
                    continue
                direct_neighbors.add(neighbor)
                if neighbor not in first_parent:
                    first_parent[neighbor] = start
                    queue.append((neighbor, start))

            while queue:
                node, _parent = queue.popleft()
                for neighbor in adj.get(node, []):
                    if neighbor == start:
                        continue
                    if neighbor not in first_parent:
                        first_parent[neighbor] = node
                        queue.append((neighbor, node))
                    elif first_parent[neighbor] != node:
                        # Skip if target has a direct edge from start —
                        # the direct join is the canonical path.
                        if neighbor in direct_neighbors:
                            continue
                        pair = (start, neighbor)
                        if pair not in reported:
                            reported.add(pair)
                            errors.append(
                                SemanticError(
                                    code="MULTIPATH_JOIN",
                                    message=(
                                        f"Multiple join paths from '{start}' to "
                                        f"'{neighbor}' (via '{first_parent[neighbor]}' "
                                        f"and '{node}'). "
                                        f"Join paths must be unambiguous."
                                    ),
                                    path=f"dataObjects.{start}.joins",
                                )
                            )

        return errors

    def _check_measures_resolve(self, model: SemanticModel) -> list[SemanticError]:
        """Ensure measure column references resolve to actual data object columns."""
        errors: list[SemanticError] = []
        for name, measure in model.measures.items():
            for i, col_ref in enumerate(measure.columns):
                obj_name = col_ref.view
                col_name = col_ref.column
                if obj_name and obj_name not in model.data_objects:
                    errors.append(
                        SemanticError(
                            code="UNKNOWN_DATA_OBJECT",
                            message=(
                                f"Measure '{name}' column[{i}] references "
                                f"unknown data object '{obj_name}'"
                            ),
                            path=f"measures.{name}.columns[{i}]",
                        )
                    )
                elif obj_name and col_name:
                    obj = model.data_objects[obj_name]
                    if col_name not in obj.columns:
                        errors.append(
                            SemanticError(
                                code="UNKNOWN_COLUMN",
                                message=(
                                    f"Measure '{name}' column[{i}] references "
                                    f"unknown column '{col_name}' in data object '{obj_name}'"
                                ),
                                path=f"measures.{name}.columns[{i}]",
                            )
                        )
        return errors

    def _check_join_targets_exist(self, model: SemanticModel) -> list[SemanticError]:
        """Ensure join targets reference existing data objects."""
        errors: list[SemanticError] = []
        for obj_name, obj in model.data_objects.items():
            for i, join in enumerate(obj.joins):
                if not join.columns_from or not join.columns_to:
                    errors.append(
                        SemanticError(
                            code="EMPTY_JOIN_COLUMNS",
                            message=(
                                f"Data object '{obj_name}' join[{i}] to "
                                f"'{join.join_to}' has empty join columns"
                            ),
                            path=f"dataObjects.{obj_name}.joins[{i}]",
                        )
                    )
                elif len(join.columns_from) != len(join.columns_to):
                    errors.append(
                        SemanticError(
                            code="JOIN_COLUMN_COUNT_MISMATCH",
                            message=(
                                f"Data object '{obj_name}' join[{i}] has "
                                f"{len(join.columns_from)} columnsFrom and "
                                f"{len(join.columns_to)} columnsTo"
                            ),
                            path=f"dataObjects.{obj_name}.joins[{i}]",
                        )
                    )
                if join.join_to not in model.data_objects:
                    errors.append(
                        SemanticError(
                            code="UNKNOWN_JOIN_TARGET",
                            message=(
                                f"Data object '{obj_name}' join[{i}] references "
                                f"unknown data object '{join.join_to}'"
                            ),
                            path=f"dataObjects.{obj_name}.joins[{i}]",
                        )
                    )
                else:
                    # Validate join columns exist
                    for col_name in join.columns_from:
                        if col_name not in obj.columns:
                            errors.append(
                                SemanticError(
                                    code="UNKNOWN_JOIN_COLUMN",
                                    message=(
                                        f"Data object '{obj_name}' join[{i}] columnsFrom "
                                        f"references unknown column '{col_name}'"
                                    ),
                                    path=f"dataObjects.{obj_name}.joins[{i}].columnsFrom",
                                )
                            )
                    target_obj = model.data_objects[join.join_to]
                    for col_name in join.columns_to:
                        if col_name not in target_obj.columns:
                            errors.append(
                                SemanticError(
                                    code="UNKNOWN_JOIN_COLUMN",
                                    message=(
                                        f"Data object '{obj_name}' join[{i}] columnsTo "
                                        f"references unknown column '{col_name}' "
                                        f"in data object '{join.join_to}'"
                                    ),
                                    path=f"dataObjects.{obj_name}.joins[{i}].columnsTo",
                                )
                            )
        return errors

    def _check_references_resolve(self, model: SemanticModel) -> list[SemanticError]:
        """Ensure dimension references resolve."""
        errors: list[SemanticError] = []
        for name, dim in model.dimensions.items():
            obj_name = dim.view
            col_name = dim.column
            if obj_name and obj_name not in model.data_objects:
                errors.append(
                    SemanticError(
                        code="UNKNOWN_DATA_OBJECT",
                        message=f"Dimension '{name}' references unknown data object '{obj_name}'",
                        path=f"dimensions.{name}",
                    )
                )
            elif obj_name and col_name:
                obj = model.data_objects[obj_name]
                if col_name not in obj.columns:
                    errors.append(
                        SemanticError(
                            code="UNKNOWN_COLUMN",
                            message=(
                                f"Dimension '{name}' references unknown column "
                                f"'{col_name}' in data object '{obj_name}'"
                            ),
                            path=f"dimensions.{name}",
                        )
                    )
        return errors

    _NUMERIC_TYPES = {DataType.INT, DataType.FLOAT}

    def _check_num_class_on_numeric_columns(self, model: SemanticModel) -> list[SemanticError]:
        """Ensure numClass is only set on numeric columns (int or float)."""
        errors: list[SemanticError] = []
        for obj_name, obj in model.data_objects.items():
            for col_name, col in obj.columns.items():
                if col.num_class and col.abstract_type not in self._NUMERIC_TYPES:
                    errors.append(
                        SemanticError(
                            code="NUM_CLASS_ON_NON_NUMERIC",
                            message=(
                                f"Column '{col_name}' in data object '{obj_name}' "
                                f"has numClass '{col.num_class}' but abstractType "
                                f"'{col.abstract_type}' is not numeric (int or float)"
                            ),
                            path=f"dataObjects.{obj_name}.columns.{col_name}",
                        )
                    )
        return errors

    def _check_measure_filter_refs(self, model: SemanticModel) -> list[SemanticError]:
        """Verify that measure filter columns reference existing data objects and columns."""
        errors: list[SemanticError] = []
        for meas_name, measure in model.measures.items():
            for fi in measure.filters:
                self._validate_filter_item(fi, model, meas_name, errors)
        return errors

    def _validate_filter_item(
        self,
        item: MeasureFilterItem,
        model: SemanticModel,
        meas_name: str,
        errors: list[SemanticError],
    ) -> None:
        """Recursively validate a measure filter item."""
        if isinstance(item, MeasureFilter):
            if not item.column or not item.column.view:
                return
            obj = model.data_objects.get(item.column.view)
            if not obj:
                errors.append(
                    SemanticError(
                        code="UNKNOWN_FILTER_DATA_OBJECT",
                        message=(
                            f"Measure '{meas_name}' filter references unknown "
                            f"data object '{item.column.view}'"
                        ),
                        path=f"measures.{meas_name}.filters",
                    )
                )
                return
            if item.column.column and item.column.column not in obj.columns:
                errors.append(
                    SemanticError(
                        code="UNKNOWN_FILTER_COLUMN",
                        message=(
                            f"Measure '{meas_name}' filter references unknown "
                            f"column '{item.column.column}' in '{item.column.view}'"
                        ),
                        path=f"measures.{meas_name}.filters",
                    )
                )
        elif isinstance(item, MeasureFilterGroup):
            for child in item.filters:
                self._validate_filter_item(child, model, meas_name, errors)

validate(model)

Source code in src/orionbelt/parser/validator.py
def validate(self, model: SemanticModel) -> list[SemanticError]:
    errors: list[SemanticError] = []
    errors.extend(self._check_unique_identifiers(model))
    errors.extend(self._check_unique_column_names(model))
    errors.extend(self._check_secondary_joins(model))
    errors.extend(self._check_no_cyclic_joins(model))
    errors.extend(self._check_no_multipath_joins(model))
    errors.extend(self._check_measures_resolve(model))
    errors.extend(self._check_join_targets_exist(model))
    errors.extend(self._check_references_resolve(model))
    errors.extend(self._check_num_class_on_numeric_columns(model))
    errors.extend(self._check_measure_filter_refs(model))
    return errors

Semantic Model

orionbelt.models.semantic.SemanticModel

Bases: BaseModel

Complete semantic model parsed from OBML YAML.

Source code in src/orionbelt/models/semantic.py
class SemanticModel(BaseModel):
    """Complete semantic model parsed from OBML YAML."""

    version: float = 1.0
    description: str | None = None
    data_objects: dict[str, DataObject] = Field(default={}, alias="dataObjects")
    dimensions: dict[str, Dimension] = {}
    measures: dict[str, Measure] = {}
    metrics: dict[str, Metric] = {}
    owner: str | None = None
    custom_extensions: list[CustomExtension] = Field(default_factory=list, alias="customExtensions")

    model_config = {"populate_by_name": True}

orionbelt.models.semantic.DataObject

Bases: BaseModel

A database table or view with its columns and joins.

Source code in src/orionbelt/models/semantic.py
class DataObject(BaseModel):
    """A database table or view with its columns and joins."""

    label: str
    code: str
    database: str
    schema_name: str = Field(alias="schema")
    columns: dict[str, DataObjectColumn] = {}
    joins: list[DataObjectJoin] = []
    description: str | None = None
    comment: str | None = None
    owner: str | None = None
    synonyms: list[str] = Field(default_factory=list)
    custom_extensions: list[CustomExtension] = Field(default_factory=list, alias="customExtensions")

    @property
    def qualified_code(self) -> str:
        """Full qualified table reference: database.schema.code."""
        return f"{self.database}.{self.schema_name}.{self.code}"

    model_config = {"populate_by_name": True}

qualified_code property

Full qualified table reference: database.schema.code.

orionbelt.models.semantic.Dimension

Bases: BaseModel

A named dimension referencing a data object column.

Source code in src/orionbelt/models/semantic.py
class Dimension(BaseModel):
    """A named dimension referencing a data object column."""

    label: str
    view: str = Field(alias="dataObject")
    column: str = ""
    result_type: DataType = Field(DataType.STRING, alias="resultType")
    time_grain: TimeGrain | None = Field(None, alias="timeGrain")
    description: str | None = None
    format: str | None = None
    owner: str | None = None
    synonyms: list[str] = Field(default_factory=list)
    custom_extensions: list[CustomExtension] = Field(default_factory=list, alias="customExtensions")

    model_config = {"populate_by_name": True}

orionbelt.models.semantic.Measure

Bases: BaseModel

An aggregation measure with optional expression template.

Source code in src/orionbelt/models/semantic.py
class Measure(BaseModel):
    """An aggregation measure with optional expression template."""

    label: str
    columns: list[DataColumnRef] = []
    result_type: DataType = Field(DataType.FLOAT, alias="resultType")
    aggregation: str
    expression: str | None = None
    distinct: bool = False
    total: bool = False
    filters: list[MeasureFilterItem] = []
    description: str | None = None
    format: str | None = None
    allow_fan_out: bool = Field(False, alias="allowFanOut")
    delimiter: str | None = None
    within_group: WithinGroup | None = Field(None, alias="withinGroup")
    owner: str | None = None
    synonyms: list[str] = Field(default_factory=list)
    custom_extensions: list[CustomExtension] = Field(default_factory=list, alias="customExtensions")

    model_config = {"populate_by_name": True}

orionbelt.models.semantic.Metric

Bases: BaseModel

A metric: derived expression, cumulative window, or period-over-period comparison.

Derived (default): references measures by name using {[Measure Name]} syntax. Cumulative: applies a window function to an existing measure, ordered by a time dimension. Supports running totals, rolling windows, and grain-to-date resets. Period-over-Period: compares a measure's value against a prior time period using a synthetical date spine. Supports ratio, difference, previous value, and percent change.

Source code in src/orionbelt/models/semantic.py
class Metric(BaseModel):
    """A metric: derived expression, cumulative window, or period-over-period comparison.

    **Derived** (default): references measures by name using ``{[Measure Name]}`` syntax.
    **Cumulative**: applies a window function to an existing measure, ordered by a time
    dimension.  Supports running totals, rolling windows, and grain-to-date resets.
    **Period-over-Period**: compares a measure's value against a prior time period using
    a synthetical date spine.  Supports ratio, difference, previous value, and percent change.
    """

    label: str
    type: MetricType = MetricType.DERIVED
    # Derived metrics
    expression: str | None = None
    # Cumulative metrics
    measure: str | None = None
    time_dimension: str | None = Field(None, alias="timeDimension")
    cumulative_type: CumulativeAggType = Field(CumulativeAggType.SUM, alias="cumulativeType")
    window: int | None = None
    grain_to_date: GrainToDate | None = Field(None, alias="grainToDate")
    # Period-over-Period metrics
    period_over_period: PeriodOverPeriod | None = Field(None, alias="periodOverPeriod")
    # Common
    description: str | None = None
    format: str | None = None
    owner: str | None = None
    synonyms: list[str] = Field(default_factory=list)
    custom_extensions: list[CustomExtension] = Field(default_factory=list, alias="customExtensions")

    model_config = {"populate_by_name": True}

    @model_validator(mode="after")
    def _validate_metric_type(self) -> Metric:
        if self.type == MetricType.DERIVED:
            if not self.expression:
                raise ValueError("Derived metrics require 'expression'")
        elif self.type == MetricType.CUMULATIVE:
            if not self.measure:
                raise ValueError("Cumulative metrics require 'measure'")
            if not self.time_dimension:
                raise ValueError("Cumulative metrics require 'timeDimension'")
            if self.expression:
                raise ValueError("Cumulative metrics must not have 'expression'")
            if self.window is not None and self.grain_to_date is not None:
                raise ValueError("'window' and 'grainToDate' are mutually exclusive")
            if self.window is not None and self.window < 1:
                raise ValueError("'window' must be >= 1")
        elif self.type == MetricType.PERIOD_OVER_PERIOD:
            if not self.expression:
                raise ValueError("Period-over-period metrics require 'expression'")
            if not self.period_over_period:
                raise ValueError("Period-over-period metrics require 'periodOverPeriod'")
            if self.measure:
                raise ValueError(
                    "Period-over-period metrics must not have 'measure' "
                    "(use 'expression' to reference measures)"
                )
            if self.window is not None or self.grain_to_date is not None:
                raise ValueError(
                    "Period-over-period metrics must not have 'window' or 'grainToDate'"
                )
        return self

Query Models

orionbelt.models.query.QueryObject

Bases: BaseModel

A complete YAML analytical query.

Source code in src/orionbelt/models/query.py
class QueryObject(BaseModel):
    """A complete YAML analytical query."""

    select: QuerySelect
    where: list[QueryFilterItem] = []
    having: list[QueryFilterItem] = []
    order_by: list[QueryOrderBy] = Field([], alias="order_by")
    limit: int | None = None
    offset: int | None = None
    use_path_names: list[UsePathName] = Field([], alias="usePathNames")
    dimensions_exclude: bool = Field(False, alias="dimensionsExclude")

    model_config = {"populate_by_name": True}

orionbelt.models.query.QuerySelect

Bases: BaseModel

The SELECT part of a query: dimensions + measures.

Source code in src/orionbelt/models/query.py
class QuerySelect(BaseModel):
    """The SELECT part of a query: dimensions + measures."""

    dimensions: list[str] = []
    measures: list[str] = []

orionbelt.models.query.QueryFilter

Bases: BaseModel

A filter condition in a query.

Source code in src/orionbelt/models/query.py
class QueryFilter(BaseModel):
    """A filter condition in a query."""

    field: str
    op: FilterOperator
    value: Any = None

    model_config = {"populate_by_name": True}

    @field_validator("value", mode="before")
    @classmethod
    def _validate_filter_value(cls, v: Any) -> Any:
        """Reject arbitrary nested objects — allow scalars, lists of scalars, and dicts
        (for RELATIVE filters which use ``{unit, count, direction}`` objects).
        """
        if v is None:
            return v
        if isinstance(v, (str, int, float, bool)):
            return v
        if isinstance(v, list) and all(isinstance(i, (str, int, float, bool)) for i in v):
            return v
        if isinstance(v, dict) and all(isinstance(k, str) for k in v):
            return v
        msg = "Filter value must be a scalar, list of scalars, or object"
        raise ValueError(msg)

orionbelt.models.query.UsePathName

Bases: BaseModel

Selects a named secondary join path for a specific (source, target) pair.

Source code in src/orionbelt/models/query.py
class UsePathName(BaseModel):
    """Selects a named secondary join path for a specific (source, target) pair."""

    source: str
    target: str
    path_name: str = Field(alias="pathName")

    model_config = {"populate_by_name": True}

orionbelt.models.query.DimensionRef

Bases: BaseModel

Reference to a dimension, optionally with time grain.

Supports notation like "customer.country" or "order.order_date:month".

Source code in src/orionbelt/models/query.py
class DimensionRef(BaseModel):
    """Reference to a dimension, optionally with time grain.

    Supports notation like "customer.country" or "order.order_date:month".
    """

    name: str
    grain: TimeGrain | None = None

    @classmethod
    def parse(cls, raw: str) -> DimensionRef:
        """Parse 'name:grain' notation."""
        if ":" in raw:
            name, grain_str = raw.rsplit(":", 1)
            return cls(name=name, grain=TimeGrain(grain_str))
        return cls(name=raw)

parse(raw) classmethod

Parse 'name:grain' notation.

Source code in src/orionbelt/models/query.py
@classmethod
def parse(cls, raw: str) -> DimensionRef:
    """Parse 'name:grain' notation."""
    if ":" in raw:
        name, grain_str = raw.rsplit(":", 1)
        return cls(name=name, grain=TimeGrain(grain_str))
    return cls(name=raw)

Error Models

orionbelt.models.errors.SemanticError

Bases: BaseModel

A structured error with optional source position and suggestions.

Source code in src/orionbelt/models/errors.py
class SemanticError(BaseModel):
    """A structured error with optional source position and suggestions."""

    code: str
    message: str
    path: str | None = None
    span: SourceSpan | None = None
    suggestions: list[str] = []

orionbelt.models.errors.ValidationResult

Bases: BaseModel

Result of semantic model validation.

Source code in src/orionbelt/models/errors.py
class ValidationResult(BaseModel):
    """Result of semantic model validation."""

    valid: bool
    errors: list[SemanticError] = []
    warnings: list[SemanticError] = []

orionbelt.models.errors.SourceSpan

Bases: BaseModel

Points to exact location in YAML source for error reporting.

Source code in src/orionbelt/models/errors.py
class SourceSpan(BaseModel):
    """Points to exact location in YAML source for error reporting."""

    file: str
    line: int
    column: int
    end_line: int | None = None
    end_column: int | None = None

SQL AST Nodes

orionbelt.ast.nodes.Select dataclass

A complete SELECT statement.

Source code in src/orionbelt/ast/nodes.py
@dataclass(frozen=True)
class Select:
    """A complete SELECT statement."""

    columns: list[Expr] = field(default_factory=list)
    from_: From | None = None
    joins: list[Join] = field(default_factory=list)
    where: Expr | None = None
    group_by: list[Expr] = field(default_factory=list)
    having: Expr | None = None
    order_by: list[OrderByItem] = field(default_factory=list)
    limit: int | None = None
    offset: int | None = None
    ctes: list[CTE] = field(default_factory=list)

orionbelt.ast.nodes.ColumnRef dataclass

Reference to a column, optionally qualified by table/alias.

Source code in src/orionbelt/ast/nodes.py
@dataclass(frozen=True)
class ColumnRef:
    """Reference to a column, optionally qualified by table/alias."""

    name: str
    table: str | None = None

orionbelt.ast.nodes.FunctionCall dataclass

SQL function call, e.g. SUM(col), DATE_TRUNC('month', col).

Source code in src/orionbelt/ast/nodes.py
@dataclass(frozen=True)
class FunctionCall:
    """SQL function call, e.g. SUM(col), DATE_TRUNC('month', col)."""

    name: str
    args: list[Expr] = field(default_factory=list)
    distinct: bool = False
    order_by: list[OrderByItem] = field(default_factory=list)
    separator: str | None = None

orionbelt.ast.nodes.BinaryOp dataclass

Binary operation: left op right.

Source code in src/orionbelt/ast/nodes.py
@dataclass(frozen=True)
class BinaryOp:
    """Binary operation: left op right."""

    left: Expr
    op: str  # +, -, *, /, =, <>, AND, OR, LIKE, etc.
    right: Expr

orionbelt.ast.nodes.Literal dataclass

A literal value: number, string, boolean, or NULL.

Source code in src/orionbelt/ast/nodes.py
@dataclass(frozen=True)
class Literal:
    """A literal value: number, string, boolean, or NULL."""

    value: str | int | float | bool | None

    @classmethod
    def string(cls, v: str) -> Literal:
        return cls(value=v)

    @classmethod
    def number(cls, v: int | float) -> Literal:
        return cls(value=v)

    @classmethod
    def null(cls) -> Literal:
        return cls(value=None)

    @classmethod
    def boolean(cls, v: bool) -> Literal:
        return cls(value=v)

AST Builder

orionbelt.ast.builder.QueryBuilder

Fluent builder for ergonomic AST construction.

Source code in src/orionbelt/ast/builder.py
class QueryBuilder:
    """Fluent builder for ergonomic AST construction."""

    def __init__(self) -> None:
        self._columns: list[Expr] = []
        self._from: From | None = None
        self._joins: list[Join] = []
        self._where: Expr | None = None
        self._group_by: list[Expr] = []
        self._having: Expr | None = None
        self._order_by: list[OrderByItem] = []
        self._limit: int | None = None
        self._offset: int | None = None
        self._ctes: list[CTE] = []

    def select(self, *columns: Expr) -> Self:
        self._columns.extend(columns)
        return self

    def select_aliased(self, expr: Expr, alias: str) -> Self:
        self._columns.append(AliasedExpr(expr=expr, alias=alias))
        return self

    def from_(self, table: str, alias: str | None = None) -> Self:
        self._from = From(source=table, alias=alias)
        return self

    def from_subquery(self, subquery: Select, alias: str) -> Self:
        self._from = From(source=subquery, alias=alias)
        return self

    def join(
        self,
        table: str,
        on: Expr,
        join_type: JoinType = JoinType.LEFT,
        alias: str | None = None,
    ) -> Self:
        self._joins.append(Join(join_type=join_type, source=table, alias=alias, on=on))
        return self

    def where(self, condition: Expr) -> Self:
        if self._where is None:
            self._where = condition
        else:
            self._where = BinaryOp(left=self._where, op="AND", right=condition)
        return self

    def group_by(self, *exprs: Expr) -> Self:
        self._group_by.extend(exprs)
        return self

    def having(self, condition: Expr) -> Self:
        if self._having is None:
            self._having = condition
        else:
            self._having = BinaryOp(left=self._having, op="AND", right=condition)
        return self

    def order_by(self, expr: Expr, desc: bool = False) -> Self:
        self._order_by.append(OrderByItem(expr=expr, desc=desc))
        return self

    def limit(self, n: int) -> Self:
        self._limit = n
        return self

    def offset(self, n: int) -> Self:
        self._offset = n
        return self

    def with_cte(self, name: str, query: Select) -> Self:
        self._ctes.append(CTE(name=name, query=query))
        return self

    def build(self) -> Select:
        return Select(
            columns=self._columns,
            from_=self._from,
            joins=self._joins,
            where=self._where,
            group_by=self._group_by,
            having=self._having,
            order_by=self._order_by,
            limit=self._limit,
            offset=self._offset,
            ctes=self._ctes,
        )

API Schemas

orionbelt.api.schemas

API request/response Pydantic schemas.

SessionCreateRequest

Bases: BaseModel

Request body for POST /sessions.

Source code in src/orionbelt/api/schemas.py
class SessionCreateRequest(BaseModel):
    """Request body for POST /sessions."""

    metadata: dict[str, str] = Field(default_factory=dict)

SessionResponse

Bases: BaseModel

Single session info.

Source code in src/orionbelt/api/schemas.py
class SessionResponse(BaseModel):
    """Single session info."""

    session_id: str
    created_at: datetime
    last_accessed_at: datetime
    model_count: int
    metadata: dict[str, str] = Field(default_factory=dict)
    expires_at: datetime = Field(description="Idle TTL deadline (refreshed on each access)")
    max_expires_at: datetime = Field(description="Absolute lifetime deadline (fixed at creation)")

SessionListResponse

Bases: BaseModel

Response for GET /sessions.

Source code in src/orionbelt/api/schemas.py
class SessionListResponse(BaseModel):
    """Response for GET /sessions."""

    sessions: list[SessionResponse]

ModelLoadRequest

Bases: BaseModel

Request body for POST /sessions/{session_id}/models.

Source code in src/orionbelt/api/schemas.py
class ModelLoadRequest(BaseModel):
    """Request body for POST /sessions/{session_id}/models."""

    model_yaml: str = Field(description="OBML YAML content", max_length=5_000_000)

ModelLoadResponse

Bases: BaseModel

Response for POST /sessions/{session_id}/models.

Source code in src/orionbelt/api/schemas.py
class ModelLoadResponse(BaseModel):
    """Response for POST /sessions/{session_id}/models."""

    model_id: str
    data_objects: int
    dimensions: int
    measures: int
    metrics: int
    warnings: list[str] = Field(default_factory=list)

ModelSummaryResponse

Bases: BaseModel

Short model summary for listing.

Source code in src/orionbelt/api/schemas.py
class ModelSummaryResponse(BaseModel):
    """Short model summary for listing."""

    model_id: str
    data_objects: int
    dimensions: int
    measures: int
    metrics: int

SessionQueryRequest

Bases: BaseModel

Request body for POST /sessions/{session_id}/query/sql.

Source code in src/orionbelt/api/schemas.py
class SessionQueryRequest(BaseModel):
    """Request body for POST /sessions/{session_id}/query/sql."""

    model_id: str
    query: QueryObject
    dialect: str = Field(default="postgres")

QueryCompileResponse

Bases: BaseModel

Response body for POST /query/sql.

Source code in src/orionbelt/api/schemas.py
class QueryCompileResponse(BaseModel):
    """Response body for POST /query/sql."""

    sql: str
    dialect: str
    resolved: ResolvedInfoResponse
    warnings: list[str] = Field(default_factory=list)
    sql_valid: bool = True
    explain: ExplainPlanResponse | None = None

ValidateRequest

Bases: BaseModel

Request body for POST /validate.

Source code in src/orionbelt/api/schemas.py
class ValidateRequest(BaseModel):
    """Request body for POST /validate."""

    model_yaml: str = Field(
        description="YAML semantic model content to validate", max_length=5_000_000
    )

ValidateResponse

Bases: BaseModel

Response body for POST /validate.

Source code in src/orionbelt/api/schemas.py
class ValidateResponse(BaseModel):
    """Response body for POST /validate."""

    valid: bool
    errors: list[ErrorDetail] = Field(default_factory=list)
    warnings: list[ErrorDetail] = Field(default_factory=list)

DialectListResponse

Bases: BaseModel

Response for GET /dialects.

Source code in src/orionbelt/api/schemas.py
class DialectListResponse(BaseModel):
    """Response for GET /dialects."""

    dialects: list[DialectInfo] = Field(default_factory=list)

HealthResponse

Bases: BaseModel

Health check response.

Source code in src/orionbelt/api/schemas.py
class HealthResponse(BaseModel):
    """Health check response."""

    status: str = "ok"
    version: str = ""

Settings

orionbelt.settings.Settings

Bases: BaseSettings

Configuration for OrionBelt REST API server.

Values are read from environment variables and from a .env file in the working directory. See .env.template for all options.

Source code in src/orionbelt/settings.py
class Settings(BaseSettings):
    """Configuration for OrionBelt REST API server.

    Values are read from environment variables and from a ``.env`` file
    in the working directory.  See ``.env.template`` for all options.
    """

    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        extra="ignore",
    )

    # Shared
    log_level: str = "INFO"
    # Log format:
    #   "console"  — pretty-printed for local dev (default)
    #   "json"     — structured JSON for log aggregators (ELK, Datadog, etc.)
    #   "cloudrun" — JSON + disables uvicorn access logs (Cloud Run provides its own)
    log_format: str = "console"

    # REST API
    api_server_host: str = "localhost"
    api_server_port: int = 8000
    port: int | None = None  # Cloud Run injects PORT; takes precedence over api_server_port

    @property
    def effective_port(self) -> int:
        """Return the port to listen on (Cloud Run PORT takes precedence)."""
        return self.port if self.port is not None else self.api_server_port

    # Sessions
    session_ttl_seconds: int = 1800  # 30 min inactivity
    session_max_age_seconds: int = 86400  # 24 h absolute max lifetime
    session_cleanup_interval: int = 60  # seconds between cleanup sweeps
    max_sessions: int = 500  # global concurrent session cap (429 when full)
    max_models_per_session: int = 10  # max models a single session may hold
    disable_session_list: bool = False  # hide GET /sessions endpoint
    session_rate_limit: int = 10  # max POST /sessions per IP per minute
    trusted_proxy_count: int = 0  # number of trusted reverse proxies in front of the app

    # Single-model mode — pre-loaded into every new session.
    # When set, model upload/removal endpoints return 403.
    model_dir: str | None = None  # base directory for MODEL_FILE (set by Docker)
    model_file: str | None = None  # filename or absolute path to OBML YAML

    # Query execution
    query_execute: bool = False  # enable POST /v1/query/execute
    query_default_limit: int = 1000  # max rows when query has no LIMIT
    db_pool_size: int = 5  # connection pool size per dialect

    # Arrow Flight SQL server (requires ob-flight-extension)
    flight_enabled: bool = False  # start gRPC Flight server on FLIGHT_PORT (implies query_execute)
    flight_port: int = 8815
    flight_auth_mode: str = "none"  # "none" or "token"
    flight_api_token: str | None = None
    db_vendor: str = "duckdb"  # default vendor driver for Flight query execution

effective_port property

Return the port to listen on (Cloud Run PORT takes precedence).