Skip to content

Library API reference

The complete public API, generated from the source docstrings. Everything here is importable from the top level (from deeperfly import ...). For task-oriented examples see the library guide; for the array and coordinate conventions these functions share, see Conventions & glossary.

Configuration

Config

A loaded, validated deeperfly run configuration.

Construct via :meth:from_toml (from a TOML file) or :meth:from_dict (a parsed mapping). The parsed mapping is :attr:data; :attr:text is the original TOML text when read from a file (None for a dict), used to snapshot the config byte-for-byte.

Source code in src/deeperfly/config.py
class Config:
    """A loaded, validated deeperfly run configuration.

    Construct via :meth:`from_toml` (from a TOML file) or :meth:`from_dict` (a
    parsed mapping). The parsed mapping is :attr:`data`; :attr:`text` is the original
    TOML text when read from a file (``None`` for a dict), used to snapshot the config
    byte-for-byte.
    """

    def __init__(
        self, data: dict, *, text: str | None = None, source: Path | None = None
    ):
        self.data = data
        self.text = text
        self.source = source

    # -- construction --------------------------------------------------------

    @classmethod
    def from_toml(cls, path: str | Path) -> "Config":
        """Load a config from a TOML file (preserving its text for snapshots).

        Parameters
        ----------
        path
            Path to a config TOML file.

        Returns
        -------
        Config
            The loaded, validated config.
        """
        p = Path(path)
        text = p.read_text()
        return cls(tomllib.loads(text), text=text, source=p)

    @classmethod
    def from_dict(cls, data: dict) -> "Config":
        """Wrap an already-parsed mapping (library use, tests).

        Parameters
        ----------
        data
            A parsed config mapping.

        Returns
        -------
        Config
            The validated config wrapping ``data`` (no source text).
        """
        return cls(data)

    @classmethod
    def default(cls) -> "Config":
        """The packaged default config (:data:`DEFAULT_CONFIG_PATH`)."""
        return cls.from_toml(DEFAULT_CONFIG_PATH)

    @classmethod
    def read_for_run(cls, cli_config: str | None, outdir: Path) -> "Config":
        """Pick the config for one run: ``-c`` wins, then the ``outdir`` snapshot.

        An explicit ``-c`` always drives the run (and refreshes the snapshot --
        see :meth:`save_snapshot`). Without ``-c``, the snapshot a previous run
        left in ``<outdir>/config.toml`` is reused -- so both "pass a new ``-c``"
        and "edit the snapshot and re-run" work; either way the per-stage
        fingerprints (:mod:`deeperfly.pipeline.fingerprint`) recompute exactly
        the stages whose parameters changed. With neither, the packaged default
        is used.

        Parameters
        ----------
        cli_config
            The ``-c`` config path, or ``None``.
        outdir
            The run's output directory, which may already hold a ``config.toml``
            snapshot.

        Returns
        -------
        Config
            The config that drives this run.
        """
        snapshot = Path(outdir) / "config.toml"
        if cli_config:
            path = Path(cli_config)
            log.info("using config %s (from -c)", path)
        elif snapshot.exists():
            path = snapshot
            log.info("using config %s (snapshot in the output dir)", path)
        else:
            path = DEFAULT_CONFIG_PATH
            log.info("using config %s (packaged default; pass -c to override)", path)
        return cls.from_toml(path)

    # -- typed per-stage subgroups ------------------------------------------

    @property
    def pose2d(self) -> Pose2dParams:
        return _params(self.data, ("pose2d",), Pose2dParams, ignore=_POSE2D_PLAN_KEYS)

    @property
    def triangulation(self) -> TriangulationParams:
        return _params(self.data, ("triangulation",), TriangulationParams)

    @property
    def pictorial(self) -> PictorialParams:
        return _params(self.data, ("pictorial_structures",), PictorialParams)

    @property
    def io(self) -> IoParams:
        im = _dig(self.data, ("io", "image"))
        present: dict = {}
        if "workers" in im:
            present["image_workers"] = int(im["workers"]) or None
        return IoParams(**present)

    @property
    def bundle_adjustment(self) -> BundleAdjustmentParams:
        ba = dict(_dig(self.data, ("bundle_adjustment",)))
        points_to_use = ba.pop("points_to_use", None)
        fixed = ba.pop("fixed", [])
        shared = ba.pop("shared", [])
        weigh_by_confidence = ba.pop("weigh_by_confidence", True)
        max_frames = ba.pop("max_frames", 100)
        frame_sampling = ba.pop("frame_sampling", "even")
        return BundleAdjustmentParams(
            points_to_use=None
            if points_to_use is None
            else [str(p) for p in points_to_use],
            fixed=list(fixed),
            shared=[list(s) for s in shared],
            weigh_by_confidence=bool(weigh_by_confidence),
            max_frames=None if max_frames is None else int(max_frames),
            frame_sampling=str(frame_sampling),
            least_squares=ba,  # leftover flat keys -> scipy.optimize.least_squares
        )

    # -- pipeline orchestration ---------------------------------------------

    def stage_flags(self) -> dict[str, bool]:
        """Which stages are enabled, from the ``[pipeline].do_<stage>`` booleans.

        Returns
        -------
        dict of str to bool
            ``stage_name -> enabled`` for every stage in :data:`STAGES`, each
            defaulting to :data:`STAGE_DEFAULTS`.
        """
        pipe = self.data.get("pipeline", {})
        return {n: bool(pipe.get(f"do_{n}", STAGE_DEFAULTS[n])) for n in STAGES}

    # -- structured sections: the domain objects their parsers build --------

    @property
    def visualization(self) -> dict:
        """The raw ``[visualization]`` table (consumed by :attr:`videos`)."""
        return self.data.get("visualization", {})

    @property
    def videos(self) -> "list[VideoSpec]":
        """The output-video specs (``[[visualization.videos]]``)."""
        from .visualization.compose import read_video_specs

        return read_video_specs(self)

    def camera_group(self, image_sizes=None) -> "CameraGroup":
        """The configured camera rig (``[cameras.*]``).

        Parameters
        ----------
        image_sizes
            Optional ``camera_name -> (height, width)`` used to infer principal
            points when a camera omits ``principal_point_px``.

        Returns
        -------
        CameraGroup
            The configured rig.
        """
        from .cameras import CameraGroup

        return CameraGroup.from_config(self, image_sizes=image_sizes)

    def skeleton(self) -> "Skeleton":
        """The configured skeleton (``[skeleton]``), or the default fly skeleton."""
        from .skeleton import Skeleton

        return Skeleton.from_config(self) if "skeleton" in self.data else Skeleton.fly()

    def frame_transforms(self) -> "dict[str, FrameTransform]":
        """Per-camera frame preprocessing (the ``[cameras.<name>]`` ``preprocess`` lists)."""
        from .preprocessing import parse_frame_transforms

        return parse_frame_transforms(self)

    def detection_plan(self) -> "DetectionPlan":
        """The 2D detection plan (``[[sources]]`` + ``[[pose2d.preprocessors]]``/``[[pose2d.models]]``/``[[pose2d.pathways]]``).

        Returns
        -------
        DetectionPlan
            The parsed, validated plan mapping footage sources through
            preprocessors and models into the skeleton (see
            :class:`deeperfly.pose2d.pathways.DetectionPlan`).
        """
        from .pose2d.pathways import DetectionPlan

        return DetectionPlan.from_config(self)

    def camera_table(self) -> tuple[dict, dict]:
        """Split ``[cameras]`` into the shared defaults and the per-camera specs.

        Returns
        -------
        defaults, cameras : dict
            The ``[cameras.defaults]`` spec and the real per-camera specs (keyed
            by name, with ``defaults`` excluded).
        """
        cams = dict(self.data.get("cameras", {}))
        defaults = cams.pop("defaults", {})
        return defaults, cams

    def source_patterns(self) -> dict[str, str]:
        """Map each footage source to its glob (``[[sources]]`` ``name`` -> ``filename``).

        Read directly from the ``[[sources]]`` table (without building the whole
        detection plan) so recording discovery stays cheap. A source with no
        ``filename`` key uses its own name as the glob pattern.

        Returns
        -------
        dict of str to str
            ``source_name -> footage glob`` in config order.

        Raises
        ------
        ValueError
            If a source entry has no string ``name``.
        """
        out: dict[str, str] = {}
        for s in self.data.get("sources", []) or []:
            name = s.get("name")
            if not isinstance(name, str):
                raise ValueError(
                    f"[[sources]] entry needs a string 'name', got {name!r}"
                )
            out[name] = s.get("filename", name)
        return out

    # -- snapshot round-trip -------------------------------------------------

    def snapshot_text(self) -> str:
        """The exact TOML text to snapshot.

        Returns
        -------
        str
            The original file text.

        Raises
        ------
        ValueError
            If this config was built from a dict (no source text to snapshot).
        """
        if self.text is None:
            raise ValueError(
                "this Config was built from a dict; it has no source text to snapshot"
            )
        return self.text

    def save_snapshot(self, outdir: Path) -> None:
        """Snapshot the run config into ``<outdir>/config.toml`` for reproducibility.

        A no-op rewrite when the config already came from there (see
        :meth:`read_for_run`); otherwise it records the ``-c``/default config that
        drives this run, so a later run without ``-c`` reuses the very same config.

        Parameters
        ----------
        outdir
            The run's output directory; the snapshot is written to
            ``<outdir>/config.toml``.
        """
        (Path(outdir) / "config.toml").write_text(self.snapshot_text())

visualization property

visualization: dict

The raw [visualization] table (consumed by :attr:videos).

videos property

videos: 'list[VideoSpec]'

The output-video specs ([[visualization.videos]]).

from_toml classmethod

from_toml(path: str | Path) -> 'Config'

Load a config from a TOML file (preserving its text for snapshots).

Parameters:

Name Type Description Default
path str | Path

Path to a config TOML file.

required

Returns:

Type Description
Config

The loaded, validated config.

Source code in src/deeperfly/config.py
@classmethod
def from_toml(cls, path: str | Path) -> "Config":
    """Load a config from a TOML file (preserving its text for snapshots).

    Parameters
    ----------
    path
        Path to a config TOML file.

    Returns
    -------
    Config
        The loaded, validated config.
    """
    p = Path(path)
    text = p.read_text()
    return cls(tomllib.loads(text), text=text, source=p)

from_dict classmethod

from_dict(data: dict) -> 'Config'

Wrap an already-parsed mapping (library use, tests).

Parameters:

Name Type Description Default
data dict

A parsed config mapping.

required

Returns:

Type Description
Config

The validated config wrapping data (no source text).

Source code in src/deeperfly/config.py
@classmethod
def from_dict(cls, data: dict) -> "Config":
    """Wrap an already-parsed mapping (library use, tests).

    Parameters
    ----------
    data
        A parsed config mapping.

    Returns
    -------
    Config
        The validated config wrapping ``data`` (no source text).
    """
    return cls(data)

default classmethod

default() -> 'Config'

The packaged default config (:data:DEFAULT_CONFIG_PATH).

Source code in src/deeperfly/config.py
@classmethod
def default(cls) -> "Config":
    """The packaged default config (:data:`DEFAULT_CONFIG_PATH`)."""
    return cls.from_toml(DEFAULT_CONFIG_PATH)

read_for_run classmethod

read_for_run(
    cli_config: str | None, outdir: Path
) -> "Config"

Pick the config for one run: -c wins, then the outdir snapshot.

An explicit -c always drives the run (and refreshes the snapshot -- see :meth:save_snapshot). Without -c, the snapshot a previous run left in <outdir>/config.toml is reused -- so both "pass a new -c" and "edit the snapshot and re-run" work; either way the per-stage fingerprints (:mod:deeperfly.pipeline.fingerprint) recompute exactly the stages whose parameters changed. With neither, the packaged default is used.

Parameters:

Name Type Description Default
cli_config str | None

The -c config path, or None.

required
outdir Path

The run's output directory, which may already hold a config.toml snapshot.

required

Returns:

Type Description
Config

The config that drives this run.

Source code in src/deeperfly/config.py
@classmethod
def read_for_run(cls, cli_config: str | None, outdir: Path) -> "Config":
    """Pick the config for one run: ``-c`` wins, then the ``outdir`` snapshot.

    An explicit ``-c`` always drives the run (and refreshes the snapshot --
    see :meth:`save_snapshot`). Without ``-c``, the snapshot a previous run
    left in ``<outdir>/config.toml`` is reused -- so both "pass a new ``-c``"
    and "edit the snapshot and re-run" work; either way the per-stage
    fingerprints (:mod:`deeperfly.pipeline.fingerprint`) recompute exactly
    the stages whose parameters changed. With neither, the packaged default
    is used.

    Parameters
    ----------
    cli_config
        The ``-c`` config path, or ``None``.
    outdir
        The run's output directory, which may already hold a ``config.toml``
        snapshot.

    Returns
    -------
    Config
        The config that drives this run.
    """
    snapshot = Path(outdir) / "config.toml"
    if cli_config:
        path = Path(cli_config)
        log.info("using config %s (from -c)", path)
    elif snapshot.exists():
        path = snapshot
        log.info("using config %s (snapshot in the output dir)", path)
    else:
        path = DEFAULT_CONFIG_PATH
        log.info("using config %s (packaged default; pass -c to override)", path)
    return cls.from_toml(path)

stage_flags

stage_flags() -> dict[str, bool]

Which stages are enabled, from the [pipeline].do_<stage> booleans.

Returns:

Type Description
dict of str to bool

stage_name -> enabled for every stage in :data:STAGES, each defaulting to :data:STAGE_DEFAULTS.

Source code in src/deeperfly/config.py
def stage_flags(self) -> dict[str, bool]:
    """Which stages are enabled, from the ``[pipeline].do_<stage>`` booleans.

    Returns
    -------
    dict of str to bool
        ``stage_name -> enabled`` for every stage in :data:`STAGES`, each
        defaulting to :data:`STAGE_DEFAULTS`.
    """
    pipe = self.data.get("pipeline", {})
    return {n: bool(pipe.get(f"do_{n}", STAGE_DEFAULTS[n])) for n in STAGES}

camera_group

camera_group(image_sizes=None) -> 'CameraGroup'

The configured camera rig ([cameras.*]).

Parameters:

Name Type Description Default
image_sizes

Optional camera_name -> (height, width) used to infer principal points when a camera omits principal_point_px.

None

Returns:

Type Description
CameraGroup

The configured rig.

Source code in src/deeperfly/config.py
def camera_group(self, image_sizes=None) -> "CameraGroup":
    """The configured camera rig (``[cameras.*]``).

    Parameters
    ----------
    image_sizes
        Optional ``camera_name -> (height, width)`` used to infer principal
        points when a camera omits ``principal_point_px``.

    Returns
    -------
    CameraGroup
        The configured rig.
    """
    from .cameras import CameraGroup

    return CameraGroup.from_config(self, image_sizes=image_sizes)

skeleton

skeleton() -> 'Skeleton'

The configured skeleton ([skeleton]), or the default fly skeleton.

Source code in src/deeperfly/config.py
def skeleton(self) -> "Skeleton":
    """The configured skeleton (``[skeleton]``), or the default fly skeleton."""
    from .skeleton import Skeleton

    return Skeleton.from_config(self) if "skeleton" in self.data else Skeleton.fly()

frame_transforms

frame_transforms() -> 'dict[str, FrameTransform]'

Per-camera frame preprocessing (the [cameras.<name>] preprocess lists).

Source code in src/deeperfly/config.py
def frame_transforms(self) -> "dict[str, FrameTransform]":
    """Per-camera frame preprocessing (the ``[cameras.<name>]`` ``preprocess`` lists)."""
    from .preprocessing import parse_frame_transforms

    return parse_frame_transforms(self)

detection_plan

detection_plan() -> 'DetectionPlan'

The 2D detection plan ([[sources]] + [[pose2d.preprocessors]]/[[pose2d.models]]/[[pose2d.pathways]]).

Returns:

Type Description
DetectionPlan

The parsed, validated plan mapping footage sources through preprocessors and models into the skeleton (see :class:deeperfly.pose2d.pathways.DetectionPlan).

Source code in src/deeperfly/config.py
def detection_plan(self) -> "DetectionPlan":
    """The 2D detection plan (``[[sources]]`` + ``[[pose2d.preprocessors]]``/``[[pose2d.models]]``/``[[pose2d.pathways]]``).

    Returns
    -------
    DetectionPlan
        The parsed, validated plan mapping footage sources through
        preprocessors and models into the skeleton (see
        :class:`deeperfly.pose2d.pathways.DetectionPlan`).
    """
    from .pose2d.pathways import DetectionPlan

    return DetectionPlan.from_config(self)

camera_table

camera_table() -> tuple[dict, dict]

Split [cameras] into the shared defaults and the per-camera specs.

Returns:

Type Description
defaults, cameras : dict

The [cameras.defaults] spec and the real per-camera specs (keyed by name, with defaults excluded).

Source code in src/deeperfly/config.py
def camera_table(self) -> tuple[dict, dict]:
    """Split ``[cameras]`` into the shared defaults and the per-camera specs.

    Returns
    -------
    defaults, cameras : dict
        The ``[cameras.defaults]`` spec and the real per-camera specs (keyed
        by name, with ``defaults`` excluded).
    """
    cams = dict(self.data.get("cameras", {}))
    defaults = cams.pop("defaults", {})
    return defaults, cams

source_patterns

source_patterns() -> dict[str, str]

Map each footage source to its glob ([[sources]] name -> filename).

Read directly from the [[sources]] table (without building the whole detection plan) so recording discovery stays cheap. A source with no filename key uses its own name as the glob pattern.

Returns:

Type Description
dict of str to str

source_name -> footage glob in config order.

Raises:

Type Description
ValueError

If a source entry has no string name.

Source code in src/deeperfly/config.py
def source_patterns(self) -> dict[str, str]:
    """Map each footage source to its glob (``[[sources]]`` ``name`` -> ``filename``).

    Read directly from the ``[[sources]]`` table (without building the whole
    detection plan) so recording discovery stays cheap. A source with no
    ``filename`` key uses its own name as the glob pattern.

    Returns
    -------
    dict of str to str
        ``source_name -> footage glob`` in config order.

    Raises
    ------
    ValueError
        If a source entry has no string ``name``.
    """
    out: dict[str, str] = {}
    for s in self.data.get("sources", []) or []:
        name = s.get("name")
        if not isinstance(name, str):
            raise ValueError(
                f"[[sources]] entry needs a string 'name', got {name!r}"
            )
        out[name] = s.get("filename", name)
    return out

snapshot_text

snapshot_text() -> str

The exact TOML text to snapshot.

Returns:

Type Description
str

The original file text.

Raises:

Type Description
ValueError

If this config was built from a dict (no source text to snapshot).

Source code in src/deeperfly/config.py
def snapshot_text(self) -> str:
    """The exact TOML text to snapshot.

    Returns
    -------
    str
        The original file text.

    Raises
    ------
    ValueError
        If this config was built from a dict (no source text to snapshot).
    """
    if self.text is None:
        raise ValueError(
            "this Config was built from a dict; it has no source text to snapshot"
        )
    return self.text

save_snapshot

save_snapshot(outdir: Path) -> None

Snapshot the run config into <outdir>/config.toml for reproducibility.

A no-op rewrite when the config already came from there (see :meth:read_for_run); otherwise it records the -c/default config that drives this run, so a later run without -c reuses the very same config.

Parameters:

Name Type Description Default
outdir Path

The run's output directory; the snapshot is written to <outdir>/config.toml.

required
Source code in src/deeperfly/config.py
def save_snapshot(self, outdir: Path) -> None:
    """Snapshot the run config into ``<outdir>/config.toml`` for reproducibility.

    A no-op rewrite when the config already came from there (see
    :meth:`read_for_run`); otherwise it records the ``-c``/default config that
    drives this run, so a later run without ``-c`` reuses the very same config.

    Parameters
    ----------
    outdir
        The run's output directory; the snapshot is written to
        ``<outdir>/config.toml``.
    """
    (Path(outdir) / "config.toml").write_text(self.snapshot_text())

Cameras

Camera dataclass

A single camera: extrinsics, intrinsics, and lens distortion.

intr is always the 4-vector [fx, fy, cx, cy] (so every camera in a group has the same intrinsic layout); dist holds OpenCV-ordered distortion coefficients (possibly empty).

Source code in src/deeperfly/cameras.py
@dataclass
class Camera:
    """A single camera: extrinsics, intrinsics, and lens distortion.

    ``intr`` is always the 4-vector ``[fx, fy, cx, cy]`` (so every camera in a
    group has the same intrinsic layout); ``dist`` holds OpenCV-ordered
    distortion coefficients (possibly empty).
    """

    rvec: Float[np.ndarray, "3"]
    tvec: Float[np.ndarray, "3"]
    intr: Float[np.ndarray, "4"]
    dist: Float[np.ndarray, "K"]
    name: str | None = None

    @classmethod
    def from_spec(
        cls,
        spec: dict,
        name: str | None = None,
        image_size: tuple[int, int] | None = None,
        transform: "FrameTransform | None" = None,
    ) -> Camera:
        """Build a camera from a config dict (see :func:`resolve_extrinsics`).

        Parameters
        ----------
        spec
            Camera spec dict (extrinsics + intrinsics keys; intrinsics in
            raw-footage pixels).
        name
            Optional camera name stored on the result.
        image_size
            Optional raw-footage ``(height, width)`` pair used to infer the
            principal point (image center) when the spec omits
            ``principal_point_px``, and to anchor ``transform``'s affine.
        transform
            The camera's preprocess transform; when given and non-identity, the
            spec's raw-frame intrinsics are mapped through it (see
            :func:`_parse_intrinsics`).

        Returns
        -------
        Camera
            The constructed camera.
        """
        rvec, tvec = resolve_extrinsics(spec)
        intr, dist = _parse_intrinsics(spec, image_size=image_size, transform=transform)
        return cls(rvec=rvec, tvec=tvec, intr=intr, dist=dist, name=name)

    @property
    def rmat(self) -> Float[np.ndarray, "3 3"]:
        return np.asarray(rvec_to_rmat(self.rvec))

    @property
    def kmat(self) -> Float[np.ndarray, "3 3"]:
        return np.asarray(intr_to_kmat(self.intr))

    @property
    def position(self) -> Float[np.ndarray, "3"]:
        """Camera center in world coordinates, ``-R.T @ tvec``."""
        return -self.rmat.T @ self.tvec

    def project(
        self, pts3d: Float[np.ndarray, "*pts 3"]
    ) -> Float[np.ndarray, "*pts 2"]:
        """Project world points to this camera's image plane.

        Parameters
        ----------
        pts3d
            World points of shape ``(*pts, 3)``.

        Returns
        -------
        np.ndarray
            Image points of shape ``(*pts, 2)``.
        """
        out = project_full(
            np.asarray(pts3d),
            self.rvec[None],
            self.tvec[None],
            self.intr[None],
            self.dist[None],
        )
        return np.asarray(out)[0]

    def backproject_ray(
        self, pixel: Float[np.ndarray, "2"]
    ) -> tuple[Float[np.ndarray, "3"], Float[np.ndarray, "3"]]:
        """The world-frame viewing ray of an image ``pixel`` through this camera.

        Inverse of :meth:`project`: returns ``(origin, direction)`` such that
        every world point ``origin + s * direction`` projects back onto
        ``pixel``. ``origin`` is the camera center. See
        :func:`deeperfly.geometry.backproject_ray_one`.

        Parameters
        ----------
        pixel
            Image point of shape ``(2,)`` in pixels.

        Returns
        -------
        origin, direction : np.ndarray
            The camera center and the (unnormalized) world-frame ray direction,
            each of shape ``(3,)``.
        """
        origin, direction = backproject_ray_one(
            jnp.asarray(pixel, dtype=float),
            jnp.asarray(self.rvec),
            jnp.asarray(self.tvec),
            jnp.asarray(self.intr),
            jnp.asarray(self.dist),
        )
        return np.asarray(origin), np.asarray(direction)

position property

position: Float[ndarray, '3']

Camera center in world coordinates, -R.T @ tvec.

from_spec classmethod

from_spec(
    spec: dict,
    name: str | None = None,
    image_size: tuple[int, int] | None = None,
    transform: "FrameTransform | None" = None,
) -> Camera

Build a camera from a config dict (see :func:resolve_extrinsics).

Parameters:

Name Type Description Default
spec dict

Camera spec dict (extrinsics + intrinsics keys; intrinsics in raw-footage pixels).

required
name str | None

Optional camera name stored on the result.

None
image_size tuple[int, int] | None

Optional raw-footage (height, width) pair used to infer the principal point (image center) when the spec omits principal_point_px, and to anchor transform's affine.

None
transform 'FrameTransform | None'

The camera's preprocess transform; when given and non-identity, the spec's raw-frame intrinsics are mapped through it (see :func:_parse_intrinsics).

None

Returns:

Type Description
Camera

