The deeperfly pipeline, step by step¶
This notebook reproduces what deeperfly run does, but one module at a time,
so every intermediate output is visible and plotted. We run on the synchronized
7-camera recording in examples/data/ (one MP4 per camera, camera_0.mp4 …
camera_6.mp4).
deeperfly run is driven entirely by one config. The packaged default
(Config.default(), the data/default_config.toml used when you pass no -c)
already points its footage sources at this recording, so we load it and let it
wire everything together:
- the detection plan (
Config.detection_plan) — the config-driven description of what to detect: named footage sources, reusable preprocessors (mirror/crop/resize), detector models, and the pathways (source → preprocessor → model) that run them. A separate[pose2d.output_points]table says where each pathway's output channels land in the(view, skeleton-point)grid. - the camera rig (
Config.camera_group) and skeleton (Config.skeleton). pose2dmodels +inference.detect_sequence— preprocess → heatmaps → map each pathway's peaks back into its view → a(V, T, 38, 2)2D pose.pipeline.run_from_points2d— bundle adjustment → triangulation (DLT) → a savedresults.PoseResult.
Two things to note about the design: there is no separate visibility mask and
no special-cased front camera. A (view, point) that no pathway writes simply
stays NaN — that is the visibility mask. The front camera is just one source
feeding two pathways (one mirrored), one reading its right legs and one its
left. We unfold all of this below and visualize each stage with Plotly — the 3D
views and image overlays stay interactive in the rendered docs; the final cells
call run_from_points2d to show it is the same thing in one call.
Setup¶
Import the precise pieces the CLI composes, and load the packaged default config.
N_FRAMES controls how much of the 64-frame recording we process — small enough
to run quickly, large enough for bundle adjustment to have something to chew on.
import base64
from collections import defaultdict
from pathlib import Path
import cv2
import numpy as np
import plotly.graph_objects as go
from IPython.display import HTML, display
from plotly.subplots import make_subplots
# The exact modules `deeperfly run` composes:
from deeperfly import Config, io
from deeperfly.pipeline import (
bundle_adjust_cameras,
run_from_points2d,
)
from deeperfly.pose2d import inference
from deeperfly.pose2d.pathways import (
normalized_peaks_to_original_pixels,
route_channels_to_points_in_views,
)
from deeperfly.pose2d.stream import load_models
from deeperfly.results import PoseResult
from deeperfly.triangulation import reprojection_error, triangulate
from deeperfly.visualization._palette import point_colors_rgb
# deeperfly's own rendering is headless OpenCV (no plotting deps); the plots below
# are drawn with Plotly directly for the walkthrough so the 3D views and image
# overlays stay interactive on the rendered docs site.
# Theme-neutral palette: a mid-gray for all text and axis lines reads on both the
# light and dark docs themes (no black-on-black), and the figure backgrounds are
# left transparent so the page theme shows through.
FG = "#888888"
GRID = "rgba(136, 136, 136, 0.25)"
def show(fig):
"Style a Plotly figure for both light/dark backgrounds, then render it as self-contained HTML."
fig.update_layout(
paper_bgcolor="rgba(0, 0, 0, 0)",
font_color=FG,
title_x=0.5,
title_xanchor="center",
title_font_color=FG,
)
if (
fig.layout.plot_bgcolor is None
): # leave intentionally-colored plots (e.g. masks) alone
fig.update_layout(plot_bgcolor="rgba(0, 0, 0, 0)")
fig.update_xaxes(color=FG, gridcolor=GRID, zerolinecolor=GRID)
fig.update_yaxes(color=FG, gridcolor=GRID, zerolinecolor=GRID)
# 3D scenes: drop the opaque background panes (jarring on a dark page) but keep
# faint gridlines for depth; no-op for figures without a scene.
fig.update_scenes(
xaxis=dict(showbackground=False, gridcolor=GRID, color=FG),
yaxis=dict(showbackground=False, gridcolor=GRID, color=FG),
zaxis=dict(showbackground=False, gridcolor=GRID, color=FG),
)
fig.update_annotations(font_color=FG) # subplot titles
display(
HTML(
fig.to_html(
include_plotlyjs="cdn", full_html=False, config={"responsive": True}
)
)
)
def rgb(c):
"An RGB float triple in [0, 1] -> a Plotly 'rgb(r,g,b)' string."
r, g, b = (int(round(255 * float(x))) for x in c[:3])
return f"rgb({r},{g},{b})"
def _u8(img):
"Coerce an image to uint8 (float images are assumed to be in [0, 1])."
a = np.asarray(img)
return a if a.dtype == np.uint8 else (np.clip(a, 0, 1) * 255).astype(np.uint8)
def img_uri(img, quality=85):
"An (H, W, 3) image -> a compact base64 JPEG data URI (frames are photographic; overlays are vector traces on top)."
bgr = cv2.cvtColor(np.ascontiguousarray(_u8(img)), cv2.COLOR_RGB2BGR)
_, buf = cv2.imencode(".jpg", bgr, [cv2.IMWRITE_JPEG_QUALITY, quality])
return "data:image/jpeg;base64," + base64.b64encode(buf).decode()
def add_image(fig, img, row, col, ncols):
"Place an image into subplot (row, col) of an ncols-wide grid, with square top-down pixel axes."
h, w = np.asarray(img).shape[:2]
n = (row - 1) * ncols + col
suffix = "" if n == 1 else str(n)
fig.add_layout_image(
dict(
source=img_uri(img),
x=0,
y=0,
sizex=w,
sizey=h,
sizing="stretch",
layer="below",
),
row=row,
col=col,
)
fig.update_xaxes(visible=False, range=[0, w], constrain="domain", row=row, col=col)
fig.update_yaxes(
visible=False,
range=[h, 0],
scaleanchor=f"x{suffix}",
scaleratio=1,
constrain="domain",
row=row,
col=col,
)
# Locate the repo root (so the notebook works regardless of the launch dir).
REPO = Path.cwd()
while not (REPO / "pyproject.toml").exists() and REPO != REPO.parent:
REPO = REPO.parent
DATA_DIR = REPO / "examples" / "data"
# The packaged default config -- exactly what `deeperfly run` uses with no `-c`.
# It is self-contained (detection plan + cameras + skeleton + pipeline) and its
# footage sources already map onto camera_0.mp4 .. camera_6.mp4 in examples/data.
config = Config.default()
N_FRAMES = 60 # frames to process (each example video has 64)
FPS = 100.0 # `deeperfly run` detects this from the videos; we set it directly
patterns = config.source_patterns() # source name -> footage glob
first_file = DATA_DIR / next(iter(patterns.values()))
n_avail = io.open_reader(first_file).count()
print(f"repo: {REPO}")
print(f"data: {DATA_DIR} ({n_avail} frames/camera available)")
print(f"sources: {patterns}")
repo: /home/tlam/deeperfly
data: /home/tlam/deeperfly/examples/data (64 frames/camera available)
sources: {'vid_rh': 'camera_0.mp4', 'vid_rm': 'camera_1.mp4', 'vid_rf': 'camera_2.mp4', 'vid_f': 'camera_3.mp4', 'vid_lf': 'camera_4.mp4', 'vid_lm': 'camera_5.mp4', 'vid_lh': 'camera_6.mp4'}
Step 1 — the camera rig and the skeleton¶
Config.detection_plan parses the plan and resolves the views (the geometric
cameras under [cameras.*]) — the V axis of every points array. Config.skeleton
is the rig-independent description of what is tracked: 38 points (left 0..18,
right 19..37), grouped into limbs and joined by bones. Config.camera_group
builds the rig geometry from the same TOML.
The config leaves principal_point_px unset, so we hand camera_group each
camera's (H, W) and it places the principal point at the image center — exactly
what deeperfly run does from the footage (these frames are 960×480). The
bundle-adjustment options come straight off config.bundle_adjustment: the
leg-only points_to_use (point names) that drive bundle adjustment and the
fixed/shared parameters that anchor the world gauge.
plan = config.detection_plan()
skeleton = config.skeleton()
view_names = plan.view_names # the V axis order, from [cameras.*]
# The frames are 960x480 and the config leaves principal_point_px unset, so pass
# each camera's (H, W) and camera_group infers the principal point as the center.
probe = np.asarray(io.open_reader(first_file)[:1])[0]
H, W = probe.shape[:2]
image_sizes = {name: (H, W) for name in view_names}
cameras = config.camera_group(image_sizes=image_sizes)
# Bundle-adjustment options, mapped to `bundle_adjust_cameras`'s kwargs the same way
# `deeperfly run` does: the leg-only `points_to_use` (point *names*) resolved to
# the `ba_keypoints` indices that drive bundle adjustment, the `fixed`/`shared` world
# gauge, and the scipy least_squares kwargs (max_nfev, loss).
ba = config.bundle_adjustment
bundle_adjust_kwargs = {"fixed": ba.fixed, "shared": ba.shared, **ba.least_squares}
if ba.points_to_use is not None:
point_index = {name: i for i, name in enumerate(skeleton.point_names)}
bundle_adjust_kwargs["ba_keypoints"] = [point_index[n] for n in ba.points_to_use]
print(
f"skeleton: {skeleton.name!r} {skeleton.n_points} points, "
f"{len(skeleton.bones)} bones, {skeleton.n_limbs} limbs"
)
print(f"limbs: {skeleton.limb_names}")
print(f"views: {view_names}")
print(f"frame size (H, W) = {(H, W)}; principal point -> {[(W - 1) / 2, (H - 1) / 2]}")
for c in cameras:
print(f" {c.name:>3}: center={np.round(c.position, 1)} focal={c.intr[0]:.0f}px")
skeleton: 'fly38' 38 points, 28 bones, 10 limbs
limbs: ('lf_leg', 'lm_leg', 'lh_leg', 'l_antenna', 'l_abdomen', 'rf_leg', 'rm_leg', 'rh_leg', 'r_antenna', 'r_abdomen')
views: ['rh', 'rm', 'rf', 'f', 'lf', 'lm', 'lh']
frame size (H, W) = (480, 960); principal point -> [479.5, 239.5]
rh: center=[-53.7 -93.1 -0. ] focal=22388px
rm: center=[ 0. -107.5 -0. ] focal=22388px
rf: center=[ 76. -76. -0.] focal=22388px
f: center=[107.5 0. 0. ] focal=22388px
lf: center=[76. 76. 0.] focal=22388px
lm: center=[ 0. 107.5 0. ] focal=22388px
lh: center=[-53.7 93.1 0. ] focal=22388px
from deeperfly.cameras import Camera
def plot_camera(camera: Camera, fig, length=None, **kwargs):
"Add a camera to a 3D figure as an RGB axis triad at its world center."
if length is None:
length = np.linalg.norm(camera.tvec) * 0.2
for axis, color in zip(camera.rmat, ("red", "green", "blue")):
tip = camera.position + axis * length
fig.add_scatter3d(
x=[camera.position[0], tip[0]],
y=[camera.position[1], tip[1]],
z=[camera.position[2], tip[2]],
mode="lines",
line=dict(color=color, width=5),
showlegend=False,
hoverinfo="skip",
**kwargs,
)
# The 7 cameras orbit the world origin (where the fly sits). Plot their centers.
fig = go.Figure()
for c in cameras:
plot_camera(c, fig, length=10, opacity=0.8)
fig.add_scatter3d(
x=[c.position[0]],
y=[c.position[1]],
z=[c.position[2] + 12],
mode="text",
text=[c.name],
textfont=dict(size=12, color=FG),
showlegend=False,
hoverinfo="skip",
)
fig.add_scatter3d(
x=[0],
y=[0],
z=[0],
mode="markers",
marker=dict(color=FG, symbol="x", size=4),
showlegend=False,
hoverinfo="skip",
)
fig.update_layout(
title="Camera rig: centers orbit the world origin",
height=560,
margin=dict(l=0, r=0, t=40, b=0),
scene=dict(
xaxis_title="x",
yaxis_title="y",
zaxis_title="z",
aspectmode="data",
camera=dict(eye=dict(x=1.0, y=-1.7, z=0.9)),
),
)
show(fig)
Step 2 — the detection plan¶
The plan keeps four counts independent rather than fusing them at "one per camera":
- sources — named footage globs, each decoded once.
- preprocessors — reusable frame-op pipelines (here just
noflipand a left-rightfliplrmirror). - models — detector networks (here one stacked-hourglass
deepfly2d). - pathways — each a named
source → preprocessor → modelinference run.
Where a pathway's 19 output channels land is declared separately in the
[pose2d.output_points.<view>] tables, resolved into each pathway's (out_channel, view, point) mapping. The front source vid_f feeds two pathways: f_noflip
reads the right distal joints and f_fliplr (mirrored) reads the left — so the one
front image observes joints on both body sides. That front-camera bridge is
what lets bundle adjustment tie the otherwise-disjoint left and right cameras into a
single world frame (see Step 9).
from collections import Counter
print("views:", plan.view_names)
print("\nsources:")
for s in plan.sources:
print(f" {s.name:>7} <- {s.pattern}")
print("\npreprocessors:")
for name, t in plan.preprocessors.items():
print(f" {name:>7}: {t.to_json()}")
print("\nmodels:")
for name, spec in plan.models.items():
print(
f" {name}: class={spec.cls!r} input_size={spec.input_size} "
f"n_out_channels={spec.n_out_channels}"
)
print("\npathways (source -> preprocessor -> model -> views it fills):")
for pw in plan.pathways:
views = sorted({plan.view_names[v] for v in pw.mapping[:, 1]})
print(
f" {pw.name:>10}: {pw.source:>7} --{pw.preprocessor}--> {pw.model}"
f" -> {views} ({len(pw.mapping)} points)"
)
src_counts = Counter(pw.source for pw in plan.pathways)
multi = {s: c for s, c in src_counts.items() if c > 1}
print(f"\nsource feeding >1 pathway (the front bridge): {multi}")
views: ['rh', 'rm', 'rf', 'f', 'lf', 'lm', 'lh']
sources:
vid_rh <- camera_0.mp4
vid_rm <- camera_1.mp4
vid_rf <- camera_2.mp4
vid_f <- camera_3.mp4
vid_lf <- camera_4.mp4
vid_lm <- camera_5.mp4
vid_lh <- camera_6.mp4
preprocessors:
flip: [{'op': 'fliplr'}]
models:
deepfly2d: class='hourglass' input_size=(256, 512) n_out_channels=19
pathways (source -> preprocessor -> model -> views it fills):
rh: vid_rh --None--> deepfly2d -> ['rh'] (19 points)
rm: vid_rm --None--> deepfly2d -> ['rm'] (19 points)
rf: vid_rf --None--> deepfly2d -> ['rf'] (16 points)
f: vid_f --None--> deepfly2d -> ['f'] (7 points)
f_flip: vid_f --flip--> deepfly2d -> ['f'] (7 points)
lf_flip: vid_lf --flip--> deepfly2d -> ['lf'] (16 points)
lm_flip: vid_lm --flip--> deepfly2d -> ['lm'] (19 points)
lh_flip: vid_lh --flip--> deepfly2d -> ['lh'] (19 points)
source feeding >1 pathway (the front bridge): {'vid_f': 2}
Step 3 — load the detector model(s)¶
load_models loads every model the plan references into a name → LoadedModel
dict (downloading the cached DeepFly2D checkpoint on first use). A LoadedModel
owns its input contract — resize to its input_size (256×512) and subtract its
training mean — and runs the forward/decode on the GPU when one is available.
Every pathway forwards through the model named in its model key.
models = load_models(plan)
for name, m in models.items():
print(
f"{name}: device={m.device()} input_size={m.input_size} "
f"channels={m.n_out_channels} mean={m.spec.mean}"
)
deepfly2d: device=cuda:0 input_size=(256, 512) channels=19 mean=0.22
Step 4 — load synchronized frames¶
deeperfly run decodes each source once. We decode the first N_FRAMES of
each source's video with io.open_reader(...)[:N_FRAMES] — the same reader the
CLI uses — into a windows dict mapping each source name to a (T, H, W, 3)
array. For frame t, the synchronized views are windows[src][t] across sources.
# Decode the first N_FRAMES of each source's video into a (T, H, W, 3) window.
windows = {
name: np.asarray(io.open_reader(DATA_DIR / pat)[:N_FRAMES])
for name, pat in patterns.items()
}
print("windows:", {k: v.shape for k, v in windows.items()})
# Each view's footage comes from the source feeding it (plan.view_sources()).
view_src = plan.view_sources()
fig = make_subplots(
rows=2,
cols=4,
subplot_titles=[
f"{view_names[v]} <- {view_src[view_names[v]]}" if v < len(view_names) else ""
for v in range(8)
],
horizontal_spacing=0.01,
vertical_spacing=0.08,
)
for v, name in enumerate(view_names):
add_image(fig, windows[view_src[name]][0], row=v // 4 + 1, col=v % 4 + 1, ncols=4)
fig.update_annotations(font_size=11)
fig.update_layout(
title_text="Synchronized raw frame 0 across the 7 views",
height=320,
margin=dict(l=0, r=0, t=60, b=0),
)
show(fig)
windows: {'vid_rh': (60, 480, 960, 3), 'vid_rm': (60, 480, 960, 3), 'vid_rf': (60, 480, 960, 3), 'vid_f': (60, 480, 960, 3), 'vid_lf': (60, 480, 960, 3), 'vid_lm': (60, 480, 960, 3), 'vid_lh': (60, 480, 960, 3)}
Step 5 — preprocessing per pathway¶
Each pathway orients its source frame with its preprocessor (fliplr mirrors
far-side cameras so the fly faces the trained orientation; noflip is the
identity), then the model resizes to its 256×512 input and subtracts the
training mean. We run this per pathway, so the front source vid_f appears twice:
once un-flipped (its right-leg pathway) and once mirrored (its left-leg pathway).
Below, each pathway before and after preprocessing — note how the mirror makes
every view face the same way.
def pathway_label(pw):
"Short name for a pathway, e.g. 'f_fliplr (vid_f, fliplr)'."
return f"{pw.name} ({pw.source}, {pw.preprocessor})"
# Preprocess one frame per PATHWAY (the front source -> two pathways, one mirrored).
preprocessed_disp = []
for pw in plan.pathways:
model = models[pw.model]
oriented = pw.transform.apply(windows[pw.source][:1]) # (1, H, W, 3)
prepared = model.prepare(oriented) # (1, 3, Hh, Ww), on the model's device
img = prepared[0].float().cpu().numpy().transpose(1, 2, 0) + model.spec.mean
preprocessed_disp.append(np.clip(img, 0, 1))
n_pw = len(plan.pathways)
# 4x4 grid: each pathway occupies a (raw, preprocessed) pair of stacked cells.
titles = []
for block in range(2):
for kind in ("raw", "preprocessed"):
for k in range(4):
i = block * 4 + k
titles.append(
f"{pathway_label(plan.pathways[i])}, {kind}" if i < n_pw else ""
)
fig = make_subplots(
rows=4,
cols=4,
subplot_titles=titles,
horizontal_spacing=0.01,
vertical_spacing=0.06,
)
for i in range(n_pw):
block, k = divmod(i, 4)
add_image(
fig, windows[plan.pathways[i].source][0], row=block * 2 + 1, col=k + 1, ncols=4
)
add_image(fig, preprocessed_disp[i], row=block * 2 + 2, col=k + 1, ncols=4)
fig.update_annotations(font_size=10)
fig.update_layout(
title_text="preprocess() per pathway: the front source appears twice (right + flipped left)",
height=720,
margin=dict(l=0, r=0, t=70, b=0),
)
show(fig)
Step 6 — heatmaps¶
Each model's forward pass returns one heatmap per output channel (19 single-side channels) at the network's output resolution. The peak of each heatmap is the predicted joint location. We run every pathway's frame-0 input through its model and overlay the heatmaps onto each preprocessed pathway below.
heatmaps = []
for pw in plan.pathways:
model = models[pw.model]
oriented = pw.transform.apply(windows[pw.source][:1])
hm = model.predict_heatmaps(model.prepare(oriented))[0] # (J, Hh, Ww)
heatmaps.append(hm)
heatmaps = np.stack(heatmaps) # (P, J, Hh, Ww)
print("heatmaps:", heatmaps.shape, " (pathways, channels, Hh, Ww)")
heatmaps: (8, 19, 64, 128) (pathways, channels, Hh, Ww)
def channel_colors(pathway, skeleton, n_out):
"""RGB per model output channel, by the skeleton point each maps to.
A pathway's mapping is (out_channel, view, point) triples; color channel i
by its target point's limb color (gray for any channel the pathway drops).
"""
pts = point_colors_rgb(skeleton)
out = np.full((n_out, 3), 0.6) # unmapped channels -> gray
for i, _v, p in pathway.mapping:
out[i] = pts[p]
return out
n_out = models[plan.pathways[0].model].n_out_channels
fig = make_subplots(
rows=2,
cols=4,
subplot_titles=[
pathway_label(plan.pathways[i]) if i < n_pw else "" for i in range(8)
],
horizontal_spacing=0.01,
vertical_spacing=0.1,
)
for i in range(n_pw):
pw = plan.pathways[i]
colors = channel_colors(pw, skeleton, n_out)
base = preprocessed_disp[i] # (Hp, Wp, 3) float in [0, 1]
hp, wp = base.shape[:2]
# Flatten the gray image (at 0.5 over black) and every colored heatmap channel
# (each at 0.5) into one RGB image -- the same over-compositing matplotlib did
# with stacked semi-transparent imshow layers, done once in NumPy so the docs
# page carries a single small image per pathway instead of 19 image layers.
canvas = base * 0.5
for j in range(heatmaps.shape[1]):
alpha = np.clip(heatmaps[i, j] * 2, 0, 1)
alpha = cv2.resize(alpha, (wp, hp), interpolation=cv2.INTER_LINEAR)
eff = (0.5 * alpha)[..., None]
canvas = canvas * (1 - eff) + colors[j][None, None, :] * eff
add_image(fig, canvas, row=i // 4 + 1, col=i % 4 + 1, ncols=4)
fig.update_annotations(font_size=11)
fig.update_layout(
title_text="Stacked-hourglass heatmaps (one set per pathway)",
height=320,
margin=dict(l=0, r=0, t=60, b=0),
)
show(fig)
Step 7 — decode peaks, map back to the view, and scatter into the skeleton¶
heatmap_to_points takes the (sub-pixel-refined) peak of each heatmap — a
normalized (x, y) plus a confidence. normalized_peaks_to_original_pixels then
inverts the pathway's preprocessing (undoing the model's resize and any
mirror/crop) to put each peak back into raw view pixels, and
route_channels_to_points_in_views writes channel i into (view, point) per
the pathway's mapping. Because the front source's two pathways both target view
f, they land on the same row, filling both halves. In the plot below the
front view now carries both leg sets; every other view carries one.
out_pts = np.full((plan.n_views, plan.n_points, 2), np.nan)
out_conf = np.zeros((plan.n_views, plan.n_points))
for i, pw in enumerate(plan.pathways):
model = models[pw.model]
pn, cc = inference.heatmap_to_points(heatmaps[i]) # (J, 2) in [0,1], (J,)
src_hw = windows[pw.source].shape[1:3] # (H, W) of the raw source frame
raw_xy = normalized_peaks_to_original_pixels(
pn, pw.transform, model.input_size, src_hw
)
route_channels_to_points_in_views(raw_xy, cc, pw.mapping, out_pts, out_conf)
front = view_names.index("f")
print("assembled 2D points:", out_pts.shape)
print(
f"front view fills left half: {np.isfinite(out_pts[front, :19]).any()}, "
f"right half: {np.isfinite(out_pts[front, 19:]).any()}"
)
colors = point_colors_rgb(skeleton)
def add_bones(fig, xy, row, col):
"Overlay skeleton bones (grouped by limb color, NaN points left as gaps) onto subplot (row, col)."
segs = defaultdict(lambda: ([], []))
for a, b in skeleton.bones:
xs, ys = segs[rgb(colors[a])]
xs += [xy[a, 0], xy[b, 0], None]
ys += [xy[a, 1], xy[b, 1], None]
for color, (xs, ys) in segs.items():
fig.add_trace(
go.Scatter(
x=xs,
y=ys,
mode="lines",
line=dict(color=color, width=2),
showlegend=False,
hoverinfo="skip",
),
row=row,
col=col,
)
fig = make_subplots(
rows=2,
cols=4,
subplot_titles=[
(view_names[v] + (" - both sides" if v == front else ""))
if v < len(view_names)
else ""
for v in range(8)
],
horizontal_spacing=0.01,
vertical_spacing=0.08,
)
for v in range(len(view_names)):
r, c = v // 4 + 1, v % 4 + 1
add_image(fig, windows[view_src[view_names[v]]][0], row=r, col=c, ncols=4)
add_bones(fig, out_pts[v], row=r, col=c)
fig.update_annotations(font_size=11)
fig.update_layout(
title_text="Assembled 2D skeleton (the front view carries both leg sets)",
height=320,
margin=dict(l=0, r=0, t=60, b=0),
)
show(fig)
assembled 2D points: (7, 38, 2) front view fills left half: True, right half: True
Step 8 — detect the whole sequence¶
inference.detect_sequence repeats steps 5–7 for every frame in one fully-batched
pass over the windows, giving the full 2D result: pts2d of shape
(V, T, 38, 2) in pixels and conf of shape (V, T, 38). This is the array
deeperfly run feeds into the geometry pipeline (deeperfly.detect_2d wraps this
with streaming decode so memory stays bounded over long recordings).
pts2d, conf = inference.detect_sequence(plan, models, windows)
print("pts2d:", pts2d.shape, " conf:", conf.shape)
j, v = 23, view_names.index("rm") # rf claw seen by camera rm
fig = make_subplots(
rows=1,
cols=2,
column_widths=[0.55, 0.45],
horizontal_spacing=0.17, # leave a gap wide enough for the heatmap's colorbar
subplot_titles=(
"Mean detector confidence per (view, joint)",
f"{view_names[v]}: 2D track of {skeleton.point_names[j]}",
),
)
# Park the colorbar in the gap just right of the heatmap (derived from the
# subplot's own domain) so it never sits on top of the cells or the next plot.
cb_x = fig.layout.xaxis.domain[1] + 0.015
fig.add_trace(
go.Heatmap(
z=np.nanmean(conf, axis=1),
y=view_names,
zmin=0,
zmax=1,
colorscale="Viridis",
colorbar=dict(
title=dict(text="conf", side="top"),
len=0.9,
thickness=12,
x=cb_x,
xanchor="left",
),
hovertemplate="view %{y}<br>joint %{x}<br>conf %{z:.2f}<extra></extra>",
),
row=1,
col=1,
)
fig.update_yaxes(autorange="reversed", row=1, col=1) # view 0 on top, like imshow
fig.update_xaxes(title_text="joint index (0..18 left, 19..37 right)", row=1, col=1)
fig.add_trace(go.Scatter(y=pts2d[v, :, j, 0], mode="lines", name="x"), row=1, col=2)
fig.add_trace(go.Scatter(y=pts2d[v, :, j, 1], mode="lines", name="y"), row=1, col=2)
fig.update_xaxes(title_text="frame", row=1, col=2)
fig.update_yaxes(title_text="pixel", row=1, col=2)
fig.update_layout(height=380, margin=dict(l=0, r=0, t=40, b=0))
show(fig)
pts2d: (7, 60, 38, 2) conf: (7, 60, 38)
Step 9 — visibility is built into the plan¶
There is no separate visibility-masking step anymore. A (view, point) that no
pathway writes is simply left NaN by the scatter in Step 7, so the detector's
output is already masked. plan.visibility_mask() recovers which (view, point)
pairs any pathway fills, and it matches the finite entries of pts2d exactly. The
front (f) row is the only one with white cells on both halves — that
cross-side visibility is the bridge that co-registers the left and right cameras
during bundle adjustment.
mask = plan.visibility_mask() # (V, N) bool: which (view, point) any pathway fills
ever_seen = np.isfinite(pts2d).all(-1).any(axis=1) # (V, N) finite somewhere in time
print(
f"pts2d finite pattern matches the plan's visibility mask: {(ever_seen == mask).all()}"
)
fr = mask[front]
print(
f"front view 'f' sees {int(fr.sum())} points spanning both sides "
f"(left {int(fr[:19].sum())}, right {int(fr[19:].sum())})"
)
fig = go.Figure(
go.Heatmap(
z=mask.astype(int),
x=skeleton.point_names,
y=view_names,
colorscale="Greys",
reversescale=True, # filled (1) -> white, like imshow(cmap="Greys_r")
showscale=False,
xgap=1,
ygap=1,
hovertemplate="view %{y}<br>%{x}<br>filled=%{z}<extra></extra>",
)
)
fig.update_yaxes(autorange="reversed") # view 0 on top, like imshow
# 38 point names: stack them vertically so they don't overlap, and let the bottom
# margin grow to fit.
fig.update_xaxes(
tickangle=-90, tickfont_size=9, automargin=True, title_text="skeleton point"
)
fig.update_layout(
title_text="Visibility mask (white = some pathway fills this (view, point))",
height=340,
plot_bgcolor="rgb(150,150,150)", # shows through the cell gaps as gridlines
margin=dict(l=0, r=0, t=40, b=0),
)
show(fig)
pts2d finite pattern matches the plan's visibility mask: True front view 'f' sees 14 points spanning both sides (left 7, right 7)
Step 10 — bundle adjustment¶
pipeline.bundle_adjust_cameras treats the fly itself as the bundle-adjustment target: it
flattens the frames into one point cloud and refines the cameras by bundle
adjustment, using detector confidences as per-observation weights, a robust loss,
and a soft bone-length prior. The detector's pts2d is already NaN where
unobserved, so it goes straight in. We compare reprojection error before vs after.
# Reprojection error with the nominal (un-refined) cameras ...
pts3d_init = triangulate(cameras, pts2d)
err_init = reprojection_error(cameras, pts3d_init, pts2d)
# ... then refine the rig by fly-as-target bundle adjustment.
cameras_ba, ba_res = bundle_adjust_cameras(
cameras, pts2d, conf, skeleton, **bundle_adjust_kwargs
)
pts3d_ba = triangulate(cameras_ba, pts2d)
err_ba = reprojection_error(cameras_ba, pts3d_ba, pts2d)
fi, fc = np.isfinite(err_init), np.isfinite(err_ba)
print(f"bundle adjustment: {ba_res.nfev} fn evals, final cost {ba_res.cost:.4g}")
print(
f"median reproj error before {np.median(err_init[fi]):.2f}px -> "
f"after {np.median(err_ba[fc]):.2f}px"
)
shift = np.linalg.norm(
np.array([c.position for c in cameras_ba])
- np.array([c.position for c in cameras]),
axis=1,
)
fig = make_subplots(
rows=1,
cols=2,
subplot_titles=(
"Reprojection error: nominal vs bundle-adjusted cameras",
"How far bundle adjustment moved each camera",
),
)
xbins = dict(start=0, end=40, size=40 / 59)
fig.add_trace(
go.Histogram(
x=err_init[fi],
xbins=xbins,
opacity=0.6,
name=f"before (med {np.median(err_init[fi]):.1f})",
),
row=1,
col=1,
)
fig.add_trace(
go.Histogram(
x=err_ba[fc],
xbins=xbins,
opacity=0.6,
name=f"after (med {np.median(err_ba[fc]):.1f})",
),
row=1,
col=1,
)
fig.update_xaxes(title_text="reprojection error (px)", row=1, col=1)
fig.update_yaxes(title_text="count", row=1, col=1)
fig.add_trace(
go.Bar(x=view_names, y=shift, marker_color="purple", showlegend=False), row=1, col=2
)
fig.update_yaxes(title_text="camera-center shift (world units)", row=1, col=2)
fig.update_layout(
barmode="overlay",
height=380,
margin=dict(l=0, r=0, t=40, b=0),
legend=dict(x=0.46, y=0.99, xanchor="right"),
)
show(fig)
bundle adjustment: 812 fn evals, final cost 3.28e+04 median reproj error before 14.42px -> after 2.60px
Step 11 — triangulation (DLT)¶
triangulation.triangulate lifts the 2D observations to 3D with the Direct
Linear Transform: each camera that sees a point contributes two linear
equations in the unknown 3D coordinates, and the point is the least-squares
solution of that stacked system (views that are NaN for the point are simply
skipped). It is a plain fit with no outlier rejection — every finite view is
trusted equally — which keeps this step easy to follow. The result is the 3D
pose sequence (T, 38, 3).
run_from_points2d also offers robust reconstructors (triangulation="ransac"
for the largest multi-view consensus set, or "greedy" to drop the
worst-reprojecting view); here we use plain "dlt".
pts3d = triangulate(cameras_ba, pts2d)
reproj = reprojection_error(cameras_ba, pts3d, pts2d)
fin = np.isfinite(reproj)
n_tri = int(np.isfinite(pts3d).all(-1).sum())
print(f"3D points: {pts3d.shape}; triangulated {n_tri} of {pts3d[..., 0].size}")
print(
f"reproj error (px): median {np.median(reproj[fin]):.2f} "
f"p95 {np.percentile(reproj[fin], 95):.2f}"
)
t = N_FRAMES // 2
P = pts3d[t]
col = point_colors_rgb(skeleton)
finite = np.isfinite(P).all(-1)
# One interactive scene -- drag to rotate / scroll to zoom replaces the old fixed views.
fig = go.Figure()
segs = defaultdict(lambda: ([], [], []))
for a, b in skeleton.bones:
if finite[a] and finite[b]:
xs, ys, zs = segs[rgb(col[a])]
xs += [P[a, 0], P[b, 0], None]
ys += [P[a, 1], P[b, 1], None]
zs += [P[a, 2], P[b, 2], None]
for color, (xs, ys, zs) in segs.items():
fig.add_scatter3d(
x=xs,
y=ys,
z=zs,
mode="lines",
line=dict(color=color, width=4),
showlegend=False,
hoverinfo="skip",
)
fig.add_scatter3d(
x=P[finite, 0],
y=P[finite, 1],
z=P[finite, 2],
mode="markers",
marker=dict(size=3, color=[rgb(c) for c in col[finite]]),
showlegend=False,
hoverinfo="skip",
)
fig.update_layout(
title_text=f"Triangulated 3D pose, frame {t} (DLT) — drag to rotate",
height=560,
margin=dict(l=0, r=0, t=50, b=0),
scene=dict(
aspectmode="data",
xaxis_visible=False,
yaxis_visible=False,
zaxis_visible=False,
camera=dict(eye=dict(x=0.85, y=-1.45, z=0.6)),
),
)
show(fig)
3D points: (60, 38, 3); triangulated 2280 of 2280 reproj error (px): median 2.60 p95 6.64
Step 12 — the same thing in one call, and save the result¶
Everything from Step 10 on — bundle-adjust → reconstruct — is exactly what
pipeline.run_from_points2d (and therefore deeperfly run) does internally. We
call it directly on the raw pts2d / conf, save the PoseResult to HDF5, and
reload it to confirm the round-trip.
result = run_from_points2d(
cameras,
skeleton,
pts2d,
conf,
do_bundle_adjust=True,
bundle_adjust_kwargs=bundle_adjust_kwargs,
triangulation="dlt",
weigh_by_confidence=False, # plain (unweighted) DLT; confidence shapes only the bundle adjustment
fps=FPS,
meta={"source": str(DATA_DIR), "n_frames_input": N_FRAMES},
)
out = REPO / "results" / "fly_pose_walkthrough.h5"
out.parent.mkdir(parents=True, exist_ok=True)
result.save(out)
re = result.reproj_error
fr = np.isfinite(re)
print(f"wrote {out}")
print(f" views x frames x points : {result.pts2d.shape}")
print(f" 3D points : {result.pts3d.shape}")
print(
f" reprojection error (px) : median {np.median(re[fr]):.2f} "
f"p95 {np.percentile(re[fr], 95):.2f}"
)
reloaded = PoseResult.load(out)
print(
f" reloaded {reloaded.n_views} views x {reloaded.n_frames} frames; "
f"has 3D = {reloaded.pts3d is not None}"
)
wrote /home/tlam/deeperfly/results/fly_pose_walkthrough.h5 views x frames x points : (7, 60, 38, 2) 3D points : (60, 38, 3) reprojection error (px) : median 2.60 p95 6.64 reloaded 7 views x 60 frames; has 3D = True
Sanity check: reproject the bundle-adjusted 3D back onto every camera¶
If bundle adjustment and triangulation are consistent, projecting the recovered 3D pose through the bundle-adjusted cameras should land on the fly in each raw view.
t = 0
proj = np.asarray(result.cameras.project(result.pts3d[t])) # (V, 38, 2)
fig = make_subplots(
rows=2,
cols=4,
subplot_titles=[view_names[v] if v < len(view_names) else "" for v in range(8)],
horizontal_spacing=0.01,
vertical_spacing=0.08,
)
for v in range(len(view_names)):
r, c = v // 4 + 1, v % 4 + 1
add_image(fig, windows[view_src[view_names[v]]][t], row=r, col=c, ncols=4)
add_bones(fig, proj[v], row=r, col=c)
fig.update_annotations(font_size=11)
fig.update_layout(
title_text="Bundle-adjusted 3D pose reprojected onto every camera (frame 0)",
height=320,
margin=dict(l=0, r=0, t=60, b=0),
)
show(fig)
Mapping back to the CLI¶
| Notebook step | CLI / library call |
|---|---|
| Steps 1–3 | Config.detection_plan, Config.camera_group, Config.skeleton, pose2d.stream.load_models |
| Steps 4–8 | inference.detect_sequence (the streaming deeperfly.detect_2d wraps it) |
| Steps 9–12 | pipeline.run_from_points2d (bundle-adjust → reconstruct → save) |
deeperfly run drives all of this from one config. deeperfly init writes a
self-contained config.toml (the detection plan — [[sources]] /
[[pose2d.preprocessors]] / [[pose2d.models]] / [[pose2d.pathways]] with [pose2d.output_points]
mappings — plus the [cameras.*] rig, the [skeleton], and the [pipeline]).
The default config's sources already map to this recording's camera_0.mp4 …
camera_6.mp4, so the whole notebook collapses to:
deeperfly init config.toml # then edit the plan/[cameras] for your own rig
deeperfly run examples/data -c config.toml -o out/ # detect -> 3D -> overlay videos
deeperfly inspect out/results.h5 # inspect the saved result
run writes out/results.h5 (the PoseResult) plus one MP4 per
[[visualization.videos]] entry — by default out/pose2d.mp4 (the
raw 2D detections) and out/pose3d.mp4 (the triangulated skeleton reprojected
into every view). Enabled stages reuse their cached outputs while their config is
unchanged, so editing the triangulation or the videos and re-running recomputes
only the affected stages — the slow 2D detection is reused. Visibility is now
intrinsic to the plan (a (view, point) no pathway writes stays NaN), and the
front camera is just one source feeding two pathways — the manual cells above
unfold exactly what detect_sequence does internally.