"""Utilities."""
import logging
import pickle
import numpy as np
import cv2
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, List
from collections import defaultdict
from scipy.interpolate import pchip_interpolate
[docs]
def get_fps_from_video(video_dir):
"""Finds the fps of a video."""
cap = cv2.VideoCapture(str(video_dir))
fps = int(cap.get(cv2.CAP_PROP_FPS))
return fps
[docs]
def load_stim_data(main_dir: Path):
"""Loads the stimulus info from txt."""
stim_dir = main_dir / "stimulusSequence.txt"
try:
with open(stim_dir) as f:
lines = f.readlines()
return lines
except FileNotFoundError:
logging.error(f"{stim_dir} does not exist!!")
return False
[docs]
def get_stim_array(
lines: list,
frame_rate: int,
scale: int = 1,
hamming_window_size: int = 0,
time_scale: int = 1e-3,
):
"""Computes a stimulus array from the stimulus information.
Parameters
----------
main_dir : str
Path of the file containing the stim info.
frame_rate : int
Frame rate of the recordings.
scale : int, optional
Interpolation rate applied in df3dPP, by default 1 (no interpolation!)
hamming_window_size : int, optional
Size of the filter applied in df3dPP, by default 28
time_scale : int, optional
Time scale of the stimulus info (msec), by default 1e-3
Returns
-------
np.ndarray
np.array(length, ) of boolean values indicating the stimulus time points.
"""
stim_duration = int(lines[0].split()[-2])
frame_no = stim_duration * frame_rate * scale
repeat = int(lines[-1].split()[-1])
stim_array = np.zeros(frame_no)
# Get only the sequence data
start = 0
for repeat_no in range(repeat):
for line_no in range(2, len(lines) - 1):
duration = int(
int(lines[line_no].split()[-1]) * frame_rate * time_scale * scale
)
stim_array[start : start + duration] = (
False if lines[line_no].startswith("off") else True
)
start += duration
trim_ind = int(hamming_window_size * 0.5)
return stim_array[trim_ind : stim_array.shape[0] - trim_ind]
[docs]
def get_stim_intervals(stim_data):
"""Reads stimulus array and returns the stim intervals for plotting purposes.
Use get_stim_array otherwise."""
stim_on = np.where(stim_data)[0]
stim_start_end = [stim_on[0]]
for ind in list(np.where(np.diff(stim_on) > 1)[0]):
stim_start_end.append(stim_on[ind])
stim_start_end.append(stim_on[ind + 1])
if not stim_on[-1] in stim_start_end:
stim_start_end.append(stim_on[-1])
return stim_start_end
[docs]
def calculate_body_size(
body_template: Dict[str, np.ndarray],
legs_list: List[str] = ["RF", "LF", "RM", "LM", "RH", "LH"],
) -> Dict[str, np.ndarray]:
"""Calculates body segment sizes from the template data."""
if set(legs_list).difference(set(["RF", "LF", "RM", "LM", "RH", "LH"])):
raise NameError(
f"""
legs_list could only contain ["RF", "LF", "RM", "LM", "RH", "LH"],
currently, it contains {legs_list}
"""
)
body_size = {}
leg_segments = ["Coxa", "Femur", "Tibia", "Tarsus", "Claw"]
for i, segment_name in enumerate(leg_segments):
for leg in legs_list:
# If Claw, calculate the length of the entire leg
if segment_name == "Claw":
body_size[leg] = (
body_size[f"{leg}_Coxa"]
+ body_size[f"{leg}_Femur"]
+ body_size[f"{leg}_Tibia"]
+ body_size[f"{leg}_Tarsus"]
)
else:
body_size[f"{leg}_{segment_name}"] = np.linalg.norm(
body_template[f"{leg}_{segment_name}"]
- body_template[f"{leg}_{leg_segments[i+1]}"]
)
# Assuming right and left hand-side are symmetric, checking for one side is enough
if "R_Antenna_base" in body_template:
body_size["Antenna"] = np.linalg.norm(
body_template["R_Antenna_base"] - body_template["R_Antenna_edge"]
)
body_size["Antenna_mid_thorax"] = np.linalg.norm(
body_template["R_Antenna_base"] - body_template["Thorax_mid"]
)
return body_size
[docs]
def fix_coxae_pos(
points3d, right_coxa_kp="thorax_coxa_R", left_coxa_kp="thorax_coxa_L"
):
"""Calculates the fixed coxae location based on the quantiles."""
coxa_right = get_array(right_coxa_kp, points3d)
coxa_left = get_array(left_coxa_kp, points3d)
coxa_right_fixed = (
np.quantile(coxa_right, 0.3, axis=1) + np.quantile(coxa_right, 0.7, axis=1)
) * 0.5
coxa_left_fixed = (
np.quantile(coxa_left, 0.3, axis=1) + np.quantile(coxa_left, 0.7, axis=1)
) * 0.5
return {"R": coxa_right_fixed, "L": coxa_left_fixed}
[docs]
def compute_length_of_segment(points3d, segment_beg, segment_end):
"""Computes the length of a segment."""
return np.sqrt(
(points3d[f"{segment_beg}_x"] - points3d[f"{segment_end}_x"]) ** 2
+ (points3d[f"{segment_beg}_y"] - points3d[f"{segment_end}_y"]) ** 2
+ (points3d[f"{segment_beg}_z"] - points3d[f"{segment_end}_z"]) ** 2
)
[docs]
def leg_length_model(nmf_size: dict, leg_name: str, claw_is_ee: bool) -> float:
"""Returns the leg size from the nmf size dictionary."""
if claw_is_ee:
return nmf_size[leg_name]
return nmf_size[leg_name] - nmf_size[f"{leg_name}_Tarsus"]
[docs]
def get_length_of_segments(points3d, claw_is_ee=False):
"""Returns a dictionary with segment sizes."""
segments = {
"Coxa": ("thorax_coxa", "coxa_femur"),
"Femur": ("coxa_femur", "femur_tibia"),
"Tibia": ("femur_tibia", "tibia_tarsus"),
# "Antenna": ("base_anten", "tip_anten"),
}
if claw_is_ee:
segments = {**segments, "Tarsus": ("tibia_tarsus", "claw")}
lengths = {}
for segment_name, (segment_start, segment_end) in segments.items():
for side in ["R", "L"]:
lengths[f"{side}_{segment_name}"] = compute_length_of_segment(
points3d, f"{segment_start}_{side}", f"{segment_end}_{side}"
)
return lengths
[docs]
def compute_mean_length_of_segments(segment_lengths):
"""Computes mean length of each segment."""
mean_segment_length = {}
for segment, length_array in segment_lengths.items():
mean_segment_length[segment] = np.mean(length_array)
return mean_segment_length
[docs]
def get_mean_length_of_segments(points3d):
"""Gets a dictionary with segment lengths."""
lengths = get_length_of_segments(points3d)
return compute_mean_length_of_segments(lengths)
[docs]
def get_leg_length(mean_segment_size):
"""Returns the leg size from the mean segment lengths."""
leg_length_r = np.sum(
[size for name, size in mean_segment_size.items() if "R_" in name]
)
leg_length_l = np.sum(
[size for name, size in mean_segment_size.items() if "L_" in name]
)
return {"R": leg_length_r, "L": leg_length_l}
[docs]
def get_array(kp_name, kp_dict):
"""Returns 3D points of a key point in an array format."""
return np.array(
[
kp_dict[f"{kp_name}_x"],
kp_dict[f"{kp_name}_y"],
kp_dict[f"{kp_name}_z"],
]
).reshape(-1, 3)
[docs]
def get_mean_quantile(vector, quantile_diff=0.05):
return 0.5 * (
np.quantile(vector, q=0.5 - quantile_diff)
+ np.quantile(vector, q=0.5 + quantile_diff)
)
[docs]
def dist_calc(v1, v2):
return np.sqrt((v1[0] - v2[0]) ** 2 + (v1[1] - v2[1]) ** 2 + (v1[2] - v2[2]) ** 2)
[docs]
def get_distance_btw_vecs(vector1, vector2):
return np.linalg.norm(vector1 - vector2, axis=1)
[docs]
def save_file(out_fname, data):
"""Save file."""
with open(out_fname, "wb") as f:
pickle.dump(data, f)
[docs]
def load_file(output_fname):
"""Load file."""
with open(output_fname, "rb") as f:
pts = pickle.load(f)
return pts
[docs]
def from_anipose_to_array(
points3d,
claw_is_end_effector=False,
kps=["thorax_coxa", "coxa_femur", "femur_tibia", "tibia_tarsus"],
) -> np.ndarray:
"""Convert usual dataframe format into a three dimensional array of size (N,KeyPoints,3)."""
if claw_is_end_effector:
kps += ["claw"]
key_points = [f"{kp}_{side}" for side in ["R", "L"] for kp in kps]
else:
key_points = [f"{kp}_{side}" for side in ["R", "L"] for kp in kps]
frames_no = points3d.shape[0]
position_array = np.empty(
(frames_no, len(key_points), 3)
) # timestep, key points, axes
for i, kp in enumerate(key_points):
position_array[:, i, :] = get_array(kp, points3d).T
return position_array
[docs]
def df_to_nparray(data_frame, side, claw_is_end_effector, segment="F"):
"""Convert usual dataframe format into a three dimensional array."""
if claw_is_end_effector:
key_points = ["Coxa", "Femur", "Tibia", "Tarsus", "Claw"]
else:
key_points = ["Coxa", "Femur", "Tibia", "Tarsus"]
position_array = np.empty(
(data_frame[f"Pose_{side}F_Coxa_x"].shape[0], len(key_points), 3)
) # timestep, key points, axes
for i, kp in enumerate(key_points):
position_array[:, i, :] = np.array(
[
data_frame[f"Pose_{side}{segment}_{kp}_x"].to_numpy(),
data_frame[f"Pose_{side}{segment}_{kp}_y"].to_numpy(),
data_frame[f"Pose_{side}{segment}_{kp}_z"].to_numpy(),
]
)
return position_array
[docs]
def dict_to_nparray_pose(pose_dict, claw_is_end_effector):
"""Convert usual df3dPP dictionary format into a three dimensional array."""
if claw_is_end_effector:
key_points = ["Coxa", "Femur", "Tibia", "Tarsus", "Claw"]
else:
key_points = ["Coxa", "Femur", "Tibia", "Tarsus"]
position_array = np.empty(
(pose_dict["Coxa"]["raw_pos_aligned"].shape[0], len(key_points), 3)
) # timestep, key points, axes
for i, kp in enumerate(key_points):
position_array[:, i, :] = np.array(pose_dict[kp]["raw_pos_aligned"])
return position_array
[docs]
def dict_to_nparray_angle(angle_dict, leg, claw_is_end_effector):
"""Convert usual df3dPP dictionary format into a three dimensional array."""
if claw_is_end_effector:
dofs = [
"ThC_roll",
"ThC_yaw",
"ThC_pitch",
"CTr_pitch",
"CTr_roll",
"FTi_pitch",
"TiTa_pitch",
]
else:
dofs = [
"ThC_roll",
"ThC_yaw",
"ThC_pitch",
"CTr_pitch",
"CTr_roll",
"FTi_pitch",
]
angle_array = np.empty(
(len(angle_dict[f"{leg}_leg"][dofs[0]]), len(dofs))
) # timestep, dofs
for i, kp in enumerate(dofs):
angle_array[
:,
i,
] = np.array(angle_dict[f"{leg}_leg"][kp])
return angle_array
[docs]
def interpolate_signal(signal, original_ts, new_ts):
"""Interpolates signals."""
total_time = signal.shape[0] * original_ts
original_x = np.arange(0, total_time, original_ts)
new_x = np.arange(0, total_time, new_ts)
try:
interpolated = np.array(pchip_interpolate(original_x, signal, new_x))
except BaseException:
signal[np.isinf(signal)] = 0
signal[-1] = 0
interpolated = np.array(pchip_interpolate(original_x, signal, new_x))
return interpolated
[docs]
def interpolate_joint_angles(joint_angles_dict, **kwargs):
"""Interpolates joint angles."""
interpolated_joint_angles = {}
for dof in joint_angles_dict:
interpolated_joint_angles[dof] = interpolate_signal(
signal=joint_angles_dict[dof], **kwargs
)
return interpolated_joint_angles
[docs]
def from_sdf(sdf_file: str):
"""Extracts a body template and joint bounds from an SDF file.
Parameters
----------
sdf_file : str
Path to the SDF file.
Returns
-------
Dict, Dict
Body template and joint bounds.
"""
# Parse the SDF file
sdf_in = ET.parse(sdf_file)
root_in = sdf_in.getroot()
model_in = root_in.find("world").find("model")
# Get links and joints
links = model_in.findall("link")
joints = model_in.findall("joint")
# Extract the body template
body_template = {}
for link in links:
if not any(dof in link.attrib["name"] for dof in ["roll", "pitch", "yaw"]):
# Get location of the joint
joint_loc_str = link.find("pose").text
# Convert string into a numpy array
joint_loc = np.array([float(val) for val in joint_loc_str.split(" ")])
# Get only the x, y, z coordinates
body_template[link.attrib["name"]] = joint_loc[:3]
# Extract the joint bounds
joint_bounds = {
joint.attrib["name"]: (
float(joint.find("axis").find("limit").find("lower").text),
float(joint.find("axis").find("limit").find("upper").text),
)
for joint in joints
}
return body_template, joint_bounds
[docs]
def split_arrays_into_chunks(
arrays: list[np.ndarray],
approx_n_chunks_total: int,
overlap: int = 20,
min_chunk_size: int = 100,
) -> list[tuple[int, int, np.ndarray]]:
"""Splits arrays into chunks with overlap between chunks. This is useful for
processing long sequences from multiple kinematic chains in parallel.
Parameters
----------
arrays : list[np.ndarray]
List of arrays to be split into chunks. Each array should have shape
(sequence_length, ...) where `...` can be any number of dimensions.
approx_n_chunks_total : int
Approximate number of chunks to split the arrays into. The actual number of
chunks is generally different in order to make the total number of chunks a
multiple of the number of arrays (i.e. kinematic chains).
overlap : int, optional
Number of overlapping frames between chunks, by default 20.
min_chunk_size : int, optional
Minimum size of each chunk, by default 100.
Returns
-------
list[tuple[int, int, np.ndarray]]
List of tuples containing
- array_idx: int
- start_idx_within_array: int
- chunk_of_array: np.ndarray
"""
assert overlap < min_chunk_size, "Overlap must be smaller than min_chunk_size."
n_arrays = len(arrays)
assert n_arrays > 0, "Number of arrays must be positive"
assert (
approx_n_chunks_total >= n_arrays
), "Approximate number of chunks must be at least equal to the number of arrays."
n_chunks_per_array = int(approx_n_chunks_total / n_arrays) # guaranteed to be >= 1
for arr in arrays:
assert arr.shape == arrays[0].shape, "All arrays must have the same shape."
assert arrays[0].ndim >= 1, "Arrays must be at least 1-dimensional."
seq_length = arrays[0].shape[0]
# If the chunk size is too small, return the entire arrays as single chunks
if seq_length <= min_chunk_size:
return [(arr_idx, 0, arr) for arr_idx, arr in enumerate(arrays)]
# Create chunks
chunk_size = seq_length // n_chunks_per_array # last chunk is generally larger
if chunk_size < min_chunk_size:
n_chunks_per_array = seq_length // min_chunk_size
chunk_size = seq_length // n_chunks_per_array
logging.warning(
f"Requested number of chunks results in chunk size smaller than "
f"min_chunk_size. Reducing number of chunks per array to "
f"{n_chunks_per_array}."
)
chunks = []
for array_idx, arr in enumerate(arrays):
start_idx = 0
for chunk_idx in range(n_chunks_per_array):
# Determine the end index of the chunk before adding overlap
if chunk_idx == n_chunks_per_array - 1:
end_idx_before_overlap = seq_length
end_idx_after_overlap = seq_length
else:
end_idx_before_overlap = start_idx + chunk_size
end_idx_after_overlap = end_idx_before_overlap + overlap
assert end_idx_after_overlap <= seq_length, (
"Chunk end idx with overlap exceeds array length - shouldn't "
"happen because the present chunk is not the last one, and the "
"chunk after it is guaranteed to be at least `min_chunk_size` "
"long, which is guaranteed to be larger than `overlap`."
)
assert end_idx_before_overlap - start_idx >= min_chunk_size, (
"Chunk size is smaller than minimum chunk size - shouldn't happen "
"because the total array length is guaranteed to be at least "
"`min_chunk_size` long and the `n_chunks` is rounded down."
)
chunk_arr = arr[start_idx:end_idx_after_overlap]
chunks.append((array_idx, start_idx, chunk_arr))
start_idx = end_idx_before_overlap
return chunks
[docs]
def merge_chunks_into_arrays(
chunks: list[tuple[int, int, np.ndarray]],
n_arrays: int,
seq_length: int,
overlap: int = 20,
) -> list[np.ndarray]:
"""Merges chunks back into arrays.
Parameters
----------
chunks : list[tuple[int, int, np.ndarray]]
List of tuples containing
- array_idx: int
- start_idx_within_array: int
- chunk_of_array: np.ndarray of shape (chunk_length_with_overlap, ...)
n_arrays : int
Number of arrays (i.e. kinematic chains).
seq_length : int
Length of each array.
overlap : int, optional
Number of overlapping frames between chunks, by default 20.
Returns
-------
list[np.ndarray]
List of merged arrays, each of shape (seq_length, ...) where `...` matches the
shape of chunked arrays in the input.
"""
assert n_arrays > 0, "Number of arrays must be positive"
ref_arr = chunks[0][2]
assert ref_arr.ndim >= 1, "Arrays must be at least 1-dimensional"
for _, _, chunk_arr in chunks:
assert (
chunk_arr.shape[1:] == ref_arr.shape[1:]
), "All chunks must have the same shape except for the 0th dimension"
# Blend chunks into full arrays
merged_arrays = [
np.zeros((seq_length, *ref_arr.shape[1:]), dtype=ref_arr.dtype)
for _ in range(n_arrays)
]
for array_idx, start_idx, chunk_arr in chunks:
# The first part of the chunk overlaps with the previous chunk. The policy is:
# - For the first half of the overlap, just use data from the previous chunk
# while the current chunk is still "warming up" from the initial condition
# - For the second half of the overlap, use a linear "blend" between the
# previous chunk and the current chunk, with the weight of the current chunk
# increasing linearly from 0 to 1.
chunk_length_with_overlap = chunk_arr.shape[0]
blend_weight_curr = np.ones(chunk_length_with_overlap)
if start_idx > 0:
# Ramp up the weight of the current chunk over the overlap region
blend_weight_curr[:overlap] = np.linspace(-1, 1, overlap, endpoint=False)
blend_weight_curr = np.clip(blend_weight_curr, 0, 1)
target_slice = np.s_[start_idx : start_idx + chunk_length_with_overlap]
mask_shape = (chunk_length_with_overlap,) + (1,) * (chunk_arr.ndim - 1)
blend_weight_curr = blend_weight_curr.reshape(mask_shape)
blend_weight_prev = 1 - blend_weight_curr
if n_arrays == 1:
# special case: if dofs are already separated by leg, then we should save
# data to the only array even though array_idx might be > 0 (because they
# can be originally from all legs)
array_idx = 0
target_arr = merged_arrays[array_idx]
target_arr[target_slice] = (
target_arr[target_slice] * blend_weight_prev + chunk_arr * blend_weight_curr
)
return merged_arrays