The constructed camera.

Source code in src/deeperfly/cameras.py
@classmethod
def from_spec(
    cls,
    spec: dict,
    name: str | None = None,
    image_size: tuple[int, int] | None = None,
    transform: "FrameTransform | None" = None,
) -> Camera:
    """Build a camera from a config dict (see :func:`resolve_extrinsics`).

    Parameters
    ----------
    spec
        Camera spec dict (extrinsics + intrinsics keys; intrinsics in
        raw-footage pixels).
    name
        Optional camera name stored on the result.
    image_size
        Optional raw-footage ``(height, width)`` pair used to infer the
        principal point (image center) when the spec omits
        ``principal_point_px``, and to anchor ``transform``'s affine.
    transform
        The camera's preprocess transform; when given and non-identity, the
        spec's raw-frame intrinsics are mapped through it (see
        :func:`_parse_intrinsics`).

    Returns
    -------
    Camera
        The constructed camera.
    """
    rvec, tvec = resolve_extrinsics(spec)
    intr, dist = _parse_intrinsics(spec, image_size=image_size, transform=transform)
    return cls(rvec=rvec, tvec=tvec, intr=intr, dist=dist, name=name)

project

project(
    pts3d: Float[ndarray, "*pts 3"],
) -> Float[np.ndarray, "*pts 2"]

Project world points to this camera's image plane.

Parameters:

Name Type Description Default
pts3d Float[ndarray, '*pts 3']

World points of shape (*pts, 3).

required

Returns:

Type Description
ndarray

Image points of shape (*pts, 2).

Source code in src/deeperfly/cameras.py
def project(
    self, pts3d: Float[np.ndarray, "*pts 3"]
) -> Float[np.ndarray, "*pts 2"]:
    """Project world points to this camera's image plane.

    Parameters
    ----------
    pts3d
        World points of shape ``(*pts, 3)``.

    Returns
    -------
    np.ndarray
        Image points of shape ``(*pts, 2)``.
    """
    out = project_full(
        np.asarray(pts3d),
        self.rvec[None],
        self.tvec[None],
        self.intr[None],
        self.dist[None],
    )
    return np.asarray(out)[0]

backproject_ray

backproject_ray(
    pixel: Float[ndarray, "2"],
) -> tuple[Float[np.ndarray, "3"], Float[np.ndarray, "3"]]

The world-frame viewing ray of an image pixel through this camera.

Inverse of :meth:project: returns (origin, direction) such that every world point origin + s * direction projects back onto pixel. origin is the camera center. See :func:deeperfly.geometry.backproject_ray_one.

Parameters:

Name Type Description Default
pixel Float[ndarray, '2']

Image point of shape (2,) in pixels.

required

Returns:

Type Description
origin, direction : np.ndarray

The camera center and the (unnormalized) world-frame ray direction, each of shape (3,).

Source code in src/deeperfly/cameras.py
def backproject_ray(
    self, pixel: Float[np.ndarray, "2"]
) -> tuple[Float[np.ndarray, "3"], Float[np.ndarray, "3"]]:
    """The world-frame viewing ray of an image ``pixel`` through this camera.

    Inverse of :meth:`project`: returns ``(origin, direction)`` such that
    every world point ``origin + s * direction`` projects back onto
    ``pixel``. ``origin`` is the camera center. See
    :func:`deeperfly.geometry.backproject_ray_one`.

    Parameters
    ----------
    pixel
        Image point of shape ``(2,)`` in pixels.

    Returns
    -------
    origin, direction : np.ndarray
        The camera center and the (unnormalized) world-frame ray direction,
        each of shape ``(3,)``.
    """
    origin, direction = backproject_ray_one(
        jnp.asarray(pixel, dtype=float),
        jnp.asarray(self.rvec),
        jnp.asarray(self.tvec),
        jnp.asarray(self.intr),
        jnp.asarray(self.dist),
    )
    return np.asarray(origin), np.asarray(direction)

CameraGroup

An ordered collection of named :class:Camera objects.

Source code in src/deeperfly/cameras.py
class CameraGroup:
    """An ordered collection of named :class:`Camera` objects."""

    def __init__(self, cameras: dict[str, Camera]):
        self.cameras = dict(cameras)

    def __len__(self) -> int:
        return len(self.cameras)

    def __getitem__(self, name: str) -> Camera:
        return self.cameras[name]

    def __iter__(self):
        return iter(self.cameras.values())

    @property
    def names(self) -> list[str]:
        return list(self.cameras)

    # -- construction --------------------------------------------------------

    @classmethod
    def from_config(
        cls,
        config: "Config",
        image_sizes: dict[str, tuple[int, int]] | None = None,
    ) -> CameraGroup:
        """Build a group from a config.

        Reads ``[cameras.defaults]`` and ``[cameras.<name>]``; per-camera keys
        override the defaults. A camera here is a geometric *view*: its
        intrinsics describe its source's raw footage frame, the frame a pathway
        maps its detections back into (see
        :mod:`deeperfly.pose2d.pathways`). Detector-input geometry (mirror,
        crop, resize) lives in the pathways, not on the view.

        Parameters
        ----------
        config
            A :class:`~deeperfly.config.Config`.
        image_sizes
            Maps a view name to its source's raw footage ``(height, width)``,
            used to infer that view's principal point (image center) when
            neither the camera spec nor ``[cameras.defaults]`` specifies
            ``principal_point_px``.

        Returns
        -------
        CameraGroup
            The configured rig.

        Raises
        ------
        ValueError
            If the config defines no cameras.
        """
        defaults, specs = config.camera_table()
        image_sizes = image_sizes or {}
        cameras = {
            name: Camera.from_spec(
                _rig_keys({**defaults, **spec}),
                name=name,
                image_size=image_sizes.get(name),
            )
            for name, spec in specs.items()
        }
        if not cameras:
            raise ValueError("config defines no cameras")
        return cls(cameras)

    @classmethod
    def from_arrays(
        cls,
        names: list[str],
        rvecs: Float[np.ndarray, "V 3"],
        tvecs: Float[np.ndarray, "V 3"],
        intrs: Float[np.ndarray, "V 4"],
        dists: Float[np.ndarray, "V K"],
    ) -> CameraGroup:
        """Build a group from stacked per-camera arrays (e.g. BA output).

        Parameters
        ----------
        names
            Camera names, in order, labelling the leading axis of the arrays.
        rvecs, tvecs
            Stacked extrinsics of shape ``(V, 3)``.
        intrs
            Stacked packed intrinsics of shape ``(V, 4)``.
        dists
            Stacked distortion coefficients of shape ``(V, K)``.

        Returns
        -------
        CameraGroup
            The rig assembled from the arrays.
        """
        rvecs, tvecs, intrs, dists = map(np.asarray, (rvecs, tvecs, intrs, dists))
        cameras = {
            name: Camera(
                rvec=rvecs[i], tvec=tvecs[i], intr=intrs[i], dist=dists[i], name=name
            )
            for i, name in enumerate(names)
        }
        return cls(cameras)

    # -- stacked parameter views --------------------------------------------

    @property
    def rvecs(self) -> Float[np.ndarray, "V 3"]:
        return np.stack([c.rvec for c in self])

    @property
    def tvecs(self) -> Float[np.ndarray, "V 3"]:
        return np.stack([c.tvec for c in self])

    @property
    def intrs(self) -> Float[np.ndarray, "V 4"]:
        return np.stack([c.intr for c in self])

    @property
    def dists(self) -> Float[np.ndarray, "V K"]:
        """Per-camera distortion, zero-padded to the group-wide max length."""
        k = max((c.dist.size for c in self), default=0)
        out = np.zeros((len(self), k))
        for i, c in enumerate(self):
            out[i, : c.dist.size] = c.dist
        return out

    # -- geometry ------------------------------------------------------------

    def project(
        self, pts3d: Float[np.ndarray, "*pts 3"]
    ) -> Float[np.ndarray, "V *pts 2"]:
        """Project world points through every camera.

        Parameters
        ----------
        pts3d
            World points of shape ``(*pts, 3)``.

        Returns
        -------
        np.ndarray
            Image points of shape ``(V, *pts, 2)``.
        """
        out = project_full(
            np.asarray(pts3d), self.rvecs, self.tvecs, self.intrs, self.dists
        )
        return np.asarray(out)

    def triangulate(
        self,
        pts2d: Float[np.ndarray, "V *pts 2"],
        weights: Float[np.ndarray, "V *pts"] | None = None,
    ) -> Float[np.ndarray, "*pts 3"]:
        """Triangulate 3D points from 2D observations and this group's cameras.

        Parameters
        ----------
        pts2d
            2D observations of shape ``(V, *pts, 2)``, NaN for missing.
        weights
            Optional per-(view, point) weights of shape ``(V, *pts)`` for a
            confidence-weighted DLT; ``None`` (default) is plain DLT. See
            :func:`deeperfly.geometry.triangulate_dlt`.

        Returns
        -------
        np.ndarray
            Triangulated points of shape ``(*pts, 3)`` (NaN below two views).
        """
        rtmat = np.concatenate(
            (np.asarray(rvec_to_rmat(self.rvecs)), self.tvecs[..., None]), axis=-1
        )
        pmats = np.asarray(intr_to_kmat(self.intrs)) @ rtmat
        w = None if weights is None else np.asarray(weights)
        return np.asarray(triangulate_dlt(np.asarray(pts2d), pmats, w))

dists property

dists: Float[ndarray, 'V K']

Per-camera distortion, zero-padded to the group-wide max length.

from_config classmethod

from_config(
    config: "Config",
    image_sizes: dict[str, tuple[int, int]] | None = None,
) -> CameraGroup

Build a group from a config.

Reads [cameras.defaults] and [cameras.<name>]; per-camera keys override the defaults. A camera here is a geometric view: its intrinsics describe its source's raw footage frame, the frame a pathway maps its detections back into (see :mod:deeperfly.pose2d.pathways). Detector-input geometry (mirror, crop, resize) lives in the pathways, not on the view.

Parameters:

Name Type Description Default
config 'Config'

A :class:~deeperfly.config.Config.

required
image_sizes dict[str, tuple[int, int]] | None

Maps a view name to its source's raw footage (height, width), used to infer that view's principal point (image center) when neither the camera spec nor [cameras.defaults] specifies principal_point_px.

None

Returns:

Type Description
CameraGroup

The configured rig.

Raises:

Type Description
ValueError

If the config defines no cameras.

Source code in src/deeperfly/cameras.py
@classmethod
def from_config(
    cls,
    config: "Config",
    image_sizes: dict[str, tuple[int, int]] | None = None,
) -> CameraGroup:
    """Build a group from a config.

    Reads ``[cameras.defaults]`` and ``[cameras.<name>]``; per-camera keys
    override the defaults. A camera here is a geometric *view*: its
    intrinsics describe its source's raw footage frame, the frame a pathway
    maps its detections back into (see
    :mod:`deeperfly.pose2d.pathways`). Detector-input geometry (mirror,
    crop, resize) lives in the pathways, not on the view.

    Parameters
    ----------
    config
        A :class:`~deeperfly.config.Config`.
    image_sizes
        Maps a view name to its source's raw footage ``(height, width)``,
        used to infer that view's principal point (image center) when
        neither the camera spec nor ``[cameras.defaults]`` specifies
        ``principal_point_px``.

    Returns
    -------
    CameraGroup
        The configured rig.

    Raises
    ------
    ValueError
        If the config defines no cameras.
    """
    defaults, specs = config.camera_table()
    image_sizes = image_sizes or {}
    cameras = {
        name: Camera.from_spec(
            _rig_keys({**defaults, **spec}),
            name=name,
            image_size=image_sizes.get(name),
        )
        for name, spec in specs.items()
    }
    if not cameras:
        raise ValueError("config defines no cameras")
    return cls(cameras)

from_arrays classmethod

from_arrays(
    names: list[str],
    rvecs: Float[ndarray, "V 3"],
    tvecs: Float[ndarray, "V 3"],
    intrs: Float[ndarray, "V 4"],
    dists: Float[ndarray, "V K"],
) -> CameraGroup

Build a group from stacked per-camera arrays (e.g. BA output).

Parameters:

Name Type Description Default
names list[str]

Camera names, in order, labelling the leading axis of the arrays.

required
rvecs Float[ndarray, 'V 3']

Stacked extrinsics of shape (V, 3).

required
tvecs Float[ndarray, 'V 3']

Stacked extrinsics of shape (V, 3).

required
intrs Float[ndarray, 'V 4']

Stacked packed intrinsics of shape (V, 4).

required
dists Float[ndarray, 'V K']

Stacked distortion coefficients of shape (V, K).

required

Returns:

Type Description
CameraGroup

The rig assembled from the arrays.

Source code in src/deeperfly/cameras.py
@classmethod
def from_arrays(
    cls,
    names: list[str],
    rvecs: Float[np.ndarray, "V 3"],
    tvecs: Float[np.ndarray, "V 3"],
    intrs: Float[np.ndarray, "V 4"],
    dists: Float[np.ndarray, "V K"],
) -> CameraGroup:
    """Build a group from stacked per-camera arrays (e.g. BA output).

    Parameters
    ----------
    names
        Camera names, in order, labelling the leading axis of the arrays.
    rvecs, tvecs
        Stacked extrinsics of shape ``(V, 3)``.
    intrs
        Stacked packed intrinsics of shape ``(V, 4)``.
    dists
        Stacked distortion coefficients of shape ``(V, K)``.

    Returns
    -------
    CameraGroup
        The rig assembled from the arrays.
    """
    rvecs, tvecs, intrs, dists = map(np.asarray, (rvecs, tvecs, intrs, dists))
    cameras = {
        name: Camera(
            rvec=rvecs[i], tvec=tvecs[i], intr=intrs[i], dist=dists[i], name=name
        )
        for i, name in enumerate(names)
    }
    return cls(cameras)

project

project(
    pts3d: Float[ndarray, "*pts 3"],
) -> Float[np.ndarray, "V *pts 2"]

Project world points through every camera.

Parameters:

Name Type Description Default
pts3d Float[ndarray, '*pts 3']

World points of shape (*pts, 3).

required

Returns:

Type Description
ndarray

Image points of shape (V, *pts, 2).

Source code in src/deeperfly/cameras.py
def project(
    self, pts3d: Float[np.ndarray, "*pts 3"]
) -> Float[np.ndarray, "V *pts 2"]:
    """Project world points through every camera.

    Parameters
    ----------
    pts3d
        World points of shape ``(*pts, 3)``.

    Returns
    -------
    np.ndarray
        Image points of shape ``(V, *pts, 2)``.
    """
    out = project_full(
        np.asarray(pts3d), self.rvecs, self.tvecs, self.intrs, self.dists
    )
    return np.asarray(out)

triangulate

triangulate(
    pts2d: Float[ndarray, "V *pts 2"],
    weights: Float[ndarray, "V *pts"] | None = None,
) -> Float[np.ndarray, "*pts 3"]

Triangulate 3D points from 2D observations and this group's cameras.

Parameters:

Name Type Description Default
pts2d Float[ndarray, 'V *pts 2']

2D observations of shape (V, *pts, 2), NaN for missing.

required
weights Float[ndarray, 'V *pts'] | None

Optional per-(view, point) weights of shape (V, *pts) for a confidence-weighted DLT; None (default) is plain DLT. See :func:deeperfly.geometry.triangulate_dlt.

None

Returns:

Type Description
ndarray

Triangulated points of shape (*pts, 3) (NaN below two views).

Source code in src/deeperfly/cameras.py
def triangulate(
    self,
    pts2d: Float[np.ndarray, "V *pts 2"],
    weights: Float[np.ndarray, "V *pts"] | None = None,
) -> Float[np.ndarray, "*pts 3"]:
    """Triangulate 3D points from 2D observations and this group's cameras.

    Parameters
    ----------
    pts2d
        2D observations of shape ``(V, *pts, 2)``, NaN for missing.
    weights
        Optional per-(view, point) weights of shape ``(V, *pts)`` for a
        confidence-weighted DLT; ``None`` (default) is plain DLT. See
        :func:`deeperfly.geometry.triangulate_dlt`.

    Returns
    -------
    np.ndarray
        Triangulated points of shape ``(*pts, 3)`` (NaN below two views).
    """
    rtmat = np.concatenate(
        (np.asarray(rvec_to_rmat(self.rvecs)), self.tvecs[..., None]), axis=-1
    )
    pmats = np.asarray(intr_to_kmat(self.intrs)) @ rtmat
    w = None if weights is None else np.asarray(weights)
    return np.asarray(triangulate_dlt(np.asarray(pts2d), pmats, w))

Skeleton

Skeleton dataclass

An ordered set of tracked points with limb/bone structure and visibility.

Attributes:

Name Type Description
name str

Identifier for the skeleton (e.g. "fly38").

point_names tuple[str, ...]

Human-readable name per tracked point, in order (length n_points).

limb_names, limb_id, bones

Limb structure derived from the config's limb_points mapping (see :func:_parse_limb_points): the limb names (length n_limbs), each point's limb index (shape (n_points,)), and the within-view 2D edges as point-index pairs (shape (n_bones, 2)).

palette dict[str, str]

Mapping limb_name -> hex color for plotting. Limbs absent from the mapping fall back to a default colormap in the visualization helpers.

Which view sees which point lives in the detection plan (the pathways'
``(channel, view, point)`` mappings), not here an unobserved ``(view, point)``
is simply ``NaN`` in the points array.
Source code in src/deeperfly/skeleton.py
@dataclass(frozen=True)
class Skeleton:
    """An ordered set of tracked points with limb/bone structure and visibility.

    Attributes
    ----------
    name
        Identifier for the skeleton (e.g. ``"fly38"``).
    point_names
        Human-readable name per tracked point, in order (length ``n_points``).
    limb_names, limb_id, bones
        Limb structure derived from the config's ``limb_points`` mapping (see
        :func:`_parse_limb_points`): the limb names (length ``n_limbs``), each
        point's limb index (shape ``(n_points,)``), and the within-view 2D edges
        as point-index pairs (shape ``(n_bones, 2)``).
    palette
        Mapping ``limb_name -> hex color`` for plotting. Limbs absent from the
        mapping fall back to a default colormap in the visualization helpers.

    Which view sees which point lives in the detection plan (the pathways'
    ``(channel, view, point)`` mappings), not here: an unobserved ``(view, point)``
    is simply ``NaN`` in the points array.
    """

    name: str
    point_names: tuple[str, ...]
    limb_names: tuple[str, ...]
    limb_id: Int[np.ndarray, "P"]
    bones: Int[np.ndarray, "B 2"]
    palette: dict[str, str]

    # -- construction --------------------------------------------------------

    @classmethod
    def fly(cls) -> Skeleton:
        """The default 38-point Drosophila skeleton (DeepFly3D 7-camera rig)."""
        from .config import Config

        return cls.from_config(Config.default())

    @classmethod
    def from_config(cls, config: "Config") -> Skeleton:
        """Build a skeleton from a config.

        Parameters
        ----------
        config
            A :class:`~deeperfly.config.Config` with a ``[skeleton]`` table.

        Returns
        -------
        Skeleton
            The skeleton described by the config's ``[skeleton]`` table.

        Raises
        ------
        ValueError
            If a ``limb_points`` entry names an unknown point or an
            out-of-range point index.
        """
        spec = config.data["skeleton"]
        point_names = tuple(spec["point_names"])
        limb_names, limb_id, bones = _parse_limb_points(
            spec.get("limb_points", {}), point_names
        )
        palette = {str(k): str(v) for k, v in spec.get("limb_palette", {}).items()}
        return cls(
            name=spec.get("name", "skeleton"),
            point_names=point_names,
            limb_names=limb_names,
            limb_id=limb_id,
            bones=bones,
            palette=palette,
        )

    # -- basic views ---------------------------------------------------------

    @property
    def n_points(self) -> int:
        return len(self.point_names)

    @property
    def n_limbs(self) -> int:
        return len(self.limb_names)

    def __len__(self) -> int:
        return self.n_points

    # -- derived structure ---------------------------------------------------

    def bone_index_pairs(
        self,
    ) -> tuple[Int[np.ndarray, "B"], Int[np.ndarray, "B"]]:
        """Endpoint index arrays ``(i, j)`` for vectorized bone-length maths.

        Returns
        -------
        i, j : np.ndarray
            The first and second endpoint index of each bone (shape ``(B,)``).
        """
        return self.bones[:, 0], self.bones[:, 1]

fly classmethod

fly() -> Skeleton

The default 38-point Drosophila skeleton (DeepFly3D 7-camera rig).

Source code in src/deeperfly/skeleton.py
@classmethod
def fly(cls) -> Skeleton:
    """The default 38-point Drosophila skeleton (DeepFly3D 7-camera rig)."""
    from .config import Config

    return cls.from_config(Config.default())

from_config classmethod

from_config(config: 'Config') -> Skeleton

Build a skeleton from a config.

Parameters:

Name Type Description Default
config 'Config'

A :class:~deeperfly.config.Config with a [skeleton] table.

required

Returns:

Type Description
Skeleton

The skeleton described by the config's [skeleton] table.

Raises:

Type Description
ValueError

If a limb_points entry names an unknown point or an out-of-range point index.

Source code in src/deeperfly/skeleton.py
@classmethod
def from_config(cls, config: "Config") -> Skeleton:
    """Build a skeleton from a config.

    Parameters
    ----------
    config
        A :class:`~deeperfly.config.Config` with a ``[skeleton]`` table.

    Returns
    -------
    Skeleton
        The skeleton described by the config's ``[skeleton]`` table.

    Raises
    ------
    ValueError
        If a ``limb_points`` entry names an unknown point or an
        out-of-range point index.
    """
    spec = config.data["skeleton"]
    point_names = tuple(spec["point_names"])
    limb_names, limb_id, bones = _parse_limb_points(
        spec.get("limb_points", {}), point_names
    )
    palette = {str(k): str(v) for k, v in spec.get("limb_palette", {}).items()}
    return cls(
        name=spec.get("name", "skeleton"),
        point_names=point_names,
        limb_names=limb_names,
        limb_id=limb_id,
        bones=bones,
        palette=palette,
    )

bone_index_pairs

bone_index_pairs() -> tuple[
    Int[np.ndarray, "B"], Int[np.ndarray, "B"]
]

Endpoint index arrays (i, j) for vectorized bone-length maths.

Returns:

Type Description
i, j : np.ndarray

The first and second endpoint index of each bone (shape (B,)).

Source code in src/deeperfly/skeleton.py
def bone_index_pairs(
    self,
) -> tuple[Int[np.ndarray, "B"], Int[np.ndarray, "B"]]:
    """Endpoint index arrays ``(i, j)`` for vectorized bone-length maths.

    Returns
    -------
    i, j : np.ndarray
        The first and second endpoint index of each bone (shape ``(B,)``).
    """
    return self.bones[:, 0], self.bones[:, 1]

Results

PoseResult dataclass

A complete multi-view pose-estimation result for one recording.

