Ask AI

You are viewing an unreleased or outdated version of the documentation

Source code for dagster._core.definitions.metadata.source_code

import inspect
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Sequence, Union

import dagster._check as check
from dagster._annotations import experimental
from dagster._model import DagsterModel
from dagster._serdes import whitelist_for_serdes

from .metadata_set import (
    NamespacedMetadataSet as NamespacedMetadataSet,
    TableMetadataSet as TableMetadataSet,
)
from .metadata_value import MetadataValue

if TYPE_CHECKING:
    from dagster._core.definitions.assets import AssetsDefinition, SourceAsset
    from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition

DEFAULT_SOURCE_FILE_KEY = "asset_definition"


@experimental
@whitelist_for_serdes
class LocalFileCodeReference(DagsterModel):
    """Represents a local file source location."""

    file_path: str
    line_number: Optional[int] = None
    label: Optional[str] = None


@experimental
@whitelist_for_serdes
class UrlCodeReference(DagsterModel):
    """Represents a source location which points at a URL, for example
    in source control.
    """

    url: str
    label: Optional[str] = None


@experimental
@whitelist_for_serdes
class CodeReferencesMetadataValue(DagsterModel, MetadataValue["CodeReferencesMetadataValue"]):
    """Metadata value type which represents source locations (locally or otherwise)
    of the asset in question. For example, the file path and line number where the
    asset is defined.

    Attributes:
        sources (List[Union[LocalFileCodeReference, SourceControlCodeReference]]):
            A list of code references for the asset, such as file locations or
            references to source control.
    """

    code_references: List[Union[LocalFileCodeReference, UrlCodeReference]]

    @property
    def value(self) -> "CodeReferencesMetadataValue":
        return self


def local_source_path_from_fn(fn: Callable[..., Any]) -> Optional[LocalFileCodeReference]:
    cwd = os.getcwd()

    origin_file = os.path.abspath(os.path.join(cwd, inspect.getsourcefile(fn)))  # type: ignore
    origin_file = check.not_none(origin_file)
    origin_line = inspect.getsourcelines(fn)[1]

    return LocalFileCodeReference(file_path=origin_file, line_number=origin_line)


class CodeReferencesMetadataSet(NamespacedMetadataSet):
    """Metadata entries that apply to asset definitions and which specify the location where
    source code for the asset can be found.
    """

    code_references: Optional[CodeReferencesMetadataValue] = None

    @classmethod
    def namespace(cls) -> str:
        return "dagster"


def _with_code_source_single_definition(
    assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"],
) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]:
    from dagster._core.definitions.assets import AssetsDefinition

    # SourceAsset doesn't have an op definition to point to - cacheable assets
    # will be supported eventually but are a bit trickier
    if not isinstance(assets_def, AssetsDefinition):
        return assets_def

    metadata_by_key = dict(assets_def.metadata_by_key) or {}

    from dagster._core.definitions.decorators.op_decorator import DecoratedOpFunction
    from dagster._core.definitions.graph_definition import GraphDefinition
    from dagster._core.definitions.op_definition import OpDefinition

    base_fn = None
    if isinstance(assets_def.node_def, OpDefinition):
        base_fn = (
            assets_def.node_def.compute_fn.decorated_fn
            if isinstance(assets_def.node_def.compute_fn, DecoratedOpFunction)
            else assets_def.node_def.compute_fn
        )
    elif isinstance(assets_def.node_def, GraphDefinition):
        # For graph-backed assets, point to the composition fn, e.g. the
        # function decorated by @graph_asset
        base_fn = assets_def.node_def.composition_fn

    if not base_fn:
        return assets_def

    source_path = local_source_path_from_fn(base_fn)

    if source_path:
        sources = [source_path]

        for key in assets_def.keys:
            # merge with any existing metadata
            existing_source_code_metadata = CodeReferencesMetadataSet.extract(
                metadata_by_key.get(key, {})
            )
            existing_code_references = (
                existing_source_code_metadata.code_references.code_references
                if existing_source_code_metadata.code_references
                else []
            )
            sources_for_asset: List[Union[LocalFileCodeReference, UrlCodeReference]] = [
                *existing_code_references,
                *sources,
            ]

            metadata_by_key[key] = {
                **metadata_by_key.get(key, {}),
                **CodeReferencesMetadataSet(
                    code_references=CodeReferencesMetadataValue(code_references=sources_for_asset)
                ),
            }

    return assets_def.map_asset_specs(
        lambda spec: spec._replace(metadata=metadata_by_key[spec.key])
    )


@experimental
class FilePathMapping(ABC):
    """Base class which defines a file path mapping function. These functions are used to map local file paths
    to their corresponding paths in a source control repository.

    In many cases where a source control repository is reproduced exactly on a local machine, the included
    AnchorBasedFilePathMapping class can be used to specify a direct mapping between the local file paths and the
    repository paths. However, in cases where the repository structure differs from the local structure, a custom
    mapping function can be provided to handle these cases.
    """

    @abstractmethod
    def convert_to_source_control_path(self, local_path: Path) -> str: ...


@experimental
@dataclass
class AnchorBasedFilePathMapping(FilePathMapping):
    """Specifies the mapping between local file paths and their corresponding paths in a source control repository,
    using a specific file "anchor" as a reference point. All other paths are calculated relative to this anchor file.

    For example, if the chosen anchor file is `/Users/dagster/Documents/python_modules/my_module/my-module/__init__.py`
    locally, and `python_modules/my_module/my-module/__init__.py` in a source control repository, in order to map a
    different file `/Users/dagster/Documents/python_modules/my_module/my-module/my_asset.py` to the repository path,
    the mapping function will position the file in the repository relative to the anchor file's position in the repository,
    resulting in `python_modules/my_module/my-module/my_asset.py`.

    Args:
        local_file_anchor (Path): The path to a local file that is present in the repository.
        file_anchor_path_in_repository (str): The path to the anchor file in the repository.

    Example:
        .. code-block:: python

            mapping_fn = AnchorBasedFilePathMapping(
                local_file_anchor=Path(__file__),
                file_anchor_path_in_repository="python_modules/my_module/my-module/__init__.py",
            )
    """

    local_file_anchor: Path
    file_anchor_path_in_repository: str

    def convert_to_source_control_path(self, local_path: Path) -> str:
        path_from_anchor_to_target = os.path.relpath(
            local_path,
            self.local_file_anchor,
        )
        return os.path.normpath(
            os.path.join(
                self.file_anchor_path_in_repository,
                path_from_anchor_to_target,
            )
        )


def convert_local_path_to_git_path(
    base_git_url: str,
    file_path_mapping: FilePathMapping,
    local_path: LocalFileCodeReference,
) -> UrlCodeReference:
    source_file_from_repo_root = file_path_mapping.convert_to_source_control_path(
        Path(local_path.file_path)
    )
    line_number_suffix = f"#L{local_path.line_number}" if local_path.line_number else ""

    return UrlCodeReference(
        url=f"{base_git_url}/{source_file_from_repo_root}{line_number_suffix}",
        label=local_path.label,
    )


def _convert_local_path_to_git_path_single_definition(
    base_git_url: str,
    file_path_mapping: FilePathMapping,
    assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"],
) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]:
    from dagster._core.definitions.assets import AssetsDefinition

    # SourceAsset doesn't have an op definition to point to - cacheable assets
    # will be supported eventually but are a bit trickier
    if not isinstance(assets_def, AssetsDefinition):
        return assets_def

    metadata_by_key = dict(assets_def.metadata_by_key) or {}

    for key in assets_def.keys:
        existing_source_code_metadata = CodeReferencesMetadataSet.extract(
            metadata_by_key.get(key, {})
        )
        if not existing_source_code_metadata.code_references:
            continue

        sources_for_asset: List[Union[LocalFileCodeReference, UrlCodeReference]] = [
            convert_local_path_to_git_path(
                base_git_url,
                file_path_mapping,
                source,
            )
            if isinstance(source, LocalFileCodeReference)
            else source
            for source in existing_source_code_metadata.code_references.code_references
        ]

        metadata_by_key[key] = {
            **metadata_by_key.get(key, {}),
            **CodeReferencesMetadataSet(
                code_references=CodeReferencesMetadataValue(code_references=sources_for_asset)
            ),
        }

    return assets_def.map_asset_specs(
        lambda spec: spec._replace(metadata=metadata_by_key[spec.key])
    )


def _build_github_url(url: str, branch: str) -> str:
    return f"{url}/tree/{branch}"


def _build_gitlab_url(url: str, branch: str) -> str:
    return f"{url}/-/tree/{branch}"





[docs]@experimental def with_source_code_references( assets_defs: Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]], ) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]]: """Wrapper function which attaches local code reference metadata to the provided asset definitions. This points to the filepath and line number where the asset body is defined. Args: assets_defs (Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]): The asset definitions to which source code metadata should be attached. Returns: Sequence[AssetsDefinition]: The asset definitions with source code metadata attached. """ return [_with_code_source_single_definition(assets_def) for assets_def in assets_defs]