Source code in src/deeperfly/results.py
@dataclass
class PoseResult:
    """A complete multi-view pose-estimation result for one recording."""

    cameras: CameraGroup
    skeleton: Skeleton
    pts2d: Float[np.ndarray, "V T P 2"]
    conf: Float[np.ndarray, "V T P"] | None = None
    pts3d: Float[np.ndarray, "T P 3"] | None = None
    reproj_error: Float[np.ndarray, "V T P"] | None = None
    meta: dict = field(default_factory=dict)

    def __post_init__(self) -> None:
        self.pts2d = np.asarray(self.pts2d, dtype=float)
        for name in ("conf", "pts3d", "reproj_error"):
            arr = getattr(self, name)
            if arr is not None:
                setattr(self, name, np.asarray(arr, dtype=float))

    @property
    def n_views(self) -> int:
        return self.pts2d.shape[0]

    @property
    def n_frames(self) -> int:
        return self.pts2d.shape[1]

    # -- serialization -------------------------------------------------------

    def save(self, path: str | Path) -> None:
        """Write the result to an HDF5 file (overwriting ``path``).

        The library one-shot: ``pts2d``/``conf`` go to ``pose2d/`` and, when a 3D
        pose is present, the (possibly cleaned) 2D, 3D and reprojection error go
        to ``triangulation/`` -- so :meth:`load` round-trips the assembled view.
        ``pts2d`` is duplicated into both groups in that case (it is small next
        to the footage).

        Parameters
        ----------
        path
            Destination ``.h5`` path; an existing file is overwritten.
        """
        meta = {
            "deeperfly_format_version": FORMAT_VERSION,
            "created_utc": datetime.now(timezone.utc).isoformat(),
            **self.meta,
        }
        with h5py.File(path, "w") as f:
            f.attrs["meta"] = json.dumps(meta)
            _write_skeleton(f.create_group("skeleton"), self.skeleton)
            g2d = f.create_group("pose2d")
            g2d.create_dataset("points", data=self.pts2d)
            if self.conf is not None:
                g2d.create_dataset("conf", data=self.conf)
            _write_cameras(g2d.create_group("cameras"), self.cameras)
            if self.pts3d is not None or self.reproj_error is not None:
                g3d = f.create_group("triangulation")
                g3d.create_dataset("points", data=self.pts2d)
                if self.pts3d is not None:
                    g3d.create_dataset("points3d", data=self.pts3d)
                if self.reproj_error is not None:
                    g3d.create_dataset("reproj_error", data=self.reproj_error)

    @classmethod
    def load(cls, path: str | Path) -> PoseResult:
        """Read the assembled :class:`PoseResult` back from an HDF5 file.

        Assembly prefers the most-derived data present: ``pts2d`` from
        triangulation, else pictorial_structures, else pose2d; ``pts3d`` /
        ``reproj_error`` from triangulation, else pictorial_structures; cameras
        from bundle_adjustment, else the pose2d config rig.

        Parameters
        ----------
        path
            Path to a ``.h5`` file written by :meth:`save` or the staged run.

        Returns
        -------
        PoseResult
            The assembled result (cameras, skeleton, points and ``meta``).
        """
        with h5py.File(path, "r") as f:
            meta = json.loads(f.attrs["meta"])  # type: ignore[arg-type]
            version = meta.pop("deeperfly_format_version", None)
            if version != FORMAT_VERSION:
                raise ValueError(
                    f"{path} has deeperfly format version {version!r}, expected "
                    f"{FORMAT_VERSION}; re-run the pipeline to regenerate it"
                )
            skeleton = _read_skeleton(f["skeleton"])  # type: ignore[arg-type]
            cameras_group = (
                f["bundle_adjustment/cameras"]
                if "bundle_adjustment/cameras" in f
                else f["pose2d/cameras"]
            )
            cameras = _read_cameras(cameras_group)  # type: ignore[arg-type]
            pts2d = pts3d = reproj = None
            for stage in ("triangulation", "pictorial_structures", "pose2d"):
                if pts2d is None and f"{stage}/points" in f:
                    pts2d = f[f"{stage}/points"][()]  # type: ignore[index]
                if pts3d is None and f"{stage}/points3d" in f:
                    pts3d = f[f"{stage}/points3d"][()]  # type: ignore[index]
                if reproj is None and f"{stage}/reproj_error" in f:
                    reproj = f[f"{stage}/reproj_error"][()]  # type: ignore[index]
            conf = f["pose2d/conf"][()] if "pose2d/conf" in f else None  # type: ignore[index]
        if pts2d is None:
            raise ValueError(f"{path} has no 2D points (no pose2d group)")
        return cls(
            cameras=cameras,
            skeleton=skeleton,
            pts2d=pts2d,  # type: ignore[arg-type]
            conf=conf,  # type: ignore[arg-type]
            pts3d=pts3d,  # type: ignore[arg-type]
            reproj_error=reproj,  # type: ignore[arg-type]
            meta=meta,
        )

save

save(path: str | Path) -> None

Write the result to an HDF5 file (overwriting path).

The library one-shot: pts2d/conf go to pose2d/ and, when a 3D pose is present, the (possibly cleaned) 2D, 3D and reprojection error go to triangulation/ -- so :meth:load round-trips the assembled view. pts2d is duplicated into both groups in that case (it is small next to the footage).

Parameters:

Name Type Description Default
path str | Path

Destination .h5 path; an existing file is overwritten.

required
Source code in src/deeperfly/results.py
def save(self, path: str | Path) -> None:
    """Write the result to an HDF5 file (overwriting ``path``).

    The library one-shot: ``pts2d``/``conf`` go to ``pose2d/`` and, when a 3D
    pose is present, the (possibly cleaned) 2D, 3D and reprojection error go
    to ``triangulation/`` -- so :meth:`load` round-trips the assembled view.
    ``pts2d`` is duplicated into both groups in that case (it is small next
    to the footage).

    Parameters
    ----------
    path
        Destination ``.h5`` path; an existing file is overwritten.
    """
    meta = {
        "deeperfly_format_version": FORMAT_VERSION,
        "created_utc": datetime.now(timezone.utc).isoformat(),
        **self.meta,
    }
    with h5py.File(path, "w") as f:
        f.attrs["meta"] = json.dumps(meta)
        _write_skeleton(f.create_group("skeleton"), self.skeleton)
        g2d = f.create_group("pose2d")
        g2d.create_dataset("points", data=self.pts2d)
        if self.conf is not None:
            g2d.create_dataset("conf", data=self.conf)
        _write_cameras(g2d.create_group("cameras"), self.cameras)
        if self.pts3d is not None or self.reproj_error is not None:
            g3d = f.create_group("triangulation")
            g3d.create_dataset("points", data=self.pts2d)
            if self.pts3d is not None:
                g3d.create_dataset("points3d", data=self.pts3d)
            if self.reproj_error is not None:
                g3d.create_dataset("reproj_error", data=self.reproj_error)

load classmethod

load(path: str | Path) -> PoseResult

Read the assembled :class:PoseResult back from an HDF5 file.

Assembly prefers the most-derived data present: pts2d from triangulation, else pictorial_structures, else pose2d; pts3d / reproj_error from triangulation, else pictorial_structures; cameras from bundle_adjustment, else the pose2d config rig.

Parameters:

Name Type Description Default
path str | Path

Path to a .h5 file written by :meth:save or the staged run.

required

Returns:

Type Description
PoseResult

The assembled result (cameras, skeleton, points and meta).

Source code in src/deeperfly/results.py
@classmethod
def load(cls, path: str | Path) -> PoseResult:
    """Read the assembled :class:`PoseResult` back from an HDF5 file.

    Assembly prefers the most-derived data present: ``pts2d`` from
    triangulation, else pictorial_structures, else pose2d; ``pts3d`` /
    ``reproj_error`` from triangulation, else pictorial_structures; cameras
    from bundle_adjustment, else the pose2d config rig.

    Parameters
    ----------
    path
        Path to a ``.h5`` file written by :meth:`save` or the staged run.

    Returns
    -------
    PoseResult
        The assembled result (cameras, skeleton, points and ``meta``).
    """
    with h5py.File(path, "r") as f:
        meta = json.loads(f.attrs["meta"])  # type: ignore[arg-type]
        version = meta.pop("deeperfly_format_version", None)
        if version != FORMAT_VERSION:
            raise ValueError(
                f"{path} has deeperfly format version {version!r}, expected "
                f"{FORMAT_VERSION}; re-run the pipeline to regenerate it"
            )
        skeleton = _read_skeleton(f["skeleton"])  # type: ignore[arg-type]
        cameras_group = (
            f["bundle_adjustment/cameras"]
            if "bundle_adjustment/cameras" in f
            else f["pose2d/cameras"]
        )
        cameras = _read_cameras(cameras_group)  # type: ignore[arg-type]
        pts2d = pts3d = reproj = None
        for stage in ("triangulation", "pictorial_structures", "pose2d"):
            if pts2d is None and f"{stage}/points" in f:
                pts2d = f[f"{stage}/points"][()]  # type: ignore[index]
            if pts3d is None and f"{stage}/points3d" in f:
                pts3d = f[f"{stage}/points3d"][()]  # type: ignore[index]
            if reproj is None and f"{stage}/reproj_error" in f:
                reproj = f[f"{stage}/reproj_error"][()]  # type: ignore[index]
        conf = f["pose2d/conf"][()] if "pose2d/conf" in f else None  # type: ignore[index]
    if pts2d is None:
        raise ValueError(f"{path} has no 2D points (no pose2d group)")
    return cls(
        cameras=cameras,
        skeleton=skeleton,
        pts2d=pts2d,  # type: ignore[arg-type]
        conf=conf,  # type: ignore[arg-type]
        pts3d=pts3d,  # type: ignore[arg-type]
        reproj_error=reproj,  # type: ignore[arg-type]
        meta=meta,
    )

Recordings

Recording dataclass

One unit of work: a camera -> footage-files map and where its results go.

sources maps a camera name to its naturally-sorted footage files (a single video, or an image sequence), already reconciled to one extension and validated to share a file and frame count with the other cameras. Empty only for a directory kept so a resume can reuse a cached result though its footage is absent (see :func:resolve_recordings).

outdir is this recording's output directory (see :func:plan_outdirs) -- the run's durable identity, holding the config snapshot and cached results.h5. The input directory is not retained; a resume re-passes the recording, which re-resolves sources the same way.

Source code in src/deeperfly/recordings.py
@dataclass(frozen=True)
class Recording:
    """One unit of work: a camera -> footage-files map and where its results go.

    ``sources`` maps a camera name to its naturally-sorted footage files (a single
    video, or an image sequence), already reconciled to one extension and validated
    to share a file and frame count with the other cameras. Empty only for a
    directory kept so a resume can reuse a cached result though its footage is
    absent (see :func:`resolve_recordings`).

    ``outdir`` is this recording's output directory (see :func:`plan_outdirs`) --
    the run's durable identity, holding the config snapshot and cached ``results.h5``.
    The input directory is not retained; a resume re-passes the recording, which
    re-resolves ``sources`` the same way.
    """

    sources: dict[str, list[Path]]
    outdir: Path

resolve_recordings

resolve_recordings(
    inputs: list[Path], *, recursive: bool, config: Config
) -> list[tuple[Path, dict[str, list[Path]]]]

Expand the run inputs into the recordings to process.

inputs is one or more input arguments, each a literal path or a wildcard pattern expanded against the filesystem (:func:_expand_pattern). A recording is a directory holding footage for every configured camera, resolved to a camera -> files map by :func:find_recording (which warns and skips a malformed one). Output directories are resolved separately (:func:plan_outdirs). The behaviors:

  • A single literal path is taken as that one recording -- kept (with empty sources) even when it is not valid footage, so a resume from its cached result still works -- with a warning naming it when it is not a valid recording.
  • Several inputs and/or a wildcard run as a batch: only the valid recordings are kept (a wildcard's incidental non-recording matches are dropped silently); nothing valid is a warned error.
  • With --recursive each input is a parent directory whose subtree is walked for recordings; an empty result is an error.

De-duplicated by directory (overlapping inputs) keeping first-seen order.

Parameters:

Name Type Description Default
inputs list[Path]

One or more input arguments (literal paths or wildcard patterns).

required
recursive bool

Whether each input is a parent directory whose subtree is searched.

required
config Config

The discovery config (recognizes recording directories).

required

Returns:

Type Description
list of (Path, dict)

(recording directory, camera -> footage files) per recording.

Raises:

Type Description
SystemExit

If no valid recording can be resolved from inputs.

Source code in src/deeperfly/recordings.py
def resolve_recordings(
    inputs: list[Path], *, recursive: bool, config: Config
) -> list[tuple[Path, dict[str, list[Path]]]]:
    """Expand the ``run`` inputs into the recordings to process.

    ``inputs`` is one or more input arguments, each a literal path or a wildcard
    pattern expanded against the filesystem (:func:`_expand_pattern`). A *recording*
    is a directory holding footage for every configured camera, resolved to a
    ``camera -> files`` map by :func:`find_recording` (which warns and skips a
    malformed one). Output directories are resolved separately
    (:func:`plan_outdirs`). The behaviors:

    - A single literal path is taken as that one recording -- kept (with empty
      sources) even when it is not valid footage, so a resume from its cached result
      still works -- with a warning naming it when it is not a valid recording.
    - Several inputs and/or a wildcard run as a batch: only the valid recordings are
      kept (a wildcard's incidental non-recording matches are dropped silently);
      nothing valid is a warned error.
    - With ``--recursive`` each input is a *parent* directory whose subtree is walked
      for recordings; an empty result is an error.

    De-duplicated by directory (overlapping inputs) keeping first-seen order.

    Parameters
    ----------
    inputs
        One or more input arguments (literal paths or wildcard patterns).
    recursive
        Whether each input is a parent directory whose subtree is searched.
    config
        The discovery config (recognizes recording directories).

    Returns
    -------
    list of (Path, dict)
        ``(recording directory, camera -> footage files)`` per recording.

    Raises
    ------
    SystemExit
        If no valid recording can be resolved from ``inputs``.
    """
    candidates: list[tuple[Path, bool]] = []
    for arg in inputs:
        paths, is_glob = _expand_pattern(str(arg))
        if is_glob and not paths:
            log.warning("input pattern %r matched no paths", str(arg))
        candidates += [(p, is_glob) for p in paths]

    if recursive:
        found: list[tuple[Path, dict[str, list[Path]]]] = []
        for root, is_glob in candidates:
            if not root.is_dir():
                if not is_glob:  # a literal parent the user named but that is absent
                    log.warning(
                        "%s is not a directory -- --recursive searches a parent "
                        "directory for recordings; skipping",
                        root.resolve(),
                    )
                continue
            for d in [root, *sorted(root.rglob("*"))]:
                if d.is_dir() and (src := find_recording(d, config)) is not None:
                    found.append((d, src))
        found = _dedup_found(found)
        if not found:
            log.warning(
                "no recordings found under %s (searched recursively); a recording is "
                "a directory holding footage for every configured camera",
                [str(p) for p, _ in candidates] or [str(a) for a in inputs],
            )
            raise SystemExit("no recordings to run")
        return found

    # Non-recursive. A single explicit path is honored as-is (resume-friendly): keep
    # it even when it is not valid footage, so resuming from its cache still works.
    if len(candidates) == 1 and not candidates[0][1]:
        path = candidates[0][0]
        src = find_recording(path, config)
        if src is None:
            log.warning(
                "%s is not a valid recording directory -- it does not hold footage "
                "for every configured camera (it can still resume from a cached "
                "result in its output dir)",
                path.resolve(),
            )
            src = {}
        return [(path, src)]

    # Several inputs and/or a wildcard: a batch. Keep only the valid recordings; only
    # warn (and error) when the inputs yield no valid recording at all.
    found = _dedup_found(
        (p, src)
        for p, _ in candidates
        if (src := find_recording(p, config)) is not None
    )
    if not found:
        log.warning(
            "none of the inputs is a valid recording directory (a directory holding "
            "footage for every configured camera)",
        )
        raise SystemExit("no valid recording directories among the inputs")
    return found

Bundle adjustment

bundle_adjust

bundle_adjust(
    cameras: CameraGroup,
    pts2d: Float[ndarray, "V N 2"],
    *,
    fixed: Sequence[str] = (),
    shared: Sequence[Sequence[str]] = (),
    pts3d: Float[ndarray, "N 3"] | None = None,
    **solver_kwargs,
) -> tuple[
    OptimizeResult, CameraGroup, Float[ndarray, "N 3"]
]

Bundle-adjust a camera group against observed 2D points.

Parameters:

Name Type Description Default
cameras CameraGroup

Initial cameras. Their stacked parameters seed the optimization.

required
pts2d Float[ndarray, 'V N 2']

Observed 2D points of shape (V, N, 2) with NaNs for missing.

required
fixed Sequence[str]

Parameter references to hold constant / tie together; see :func:deeperfly.bundle_adjustment.state.build_state.

()
shared Sequence[str]

Parameter references to hold constant / tie together; see :func:deeperfly.bundle_adjustment.state.build_state.

()
pts3d Float[ndarray, 'N 3'] | None

Initial 3D points; triangulated from cameras if omitted.

None
**solver_kwargs

Forwarded to the core solver (e.g. max_nfev, loss, f_scale).

{}

Returns:

Name Type Description
result OptimizeResult

The raw scipy least-squares result.

optimized_cameras CameraGroup

A camera group carrying the refined parameters.

pts3d ndarray

The refined 3D points of shape (N, 3).

Source code in src/deeperfly/bundle_adjustment/__init__.py
def bundle_adjust(
    cameras: CameraGroup,
    pts2d: Float[ndarray, "V N 2"],
    *,
    fixed: Sequence[str] = (),
    shared: Sequence[Sequence[str]] = (),
    pts3d: Float[ndarray, "N 3"] | None = None,
    **solver_kwargs,
) -> tuple[OptimizeResult, CameraGroup, Float[ndarray, "N 3"]]:
    """Bundle-adjust a camera group against observed 2D points.

    Parameters
    ----------
    cameras
        Initial cameras. Their stacked parameters seed the optimization.
    pts2d
        Observed 2D points of shape ``(V, N, 2)`` with NaNs for missing.
    fixed, shared
        Parameter references to hold constant / tie together; see
        :func:`deeperfly.bundle_adjustment.state.build_state`.
    pts3d
        Initial 3D points; triangulated from ``cameras`` if omitted.
    **solver_kwargs
        Forwarded to the core solver (e.g. ``max_nfev``, ``loss``, ``f_scale``).

    Returns
    -------
    result : scipy.optimize.OptimizeResult
        The raw scipy least-squares result.
    optimized_cameras : CameraGroup
        A camera group carrying the refined parameters.
    pts3d : np.ndarray
        The refined 3D points of shape ``(N, 3)``.
    """
    state = build_state(
        cameras.rvecs,
        cameras.tvecs,
        cameras.intrs,
        cameras.dists,
        pts2d,
        cameras.names,
        fixed=fixed,
        shared=shared,
        pts3d=pts3d,
    )
    result, solution = core.bundle_adjust(*state, **solver_kwargs)
    optimized = CameraGroup.from_arrays(
        cameras.names,
        solution.rvecs,
        solution.tvecs,
        solution.intrs,
        solution.dists,
    )
    return result, optimized, solution.pts3d

bundle_adjust_from_config

bundle_adjust_from_config(
    config: "Config", pts2d: Float[ndarray, "V N 2"]
) -> tuple[
    OptimizeResult, CameraGroup, Float[ndarray, "N 3"]
]

Run :func:bundle_adjust driven by a TOML config.

The [bundle_adjustment] section supplies fixed / shared and the flat scipy least_squares kwargs (e.g. max_nfev / loss). The points_to_use key (which restricts the bundle-adjustment keypoints) is a pipeline-level concern handled by :func:deeperfly.pipeline.bundle_adjust_cameras, not here.

Parameters:

Name Type Description Default
config 'Config'

A :class:~deeperfly.config.Config.

required
pts2d Float[ndarray, 'V N 2']

Observed 2D points of shape (V, N, 2) with NaNs for missing.

required

Returns:

Name Type Description
result OptimizeResult

The raw scipy least-squares result.

optimized_cameras CameraGroup

A camera group carrying the refined parameters.

pts3d ndarray

The refined 3D points of shape (N, 3).

Source code in src/deeperfly/bundle_adjustment/__init__.py
def bundle_adjust_from_config(
    config: "Config",
    pts2d: Float[ndarray, "V N 2"],
) -> tuple[OptimizeResult, CameraGroup, Float[ndarray, "N 3"]]:
    """Run :func:`bundle_adjust` driven by a TOML config.

    The ``[bundle_adjustment]`` section supplies ``fixed`` / ``shared`` and
    the flat scipy ``least_squares`` kwargs (e.g. ``max_nfev`` / ``loss``). The
    ``points_to_use`` key (which restricts the bundle-adjustment keypoints) is a
    pipeline-level concern handled by :func:`deeperfly.pipeline.bundle_adjust_cameras`,
    not here.

    Parameters
    ----------
    config
        A :class:`~deeperfly.config.Config`.
    pts2d
        Observed 2D points of shape ``(V, N, 2)`` with NaNs for missing.

    Returns
    -------
    result : scipy.optimize.OptimizeResult
        The raw scipy least-squares result.
    optimized_cameras : CameraGroup
        A camera group carrying the refined parameters.
    pts3d : np.ndarray
        The refined 3D points of shape ``(N, 3)``.
    """
    cameras = CameraGroup.from_config(config)
    ba = config.bundle_adjustment
    return bundle_adjust(
        cameras,
        pts2d,
        fixed=ba.fixed,
        shared=ba.shared,
        **ba.least_squares,
    )

Pipeline

run_from_points2d

run_from_points2d(
    cameras: CameraGroup,
    skeleton: Skeleton,
    pts2d: Float[ndarray, "V T P 2"],
    conf: Float[ndarray, "V T P"] | None = None,
    *,
    do_bundle_adjust: bool = True,
    bundle_adjust_kwargs: dict | None = None,
    triangulation: str = "ransac",
    weigh_by_confidence: bool = False,
    do_pictorial: bool = False,
    candidates: Candidates | None = None,
    ps_kwargs: dict | None = None,
    ransac_threshold: float = 15.0,
    min_inliers: int = 2,
    reproj_threshold: float = 40.0,
    max_drops: int = 5,
    fps: float = 100.0,
    meta: dict | None = None,
) -> PoseResult

Run the full 2D-to-3D pipeline and return a :class:PoseResult.

Steps: (optional) bundle-adjust cameras -> reconstruct 3D. Unobserved points are expected to already be NaN (the detector's pathway scatter leaves them so).

Parameters:

Name Type Description Default
cameras CameraGroup

The camera rig (refined in place when do_bundle_adjust).

required
skeleton Skeleton

Skeleton used for the bone-length prior.

required
pts2d Float[ndarray, 'V T P 2']

Detector 2D observations of shape (V, T, P, 2), NaN for missing.

required
conf Float[ndarray, 'V T P'] | None

Per-observation confidences (V, T, P), or None.

None
do_bundle_adjust bool

Whether to refine the cameras with bundle adjustment first.

True
bundle_adjust_kwargs dict | None

Extra keyword arguments forwarded to :func:bundle_adjust_cameras.

None
triangulation str

Reconstruction strategy: "ransac" (default, largest multi-view consensus set; ransac_threshold / min_inliers), "greedy" (DLT dropping the worst-reprojecting view; reproj_threshold / max_drops), or "dlt" (plain least squares, no outlier handling).

'ransac'
weigh_by_confidence bool

When True and conf is given, the chosen triangulation uses a confidence-weighted DLT (each view's rows scaled by sqrt(conf)). For "ransac" this weights the candidate fits and final refit but not the consensus vote. Default False (uniform weights).

False
do_pictorial bool

When True, first run pictorial-structures peak recovery over the detector's top-K candidates (:func:deeperfly.pictorial.reconstruct, accepting ps_kwargs like temporal / lam / max_hyp), then feed its committed 2D into triangulation ("dlt" keeps the PS estimate). Bundle adjustment always uses the arg-max pts2d.

False
candidates Candidates | None

The detector's top-K candidate peaks; required when do_pictorial.

None
ps_kwargs dict | None

Extra keyword arguments forwarded to the pictorial-structures corrector.

None
ransac_threshold float

Per-strategy triangulation knobs (see triangulation above).

15.0
min_inliers float

Per-strategy triangulation knobs (see triangulation above).

15.0
reproj_threshold float

Per-strategy triangulation knobs (see triangulation above).

15.0
max_drops float

Per-strategy triangulation knobs (see triangulation above).

15.0
fps float

The recording's frame rate, recorded in the result meta.

100.0
meta dict | None

Extra key/value pairs merged into the result meta.

None

Returns:

Type Description
PoseResult

The bundle-adjusted cameras, committed 2D, triangulated 3D and diagnostics.

Raises:

Type Description
ValueError

If do_pictorial is set but no candidates are given, or triangulation is unknown.

Source code in src/deeperfly/pipeline/core.py
def run_from_points2d(
    cameras: CameraGroup,
    skeleton: Skeleton,
    pts2d: Float[np.ndarray, "V T P 2"],
    conf: Float[np.ndarray, "V T P"] | None = None,
    *,
    do_bundle_adjust: bool = True,
    bundle_adjust_kwargs: dict | None = None,
    triangulation: str = "ransac",
    weigh_by_confidence: bool = False,
    do_pictorial: bool = False,
    candidates: pictorial.Candidates | None = None,
    ps_kwargs: dict | None = None,
    ransac_threshold: float = 15.0,
    min_inliers: int = 2,
    reproj_threshold: float = 40.0,
    max_drops: int = 5,
    fps: float = 100.0,
    meta: dict | None = None,
) -> PoseResult:
    """Run the full 2D-to-3D pipeline and return a :class:`PoseResult`.

    Steps: (optional) bundle-adjust cameras -> reconstruct 3D. Unobserved points are
    expected to already be NaN (the detector's pathway scatter leaves them so).

    Parameters
    ----------
    cameras
        The camera rig (refined in place when ``do_bundle_adjust``).
    skeleton
        Skeleton used for the bone-length prior.
    pts2d
        Detector 2D observations of shape ``(V, T, P, 2)``, NaN for missing.
    conf
        Per-observation confidences ``(V, T, P)``, or ``None``.
    do_bundle_adjust
        Whether to refine the cameras with bundle adjustment first.
    bundle_adjust_kwargs
        Extra keyword arguments forwarded to :func:`bundle_adjust_cameras`.
    triangulation
        Reconstruction strategy: ``"ransac"`` (default, largest multi-view
        consensus set; ``ransac_threshold`` / ``min_inliers``), ``"greedy"`` (DLT
        dropping the worst-reprojecting view; ``reproj_threshold`` / ``max_drops``),
        or ``"dlt"`` (plain least squares, no outlier handling).
    weigh_by_confidence
        When ``True`` and ``conf`` is given, the chosen triangulation uses a
        confidence-weighted DLT (each view's rows scaled by ``sqrt(conf)``). For
        ``"ransac"`` this weights the candidate fits and final refit but not the
        consensus vote. Default ``False`` (uniform weights).
    do_pictorial
        When ``True``, first run pictorial-structures peak recovery over the
        detector's top-K ``candidates`` (:func:`deeperfly.pictorial.reconstruct`,
        accepting ``ps_kwargs`` like ``temporal`` / ``lam`` / ``max_hyp``), then
        feed its committed 2D into ``triangulation`` (``"dlt"`` keeps the PS
        estimate). Bundle adjustment always uses the arg-max ``pts2d``.
    candidates
        The detector's top-K candidate peaks; required when ``do_pictorial``.
    ps_kwargs
        Extra keyword arguments forwarded to the pictorial-structures corrector.
    ransac_threshold, min_inliers, reproj_threshold, max_drops
        Per-strategy triangulation knobs (see ``triangulation`` above).
    fps
        The recording's frame rate, recorded in the result ``meta``.
    meta
        Extra key/value pairs merged into the result ``meta``.

    Returns
    -------
    PoseResult
        The bundle-adjusted cameras, committed 2D, triangulated 3D and diagnostics.

    Raises
    ------
    ValueError
        If ``do_pictorial`` is set but no ``candidates`` are given, or
        ``triangulation`` is unknown.
    """
    method = _validate_triangulation(triangulation)  # validate before bundle-adjusting
    # Unobserved (view, point) pairs are NaN (the detector's pathway scatter leaves
    # them so), which the bundle adjustment and triangulation below treat as "not seen".
    pts2d = np.asarray(pts2d, dtype=float)

    if do_bundle_adjust:
        cameras, _ = bundle_adjust_cameras(
            cameras, pts2d, conf, skeleton, **(bundle_adjust_kwargs or {})
        )

    if do_pictorial:
        if candidates is None:
            raise ValueError("do_pictorial=True requires candidates=...")
        # PS recovers the right peaks; its committed per-view 2D then feeds the
        # triangulator below (a plain "dlt" pass reproduces the PS estimate).
        pts3d, pts2d, reproj = pictorial.reconstruct(
            cameras, skeleton, candidates, pts2d, **(ps_kwargs or {})
        )

    weights = conf if (weigh_by_confidence and conf is not None) else None
    if method == "ransac":
        pts3d, pts2d, reproj = reconstruct_ransac(
            cameras,
            pts2d,
            threshold=ransac_threshold,
            min_inliers=min_inliers,
            weights=weights,
        )
    elif method == "greedy":
        pts3d, pts2d, reproj = reconstruct(
            cameras,
            pts2d,
            reproj_threshold=reproj_threshold,
            max_drops=max_drops,
            weights=weights,
        )
    else:  # "dlt": plain least-squares triangulation, no outlier handling
        pts3d = triangulate(cameras, pts2d, weights)
        reproj = reprojection_error(cameras, pts3d, pts2d)

    return PoseResult(
        cameras=cameras,
        skeleton=skeleton,
        pts2d=pts2d,
        conf=conf,
        pts3d=pts3d,
        reproj_error=reproj,
        meta={
            "fps": fps,
            "triangulation": method,
            "pictorial": do_pictorial,
            **(meta or {}),
        },
    )

run_recording

run_recording(
    config_path: str | None,
    outdir: Path,
    *,
    sources: dict[str, list[Path]] | None = None,
    input=None,
    overwrite: list[str] | None = None,
    progress=None,
) -> None

Run the config's enabled stages for a single recording, reusing cache.

The config is resolved against outdir (see :meth:Config.read_for_run) and its [pipeline].do_<stage> toggles decide which stages run (:meth:Config.stage_flags). An enabled stage reuses its cached result when its parameters are unchanged and its output is present; editing the config recomputes exactly the affected stages (and everything downstream). The pose2d cache always feeds downstream; a derived stage's output feeds downstream only while that stage is enabled.

A stage runs only if its input is available -- footage for pose2d, a 2D pose for bundle_adjustment / triangulation, cached candidates for pictorial_structures, a result for visualization; a stage whose input is missing is skipped with the reason logged.

Parameters:

Name Type Description Default
config_path str | None

The -c config path (or None for the snapshot/default; see :meth:Config.read_for_run).

required
outdir Path

The recording's output directory (config snapshot + cached results).

required
sources dict[str, list[Path]] | None

The recording's footage (see :func:deeperfly.recordings.camera_sources); sources is the pre-resolved map (deeperfly run), input a raw recording directory a library caller can pass instead.

None
input dict[str, list[Path]] | None

The recording's footage (see :func:deeperfly.recordings.camera_sources); sources is the pre-resolved map (deeperfly run), input a raw recording directory a library caller can pass instead.

None
overwrite list[str] | None

Stage names to force-recompute (see :func:deeperfly.pipeline.overwrite_stages); config changes are detected automatically.

None
progress

Optional progress factory threaded into the detector and the compositor.

None
Source code in src/deeperfly/pipeline/run.py
def run_recording(
    config_path: str | None,
    outdir: Path,
    *,
    sources: dict[str, list[Path]] | None = None,
    input=None,
    overwrite: list[str] | None = None,
    progress=None,
) -> None:
    """Run the config's enabled stages for a single recording, reusing cache.

    The config is resolved against ``outdir`` (see :meth:`Config.read_for_run`) and
    its ``[pipeline].do_<stage>`` toggles decide which stages run
    (:meth:`Config.stage_flags`). An enabled stage reuses its cached result when
    its parameters are unchanged and its output is present; editing the config
    recomputes exactly the affected stages (and everything downstream). The
    ``pose2d`` cache always feeds downstream; a *derived* stage's output feeds
    downstream only while that stage is enabled.

    A stage runs only if its input is available -- footage for ``pose2d``, a 2D
    pose for ``bundle_adjustment`` / ``triangulation``, cached candidates for
    ``pictorial_structures``, a result for ``visualization``; a stage whose
    input is missing is skipped with the reason logged.

    Parameters
    ----------
    config_path
        The ``-c`` config path (or ``None`` for the snapshot/default; see
        :meth:`Config.read_for_run`).
    outdir
        The recording's output directory (config snapshot + cached results).
    sources, input
        The recording's footage (see :func:`deeperfly.recordings.camera_sources`);
        ``sources`` is the pre-resolved map (``deeperfly run``), ``input`` a raw
        recording directory a library caller can pass instead.
    overwrite
        Stage names to force-recompute (see
        :func:`deeperfly.pipeline.overwrite_stages`); config changes are
        detected automatically.
    progress
        Optional progress factory threaded into the detector and the compositor.
    """
    outdir = Path(outdir)
    config = Config.read_for_run(config_path, outdir)
    enabled = config.stage_flags()  # config validated at construction
    overwrite_set: set[str] = stages.overwrite_stages(overwrite)

    store = StageStore(outdir / "results.h5")
    record = RunRecord(outdir / "run.json")

    # Validate the footage *before* creating the output dir, so a fresh run that
    # can't read its input fails cleanly instead of leaving an empty dir behind.
    # Only pose2d decodes the recording, and only when it recomputes; a resume
    # reusing a cached 2D pose needs no footage.
    if enabled["pose2d"] and (
        "pose2d" in overwrite_set
        or not stage_valid(
            "pose2d",
            config,
            stage_fingerprint("pose2d", config, enabled, store),
            store,
            record,
            outdir,
        )[0]
    ):
        require_input_footage(config, sources=sources, input=input)

    outdir.mkdir(parents=True, exist_ok=True)
    log.info("output directory: %s", outdir)
    config.save_snapshot(outdir)
    log.info(
        "stages: %s",
        ", ".join(f"{n}={'on' if enabled[n] else 'off'}" for n in STAGES),
    )

    ctx = _RunContext(
        config=config,
        enabled=enabled,
        store=store,
        record=record,
        outdir=outdir,
        sources=sources,
        input=input,
        progress=progress,
    )

    # Which stages carry a result from a previous run, snapshotted before the
    # loop (record.set drops downstream entries as stages complete). A stage
    # with no prior record runs for the first time -- that is not a "recompute"
    # and warrants no reason.
    had_record = {name: record.get(name) is not None for name in STAGES}

    recomputed = False  # has any enabled stage recomputed this run? -> cascade
    for name in STAGES:
        if not enabled[name]:
            continue
        expected = stage_fingerprint(name, config, enabled, store)
        if name in overwrite_set:
            reason = "--overwrite"
        elif recomputed:
            reason = "an upstream stage recomputed (its inputs changed)"
        else:
            ok, why = stage_valid(name, config, expected, store, record, outdir)
            if ok:
                log.info(
                    "reusing cached %s (pass --overwrite %s to force a recompute)",
                    name,
                    name,
                )
                continue
            reason = why or "unknown"
        if had_record[name]:
            _log_recompute(name, reason)
        else:
            log.info("running %s", name)
        if _RUNNERS[name](ctx):
            record.set(name, expected)
            recomputed = True

2D detection

load_detector

load_detector(checkpoint=None, **kwargs)

Load the PyTorch detector, optionally from checkpoint (a .pth).

Parameters:

Name Type Description Default
checkpoint

Path to a .pth checkpoint, or None for a freshly initialized model.

None
**kwargs

Forwarded to :func:deeperfly.pose2d.weights.load_model (e.g. dev).

{}

Returns:

Type Description
The loaded detector model.
Source code in src/deeperfly/pose2d/detector.py
def load_detector(checkpoint=None, **kwargs):
    """Load the PyTorch detector, optionally from ``checkpoint`` (a ``.pth``).

    Parameters
    ----------
    checkpoint
        Path to a ``.pth`` checkpoint, or ``None`` for a freshly initialized model.
    **kwargs
        Forwarded to :func:`deeperfly.pose2d.weights.load_model` (e.g. ``dev``).

    Returns
    -------
    The loaded detector model.
    """
    from . import weights

    return weights.load_model(checkpoint, **kwargs)

detect_2d

detect_2d(
    config: Config,
    plan,
    models: dict,
    *,
    sources: dict[str, list[Path]] | None = None,
    input=None,
    want_candidates,
    k,
    progress=None,
)

Stream 2D detection over decode blocks -> (pts2d, conf, candidates).

Decodes each source in one continuous forward pass (CPU), handing the detector one [pose2d] batch_size-frame block at a time and freeing it before the next, so peak frame memory is bounded by the decode buffer, not the recording length. Each block feeds every pathway on that source (the front source is decoded once, read by both its pathways). Per-block results are concatenated along time. End-of-file comes from the decoder (a short or exhausted block), so it doesn't depend on :meth:deeperfly.io.FrameReader.count being exact -- that is only the progress-bar total.

Parameters:

Name Type Description Default
config Config

The run config (I/O backends, batch size, decode buffer).

required
plan

The detection plan (:class:~deeperfly.pose2d.pathways.DetectionPlan).

required
models dict

name -> LoadedModel for every model the plan references.

required
sources dict[str, list[Path]] | None

The footage to detect over (see :func:deeperfly.recordings.source_sources).

None
input dict[str, list[Path]] | None

The footage to detect over (see :func:deeperfly.recordings.source_sources).

None
want_candidates

Whether to also extract the top-K candidate peaks (for pictorial structures, which are not cached).

required
k

Number of candidate peaks per joint when want_candidates.

required
progress

Optional progress factory progress(total, description) -> (wrap, close); defaults to :func:_null_progress (no bar). The CLI injects a Rich-backed factory.

None

Returns:

Name Type Description
pts2d ndarray

Detected 2D of shape (V, T, P, 2).

conf ndarray

Per-point confidence of shape (V, T, P).

candidates Candidates or None

The top-K candidate set when want_candidates, else None.

Raises:

Type Description
SystemExit

If the detector received no frames.

Source code in src/deeperfly/pose2d/stream.py
def detect_2d(
    config: Config,
    plan,
    models: dict,
    *,
    sources: dict[str, list[Path]] | None = None,
    input=None,
    want_candidates,
    k,
    progress=None,
):
    """Stream 2D detection over decode blocks -> ``(pts2d, conf, candidates)``.

    Decodes each **source** in one continuous forward pass (CPU), handing the
    detector one ``[pose2d] batch_size``-frame block at a time and
    freeing it before the next, so peak frame memory is bounded by the decode
    buffer, not the recording length. Each block feeds every pathway on that
    source (the front source is decoded once, read by both its pathways).
    Per-block results are concatenated along time. End-of-file comes from the
    decoder (a short or exhausted block), so it doesn't depend on
    :meth:`deeperfly.io.FrameReader.count` being exact -- that is only the
    progress-bar total.

    Parameters
    ----------
    config
        The run config (I/O backends, batch size, decode buffer).
    plan
        The detection plan (:class:`~deeperfly.pose2d.pathways.DetectionPlan`).
    models
        ``name -> LoadedModel`` for every model the plan references.
    sources, input
        The footage to detect over (see
        :func:`deeperfly.recordings.source_sources`).
    want_candidates
        Whether to also extract the top-K candidate peaks (for pictorial
        structures, which are not cached).
    k
        Number of candidate peaks per joint when ``want_candidates``.
    progress
        Optional progress factory ``progress(total, description) -> (wrap, close)``;
        defaults to :func:`_null_progress` (no bar). The CLI injects a Rich-backed
        factory.

    Returns
    -------
    pts2d : np.ndarray
        Detected 2D of shape ``(V, T, P, 2)``.
    conf : np.ndarray
        Per-point confidence of shape ``(V, T, P)``.
    candidates : deeperfly.pictorial.Candidates or None
        The top-K candidate set when ``want_candidates``, else ``None``.

    Raises
    ------
    SystemExit
        If the detector received no frames.
    """
    from .. import io
    from ..pictorial import Candidates
    from ..recordings import source_sources
    from . import inference

    pose2d = config.pose2d
    workers = config.io.image_workers
    # Two knobs: the GPU forward batch (images/forward), and the decode buffer in
    # multiples of it. A block holds one batch of frames; the reader keeps up to
    # `depth` of them queued (>= 1 so the queue stays bounded -- 0 is unbounded).
    batch_size = pose2d.batch_size
    depth = pose2d.decode_buffer
    block = batch_size
    src_list = source_sources(config, sources=sources, input=input)
    src_names = [name for name, _ in src_list]
    src_files = [files for _, files in src_list]
    # One head reader for the first source serves the progress-bar total -- the
    # source kind is resolved once here.
    head = io.open_reader(src_files[0]) if src_files else None
    total = head.count() if head is not None else 0

    log.info(
        "streaming frames: forward batch %d, decode buffer %d batches (%d frames/source)",
        batch_size,
        depth,
        depth * batch_size,
    )

    make_progress = progress or _null_progress
    pts_parts, conf_parts, cand_xy, cand_score = [], [], [], []

    with make_progress(total, "detect 2D") as wrap:
        for window, _ in prefetch_windows(
            src_files,
            block=block,
            depth=depth,
            workers=workers,
        ):
            windows = {name: window[i] for i, name in enumerate(src_names)}
            if want_candidates:
                p, c, cand = inference.detect_candidates_sequence(
                    plan, models, windows, k=k, progress=wrap
                )
                cand_xy.append(cand.xy)
                cand_score.append(cand.score)
            else:
                p, c = inference.detect_sequence(
                    plan,
                    models,
                    windows,
                    batch_size=batch_size,
                    progress=wrap,
                )
            pts_parts.append(p)
            conf_parts.append(c)
            del window  # release this window's frames before the next is consumed

    if not pts_parts:
        raise SystemExit("detector received no frames")
    pts2d = np.concatenate(pts_parts, axis=1)
    conf = np.concatenate(conf_parts, axis=1)
    candidates = (
        Candidates(
            xy=np.concatenate(cand_xy, axis=1), score=np.concatenate(cand_score, axis=1)
        )
        if cand_xy
        else None
    )
    return pts2d, conf, candidates

Geometry primitives

geometry

Multi-view geometry primitives with JAX.

Conventions:

  • Points carry their dimensionality in the last axis: pts3d has shape (..., 3); pts2d has shape (..., 2).
  • Camera extrinsics are an axis-angle rotation vector rvec and a translation vector tvec. A 3D world point X is mapped to camera coordinates as R(rvec) @ X + tvec.
  • Camera intrinsics intr are packed as [fx, fy, cx, cy] (or [f, cx, cy] with fx = fy = f) (see :func:intr_to_kmat); distortion coefficients dists follow OpenCV's ordering [k1, k2, p1, p2, k3, k4, k5, k6, s1, s2, s3, s4].
  • A projection matrix pmat is the 3x4 product K @ [R | t].

Functions take their primary operand (pts3d, pts2d) first and camera parameters after, in the canonical order rvecs, tvecs, intrs, dists. All are JIT- and grad-friendly.

The batched public functions are :func:jax.jit-wrapped thin :func:jax.vmap wrappers around the *_one single-observation variants, which compose with :func:jax.vmap / :func:jax.jacfwd for bundle adjustment. deeperfly installs only CPU JAX -- the tiny arrays don't benefit from a GPU.

rvec_to_rmat_one

rvec_to_rmat_one(
    rvec: Float[Array, "3"],
) -> Float[Array, "3 3"]

Rodrigues' rotation for a single rotation vector.

Single-instance variant of :func:rvec_to_rmat for use under :func:jax.vmap and :func:jax.jacfwd.

Parameters:

Name Type Description Default
rvec Float[Array, '3']

Axis-angle rotation vector of shape (3,) (direction = axis, magnitude = angle in radians).

required

Returns:

Type Description
Rotation matrix of shape ``(3, 3)``.
Source code in src/deeperfly/geometry.py
def rvec_to_rmat_one(rvec: Float[Array, "3"]) -> Float[Array, "3 3"]:
    """Rodrigues' rotation for a single rotation vector.

    Single-instance variant of :func:`rvec_to_rmat` for use under
    :func:`jax.vmap` and :func:`jax.jacfwd`.

    Parameters
    ----------
    rvec
        Axis-angle rotation vector of shape ``(3,)`` (direction = axis,
        magnitude = angle in radians).

    Returns
    -------
    Rotation matrix of shape ``(3, 3)``.
    """
    theta_sq = jnp.dot(rvec, rvec)
    theta = jnp.sqrt(theta_sq)
    small = theta_sq < _SMALL_THETA_SQ
    sinc_theta = jnp.where(
        small,
        1 - theta_sq / 6 + theta_sq**2 / 120,
        jnp.sin(theta) / jnp.where(small, 1.0, theta),
    )
    cosc_theta = jnp.where(
        small,
        0.5 - theta_sq / 24 + theta_sq**2 / 720,
        (1 - jnp.cos(theta)) / jnp.where(small, 1.0, theta_sq),
    )
    rx, ry, rz = rvec[0], rvec[1], rvec[2]
    skew = jnp.array(
        [
            [0.0, -rz, ry],
            [rz, 0.0, -rx],
            [-ry, rx, 0.0],
        ]
    )
    outer = jnp.outer(rvec, rvec)
    return (
        (1 - cosc_theta * theta_sq) * jnp.eye(3)
        + sinc_theta * skew
        + cosc_theta * outer
    )

rmat_to_rvec_one

rmat_to_rvec_one(
    rmat: Float[Array, "3 3"],
) -> Float[Array, "3"]

Rotation matrix to axis-angle vector for a single rotation.

Single-instance variant of :func:rmat_to_rvec.

Parameters:

Name Type Description Default
rmat Float[Array, '3 3']

Rotation matrix of shape (3, 3).

required

Returns:

Type Description
Axis-angle rotation vector of shape ``(3,)``.
Source code in src/deeperfly/geometry.py
def rmat_to_rvec_one(rmat: Float[Array, "3 3"]) -> Float[Array, "3"]:
    """Rotation matrix to axis-angle vector for a single rotation.

    Single-instance variant of :func:`rmat_to_rvec`.

    Parameters
    ----------
    rmat
        Rotation matrix of shape ``(3, 3)``.

    Returns
    -------
    Axis-angle rotation vector of shape ``(3,)``.
    """
    r00, r01, r02 = rmat[0, 0], rmat[0, 1], rmat[0, 2]
    r10, r11, r12 = rmat[1, 0], rmat[1, 1], rmat[1, 2]
    r20, r21, r22 = rmat[2, 0], rmat[2, 1], rmat[2, 2]

    rho = jnp.array([r21 - r12, r02 - r20, r10 - r01])
    sin_theta = jnp.linalg.norm(rho) / 2
    cos_theta = jnp.clip((r00 + r11 + r22 - 1) / 2, -1.0, 1.0)
    theta = jnp.arccos(cos_theta)
    near_pi = sin_theta < _NEAR_PI_SIN_THRESH

    rvec = rho * (theta / jnp.where(near_pi, 1.0, 2.0 * sin_theta))

    ax = jnp.sqrt(jnp.maximum((r00 + 1.0) / 2, 0.0))
    ay = jnp.sqrt(jnp.maximum((r11 + 1.0) / 2, 0.0)) * jnp.where(r01 < 0, -1.0, 1.0)
    az = jnp.sqrt(jnp.maximum((r22 + 1.0) / 2, 0.0)) * jnp.where(r02 < 0, -1.0, 1.0)
    flip = (
        (jnp.abs(ax) < jnp.abs(ay))
        & (jnp.abs(ax) < jnp.abs(az))
        & ((r12 > 0) != (ay * az > 0))
    )
    az = jnp.where(flip, -az, az)
    axis = jnp.array([ax, ay, az])
    axis_norm = jnp.linalg.norm(axis)
    rvec_pi = axis * (theta / jnp.where(axis_norm == 0, 1.0, axis_norm))
    rvec_degenerate = jnp.where(cos_theta > 0, 0.0, rvec_pi)

    return jnp.where(near_pi, rvec_degenerate, rvec)

distort_one

distort_one(
    xy: Float[Array, "2"], dist: Float[Array, "K"]
) -> Float[Array, "2"]

Distortion model applied to a single 2D point.

Single-instance variant of :func:distort.

Parameters:

Name Type Description Default
xy Float[Array, '2']

Normalized 2D coordinate of shape (2,).

required
dist Float[Array, 'K']

Distortion coefficients of shape (K,) with K in {0, 1, ..., 12}.

required

Returns:

Type Description
Distorted 2D coordinate of shape ``(2,)``.
Source code in src/deeperfly/geometry.py
def distort_one(
    xy: Float[Array, "2"],
    dist: Float[Array, "K"],
) -> Float[Array, "2"]:
    """Distortion model applied to a single 2D point.

    Single-instance variant of :func:`distort`.

    Parameters
    ----------
    xy
        Normalized 2D coordinate of shape ``(2,)``.
    dist
        Distortion coefficients of shape ``(K,)`` with ``K`` in
        ``{0, 1, ..., 12}``.

    Returns
    -------
    Distorted 2D coordinate of shape ``(2,)``.
    """
    n = dist.shape[-1]
    if n == 0:
        return xy
    x, y = xy[0], xy[1]
    x2, y2 = x * x, y * y
    r2 = x2 + y2

    num = 1.0 + dist[0] * r2
    r4 = r6 = jnp.zeros(())
    if n >= 2:
        r4 = r2 * r2
        num = num + dist[1] * r4
    if n >= 5:
        r6 = r4 * r2
        num = num + dist[4] * r6
    den = 1.0
    if n >= 6:
        den = den + dist[5] * r2
    if n >= 7:
        den = den + dist[6] * r4
    if n >= 8:
        den = den + dist[7] * r6
    mult = num / den

    add_x = jnp.zeros(())
    add_y = jnp.zeros(())
    if n >= 3:
        xy_prod = x * y
        add_x = 2 * dist[2] * xy_prod
        add_y = dist[2] * (r2 + 2 * y2)
    if n >= 4:
        add_x = add_x + dist[3] * (r2 + 2 * x2)
        add_y = add_y + 2 * dist[3] * xy_prod
    if n >= 9:
        add_x = add_x + dist[8] * r2
    if n >= 10:
        add_x = add_x + dist[9] * r4
    if n >= 11:
        add_y = add_y + dist[10] * r2
    if n >= 12:
        add_y = add_y + dist[11] * r4

    return jnp.stack([x * mult + add_x, y * mult + add_y])

undistort_one

undistort_one(
    xy_dist: Float[Array, "2"], dist: Float[Array, "K"]
) -> Float[Array, "2"]

Invert :func:distort_one: distorted normalized coords -> undistorted.

Recovers the undistorted normalized coordinate (x, y) whose :func:distort_one image is xy_dist by OpenCV's fixed-point iteration (the same scheme as cv2.undistortPoints): starting from xy_dist, repeatedly apply x <- (x_d - tangential) * den / num with the radial factor num / den and the tangential / thin-prism terms evaluated at the current estimate. The locus of 3D points projecting to a fixed pixel is a ray through the camera center regardless of distortion (distortion is a function of the normalized direction only), so this is the step that turns a clicked pixel into a back-projection direction (see :func:backproject_ray_one). An empty dist is the identity.

Parameters:

Name Type Description Default
xy_dist Float[Array, '2']

Distorted normalized 2D coordinate of shape (2,) (i.e. intrinsics already removed: ((u - cx) / fx, (v - cy) / fy)).

required
dist Float[Array, 'K']

Distortion coefficients of shape (K,) with K in {0, 1, ..., 12} (same ordering as :func:distort_one).

required

Returns:

Type Description
Undistorted normalized 2D coordinate of shape ``(2,)``.
Source code in src/deeperfly/geometry.py
def undistort_one(
    xy_dist: Float[Array, "2"],
    dist: Float[Array, "K"],
) -> Float[Array, "2"]:
    """Invert :func:`distort_one`: distorted normalized coords -> undistorted.

    Recovers the undistorted normalized coordinate ``(x, y)`` whose
    :func:`distort_one` image is ``xy_dist`` by OpenCV's fixed-point iteration
    (the same scheme as ``cv2.undistortPoints``): starting from ``xy_dist``,
    repeatedly apply ``x <- (x_d - tangential) * den / num`` with the radial
    factor ``num / den`` and the tangential / thin-prism terms evaluated at the
    current estimate. The locus of 3D points projecting to a fixed pixel is a
    ray through the camera center regardless of distortion (distortion is a
    function of the normalized direction only), so this is the step that turns a
    clicked pixel into a back-projection direction (see
    :func:`backproject_ray_one`). An empty ``dist`` is the identity.

    Parameters
    ----------
    xy_dist
        Distorted normalized 2D coordinate of shape ``(2,)`` (i.e. intrinsics
        already removed: ``((u - cx) / fx, (v - cy) / fy)``).
    dist
        Distortion coefficients of shape ``(K,)`` with ``K`` in
        ``{0, 1, ..., 12}`` (same ordering as :func:`distort_one`).

    Returns
    -------
    Undistorted normalized 2D coordinate of shape ``(2,)``.
    """
    n = dist.shape[-1]
    if n == 0:
        return xy_dist
    xd, yd = xy_dist[0], xy_dist[1]
    x, y = xd, yd
    for _ in range(_UNDISTORT_ITERS):
        x2, y2 = x * x, y * y
        r2 = x2 + y2
        r4 = r2 * r2
        r6 = r4 * r2

        num = 1.0 + dist[0] * r2
        if n >= 2:
            num = num + dist[1] * r4
        if n >= 5:
            num = num + dist[4] * r6
        den = 1.0
        if n >= 6:
            den = den + dist[5] * r2
        if n >= 7:
            den = den + dist[6] * r4
        if n >= 8:
            den = den + dist[7] * r6
        icdist = den / num

        xy_prod = x * y
        add_x = jnp.zeros(())
        add_y = jnp.zeros(())
        if n >= 3:
            add_x = 2 * dist[2] * xy_prod
            add_y = dist[2] * (r2 + 2 * y2)
        if n >= 4:
            add_x = add_x + dist[3] * (r2 + 2 * x2)
            add_y = add_y + 2 * dist[3] * xy_prod
        if n >= 9:
            add_x = add_x + dist[8] * r2
        if n >= 10:
            add_x = add_x + dist[9] * r4
        if n >= 11:
            add_y = add_y + dist[10] * r2
        if n >= 12:
            add_y = add_y + dist[11] * r4

        x = (xd - add_x) * icdist
        y = (yd - add_y) * icdist
    return jnp.stack([x, y])

project_full_one

project_full_one(
    pt3d: Float[Array, "3"],
    rvec: Float[Array, "3"],
    tvec: Float[Array, "3"],
    intr: Float[Array, "P"],
    dist: Float[Array, "K"],
) -> Float[Array, "2"]

Project a single 3D point through a single camera.

Single-instance variant of :func:project_full designed to be composed with :func:jax.vmap over observations and :func:jax.jacfwd over the camera parameters and the 3D point.

Argument order matches :func:project_full: operand (pt3d) first, then camera parameters in the canonical order rvec, tvec, intr, dist.

Parameters:

Name Type Description Default
pt3d Float[Array, '3']

3D world point of shape (3,).

required
rvec Float[Array, '3']

Axis-angle rotation vector of shape (3,).

required
tvec Float[Array, '3']

Translation vector of shape (3,).

required
intr Float[Array, 'P']

Packed intrinsics of shape (P,); see :func:intr_to_kmat.

required
dist Float[Array, 'K']

Distortion coefficients of shape (K,).

required

Returns:

Type Description
Projected 2D image point of shape ``(2,)``.
Source code in src/deeperfly/geometry.py
def project_full_one(
    pt3d: Float[Array, "3"],
    rvec: Float[Array, "3"],
    tvec: Float[Array, "3"],
    intr: Float[Array, "P"],
    dist: Float[Array, "K"],
) -> Float[Array, "2"]:
    """Project a single 3D point through a single camera.

    Single-instance variant of :func:`project_full` designed to be composed
    with :func:`jax.vmap` over observations and :func:`jax.jacfwd` over the
    camera parameters and the 3D point.

    Argument order matches :func:`project_full`: operand (``pt3d``) first,
    then camera parameters in the canonical order
    ``rvec, tvec, intr, dist``.

    Parameters
    ----------
    pt3d
        3D world point of shape ``(3,)``.
    rvec
        Axis-angle rotation vector of shape ``(3,)``.
    tvec
        Translation vector of shape ``(3,)``.
    intr
        Packed intrinsics of shape ``(P,)``; see :func:`intr_to_kmat`.
    dist
        Distortion coefficients of shape ``(K,)``.

    Returns
    -------
    Projected 2D image point of shape ``(2,)``.
    """
    rmat = rvec_to_rmat_one(rvec)
    p_cam = rmat @ pt3d + tvec
    xy = p_cam[:2] / p_cam[2]
    xy = distort_one(xy, dist)
    fx, fy = intr[0], intr[-3]
    cx, cy = intr[-2], intr[-1]
    return jnp.stack([fx * xy[0] + cx, fy * xy[1] + cy])

backproject_ray_one

backproject_ray_one(
    pixel: Float[Array, "2"],
    rvec: Float[Array, "3"],
    tvec: Float[Array, "3"],
    intr: Float[Array, "P"],
    dist: Float[Array, "K"],
) -> tuple[Float[Array, "3"], Float[Array, "3"]]

Back-project an image pixel to its viewing ray in world coordinates.

Inverts :func:project_full_one: strips the intrinsics (xy_d = ((u - cx) / fx, (v - cy) / fy)), undistorts via :func:undistort_one to the normalized direction xy, lifts it to the camera-frame ray direction [x, y, 1] and rotates it into the world. The ray origin + s * direction (s >= 0) is exactly the set of world points that :func:project_full_one maps back to pixel through this camera. Pair with :func:closest_point_on_ray to move a triangulated 3D point onto the ray while staying as close as possible to its old location.

Parameters:

Name Type Description Default
pixel Float[Array, '2']

Image point of shape (2,) in pixels.

required
rvec Float[Array, '3']

Axis-angle rotation vector of shape (3,).

required
tvec Float[Array, '3']

Translation vector of shape (3,).

required
intr Float[Array, 'P']

Packed intrinsics of shape (P,); see :func:intr_to_kmat.

required
dist Float[Array, 'K']

Distortion coefficients of shape (K,).

required

Returns:

Type Description
origin, direction : Float[Array, "3"]

The camera center -R(rvec).T @ tvec and the (unnormalized) world-frame ray direction.

Source code in src/deeperfly/geometry.py
def backproject_ray_one(
    pixel: Float[Array, "2"],
    rvec: Float[Array, "3"],
    tvec: Float[Array, "3"],
    intr: Float[Array, "P"],
    dist: Float[Array, "K"],
) -> tuple[Float[Array, "3"], Float[Array, "3"]]:
    """Back-project an image pixel to its viewing ray in world coordinates.

    Inverts :func:`project_full_one`: strips the intrinsics
    (``xy_d = ((u - cx) / fx, (v - cy) / fy)``), undistorts via
    :func:`undistort_one` to the normalized direction ``xy``, lifts it to the
    camera-frame ray direction ``[x, y, 1]`` and rotates it into the world. The
    ray ``origin + s * direction`` (``s >= 0``) is exactly the set of world
    points that :func:`project_full_one` maps back to ``pixel`` through this
    camera. Pair with :func:`closest_point_on_ray` to move a triangulated 3D
    point onto the ray while staying as close as possible to its old location.

    Parameters
    ----------
    pixel
        Image point of shape ``(2,)`` in pixels.
    rvec
        Axis-angle rotation vector of shape ``(3,)``.
    tvec
        Translation vector of shape ``(3,)``.
    intr
        Packed intrinsics of shape ``(P,)``; see :func:`intr_to_kmat`.
    dist
        Distortion coefficients of shape ``(K,)``.

    Returns
    -------
    origin, direction : Float[Array, "3"]
        The camera center ``-R(rvec).T @ tvec`` and the (unnormalized) world-frame
        ray direction.
    """
    fx, fy = intr[0], intr[-3]
    cx, cy = intr[-2], intr[-1]
    xy_dist = jnp.stack([(pixel[0] - cx) / fx, (pixel[1] - cy) / fy])
    xy = undistort_one(xy_dist, dist)
    dir_cam = jnp.stack([xy[0], xy[1], jnp.ones(())])
    rmat = rvec_to_rmat_one(rvec)
    direction = rmat.T @ dir_cam
    origin = -rmat.T @ tvec
    return origin, direction

closest_point_on_ray

closest_point_on_ray(
    origin: Float[Array, "3"],
    direction: Float[Array, "3"],
    target: Float[Array, "3"],
) -> Float[Array, "3"]

The point on a ray nearest a target point (orthogonal projection).

Returns origin + s * direction with s = (target - origin) . direction / (direction . direction) -- the unique closest point on the infinite line through origin along direction. Used by the 3D-correction drag: direction / origin come from :func:backproject_ray_one for the pixel the user dragged to, and target is the point's pre-drag 3D position, so the result reprojects exactly onto the dragged pixel while moving the least in 3D.

Parameters:

Name Type Description Default
origin Float[Array, '3']

A point on the ray, shape (3,).

required
direction Float[Array, '3']

The ray direction, shape (3,) (need not be normalized).

required
target Float[Array, '3']

The point to approach, shape (3,).

required

Returns:

Type Description
The closest point on the ray, shape ``(3,)``.
Source code in src/deeperfly/geometry.py
def closest_point_on_ray(
    origin: Float[Array, "3"],
    direction: Float[Array, "3"],
    target: Float[Array, "3"],
) -> Float[Array, "3"]:
    """The point on a ray nearest a target point (orthogonal projection).

    Returns ``origin + s * direction`` with
    ``s = (target - origin) . direction / (direction . direction)`` -- the
    unique closest point on the infinite line through ``origin`` along
    ``direction``. Used by the 3D-correction drag: ``direction`` / ``origin``
    come from :func:`backproject_ray_one` for the pixel the user dragged to, and
    ``target`` is the point's pre-drag 3D position, so the result reprojects
    exactly onto the dragged pixel while moving the least in 3D.

    Parameters
    ----------
    origin
        A point on the ray, shape ``(3,)``.
    direction
        The ray direction, shape ``(3,)`` (need not be normalized).
    target
        The point to approach, shape ``(3,)``.

    Returns
    -------
    The closest point on the ray, shape ``(3,)``.
    """
    s = jnp.dot(target - origin, direction) / jnp.dot(direction, direction)
    return origin + s * direction

intr_to_kmat

intr_to_kmat(
    intr: Float[Array, "*batch P"],
) -> Float[Array, "*batch 3 3"]

Build 3x3 camera intrinsic matrices from packed intrinsic parameters.

Parameters:

Name Type Description Default
intr Float[Array, '*batch P']

Packed intrinsic parameters of shape (..., 3) or (..., 4) with the last dimensions corresponding to [fx, cx, cy] or [fx, fy, cx, cy], respectively.

required

Returns:

Type Description
Camera matrices ``K`` of shape ``(..., 3, 3)``.
Source code in src/deeperfly/geometry.py
@jax.jit
def intr_to_kmat(
    intr: Float[Array, "*batch P"],
) -> Float[Array, "*batch 3 3"]:
    """Build 3x3 camera intrinsic matrices from packed intrinsic parameters.

    Parameters
    ----------
    intr
        Packed intrinsic parameters of shape ``(..., 3)`` or ``(..., 4)``
        with the last dimensions corresponding to [fx, cx, cy] or
        [fx, fy, cx, cy], respectively.

    Returns
    -------
    Camera matrices ``K`` of shape ``(..., 3, 3)``.
    """
    kmat = jnp.zeros((*intr.shape[:-1], 3, 3))
    kmat = kmat.at[..., 0, 0].set(intr[..., 0])
    kmat = kmat.at[..., 1, 1].set(intr[..., -3])
    kmat = kmat.at[..., :2, 2].set(intr[..., -2:])
    kmat = kmat.at[..., 2, 2].set(1.0)
    return kmat

rvec_to_rmat

rvec_to_rmat(
    rvec: Float[Array, "*batch 3"],
) -> Float[Array, "*batch 3 3"]

Convert axis-angle rotation vectors to rotation matrices (Rodrigues).

Implements R = I + a * W + b * W^2 with W = skew(rvec), a = sin(theta) / theta and b = (1 - cos(theta)) / theta^2. Working on the unnormalized axis avoids 0/0 at theta = 0; evaluating a and b from their Taylor expansions for small theta sidesteps the catastrophic cancellation in 1 - cos(theta), keeping the result orthogonal to machine precision even for tiny rotations. W^2 is expanded as rvec . rvec^T - theta^2 * I to avoid a batched matmul.

Parameters:

Name Type Description Default
rvec Float[Array, '*batch 3']

Axis-angle rotation vectors of shape (..., 3). The direction is the rotation axis; the magnitude is the angle in radians.

required

Returns:

Type Description
Rotation matrices of shape ``(..., 3, 3)``.
Source code in src/deeperfly/geometry.py
@jax.jit
def rvec_to_rmat(
    rvec: Float[Array, "*batch 3"],
) -> Float[Array, "*batch 3 3"]:
    """Convert axis-angle rotation vectors to rotation matrices (Rodrigues).

    Implements ``R = I + a * W + b * W^2`` with ``W = skew(rvec)``,
    ``a = sin(theta) / theta`` and ``b = (1 - cos(theta)) / theta^2``. Working
    on the unnormalized axis avoids ``0/0`` at ``theta = 0``; evaluating ``a``
    and ``b`` from their Taylor expansions for small ``theta`` sidesteps the
    catastrophic cancellation in ``1 - cos(theta)``, keeping the result
    orthogonal to machine precision even for tiny rotations. ``W^2`` is
    expanded as ``rvec . rvec^T - theta^2 * I`` to avoid a batched matmul.

    Parameters
    ----------
    rvec
        Axis-angle rotation vectors of shape ``(..., 3)``. The direction is
        the rotation axis; the magnitude is the angle in radians.

    Returns
    -------
    Rotation matrices of shape ``(..., 3, 3)``.
    """
    flat = rvec.reshape(-1, 3)
    out = jax.vmap(rvec_to_rmat_one)(flat)
    return out.reshape(*rvec.shape[:-1], 3, 3)

rmat_to_rvec

rmat_to_rvec(
    rmat: Float[Array, "*batch 3 3"],
) -> Float[Array, "*batch 3"]

Convert rotation matrices to axis-angle rotation vectors.

Vectorized port of OpenCV's Rodrigues (matrix -> vector). The axis is read off the antisymmetric part R - R^T in the generic case, but that part vanishes at theta = pi (R becomes symmetric), so near theta = pi the axis is instead recovered from the symmetric part (R + I) / 2 with the signs disambiguated from the off-diagonal entries.

Parameters:

Name Type Description Default
rmat Float[Array, '*batch 3 3']

Rotation matrices of shape (..., 3, 3).

required

Returns:

Type Description
Axis-angle rotation vectors of shape ``(..., 3)``.
Source code in src/deeperfly/geometry.py
@jax.jit
def rmat_to_rvec(
    rmat: Float[Array, "*batch 3 3"],
) -> Float[Array, "*batch 3"]:
    """Convert rotation matrices to axis-angle rotation vectors.

    Vectorized port of OpenCV's ``Rodrigues`` (matrix -> vector). The axis is
    read off the antisymmetric part ``R - R^T`` in the generic case, but that
    part vanishes at ``theta = pi`` (``R`` becomes symmetric), so near
    ``theta = pi`` the axis is instead recovered from the symmetric part
    ``(R + I) / 2`` with the signs disambiguated from the off-diagonal entries.

    Parameters
    ----------
    rmat
        Rotation matrices of shape ``(..., 3, 3)``.

    Returns
    -------
    Axis-angle rotation vectors of shape ``(..., 3)``.
    """
    flat = rmat.reshape(-1, 3, 3)
    out = jax.vmap(rmat_to_rvec_one)(flat)
    return out.reshape(*rmat.shape[:-2], 3)

project_pmat

project_pmat(
    pts3d: Float[Array, "*pts 3"],
    pmats: Float[Array, "*cams 3 4"],
) -> Float[Array, "*cams *pts 2"]

Project 3D world points to 2D image points using 3x4 projection matrices.

Parameters:

Name Type Description Default
pts3d Float[Array, '*pts 3']

3D points of shape (*pts, 3).

required
pmats Float[Array, '*cams 3 4']

Projection matrices of shape (*cams, 3, 4) -- typically K @ [R | t].

required

Returns:

Type Description
2D image points of shape ``(*cams, *pts, 2)``.
Source code in src/deeperfly/geometry.py
@jax.jit
def project_pmat(
    pts3d: Float[Array, "*pts 3"],
    pmats: Float[Array, "*cams 3 4"],
) -> Float[Array, "*cams *pts 2"]:
    """Project 3D world points to 2D image points using 3x4 projection matrices.

    Parameters
    ----------
    pts3d
        3D points of shape ``(*pts, 3)``.
    pmats
        Projection matrices of shape ``(*cams, 3, 4)`` -- typically
        ``K @ [R | t]``.

    Returns
    -------
    2D image points of shape ``(*cams, *pts, 2)``.
    """
    output_shape = (*pmats.shape[:-2], *pts3d.shape[:-1], 2)
    pmats_flat = pmats.reshape(-1, 3, 4)
    pts_flat = pts3d.reshape(-1, 3).T
    pts2dh = pmats_flat[:, :, :3] @ pts_flat + pmats_flat[:, :, 3:]
    pts2d = (pts2dh[:, :2] / pts2dh[:, 2:]).transpose(0, 2, 1)
    return pts2d.reshape(output_shape)

triangulate_dlt

triangulate_dlt(
    pts2d: Float[Array, "V *pts 2"],
    pmats: Float[Array, "V 3 4"],
    weights: Float[Array, "V *pts"] | None = None,
) -> Float[Array, "*pts 3"]

Triangulate 3D points by direct linear transformation (DLT).

For each point, stacks two rows per view of the linear system [x * p3 - p1; y * p3 - p2] @ [X, Y, Z, 1]^T = 0 (where pi is the i-th row of pmat) and solves for the homogeneous coordinates as the right-singular vector for the smallest singular value. The smallest right-singular vector of A equals the eigenvector of A^T A for the smallest eigenvalue, which is faster than SVD for the 4x4 problem. NaN observations are zeroed out; points with fewer than two valid views are returned as NaN.

Parameters:

Name Type Description Default
pts2d Float[Array, 'V *pts 2']

2D observations of shape (V, *pts, 2). NaN entries indicate missing observations.

required
pmats Float[Array, 'V 3 4']

Projection matrices of shape (V, 3, 4).

required
weights Float[Array, 'V *pts'] | None

Optional per-(view, point) weights of shape (V, *pts). Each view's two rows are scaled by sqrt(weight), so a view contributes weight times its squared algebraic error -- the same convention bundle adjustment uses for detector confidences. The DLT solves a homogeneous system, so a uniform positive weight leaves the result unchanged; None (default) is plain unweighted DLT. The NaN-zeroing above is exactly the weight = 0 case, so the two compose: a zero (or NaN) weight drops the view. Negative and non-finite weights are clamped to zero.

None

Returns:

Type Description
Triangulated 3D points of shape ``(*pts, 3)``.
Source code in src/deeperfly/geometry.py
@jax.jit
def triangulate_dlt(
    pts2d: Float[Array, "V *pts 2"],
    pmats: Float[Array, "V 3 4"],
    weights: Float[Array, "V *pts"] | None = None,
) -> Float[Array, "*pts 3"]:
    """Triangulate 3D points by direct linear transformation (DLT).

    For each point, stacks two rows per view of the linear system
    ``[x * p3 - p1; y * p3 - p2] @ [X, Y, Z, 1]^T = 0`` (where ``pi`` is the
    i-th row of ``pmat``) and solves for the homogeneous coordinates as the
    right-singular vector for the smallest singular value. The smallest
    right-singular vector of ``A`` equals the eigenvector of ``A^T A`` for the
    smallest eigenvalue, which is faster than SVD for the 4x4 problem. NaN
    observations are zeroed out; points with fewer than two valid views are
    returned as NaN.

    Parameters
    ----------
    pts2d
        2D observations of shape ``(V, *pts, 2)``. NaN entries indicate
        missing observations.
    pmats
        Projection matrices of shape ``(V, 3, 4)``.
    weights
        Optional per-(view, point) weights of shape ``(V, *pts)``. Each view's
        two rows are scaled by ``sqrt(weight)``, so a view contributes ``weight``
        times its squared algebraic error -- the same convention bundle
        adjustment uses for detector confidences. The DLT solves a homogeneous
        system, so a uniform positive weight leaves the result unchanged; ``None``
        (default) is plain unweighted DLT. The NaN-zeroing above is exactly the
        ``weight = 0`` case, so the two compose: a zero (or NaN) weight drops the
        view. Negative and non-finite weights are clamped to zero.

    Returns
    -------
    Triangulated 3D points of shape ``(*pts, 3)``.
    """
    a = jnp.einsum("v...i,vj->...vij", pts2d, pmats[:, 2]) - pmats[:, :2]
    valid = jnp.moveaxis(jnp.isfinite(pts2d).all(axis=-1), 0, -1)
    a = jnp.where(valid[..., None, None], a, 0.0)
    if weights is not None:
        w = jnp.moveaxis(jnp.asarray(weights, dtype=a.dtype), 0, -1)  # (*pts, V)
        w = jnp.where(jnp.isfinite(w), jnp.maximum(w, 0.0), 0.0)
        a = a * jnp.sqrt(w)[..., None, None]
    a = a.reshape((*a.shape[:-3], -1, 4))
    ata = jnp.einsum("...ij,...ik->...jk", a, a)
    pts3dh = jnp.linalg.eigh(ata)[1][..., :, 0]
    pts3d = pts3dh[..., :3] / pts3dh[..., 3:]
    return jnp.where((valid.sum(axis=-1) < 2)[..., None], jnp.nan, pts3d)

distort

distort(
    pts2d: Float[Array, "V *pts 2"],
    dists: Float[Array, "V K"],
) -> Float[Array, "V *pts 2"]

Apply OpenCV-style radial + tangential + thin-prism distortion.

For normalized image coordinates (x, y) with r^2 = x^2 + y^2, the distortion model with up to 12 coefficients [k1, k2, p1, p2, k3, k4, k5, k6, s1, s2, s3, s4] is

.. code-block:: text

x_d = x * (1 + k1 r^2 + k2 r^4 + k3 r^6)
        / (1 + k4 r^2 + k5 r^4 + k6 r^6)
      + 2 p1 x y + p2 (r^2 + 2 x^2) + s1 r^2 + s2 r^4
y_d = y * (...) / (...)
      + p1 (r^2 + 2 y^2) + 2 p2 x y + s3 r^2 + s4 r^4

Coefficients beyond K are taken as zero, matching cv2.projectPoints.

Parameters:

Name Type Description Default
pts2d Float[Array, 'V *pts 2']

Normalized 2D coordinates of shape (V, *pts, 2).

required
dists Float[Array, 'V K']

Distortion coefficients of shape (V, K) with K in {0, 1, ..., 12}.

required

Returns:

Type Description
Distorted 2D coordinates of shape ``(V, *pts, 2)``.
Source code in src/deeperfly/geometry.py
@jax.jit
def distort(
    pts2d: Float[Array, "V *pts 2"],
    dists: Float[Array, "V K"],
) -> Float[Array, "V *pts 2"]:
    """Apply OpenCV-style radial + tangential + thin-prism distortion.

    For normalized image coordinates ``(x, y)`` with ``r^2 = x^2 + y^2``, the
    distortion model with up to 12 coefficients
    ``[k1, k2, p1, p2, k3, k4, k5, k6, s1, s2, s3, s4]`` is

    .. code-block:: text

        x_d = x * (1 + k1 r^2 + k2 r^4 + k3 r^6)
                / (1 + k4 r^2 + k5 r^4 + k6 r^6)
              + 2 p1 x y + p2 (r^2 + 2 x^2) + s1 r^2 + s2 r^4
        y_d = y * (...) / (...)
              + p1 (r^2 + 2 y^2) + 2 p2 x y + s3 r^2 + s4 r^4

    Coefficients beyond ``K`` are taken as zero, matching ``cv2.projectPoints``.

    Parameters
    ----------
    pts2d
        Normalized 2D coordinates of shape ``(V, *pts, 2)``.
    dists
        Distortion coefficients of shape ``(V, K)`` with ``K`` in
        ``{0, 1, ..., 12}``.

    Returns
    -------
    Distorted 2D coordinates of shape ``(V, *pts, 2)``.
    """
    if dists.shape[-1] == 0:
        return pts2d
    v = pts2d.shape[0]
    flat = pts2d.reshape(v, -1, 2)
    out = jax.vmap(jax.vmap(distort_one, in_axes=(0, None)), in_axes=(0, 0))(
        flat, dists
    )
    return out.reshape(pts2d.shape)

project_full

project_full(
    pts3d: Float[Array, "*pts 3"],
    rvecs: Float[Array, "V 3"],
    tvecs: Float[Array, "V 3"],
    intrs: Float[Array, "V P"] | Float[Array, "P"],
    dists: Float[Array, "V K"] | Float[Array, "K"],
) -> Float[Array, "V *pts 2"]

Project 3D world points to 2D image points through full camera models.

Composes the pinhole projection X_cam = R(rvec) X + tvec, perspective division xy = X_cam[:2] / X_cam[2], distortion via :func:distort, and the affine intrinsics x_pix = fx * x + cx (and analogously for y).

Parameters:

Name Type Description Default
pts3d Float[Array, '*pts 3']

3D world points of shape (*pts, 3).

required
rvecs Float[Array, 'V 3']

Axis-angle rotation vectors of shape (V, 3).

required
tvecs Float[Array, 'V 3']

Translation vectors of shape (V, 3).

required
intrs Float[Array, 'V P'] | Float[Array, 'P']

Packed intrinsics of shape (V, P) or (P,) (shared); see :func:intr_to_kmat.

required
dists Float[Array, 'V K'] | Float[Array, 'K']

Distortion coefficients of shape (V, K) or (K,) (shared).

required

Returns:

Type Description
Projected 2D image points of shape ``(V, *pts, 2)``.
Source code in src/deeperfly/geometry.py
@jax.jit
def project_full(
    pts3d: Float[Array, "*pts 3"],
    rvecs: Float[Array, "V 3"],
    tvecs: Float[Array, "V 3"],
    intrs: Float[Array, "V P"] | Float[Array, "P"],
    dists: Float[Array, "V K"] | Float[Array, "K"],
) -> Float[Array, "V *pts 2"]:
    """Project 3D world points to 2D image points through full camera models.

    Composes the pinhole projection ``X_cam = R(rvec) X + tvec``, perspective
    division ``xy = X_cam[:2] / X_cam[2]``, distortion via :func:`distort`, and
    the affine intrinsics ``x_pix = fx * x + cx`` (and analogously for ``y``).

    Parameters
    ----------
    pts3d
        3D world points of shape ``(*pts, 3)``.
    rvecs
        Axis-angle rotation vectors of shape ``(V, 3)``.
    tvecs
        Translation vectors of shape ``(V, 3)``.
    intrs
        Packed intrinsics of shape ``(V, P)`` or ``(P,)`` (shared); see
        :func:`intr_to_kmat`.
    dists
        Distortion coefficients of shape ``(V, K)`` or ``(K,)`` (shared).

    Returns
    -------
    Projected 2D image points of shape ``(V, *pts, 2)``.
    """
    v = rvecs.shape[0]
    pts_shape = pts3d.shape[:-1]
    pts_flat = pts3d.reshape(-1, 3)
    if intrs.ndim == 1:
        intrs = jnp.broadcast_to(intrs, (v, intrs.shape[0]))
    if dists.ndim == 1:
        dists = jnp.broadcast_to(dists, (v, dists.shape[0]))
    project_v = jax.vmap(
        jax.vmap(project_full_one, in_axes=(0, None, None, None, None)),
        in_axes=(None, 0, 0, 0, 0),
    )
    out = project_v(pts_flat, rvecs, tvecs, intrs, dists)
    return out.reshape(v, *pts_shape, 2)

Triangulation helpers

triangulation

Skeleton-aware triangulation helpers over a :class:CameraGroup.

Thin NumPy-facing wrappers around :mod:deeperfly.geometry and :class:deeperfly.cameras.CameraGroup. The contract with the geometry layer is the NaN convention: a 2D observation of NaN means "this camera did not (or cannot) see this point", so visibility is expressed purely as NaNs -- no separate mask array travels downstream.

All functions use the view-leading layout: pts2d has shape (V, *pts, 2) ((V, P, 2) for one frame, (V, T, P, 2) for a sequence); triangulated points come back as (*pts, 3).

triangulate

triangulate(
    cameras: CameraGroup,
    pts2d: Float[ndarray, "V *pts 2"],
    weights: Float[ndarray, "V *pts"] | None = None,
) -> Float[np.ndarray, "*pts 3"]

Triangulate 3D points from 2D observations (NaN-aware DLT).

Points seen by fewer than two cameras come back as NaN. Forwards to :meth:CameraGroup.triangulate.

Parameters:

Name Type Description Default
cameras CameraGroup

The camera rig.

required
pts2d Float[ndarray, 'V *pts 2']

2D observations of shape (V, *pts, 2), NaN for missing.

required
weights Float[ndarray, 'V *pts'] | None

Optional per-(view, point) weights of shape (V, *pts) for a confidence-weighted DLT (each view's rows scaled by sqrt(weight)); None (default) is plain DLT.

None

Returns:

Type Description
ndarray

Triangulated 3D points of shape (*pts, 3).

Source code in src/deeperfly/triangulation.py
def triangulate(
    cameras: CameraGroup,
    pts2d: Float[np.ndarray, "V *pts 2"],
    weights: Float[np.ndarray, "V *pts"] | None = None,
) -> Float[np.ndarray, "*pts 3"]:
    """Triangulate 3D points from 2D observations (NaN-aware DLT).

    Points seen by fewer than two cameras come back as ``NaN``. Forwards to
    :meth:`CameraGroup.triangulate`.

    Parameters
    ----------
    cameras
        The camera rig.
    pts2d
        2D observations of shape ``(V, *pts, 2)``, NaN for missing.
    weights
        Optional per-(view, point) weights of shape ``(V, *pts)`` for a
        confidence-weighted DLT (each view's rows scaled by ``sqrt(weight)``);
        ``None`` (default) is plain DLT.

    Returns
    -------
    np.ndarray
        Triangulated 3D points of shape ``(*pts, 3)``.
    """
    return cameras.triangulate(pts2d, weights)

reprojection_error

reprojection_error(
    cameras: CameraGroup,
    pts3d: Float[ndarray, "*pts 3"],
    pts2d: Float[ndarray, "V *pts 2"],
) -> Float[np.ndarray, "V *pts"]

Per-(view, point) reprojection error in pixels.

Projects pts3d through every camera and takes the Euclidean distance to pts2d. Entries are NaN wherever the observation or the 3D point is NaN (unobserved / un-triangulated), so callers can ignore them with np.nanmean / np.nanmax.

Parameters:

Name Type Description Default
cameras CameraGroup

The camera rig.

required
pts3d Float[ndarray, '*pts 3']

Triangulated 3D points of shape (*pts, 3).

required
pts2d Float[ndarray, 'V *pts 2']

2D observations of shape (V, *pts, 2).

required

Returns:

Type Description
ndarray

Reprojection error of shape (V, *pts) in pixels (NaN where undefined).

Source code in src/deeperfly/triangulation.py
def reprojection_error(
    cameras: CameraGroup,
    pts3d: Float[np.ndarray, "*pts 3"],
    pts2d: Float[np.ndarray, "V *pts 2"],
) -> Float[np.ndarray, "V *pts"]:
    """Per-(view, point) reprojection error in pixels.

    Projects ``pts3d`` through every camera and takes the Euclidean distance to
    ``pts2d``. Entries are ``NaN`` wherever the observation or the 3D point is
    ``NaN`` (unobserved / un-triangulated), so callers can ignore them with
    ``np.nanmean`` / ``np.nanmax``.

    Parameters
    ----------
    cameras
        The camera rig.
    pts3d
        Triangulated 3D points of shape ``(*pts, 3)``.
    pts2d
        2D observations of shape ``(V, *pts, 2)``.

    Returns
    -------
    np.ndarray
        Reprojection error of shape ``(V, *pts)`` in pixels (NaN where undefined).
    """
    proj = cameras.project(np.asarray(pts3d))  # (V, *pts, 2)
    return np.linalg.norm(proj - np.asarray(pts2d), axis=-1)

triangulate_ransac

triangulate_ransac(
    cameras: CameraGroup,
    pts2d: Float[ndarray, "V *pts 2"],
    *,
    threshold: float = 15.0,
    min_inliers: int = 2,
    weights: Float[ndarray, "V *pts"] | None = None,
) -> tuple[
    Float[np.ndarray, "*pts 3"], Bool[np.ndarray, "V *pts"]
]

Robustly triangulate 3D points, rejecting gross 2D outliers (RANSAC).

A single badly mislocated detection drags a plain least-squares fit (:func:triangulate) and inflates every view's reprojection error, hiding which view was wrong. RANSAC instead searches for the largest set of mutually consistent views.

The rigs deeperfly targets have only a handful of cameras, so rather than sampling, this exhaustively enumerates all C(V, 2) two-view hypotheses (the deterministic limit of RANSAC). For each pair it triangulates a candidate and counts views reprojecting within threshold pixels (NaN views never count). The largest consensus wins (ties broken by smaller total inlier error), and the point is re-triangulated from all its inlier views. Points with fewer than min_inliers agreeing views come back NaN.

Operates per point over any leading layout ((V, P, 2), (V, T, P, 2)).

Parameters:

Name Type Description Default
cameras CameraGroup

The camera rig.

required
pts2d Float[ndarray, 'V *pts 2']

2D observations of shape (V, *pts, 2), NaN for missing.

required
threshold float

Inlier reprojection-error cutoff in pixels (the greedy :func:deeperfly.pipeline.reconstruct uses a looser 40 px to drop outliers rather than gate inliers).

15.0
min_inliers int

Minimum agreeing views required to accept a point (>= 2).

2
weights Float[ndarray, 'V *pts'] | None

Optional per-(view, point) weights of shape (V, *pts). When given, the two-view candidate fits and the final inlier refit use a confidence-weighted DLT (see :func:triangulate). Consensus scoring is deliberately left unweighted: inliers are still counted by raw reprojection error, so a confidently-mislocated detection cannot vote itself into the consensus -- which is the whole point of RANSAC.

None

Returns:

Name Type Description
pts3d ndarray

Triangulated points of shape (*pts, 3) (NaN below min_inliers).

inliers ndarray

Boolean (V, *pts) mask of the views kept per point. NaN out the original outliers with np.where(inliers[..., None], pts2d, np.nan).

Raises:

Type Description
ValueError

If min_inliers is less than 2.

Source code in src/deeperfly/triangulation.py
def triangulate_ransac(
    cameras: CameraGroup,
    pts2d: Float[np.ndarray, "V *pts 2"],
    *,
    threshold: float = 15.0,
    min_inliers: int = 2,
    weights: Float[np.ndarray, "V *pts"] | None = None,
) -> tuple[Float[np.ndarray, "*pts 3"], Bool[np.ndarray, "V *pts"]]:
    """Robustly triangulate 3D points, rejecting gross 2D outliers (RANSAC).

    A single badly mislocated detection drags a plain least-squares fit
    (:func:`triangulate`) and inflates *every* view's reprojection error, hiding
    which view was wrong. RANSAC instead searches for the largest set of mutually
    consistent views.

    The rigs deeperfly targets have only a handful of cameras, so rather than
    sampling, this **exhaustively enumerates all** ``C(V, 2)`` two-view hypotheses
    (the deterministic limit of RANSAC). For each pair it triangulates a candidate
    and counts views reprojecting within ``threshold`` pixels (NaN views never
    count). The largest consensus wins (ties broken by smaller total inlier
    error), and the point is re-triangulated from all its inlier views. Points
    with fewer than ``min_inliers`` agreeing views come back ``NaN``.

    Operates per point over any leading layout (``(V, P, 2)``, ``(V, T, P, 2)``).

    Parameters
    ----------
    cameras
        The camera rig.
    pts2d
        2D observations of shape ``(V, *pts, 2)``, NaN for missing.
    threshold
        Inlier reprojection-error cutoff in pixels (the greedy
        :func:`deeperfly.pipeline.reconstruct` uses a looser 40 px to *drop*
        outliers rather than gate inliers).
    min_inliers
        Minimum agreeing views required to accept a point (>= 2).
    weights
        Optional per-(view, point) weights of shape ``(V, *pts)``. When given,
        the two-view candidate fits and the final inlier refit use a
        confidence-weighted DLT (see :func:`triangulate`). Consensus scoring is
        deliberately left **unweighted**: inliers are still counted by raw
        reprojection error, so a confidently-mislocated detection cannot vote
        itself into the consensus -- which is the whole point of RANSAC.

    Returns
    -------
    pts3d : np.ndarray
        Triangulated points of shape ``(*pts, 3)`` (NaN below ``min_inliers``).
    inliers : np.ndarray
        Boolean ``(V, *pts)`` mask of the views kept per point. NaN out the
        original outliers with ``np.where(inliers[..., None], pts2d, np.nan)``.

    Raises
    ------
    ValueError
        If ``min_inliers`` is less than 2.
    """
    if min_inliers < 2:
        raise ValueError(f"min_inliers must be >= 2, got {min_inliers}")
    pts2d = np.asarray(pts2d, dtype=float)
    n_views = pts2d.shape[0]
    pts_shape = pts2d.shape[1:-1]

    # Running argmax over hypotheses: keep the best consensus seen so far.
    best_score = np.full(pts_shape, -np.inf)
    best_inliers = np.zeros((n_views, *pts_shape), dtype=bool)
    # Score = inlier count, minus a sub-unit penalty so ties break toward the
    # tighter fit without ever overriding a strictly larger consensus.
    err_scale = n_views * threshold + 1e-9

    for i, j in combinations(range(n_views), 2):
        sel = np.zeros(n_views, dtype=bool)
        sel[[i, j]] = True
        sel = sel.reshape((n_views, *([1] * (pts2d.ndim - 1))))
        masked = np.where(sel, pts2d, np.nan)
        # NaN out-of-pair views zero their own rows, so the candidate fit only
        # ever weights the two selected views.
        cand = triangulate(cameras, masked, weights)  # (*pts, 3); NaN if unseen
        err = reprojection_error(cameras, cand, pts2d)  # (V, *pts)
        inl = err < threshold  # NaN (unobserved / un-triangulated) -> False
        count = inl.sum(axis=0)  # (*pts)
        inlier_err = np.where(inl, err, 0.0).sum(axis=0)  # (*pts)
        score = count - inlier_err / err_scale
        take = score > best_score
        best_score = np.where(take, score, best_score)
        best_inliers = np.where(take[None], inl, best_inliers)

    refit = np.where(best_inliers[..., None], pts2d, np.nan)
    pts3d = triangulate(cameras, refit, weights)
    accept = best_inliers.sum(axis=0) >= min_inliers  # (*pts)
    pts3d = np.where(accept[..., None], pts3d, np.nan)
    return pts3d, best_inliers

Pictorial structures

pictorial

Pictorial-structures (PS) multi-view 2D->3D correction (DeepFly3D-style).

The optional, accuracy-oriented alternative to the default reprojection-outlier rejection in :func:deeperfly.pipeline.reconstruct. Where that path can only veto a bad detection, PS can recover the correct joint when the detector's arg-max landed on the wrong heatmap peak (self-occlusion, crossing legs, left/right confusion).

Following Gunel et al. (DeepFly3D, 2019):

  1. Keep the top-K candidate peaks per (view, joint), not just the arg-max (:func:extract_candidates).
  2. Per joint, build a pool of multi-view-consistent 3D hypotheses by triangulating candidate pairs across views, refitting from inlier views, and scoring by summed heatmap confidence (batched per frame in :func:solve_frame).
  3. Choose one hypothesis per joint by exact dynamic programming along each limb (:func:_chain_dp). The fly skeleton's 2D bones form a forest of simple chains, so the MAP over the bone-length-coupled model is exact -- no loopy belief propagation. An optional temporal term penalizes 3D jumps.

Everything is plain NumPy over a :class:~deeperfly.cameras.CameraGroup and :class:~deeperfly.skeleton.Skeleton. The detector forward and heatmap decode happen upstream; this module consumes only candidate peaks + bundle-adjusted cameras.

Candidates dataclass

Top-K detector peaks per (view, point) for a sequence, in image pixels.

xy is (V, T, P, K, 2) and score is (V, T, P, K); padded / invisible / sub-threshold slots are NaN (xy) and 0 (score). The arg-max (K = 0) reproduces the single-peak detection, so bundle adjustment can still use the plain 2D path while PS consumes the full candidate set.

Source code in src/deeperfly/pictorial.py
@dataclass(frozen=True)
class Candidates:
    """Top-K detector peaks per (view, point) for a sequence, in image pixels.

    ``xy`` is ``(V, T, P, K, 2)`` and ``score`` is ``(V, T, P, K)``; padded /
    invisible / sub-threshold slots are ``NaN`` (``xy``) and ``0`` (``score``).
    The arg-max (``K = 0``) reproduces the single-peak detection, so bundle adjustment
    can still use the plain 2D path while PS consumes the full candidate set.
    """

    xy: Float[np.ndarray, "V T P K 2"]
    score: Float[np.ndarray, "V T P K"]

    @property
    def shape(self) -> tuple[int, int, int, int]:
        v, t, n, k, _ = self.xy.shape
        return v, t, n, k

    def frame(self, t: int) -> tuple[np.ndarray, np.ndarray]:
        """Candidate ``(xy, score)`` for one frame: ``(V, P, K, 2)`` and ``(V, P, K)``."""
        return self.xy[:, t], self.score[:, t]

frame

frame(t: int) -> tuple[np.ndarray, np.ndarray]

Candidate (xy, score) for one frame: (V, P, K, 2) and (V, P, K).

Source code in src/deeperfly/pictorial.py
def frame(self, t: int) -> tuple[np.ndarray, np.ndarray]:
    """Candidate ``(xy, score)`` for one frame: ``(V, P, K, 2)`` and ``(V, P, K)``."""
    return self.xy[:, t], self.score[:, t]

peak_candidates

peak_candidates(
    heatmaps: Float[ndarray, "*chan H_out W_out"],
    k: int = DEFAULT_K,
    *,
    radius: int = DEFAULT_PEAK_RADIUS,
    threshold: float = DEFAULT_PEAK_THRESHOLD,
    method: str = DEFAULT_SUBPIXEL,
) -> tuple[
    Float[np.ndarray, "*chan K 2"],
    Float[np.ndarray, "*chan K"],
]

Top-k local-maxima peaks per heatmap channel (normalized (x, y) + score).

A pixel is a peak if it is the maximum of its (2*radius+1) neighborhood and exceeds threshold; the strongest k are returned, score-ordered, padded with NaN / 0 when fewer exist. Each is refined to sub-pixel by method (the same :func:~deeperfly.pose2d.inference.refine_peaks the single-peak decoder uses), so candidates carry the arg-max's localization.

Parameters:

Name Type Description Default
heatmaps Float[ndarray, '*chan H_out W_out']

Heatmaps of shape (*chan, H_out, W_out).

required
k int

Number of peaks to keep per channel.

DEFAULT_K
radius int

NMS / sub-pixel-window half-width, in heatmap pixels.

DEFAULT_PEAK_RADIUS
threshold float

Ignore peaks weaker than this.

DEFAULT_PEAK_THRESHOLD
method str

Sub-pixel refinement: "argmax" | "weighted" | "taylor".

DEFAULT_SUBPIXEL

Returns:

Name Type Description
xy ndarray

Peak coordinates of shape (*chan, K, 2) normalized to [0, 1] (NaN-padded).

score ndarray

Raw peak values of shape (*chan, K) (0 where padded).

Source code in src/deeperfly/pictorial.py
def peak_candidates(
    heatmaps: Float[np.ndarray, "*chan H_out W_out"],
    k: int = DEFAULT_K,
    *,
    radius: int = DEFAULT_PEAK_RADIUS,
    threshold: float = DEFAULT_PEAK_THRESHOLD,
    method: str = DEFAULT_SUBPIXEL,
) -> tuple[Float[np.ndarray, "*chan K 2"], Float[np.ndarray, "*chan K"]]:
    """Top-``k`` local-maxima peaks per heatmap channel (normalized ``(x, y)`` + score).

    A pixel is a peak if it is the maximum of its ``(2*radius+1)`` neighborhood and
    exceeds ``threshold``; the strongest ``k`` are returned, score-ordered, padded
    with ``NaN`` / ``0`` when fewer exist. Each is refined to sub-pixel by
    ``method`` (the same :func:`~deeperfly.pose2d.inference.refine_peaks` the
    single-peak decoder uses), so candidates carry the arg-max's localization.

    Parameters
    ----------
    heatmaps
        Heatmaps of shape ``(*chan, H_out, W_out)``.
    k
        Number of peaks to keep per channel.
    radius
        NMS / sub-pixel-window half-width, in heatmap pixels.
    threshold
        Ignore peaks weaker than this.
    method
        Sub-pixel refinement: ``"argmax"`` | ``"weighted"`` | ``"taylor"``.

    Returns
    -------
    xy : np.ndarray
        Peak coordinates of shape ``(*chan, K, 2)`` normalized to ``[0, 1]``
        (NaN-padded).
    score : np.ndarray
        Raw peak values of shape ``(*chan, K)`` (``0`` where padded).
    """
    from scipy.ndimage import maximum_filter

    from .pose2d.inference import refine_peaks

    hm = np.asarray(heatmaps, dtype=float)
    hh, ww = hm.shape[-2:]
    chan = hm.shape[:-2]
    size = (1,) * (hm.ndim - 2) + (2 * radius + 1, 2 * radius + 1)
    is_peak = (hm == maximum_filter(hm, size=size)) & (hm > threshold)
    flat = np.where(is_peak, hm, -np.inf).reshape(*chan, hh * ww)

    k = min(k, flat.shape[-1])
    top = np.argpartition(-flat, k - 1, axis=-1)[..., :k]
    top_val = np.take_along_axis(flat, top, axis=-1)
    order = np.argsort(-top_val, axis=-1)  # strongest first
    idx = np.take_along_axis(top, order, axis=-1)
    val = np.take_along_axis(top_val, order, axis=-1)

    row, col = idx // ww, idx % ww
    m = int(np.prod(chan)) if chan else 1
    cx, cy = refine_peaks(
        hm.reshape(m, hh, ww),
        row.reshape(m, k),
        col.reshape(m, k),
        method=method,
        radius=radius,
    )
    xy = np.stack([cx.reshape(*chan, k) / ww, cy.reshape(*chan, k) / hh], axis=-1)
    valid = np.isfinite(val)
    xy = np.where(valid[..., None], xy, np.nan)
    score = np.where(valid, val, 0.0)
    return xy, score

bone_length_targets

bone_length_targets(
    cameras: CameraGroup,
    pts2d: Float[ndarray, "V F P 2"],
    skeleton: Skeleton,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]

Median bone length per skeleton bone, from an initial triangulation.

Shared by bundle adjustment (:func:deeperfly.pipeline._bone_prior) and PS so the two agree on the anatomical prior.

Parameters:

Name Type Description Default
cameras CameraGroup

The rig used for the initial triangulation.

required
pts2d Float[ndarray, 'V F P 2']

2D observations of shape (V, F, P, 2).

required
skeleton Skeleton

Skeleton supplying the bone (edge) list.

required

Returns:

Name Type Description
i, j : np.ndarray

Bone endpoint index arrays (the columns of :attr:Skeleton.bones).

targets ndarray

Per-bone median target length of shape (B,) (NaN for a bone never triangulated).

Source code in src/deeperfly/pictorial.py
def bone_length_targets(
    cameras: CameraGroup,
    pts2d: Float[np.ndarray, "V F P 2"],
    skeleton: Skeleton,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Median bone length per skeleton bone, from an initial triangulation.

    Shared by bundle adjustment
    (:func:`deeperfly.pipeline._bone_prior`) and PS so the two agree on the
    anatomical prior.

    Parameters
    ----------
    cameras
        The rig used for the initial triangulation.
    pts2d
        2D observations of shape ``(V, F, P, 2)``.
    skeleton
        Skeleton supplying the bone (edge) list.

    Returns
    -------
    i, j : np.ndarray
        Bone endpoint index arrays (the columns of :attr:`Skeleton.bones`).
    targets : np.ndarray
        Per-bone median target length of shape ``(B,)`` (NaN for a bone never
        triangulated).
    """
    import warnings

    from .triangulation import triangulate

    pts3d0 = triangulate(cameras, pts2d)  # (F, P, 3)
    i, j = skeleton.bone_index_pairs()
    lengths = np.linalg.norm(pts3d0[:, i] - pts3d0[:, j], axis=-1)  # (F, B)
    with warnings.catch_warnings():  # a never-triangulated bone -> NaN target (ok)
        warnings.simplefilter("ignore", RuntimeWarning)
        targets = np.nanmedian(lengths, axis=0)  # (B,)
    return i, j, targets

skeleton_chains

skeleton_chains(skeleton: Skeleton) -> list[list[int]]

Decompose the 2D bones into ordered simple chains (paths).

Each connected component of :attr:Skeleton.bones is a path (max degree 2), returned as an ordered joint list walked from an endpoint; isolated points come back as singletons. :func:_chain_dp runs exact Viterbi over this ordering.

Parameters:

Name Type Description Default
skeleton Skeleton

Skeleton whose bones are decomposed.

required

Returns:

Type Description
list of list of int

Ordered joint-index chains (singletons for isolated points).

Source code in src/deeperfly/pictorial.py
def skeleton_chains(skeleton: Skeleton) -> list[list[int]]:
    """Decompose the 2D bones into ordered simple chains (paths).

    Each connected component of :attr:`Skeleton.bones` is a path (max degree 2),
    returned as an ordered joint list walked from an endpoint; isolated points come
    back as singletons. :func:`_chain_dp` runs exact Viterbi over this ordering.

    Parameters
    ----------
    skeleton
        Skeleton whose bones are decomposed.

    Returns
    -------
    list of list of int
        Ordered joint-index chains (singletons for isolated points).
    """
    adj: dict[int, list[int]] = defaultdict(list)
    for a, b in skeleton.bones:
        adj[int(a)].append(int(b))
        adj[int(b)].append(int(a))

    chains: list[list[int]] = []
    seen: set[int] = set()
    for start in range(skeleton.n_points):
        if start in seen:
            continue
        # Collect the connected component (BFS).
        comp, stack = [], [start]
        while stack:
            n = stack.pop()
            if n in seen:
                continue
            seen.add(n)
            comp.append(n)
            stack.extend(m for m in adj[n] if m not in seen)
        if len(comp) == 1:
            chains.append(comp)
            continue
        # Walk the path from one endpoint (a degree-1 node); the `walked` set
        # makes the walk robust to branches / cycles (the fly skeleton has none).
        ends = [n for n in comp if len(adj[n]) == 1]
        cur = ends[0] if ends else comp[0]
        order, prev, walked = [cur], None, {cur}
        while True:
            nxts = [m for m in adj[cur] if m != prev and m not in walked]
            if not nxts:
                break
            prev, cur = cur, nxts[0]
            walked.add(cur)
            order.append(cur)
        chains.append(order)
    return chains

solve_frame

solve_frame(
    cameras: CameraGroup,
    skeleton: Skeleton,
    cand_xy: Float[ndarray, "V P K 2"],
    cand_score: Float[ndarray, "V P K"],
    target_map: dict[tuple[int, int], float],
    chains: list[list[int]],
    *,
    scale: float,
    max_hyp: int = DEFAULT_MAX_HYP,
    inlier_px: float = DEFAULT_INLIER_PX,
    lam: float = DEFAULT_LAMBDA,
    huber: float = DEFAULT_HUBER,
    mu: float = DEFAULT_MU,
    prev_pts3d: Float[ndarray, "P 3"] | None = None,
) -> tuple[
    Float[np.ndarray, "P 3"], Float[np.ndarray, "V P 2"]
]

Pictorial-structures correction for one multi-camera frame.

Generates per-joint 3D hypotheses, prunes them, and runs exact chain DP with the bone-length prior (and an optional temporal term against prev_pts3d).

Parameters:

Name Type Description Default
cameras CameraGroup

The bundle-adjusted rig.

required
skeleton Skeleton

Skeleton (kept for symmetry with the sequence call).

required
cand_xy Float[ndarray, 'V P K 2']

Per-frame candidates of shape (V, P, K, 2) / (V, P, K).

required
cand_score Float[ndarray, 'V P K 2']

Per-frame candidates of shape (V, P, K, 2) / (V, P, K).

required
target_map dict[tuple[int, int], float]

(i, j) -> target bone length for the prior.

required
chains list[list[int]]

Pre-computed skeleton chains (:func:skeleton_chains).

required
scale float

Characteristic bone length scaling the prior and NMS radius.

required
max_hyp int

Pruning and cost knobs (see the module defaults).

DEFAULT_MAX_HYP
inlier_px int

Pruning and cost knobs (see the module defaults).

DEFAULT_MAX_HYP
lam int

Pruning and cost knobs (see the module defaults).

DEFAULT_MAX_HYP
huber int

Pruning and cost knobs (see the module defaults).

DEFAULT_MAX_HYP
mu int

Pruning and cost knobs (see the module defaults).

DEFAULT_MAX_HYP
prev_pts3d Float[ndarray, 'P 3'] | None

Previous frame's 3D for the temporal term, or None.

None

Returns:

Name Type Description
pts3d ndarray

Chosen 3D points of shape (P, 3) (NaN where unsolved).

obs ndarray

Per-view 2D observations PS committed to (V, P, 2) (NaN where unsupported).

Source code in src/deeperfly/pictorial.py
def solve_frame(
    cameras: CameraGroup,
    skeleton: Skeleton,
    cand_xy: Float[np.ndarray, "V P K 2"],
    cand_score: Float[np.ndarray, "V P K"],
    target_map: dict[tuple[int, int], float],
    chains: list[list[int]],
    *,
    scale: float,
    max_hyp: int = DEFAULT_MAX_HYP,
    inlier_px: float = DEFAULT_INLIER_PX,
    lam: float = DEFAULT_LAMBDA,
    huber: float = DEFAULT_HUBER,
    mu: float = DEFAULT_MU,
    prev_pts3d: Float[np.ndarray, "P 3"] | None = None,
) -> tuple[Float[np.ndarray, "P 3"], Float[np.ndarray, "V P 2"]]:
    """Pictorial-structures correction for one multi-camera frame.

    Generates per-joint 3D hypotheses, prunes them, and runs exact chain DP with
    the bone-length prior (and an optional temporal term against ``prev_pts3d``).

    Parameters
    ----------
    cameras
        The bundle-adjusted rig.
    skeleton
        Skeleton (kept for symmetry with the sequence call).
    cand_xy, cand_score
        Per-frame candidates of shape ``(V, P, K, 2)`` / ``(V, P, K)``.
    target_map
        ``(i, j) -> target bone length`` for the prior.
    chains
        Pre-computed skeleton chains (:func:`skeleton_chains`).
    scale
        Characteristic bone length scaling the prior and NMS radius.
    max_hyp, inlier_px, lam, huber, mu
        Pruning and cost knobs (see the module defaults).
    prev_pts3d
        Previous frame's 3D for the temporal term, or ``None``.

    Returns
    -------
    pts3d : np.ndarray
        Chosen 3D points of shape ``(P, 3)`` (NaN where unsolved).
    obs : np.ndarray
        Per-view 2D observations PS committed to ``(V, P, 2)`` (NaN where
        unsupported).
    """
    v, n, k, _ = cand_xy.shape
    x_all, evidence, n_in, obs_all = _frame_hypotheses(
        cameras, cand_xy, cand_score, inlier_px=inlier_px
    )

    pos: dict[int, np.ndarray] = {}
    unary: dict[int, np.ndarray] = {}
    kept_global: dict[int, np.ndarray] = {}
    for j in range(n):
        keep = _prune_joint(
            x_all[j], evidence[j], n_in[j], max_hyp=max_hyp, nms_radius=0.5 * scale
        )
        kept_global[j] = keep
        pos[j] = x_all[j, keep]  # (S, 3)
        u = -evidence[j, keep].astype(float)  # minimize -> negative evidence
        if (
            mu
            and prev_pts3d is not None
            and np.isfinite(prev_pts3d[j]).all()
            and keep.size
        ):
            jump = np.linalg.norm(pos[j] - prev_pts3d[j], axis=-1) / max(scale, 1e-9)
            u = u + mu * jump * jump
        unary[j] = u

    choice: dict[int, int] = {}
    for chain in chains:
        if len(chain) == 1:  # isolated joint: pick the strongest hypothesis
            j = chain[0]
            if unary[j].size:
                choice[j] = int(np.argmin(unary[j]))
        else:
            choice.update(
                _chain_dp(
                    chain, pos, unary, target_map, lam=lam, scale=scale, huber=huber
                )
            )

    pts3d = np.full((n, 3), np.nan)
    obs = np.full((v, n, 2), np.nan)
    for j, s in choice.items():
        g = int(kept_global[j][s])
        pts3d[j] = x_all[j, g]
        obs[:, j] = obs_all[:, j, g]
    return pts3d, obs

reconstruct

reconstruct(
    cameras: CameraGroup,
    skeleton: Skeleton,
    candidates: Candidates,
    pts2d_argmax: Float[ndarray, "V T P 2"],
    *,
    bone_max_frames: int | None = 100,
    temporal: bool = False,
    max_hyp: int = DEFAULT_MAX_HYP,
    inlier_px: float = DEFAULT_INLIER_PX,
    lam: float = DEFAULT_LAMBDA,
    huber: float = DEFAULT_HUBER,
    mu: float = DEFAULT_MU,
) -> tuple[
    Float[np.ndarray, "T P 3"],
    Float[np.ndarray, "V T P 2"],
    Float[np.ndarray, "V T P"],
]

Run PS correction over a whole sequence.

The bone-length prior is estimated once from an arg-max triangulation of up to bone_max_frames frames; PS then runs per frame (optionally threading the previous frame's 3D for the temporal term). Same shapes/contract as :func:deeperfly.pipeline.reconstruct.

Parameters:

Name Type Description Default
cameras CameraGroup

The bundle-adjusted rig.

required
skeleton Skeleton

Skeleton supplying chains, visibility and the bone-length prior.

required
candidates Candidates

The detector's top-K candidate peaks for the sequence.

required
pts2d_argmax Float[ndarray, 'V T P 2']

Arg-max 2D of shape (V, T, P, 2) used to estimate the prior.

required
bone_max_frames int | None

Frames subsampled to estimate the prior (None uses all).

100
temporal bool

Whether to add the inter-frame temporal term.

False
max_hyp int

Per-frame pruning and cost knobs.

DEFAULT_MAX_HYP
inlier_px int

Per-frame pruning and cost knobs.

DEFAULT_MAX_HYP
lam int

Per-frame pruning and cost knobs.

DEFAULT_MAX_HYP
huber int

Per-frame pruning and cost knobs.

DEFAULT_MAX_HYP
mu int

Per-frame pruning and cost knobs.

DEFAULT_MAX_HYP

Returns:

Name Type Description
pts3d ndarray

Corrected 3D of shape (T, P, 3).

pts2d ndarray

Committed per-view 2D of shape (V, T, P, 2).

reproj ndarray

Reprojection error of shape (V, T, P).

Source code in src/deeperfly/pictorial.py
def reconstruct(
    cameras: CameraGroup,
    skeleton: Skeleton,
    candidates: Candidates,
    pts2d_argmax: Float[np.ndarray, "V T P 2"],
    *,
    bone_max_frames: int | None = 100,
    temporal: bool = False,
    max_hyp: int = DEFAULT_MAX_HYP,
    inlier_px: float = DEFAULT_INLIER_PX,
    lam: float = DEFAULT_LAMBDA,
    huber: float = DEFAULT_HUBER,
    mu: float = DEFAULT_MU,
) -> tuple[
    Float[np.ndarray, "T P 3"], Float[np.ndarray, "V T P 2"], Float[np.ndarray, "V T P"]
]:
    """Run PS correction over a whole sequence.

    The bone-length prior is estimated once from an arg-max triangulation of up to
    ``bone_max_frames`` frames; PS then runs per frame (optionally threading the
    previous frame's 3D for the temporal term). Same shapes/contract as
    :func:`deeperfly.pipeline.reconstruct`.

    Parameters
    ----------
    cameras
        The bundle-adjusted rig.
    skeleton
        Skeleton supplying chains, visibility and the bone-length prior.
    candidates
        The detector's top-K candidate peaks for the sequence.
    pts2d_argmax
        Arg-max 2D of shape ``(V, T, P, 2)`` used to estimate the prior.
    bone_max_frames
        Frames subsampled to estimate the prior (``None`` uses all).
    temporal
        Whether to add the inter-frame temporal term.
    max_hyp, inlier_px, lam, huber, mu
        Per-frame pruning and cost knobs.

    Returns
    -------
    pts3d : np.ndarray
        Corrected 3D of shape ``(T, P, 3)``.
    pts2d : np.ndarray
        Committed per-view 2D of shape ``(V, T, P, 2)``.
    reproj : np.ndarray
        Reprojection error of shape ``(V, T, P)``.
    """
    # Candidates already carry NaN where no pathway produced a (view, point), so
    # the visibility pattern is intrinsic to the detection -- no masking needed.
    cand_xy, cand_score = candidates.xy, candidates.score
    v, t, n, k = candidates.shape

    # Anatomical prior from a cheap arg-max triangulation (subsampled).
    sel = (
        np.arange(t)
        if bone_max_frames is None or t <= bone_max_frames
        else np.linspace(0, t - 1, bone_max_frames).round().astype(int)
    )
    i, j, targets = bone_length_targets(cameras, pts2d_argmax[:, sel], skeleton)
    target_map = {
        (min(int(a), int(b)), max(int(a), int(b))): float(tg)
        for a, b, tg in zip(i, j, targets)
        if np.isfinite(tg)
    }
    scale = float(np.nanmedian(targets)) if np.isfinite(targets).any() else 1.0
    chains = skeleton_chains(skeleton)

    pts3d = np.full((t, n, 3), np.nan)
    pts2d = np.full((v, t, n, 2), np.nan)
    prev = None
    for f in range(t):
        x3, x2 = solve_frame(
            cameras,
            skeleton,
            cand_xy[:, f],
            cand_score[:, f],
            target_map,
            chains,
            scale=scale,
            max_hyp=max_hyp,
            inlier_px=inlier_px,
            lam=lam,
            huber=huber,
            mu=mu,
            prev_pts3d=prev if temporal else None,
        )
        pts3d[f] = x3
        pts2d[:, f] = x2
        prev = x3
    reproj = reprojection_error(cameras, pts3d, pts2d)
    return pts3d, pts2d, reproj

Frame I/O

io

Frame I/O: reader classes for video files and image sequences, plus MP4 writing.

Footage is read through a small class hierarchy rooted at :class:~deeperfly.io.base.FrameReader:

  • :class:~deeperfly.io.video.VideoReader -- frame-accurate decode of a video file to (T, H, W, 3) uint8 RGB NumPy (PyAV, in-process FFmpeg, CPU).
  • :class:~deeperfly.io.images.ImageSequenceReader -- parallel decode of an image sequence (OpenCV).

:func:open_reader resolves a source (video file, image directory/glob, or explicit footage file list) to the right reader once; callers then index (reader[:], reader[i], reader[[0,3,5]]) or stream (stream_frames / stream_blocks) against that object. :class:~deeperfly.io.video.VideoWriter encodes frames to H.264, one frame or one array at a time.

from deeperfly import io reader = io.open_reader("clip.mp4") frames = reader[:] # (T, H, W, 3) uint8, host NumPy with io.VideoWriter("out.mp4", fps=30) as writer: ... writer.write_frames(frames) # a batch or iterable; or write_frame()

Pose overlays and 3D reconstructions are rendered to MP4 by :mod:deeperfly.visualization.compose (the OpenCV panel compositor), which builds on these read/write primitives.

FrameReader

Bases: ABC

Reads (T, H, W, 3) uint8 RGB frames from one footage source.

The two concrete readers -- :class:~deeperfly.io.video.VideoReader (PyAV) and :class:~deeperfly.io.images.ImageSequenceReader (OpenCV) -- resolve their source kind once, at construction, rather than on every read. :func:~deeperfly.io.open_reader is the factory that picks the subclass.

All decoding runs on the CPU and yields host (T, H, W, 3) uint8 RGB NumPy.

Index with reader[key] to decode frames into an array:

  • reader[5] -- single frame, (H, W, 3)
  • reader[[0, 3, 5]] -- explicit indices (random-access), (T, H, W, 3)
  • reader[2:8:2] -- sequential slice, (T, H, W, 3)
  • reader[:] -- full decode, (T, H, W, 3)

Use :meth:stream_frames / :meth:stream_blocks for lazy forward iteration.

Readers can be used as context managers (symmetric with :class:~deeperfly.io.video.VideoWriter); :meth:close releases any held resources and is a no-op for the stateless readers, which open and close the underlying file per operation.

Source code in src/deeperfly/io/base.py
class FrameReader(ABC):
    """Reads ``(T, H, W, 3)`` uint8 RGB frames from one footage source.

    The two concrete readers -- :class:`~deeperfly.io.video.VideoReader` (PyAV) and
    :class:`~deeperfly.io.images.ImageSequenceReader` (OpenCV) -- resolve their
    source kind once, at construction, rather than on every read.
    :func:`~deeperfly.io.open_reader` is the factory that picks the subclass.

    All decoding runs on the CPU and yields host ``(T, H, W, 3)`` uint8 RGB NumPy.

    Index with ``reader[key]`` to decode frames into an array:

    - ``reader[5]`` -- single frame, ``(H, W, 3)``
    - ``reader[[0, 3, 5]]`` -- explicit indices (random-access), ``(T, H, W, 3)``
    - ``reader[2:8:2]`` -- sequential slice, ``(T, H, W, 3)``
    - ``reader[:]`` -- full decode, ``(T, H, W, 3)``

    Use :meth:`stream_frames` / :meth:`stream_blocks` for lazy forward iteration.

    Readers can be used as context managers (symmetric with
    :class:`~deeperfly.io.video.VideoWriter`); :meth:`close` releases any held
    resources and is a no-op for the stateless readers, which open and close the
    underlying file per operation.
    """

    def __enter__(self) -> FrameReader:
        return self

    def __exit__(self, *exc) -> None:
        self.close()

    def close(self) -> None:
        """Release any resources held by the reader (no-op by default)."""

    @abstractmethod
    def __getitem__(self, key: int | list[int] | slice) -> Float[np.ndarray, "..."]:
        """Decode frames into a NumPy array.

        Parameters
        ----------
        key
            - ``int`` -- single frame index; returns ``(H, W, 3)`` uint8 RGB.
            - ``list[int]`` -- explicit frame indices (random-access / seeking);
              returns ``(T, H, W, 3)`` in the requested order.
            - ``slice`` -- sequential range ``slice(start, stop, step)``;
              returns ``(T, H, W, 3)``. ``reader[:]`` decodes everything.
        """

    @abstractmethod
    def stream_frames(
        self,
        *,
        start: int = 0,
        stop: int | None = None,
        step: int = 1,
    ) -> Iterator[Float[np.ndarray, "H W 3"]]:
        """Yield individual ``(H, W, 3)`` uint8 RGB frames from one forward pass.

        Parameters
        ----------
        start, stop, step
            Frame range, like ``range(start, stop, step)``.
        """

    @abstractmethod
    def stream_blocks(
        self,
        *,
        start: int = 0,
        stop: int | None = None,
        step: int = 1,
        block_size: int = 64,
    ) -> Iterator[Float[np.ndarray, "T H W 3"]]:
        """Yield ``(T, H, W, 3)`` uint8 RGB blocks from one forward pass.

        Instead of decoding a fixed ``[start, stop)`` slice, walk the source forward
        and emit frames in groups of up to ``block_size``. A whole recording is
        therefore one linear decode -- no per-window re-open or re-seek.

        Parameters
        ----------
        start, stop, step
            Frame range, like ``range(start, stop, step)``.
        block_size
            Maximum frames per yielded block.

        Yields
        ------
        np.ndarray
            ``(T, H, W, 3)`` uint8 RGB blocks with ``T <= block_size``.
        """

    @abstractmethod
    def count(self) -> int | None:
        """Best-effort frame count -- ``None`` when unknown.

        A **hint** for a progress-bar total only: callers stream frames and detect
        end-of-file from the decoder itself, so an off-by-a-few count or ``None``
        never affects correctness.
        """

    def fps(self) -> float | None:
        """Frame rate in frames/sec, or ``None`` when unknown.

        Image sequences carry no intrinsic frame rate, so the base implementation
        returns ``None``; :class:`~deeperfly.io.video.VideoReader` overrides it.
        """
        return None

close

close() -> None

Release any resources held by the reader (no-op by default).

Source code in src/deeperfly/io/base.py
def close(self) -> None:
    """Release any resources held by the reader (no-op by default)."""

stream_frames abstractmethod

stream_frames(
    *,
    start: int = 0,
    stop: int | None = None,
    step: int = 1,
) -> Iterator[Float[np.ndarray, "H W 3"]]

Yield individual (H, W, 3) uint8 RGB frames from one forward pass.

Parameters:

Name Type Description Default
start int

Frame range, like range(start, stop, step).

0
stop int

Frame range, like range(start, stop, step).

0
step int

Frame range, like range(start, stop, step).

0
Source code in src/deeperfly/io/base.py
@abstractmethod
def stream_frames(
    self,
    *,
    start: int = 0,
    stop: int | None = None,
    step: int = 1,
) -> Iterator[Float[np.ndarray, "H W 3"]]:
    """Yield individual ``(H, W, 3)`` uint8 RGB frames from one forward pass.

    Parameters
    ----------
    start, stop, step
        Frame range, like ``range(start, stop, step)``.
    """

stream_blocks abstractmethod

stream_blocks(
    *,
    start: int = 0,
    stop: int | None = None,
    step: int = 1,
    block_size: int = 64,
) -> Iterator[Float[np.ndarray, "T H W 3"]]

Yield (T, H, W, 3) uint8 RGB blocks from one forward pass.

Instead of decoding a fixed [start, stop) slice, walk the source forward and emit frames in groups of up to block_size. A whole recording is therefore one linear decode -- no per-window re-open or re-seek.

Parameters:

Name Type Description Default
start int

Frame range, like range(start, stop, step).

0
stop int

Frame range, like range(start, stop, step).

0
step int

Frame range, like range(start, stop, step).

0
block_size int

Maximum frames per yielded block.

64

Yields:

Type Description
ndarray

(T, H, W, 3) uint8 RGB blocks with T <= block_size.

Source code in src/deeperfly/io/base.py
@abstractmethod
def stream_blocks(
    self,
    *,
    start: int = 0,
    stop: int | None = None,
    step: int = 1,
    block_size: int = 64,
) -> Iterator[Float[np.ndarray, "T H W 3"]]:
    """Yield ``(T, H, W, 3)`` uint8 RGB blocks from one forward pass.

    Instead of decoding a fixed ``[start, stop)`` slice, walk the source forward
    and emit frames in groups of up to ``block_size``. A whole recording is
    therefore one linear decode -- no per-window re-open or re-seek.

    Parameters
    ----------
    start, stop, step
        Frame range, like ``range(start, stop, step)``.
    block_size
        Maximum frames per yielded block.

    Yields
    ------
    np.ndarray
        ``(T, H, W, 3)`` uint8 RGB blocks with ``T <= block_size``.
    """

count abstractmethod

count() -> int | None

Best-effort frame count -- None when unknown.

A hint for a progress-bar total only: callers stream frames and detect end-of-file from the decoder itself, so an off-by-a-few count or None never affects correctness.

Source code in src/deeperfly/io/base.py
@abstractmethod
def count(self) -> int | None:
    """Best-effort frame count -- ``None`` when unknown.

    A **hint** for a progress-bar total only: callers stream frames and detect
    end-of-file from the decoder itself, so an off-by-a-few count or ``None``
    never affects correctness.
    """

fps

fps() -> float | None

Frame rate in frames/sec, or None when unknown.

Image sequences carry no intrinsic frame rate, so the base implementation returns None; :class:~deeperfly.io.video.VideoReader overrides it.

Source code in src/deeperfly/io/base.py
def fps(self) -> float | None:
    """Frame rate in frames/sec, or ``None`` when unknown.

    Image sequences carry no intrinsic frame rate, so the base implementation
    returns ``None``; :class:`~deeperfly.io.video.VideoReader` overrides it.
    """
    return None

ImageSequenceReader

Bases: FrameReader

Reads an ordered image sequence (a directory, glob, or explicit file list).

Decode-thread count is fixed at construction. Frames are decoded in parallel across threads via OpenCV; the result is host (T, H, W, 3) uint8 RGB. Image sequences carry no frame rate, so :meth:fps is the inherited None.

Source code in src/deeperfly/io/images.py
class ImageSequenceReader(FrameReader):
    """Reads an ordered image sequence (a directory, glob, or explicit file list).

    Decode-thread count is fixed at construction. Frames are decoded in parallel
    across threads via OpenCV; the result is host ``(T, H, W, 3)`` uint8 RGB.
    Image sequences carry no frame rate, so :meth:`fps` is the inherited ``None``.
    """

    def __init__(
        self,
        files,
        *,
        workers: int | None = None,
    ) -> None:
        self.files = [Path(f) for f in files]
        self.workers = workers

    @classmethod
    def from_pattern(
        cls,
        pattern: str | Path,
        *,
        workers: int | None = None,
    ) -> ImageSequenceReader:
        """Build a reader for a directory or glob, listing/sorting its files by name."""
        return cls(list_image_files(pattern), workers=workers)

    # -- decode (parallel per-file, CPU) -------------------------------------

    @staticmethod
    def _to_rgb_uint8(arr: np.ndarray) -> np.ndarray:
        """Coerce a decoded image to ``(H, W, 3)`` uint8 (grayscale broadcast, alpha dropped)."""
        if arr.ndim == 2:  # grayscale (H, W) -> (H, W, 1)
            arr = arr[..., None]
        if arr.shape[-1] == 1:  # single channel -> 3 (broadcast, not a width slice!)
            arr = np.repeat(arr, 3, axis=-1)
        arr = arr[..., :3]  # drop alpha / extra channels
        return arr if arr.dtype == np.uint8 else np.clip(arr, 0, 255).astype(np.uint8)

    def _n_workers(self, n: int) -> int:
        return max(1, min(n, self.workers or (os.cpu_count() or 4)))

    def _decode(self, files: list[Path]) -> np.ndarray:
        """Parallel CPU decode of ``files`` -> ``(T, H, W, 3)`` uint8 NumPy."""
        import cv2

        def decode(f: Path) -> np.ndarray:
            img = cv2.imread(str(f), cv2.IMREAD_COLOR_RGB)  # (H, W, 3) RGB uint8
            if img is not None:
                return self._to_rgb_uint8(img)
            raise OSError(f"failed to decode image: {f} (OpenCV returned None)")

        with ThreadPoolExecutor(max_workers=self._n_workers(len(files))) as pool:
            frames = list(pool.map(decode, files))
        return np.stack(frames)

    def __getitem__(self, key: int | list[int] | slice) -> Float[np.ndarray, "..."]:
        if isinstance(key, int):
            out = self._decode([self.files[key]])[0]
        elif isinstance(key, list):
            if not key:
                raise ValueError("index list must be non-empty")
            out = self._decode([self.files[int(i)] for i in key])
        elif isinstance(key, slice):
            files = self.files[key]
            if not files:
                raise ValueError("no frames selected (check slice)")
            out = self._decode(files)
        else:
            raise TypeError(f"invalid index type {type(key).__name__!r}")
        log.debug("read images -> %s", out.shape)
        return out

    def stream_frames(
        self,
        *,
        start: int = 0,
        stop: int | None = None,
        step: int = 1,
    ) -> Iterator[Float[np.ndarray, "H W 3"]]:
        for f in self.files[start:stop:step]:
            yield self._decode([f])[0]

    def stream_blocks(
        self,
        *,
        start: int = 0,
        stop: int | None = None,
        step: int = 1,
        block_size: int = 64,
    ) -> Iterator[Float[np.ndarray, "T H W 3"]]:
        if block_size < 1:
            raise ValueError(f"block_size must be >= 1, got {block_size}")
        files = self.files[start:stop:step]
        for pos in range(0, len(files), block_size):
            yield self._decode(files[pos : pos + block_size])

    def count(self) -> int | None:
        return len(self.files)  # image sequence: one frame per file

from_pattern classmethod

from_pattern(
    pattern: str | Path, *, workers: int | None = None
) -> ImageSequenceReader

Build a reader for a directory or glob, listing/sorting its files by name.

Source code in src/deeperfly/io/images.py
@classmethod
def from_pattern(
    cls,
    pattern: str | Path,
    *,
    workers: int | None = None,
) -> ImageSequenceReader:
    """Build a reader for a directory or glob, listing/sorting its files by name."""
    return cls(list_image_files(pattern), workers=workers)

VideoReader

Bases: FrameReader

Frame-accurate decode of a single video file via PyAV.

Sequential reads walk the file forward; indexing with a list seeks per target frame (keyframe + decode forward). count / fps read container metadata -- both cheap, no full pixel decode.

Source code in src/deeperfly/io/video.py
class VideoReader(FrameReader):
    """Frame-accurate decode of a single video file via PyAV.

    Sequential reads walk the file forward; indexing with a list seeks per target
    frame (keyframe + decode forward). ``count`` / ``fps`` read container metadata
    -- both cheap, no full pixel decode.
    """

    def __init__(self, path: str | Path) -> None:
        self.path = Path(path)

    # -- decode (in-process FFmpeg, CPU) -------------------------------------

    def _decode_stream(self, *, start=0, step=1, stop=None):
        """Yield ``(H, W, 3)`` uint8 RGB frames from one forward open-and-walk decode."""
        import av

        with av.open(str(self.path)) as container:
            for i, frame in enumerate(container.decode(video=0)):
                if i < start:
                    continue
                if stop is not None and i >= stop:
                    break
                if (i - start) % step == 0:
                    yield frame.to_ndarray(format="rgb24")

    def _decode_range(self, start, stop, step) -> np.ndarray:
        """Decode ``range(start, stop, step)`` to a stacked ``(T, H, W, 3)`` array."""
        out = list(self._decode_stream(start=start, step=step, stop=stop))
        if not out:
            raise ValueError(f"pyav decoded no frames from {str(self.path)!r}")
        return np.stack(out)

    def _decode_indices(self, indices) -> np.ndarray:
        """Random access: seek to the keyframe at/before each target, decode forward to it.

        Recovers each frame's index from its PTS and returns the frames in the order
        ``indices`` requests.
        """
        import av

        picked: dict[int, np.ndarray] = {}
        with av.open(str(self.path)) as container:
            stream = container.streams.video[0]
            rate = stream.average_rate or stream.guessed_rate
            time_base = stream.time_base
            assert rate is not None and time_base is not None
            for target in sorted(set(indices)):
                # PTS (in time_base units) of the target frame; seek to its keyframe.
                ts = int(target / rate / time_base)
                container.seek(ts, stream=stream, backward=True, any_frame=False)
                for frame in container.decode(stream):
                    assert frame.pts is not None
                    idx = int(round(float(frame.pts * time_base * rate)))
                    if idx >= target:
                        picked[target] = frame.to_ndarray(format="rgb24")
                        break
        try:
            return np.stack([picked[int(i)] for i in indices])
        except KeyError as exc:  # a seek overshot / frame missing
            raise ValueError(
                f"pyav could not seek to frame {exc} of {str(self.path)!r}"
            ) from None

    def __getitem__(self, key: int | list[int] | slice) -> Float[np.ndarray, "..."]:
        if isinstance(key, int):
            out = self._decode_range(key, key + 1, 1)[0]
        elif isinstance(key, list):
            idx = [int(i) for i in key]
            if not idx:
                raise ValueError("index list must be non-empty")
            out = self._decode_indices(idx)
        elif isinstance(key, slice):
            start, stop, step = key.start or 0, key.stop, key.step or 1
            out = self._decode_range(int(start), stop, int(step))
        else:
            raise TypeError(f"invalid index type {type(key).__name__!r}")
        log.debug(
            "read video %s via pyav -> %s",
            self.path.name,
            out.shape,
        )
        return out

    def stream_frames(
        self,
        *,
        start: int = 0,
        stop: int | None = None,
        step: int = 1,
    ) -> Iterator[Float[np.ndarray, "H W 3"]]:
        yield from self._decode_stream(start=start, stop=stop, step=step)

    def stream_blocks(
        self,
        *,
        start: int = 0,
        stop: int | None = None,
        step: int = 1,
        block_size: int = 64,
    ) -> Iterator[Float[np.ndarray, "T H W 3"]]:
        if block_size < 1:
            raise ValueError(f"block_size must be >= 1, got {block_size}")
        buf: list[np.ndarray] = []
        for frame in self._decode_stream(start=start, stop=stop, step=step):
            buf.append(frame)
            if len(buf) >= block_size:
                yield np.stack(buf)
                buf = []
        if buf:
            yield np.stack(buf)

    # -- metadata probes (container, no pixel decode) ------------------------

    def count(self) -> int | None:
        """Frame count from the container header, or ``None`` if it is absent.

        Some containers (raw / transport streams, some MKV) omit ``nb_frames``;
        an exact count then needs a full decode, so this returns ``None`` rather
        than a ``duration * fps`` estimate.
        """
        import av

        try:
            with av.open(str(self.path)) as container:
                n = container.streams.video[0].frames
        except Exception:  # unreadable / unsupported container -> unknown
            return None
        return int(n) if n and n > 0 else None

    def fps(self) -> float | None:
        """Average frame rate from the container header, or ``None`` if unavailable."""
        import av

        try:
            with av.open(str(self.path)) as container:
                stream = container.streams.video[0]
                rate = (
                    stream.average_rate or stream.guessed_rate
                )  # match _decode_indices
        except Exception:  # unreadable / unsupported container -> unknown
            return None
        return float(rate) if rate else None

count

count() -> int | None

Frame count from the container header, or None if it is absent.

Some containers (raw / transport streams, some MKV) omit nb_frames; an exact count then needs a full decode, so this returns None rather than a duration * fps estimate.

Source code in src/deeperfly/io/video.py
def count(self) -> int | None:
    """Frame count from the container header, or ``None`` if it is absent.

    Some containers (raw / transport streams, some MKV) omit ``nb_frames``;
    an exact count then needs a full decode, so this returns ``None`` rather
    than a ``duration * fps`` estimate.
    """
    import av

    try:
        with av.open(str(self.path)) as container:
            n = container.streams.video[0].frames
    except Exception:  # unreadable / unsupported container -> unknown
        return None
    return int(n) if n and n > 0 else None

fps

fps() -> float | None

Average frame rate from the container header, or None if unavailable.

Source code in src/deeperfly/io/video.py
def fps(self) -> float | None:
    """Average frame rate from the container header, or ``None`` if unavailable."""
    import av

    try:
        with av.open(str(self.path)) as container:
            stream = container.streams.video[0]
            rate = (
                stream.average_rate or stream.guessed_rate
            )  # match _decode_indices
    except Exception:  # unreadable / unsupported container -> unknown
        return None
    return float(rate) if rate else None

VideoWriter

Incremental H.264 (libx264) MP4 encoder, backed by PyAV.

Open it, feed frames, close it (or use it as a context manager). :meth:write_frame appends one (H, W, 3) frame; :meth:write_frames appends a whole (T, H, W, 3) array or any iterable of frames / blocks -- so a long clip can be encoded as it is produced, without ever holding every frame in memory:

with VideoWriter("out.mp4", fps=30) as writer: ... for frame in render(): # a (H, W, 3) frame ... writer.write_frame(frame)

The container and stream are opened lazily on the first frame (its size sets the encode dimensions, rounded down to even for yuv420p subsampling); later frames are cropped to match. Non-uint8 input is clipped to [0, 255].

Source code in src/deeperfly/io/video.py
class VideoWriter:
    """Incremental H.264 (libx264) MP4 encoder, backed by PyAV.

    Open it, feed frames, close it (or use it as a context manager).
    :meth:`write_frame` appends one ``(H, W, 3)`` frame; :meth:`write_frames`
    appends a whole ``(T, H, W, 3)`` array or any iterable of frames / blocks -- so
    a long clip can be encoded as it is produced, without ever holding every frame
    in memory:

    >>> with VideoWriter("out.mp4", fps=30) as writer:
    ...     for frame in render():          # a (H, W, 3) frame
    ...         writer.write_frame(frame)

    The container and stream are opened lazily on the first frame (its size sets the
    encode dimensions, rounded down to even for ``yuv420p`` subsampling); later
    frames are cropped to match. Non-``uint8`` input is clipped to ``[0, 255]``.
    """

    def __init__(
        self,
        path: str | Path,
        fps: float = 30.0,
        *,
        codec: str | None = None,
        pix_fmt: str = "yuv420p",
    ) -> None:
        self.path = Path(path)
        self.fps = fps
        self.codec = codec
        self.pix_fmt = pix_fmt
        self._container: Any = None  # av.container.OutputContainer (lazy, on 1st frame)
        self._stream: Any = None  # av.video.stream.VideoStream
        self._size: tuple[int, int] | None = None  # (w, h), even

    def __enter__(self) -> VideoWriter:
        return self

    def __exit__(self, *exc) -> None:
        self.close()

    def _open(self, width: int, height: int) -> None:
        import av

        w, h = width & ~1, height & ~1  # yuv420p subsampling needs even dimensions
        rate = Fraction(self.fps).limit_denominator(1_000_000)
        self._container = av.open(str(self.path), mode="w")
        stream = self._container.add_stream(self.codec or "libx264", rate=rate)
        stream.width = w
        stream.height = h
        stream.pix_fmt = self.pix_fmt
        self._stream = stream
        self._size = (w, h)
        log.info("writing %s via pyav: %dx%d @ %g fps", self.path.name, w, h, self.fps)

    def write_frame(self, frame) -> None:
        """Append a single ``(H, W, 3)`` frame (non-``uint8`` is clipped to ``[0, 255]``).

        The first frame's size sets the encode dimensions (rounded down to even for
        ``yuv420p`` subsampling); later frames are cropped to match.

        Parameters
        ----------
        frame
            One ``(H, W, 3)`` RGB frame (NumPy, or a torch / DLPack array).
        """
        import av

        frame = to_numpy(frame)
        if frame.dtype != np.uint8:
            frame = np.clip(frame, 0, 255).astype(np.uint8)
        if self._stream is None:
            self._open(frame.shape[1], frame.shape[0])
        assert self._stream is not None and self._size is not None
        w, h = self._size
        vframe = av.VideoFrame.from_ndarray(
            np.ascontiguousarray(frame[:h, :w]), format="rgb24"
        )
        for packet in self._stream.encode(vframe):
            self._container.mux(packet)

    def write_frames(self, frames) -> None:
        """Append many frames: a ``(T, H, W, 3)`` batch, or any iterable of frames.

        Accepts a NumPy array (each frame along axis 0), a torch / DLPack batch, or
        any iterable of frames or blocks (e.g. a generator) -- so frames can be
        encoded as they arrive, without holding the whole clip in memory.

        Parameters
        ----------
        frames
            A batch, or an iterable of frames / batches (non-``uint8`` is clipped).
        """
        if isinstance(frames, np.ndarray):
            if frames.ndim == 4:
                for frame in frames:
                    self.write_frame(frame)
            else:
                self.write_frame(frames)
            return
        if hasattr(frames, "detach") or hasattr(frames, "__dlpack__"):  # torch/array
            self.write_frames(to_numpy(frames))
            return
        for item in frames:  # a list / tuple / generator of frames (or batches)
            self.write_frames(item)

    def close(self) -> None:
        """Flush the encoder and close the file (idempotent)."""
        if self._container is None:
            return
        try:
            for packet in self._stream.encode():  # flush
                self._container.mux(packet)
        finally:
            self._container.close()
            self._container = None
            self._stream = None

write_frame

write_frame(frame) -> None

Append a single (H, W, 3) frame (non-uint8 is clipped to [0, 255]).

The first frame's size sets the encode dimensions (rounded down to even for yuv420p subsampling); later frames are cropped to match.

Parameters:

Name Type Description Default
frame

One (H, W, 3) RGB frame (NumPy, or a torch / DLPack array).

required
Source code in src/deeperfly/io/video.py
def write_frame(self, frame) -> None:
    """Append a single ``(H, W, 3)`` frame (non-``uint8`` is clipped to ``[0, 255]``).

    The first frame's size sets the encode dimensions (rounded down to even for
    ``yuv420p`` subsampling); later frames are cropped to match.

    Parameters
    ----------
    frame
        One ``(H, W, 3)`` RGB frame (NumPy, or a torch / DLPack array).
    """
    import av

    frame = to_numpy(frame)
    if frame.dtype != np.uint8:
        frame = np.clip(frame, 0, 255).astype(np.uint8)
    if self._stream is None:
        self._open(frame.shape[1], frame.shape[0])
    assert self._stream is not None and self._size is not None
    w, h = self._size
    vframe = av.VideoFrame.from_ndarray(
        np.ascontiguousarray(frame[:h, :w]), format="rgb24"
    )
    for packet in self._stream.encode(vframe):
        self._container.mux(packet)

write_frames

write_frames(frames) -> None

Append many frames: a (T, H, W, 3) batch, or any iterable of frames.

Accepts a NumPy array (each frame along axis 0), a torch / DLPack batch, or any iterable of frames or blocks (e.g. a generator) -- so frames can be encoded as they arrive, without holding the whole clip in memory.

Parameters:

Name Type Description Default
frames

A batch, or an iterable of frames / batches (non-uint8 is clipped).

required
Source code in src/deeperfly/io/video.py
def write_frames(self, frames) -> None:
    """Append many frames: a ``(T, H, W, 3)`` batch, or any iterable of frames.

    Accepts a NumPy array (each frame along axis 0), a torch / DLPack batch, or
    any iterable of frames or blocks (e.g. a generator) -- so frames can be
    encoded as they arrive, without holding the whole clip in memory.

    Parameters
    ----------
    frames
        A batch, or an iterable of frames / batches (non-``uint8`` is clipped).
    """
    if isinstance(frames, np.ndarray):
        if frames.ndim == 4:
            for frame in frames:
                self.write_frame(frame)
        else:
            self.write_frame(frames)
        return
    if hasattr(frames, "detach") or hasattr(frames, "__dlpack__"):  # torch/array
        self.write_frames(to_numpy(frames))
        return
    for item in frames:  # a list / tuple / generator of frames (or batches)
        self.write_frames(item)

close

close() -> None

Flush the encoder and close the file (idempotent).

Source code in src/deeperfly/io/video.py
def close(self) -> None:
    """Flush the encoder and close the file (idempotent)."""
    if self._container is None:
        return
    try:
        for packet in self._stream.encode():  # flush
            self._container.mux(packet)
    finally:
        self._container.close()
        self._container = None
        self._stream = None

is_video_file

is_video_file(path: str | Path) -> bool

Whether path is an existing video file (decoded as a video, not an image directory/glob/sequence).

Source code in src/deeperfly/io/base.py
def is_video_file(path: str | Path) -> bool:
    """Whether ``path`` is an existing video file (decoded as a video, not an image
    directory/glob/sequence)."""
    p = Path(path)
    return p.is_file() and p.suffix.lower() in VIDEO_EXTS

to_numpy

to_numpy(frames) -> np.ndarray

Collapse decoded frames (NumPy / torch tensor) to a NumPy array.

Parameters:

Name Type Description Default
frames

A NumPy array or a torch tensor (or any array-like).

required

Returns:

Type Description
ndarray

The frames as a host NumPy array.

Source code in src/deeperfly/io/base.py
def to_numpy(frames) -> np.ndarray:
    """Collapse decoded frames (NumPy / torch tensor) to a NumPy array.

    Parameters
    ----------
    frames
        A NumPy array or a torch tensor (or any array-like).

    Returns
    -------
    np.ndarray
        The frames as a host NumPy array.
    """
    if isinstance(frames, np.ndarray):
        return frames
    if hasattr(frames, "detach"):  # torch.Tensor
        return frames.detach().cpu().numpy()
    return np.asarray(frames)

to_torch

to_torch(frames)

Hand frames to torch, zero-copy where possible.

A torch.Tensor passes through untouched, any other DLPack-capable array is wrapped via the DLPack protocol, and NumPy input (what the PyAV reader returns) is wrapped on the host via zero-copy torch.from_numpy.

Parameters:

Name Type Description Default
frames

A torch tensor, a DLPack-capable array, or a NumPy array.

required

Returns:

Type Description
Tensor

The frames as a torch tensor (zero-copy where possible).

Source code in src/deeperfly/io/base.py
def to_torch(frames):
    """Hand frames to torch, zero-copy where possible.

    A ``torch.Tensor`` passes through untouched, any other DLPack-capable array is
    wrapped via the DLPack protocol, and NumPy input (what the PyAV reader returns)
    is wrapped on the host via zero-copy ``torch.from_numpy``.

    Parameters
    ----------
    frames
        A torch tensor, a DLPack-capable array, or a NumPy array.

    Returns
    -------
    torch.Tensor
        The frames as a torch tensor (zero-copy where possible).
    """
    import torch

    if isinstance(frames, torch.Tensor):
        return frames
    if hasattr(frames, "__dlpack__"):  # DLPack-capable array
        return torch.from_dlpack(frames)
    return torch.from_numpy(to_numpy(frames))

list_image_files

list_image_files(pattern: str | Path) -> list[Path]

Sorted image files for a directory or glob pattern (by name).

Parameters:

Name Type Description Default
pattern str | Path

A directory of images, or a glob pattern.

required

Returns:

Type Description
list of Path

The matching image files, sorted by name.

Raises:

Type Description
FileNotFoundError

If nothing matches pattern.

Source code in src/deeperfly/io/images.py
def list_image_files(pattern: str | Path) -> list[Path]:
    """Sorted image files for a directory or glob pattern (by name).

    Parameters
    ----------
    pattern
        A directory of images, or a glob pattern.

    Returns
    -------
    list of Path
        The matching image files, sorted by name.

    Raises
    ------
    FileNotFoundError
        If nothing matches ``pattern``.
    """
    p = Path(pattern)
    if p.is_dir():
        files = sorted(f for f in p.iterdir() if f.suffix.lower() in IMAGE_EXTS)
    else:
        files = sorted(Path(f) for f in glob.glob(str(pattern)))
    if not files:
        raise FileNotFoundError(f"no images matched {pattern!r}")
    return files

open_reader

open_reader(
    source: str | Path | list[Path],
    *,
    workers: int | None = None,
) -> FrameReader

Open the right :class:FrameReader for a footage source.

Dispatches on source (the one place this dispatch lives):

  • a single video file (.mp4 / .avi / .mov ...) -> a :class:~deeperfly.io.video.VideoReader (PyAV);
  • a directory or glob of images -> an :class:~deeperfly.io.images.ImageSequenceReader (OpenCV, workers sets decode parallelism);
  • an explicit list of footage files -- one video file, or an ordered image sequence the caller has already resolved (deeperfly run resolves each camera's files up front, naturally sorted) -- is read in the given order without re-listing the directory.

Parameters:

Name Type Description Default
source str | Path | list[Path]

A video file, an image directory/glob, or an explicit list of footage files (one video, or an ordered image sequence).

required
workers int | None

Decode thread count for image sequences.

None

Returns:

Type Description
FrameReader

A :class:~deeperfly.io.video.VideoReader or :class:~deeperfly.io.images.ImageSequenceReader.

Raises:

Type Description
ValueError

If an explicit file list is empty.

Source code in src/deeperfly/io/__init__.py
def open_reader(
    source: str | Path | list[Path],
    *,
    workers: int | None = None,
) -> FrameReader:
    """Open the right :class:`FrameReader` for a footage source.

    Dispatches on ``source`` (the one place this dispatch lives):

    - a single video file (``.mp4`` / ``.avi`` / ``.mov`` ...) -> a
      :class:`~deeperfly.io.video.VideoReader` (PyAV);
    - a directory or glob of images -> an
      :class:`~deeperfly.io.images.ImageSequenceReader` (OpenCV, ``workers`` sets
      decode parallelism);
    - an explicit list of footage files -- one video file, or an ordered image
      sequence the caller has already resolved (``deeperfly run`` resolves each
      camera's files up front, naturally sorted) -- is read in the given order
      without re-listing the directory.

    Parameters
    ----------
    source
        A video file, an image directory/glob, or an explicit list of footage
        files (one video, or an ordered image sequence).
    workers
        Decode thread count for image sequences.

    Returns
    -------
    FrameReader
        A :class:`~deeperfly.io.video.VideoReader` or
        :class:`~deeperfly.io.images.ImageSequenceReader`.

    Raises
    ------
    ValueError
        If an explicit file list is empty.
    """
    if isinstance(source, (list, tuple)):
        files = [Path(f) for f in source]
        if not files:
            raise ValueError("open_reader got an empty file list")
        # A camera's video footage is a single file (the resolver keeps just the
        # first when several match), so decode that one.
        if is_video_file(files[0]):
            return VideoReader(files[0])
        return ImageSequenceReader(files, workers=workers)
    p = Path(source)
    if is_video_file(p):
        return VideoReader(p)
    return ImageSequenceReader.from_pattern(source, workers=workers)