Source code for seqikpy.utils

""" Utilities. """

from pathlib import Path
import logging
from typing import Dict, List
import pickle
import numpy as np
import cv2
from scipy.interpolate import pchip_interpolate

import xml
import xml.etree.ElementTree as ET
import numpy as np


[docs] def get_fps_from_video(video_dir): """Finds the fps of a video.""" cap = cv2.VideoCapture(str(video_dir)) fps = int(cap.get(cv2.CAP_PROP_FPS)) return fps
[docs] def load_stim_data(main_dir: Path): """Loads the stimulus info from txt.""" stim_dir = main_dir / "stimulusSequence.txt" try: with open(stim_dir) as f: lines = f.readlines() return lines except FileNotFoundError: logging.error(f"{stim_dir} does not exist!!") return False
[docs] def get_stim_array( lines: list, frame_rate: int, scale: int = 1, hamming_window_size: int = 0, time_scale: int = 1e-3, ): """Computes a stimulus array from the stimulus information. Parameters ---------- main_dir : str Path of the file containing the stim info. frame_rate : int Frame rate of the recordings. scale : int, optional Interpolation rate applied in df3dPP, by default 1 (no interpolation!) hamming_window_size : int, optional Size of the filter applied in df3dPP, by default 28 time_scale : int, optional Time scale of the stimulus info (msec), by default 1e-3 Returns ------- np.ndarray np.array(length, ) of boolean values indicating the stimulus time points. """ stim_duration = int(lines[0].split()[-2]) frame_no = stim_duration * frame_rate * scale repeat = int(lines[-1].split()[-1]) stim_array = np.zeros(frame_no) # Get only the sequence data start = 0 for repeat_no in range(repeat): for line_no in range(2, len(lines) - 1): duration = int( int(lines[line_no].split()[-1]) * frame_rate * time_scale * scale ) stim_array[start : start + duration] = ( False if lines[line_no].startswith("off") else True ) start += duration trim_ind = int(hamming_window_size * 0.5) return stim_array[trim_ind : stim_array.shape[0] - trim_ind]
[docs] def get_stim_intervals(stim_data): """Reads stimulus array and returns the stim intervals for plotting purposes. Use get_stim_array otherwise.""" stim_on = np.where(stim_data)[0] stim_start_end = [stim_on[0]] for ind in list(np.where(np.diff(stim_on) > 1)[0]): stim_start_end.append(stim_on[ind]) stim_start_end.append(stim_on[ind + 1]) if not stim_on[-1] in stim_start_end: stim_start_end.append(stim_on[-1]) return stim_start_end
[docs] def calculate_body_size( body_template: Dict[str, np.ndarray], legs_list: List[str] = ["RF", "LF", "RM", "LM", "RH", "LH"], ) -> Dict[str, np.ndarray]: """Calculates body segment sizes from the template data.""" if set(legs_list).difference(set(["RF", "LF", "RM", "LM", "RH", "LH"])): raise NameError( f""" legs_list could only contain ["RF", "LF", "RM", "LM", "RH", "LH"], currently, it contains {legs_list} """ ) body_size = {} leg_segments = ["Coxa", "Femur", "Tibia", "Tarsus", "Claw"] for i, segment_name in enumerate(leg_segments): for leg in legs_list: # If Claw, calculate the length of the entire leg if segment_name == "Claw": body_size[leg] = ( body_size[f"{leg}_Coxa"] + body_size[f"{leg}_Femur"] + body_size[f"{leg}_Tibia"] + body_size[f"{leg}_Tarsus"] ) else: body_size[f"{leg}_{segment_name}"] = np.linalg.norm( body_template[f"{leg}_{segment_name}"] - body_template[f"{leg}_{leg_segments[i+1]}"] ) # Assuming right and left hand-side are symmetric, checking for one side is enough if "R_Antenna_base" in body_template: body_size["Antenna"] = np.linalg.norm( body_template["R_Antenna_base"] - body_template["R_Antenna_edge"] ) body_size["Antenna_mid_thorax"] = np.linalg.norm( body_template["R_Antenna_base"] - body_template["Thorax_mid"] ) return body_size
[docs] def drop_level_dlc(data_frame): """Converts DLC type dataframe into one level df.""" data_frame.columns = data_frame.columns.droplevel() data_frame.columns = ["_".join(col) for col in data_frame.columns.values] return data_frame
[docs] def fix_coxae_pos( points3d, right_coxa_kp="thorax_coxa_R", left_coxa_kp="thorax_coxa_L" ): """Calculates the fixed coxae location based on the quantiles.""" coxa_right = get_array(right_coxa_kp, points3d) coxa_left = get_array(left_coxa_kp, points3d) coxa_right_fixed = ( np.quantile(coxa_right, 0.3, axis=1) + np.quantile(coxa_right, 0.7, axis=1) ) * 0.5 coxa_left_fixed = ( np.quantile(coxa_left, 0.3, axis=1) + np.quantile(coxa_left, 0.7, axis=1) ) * 0.5 return {"R": coxa_right_fixed, "L": coxa_left_fixed}
[docs] def compute_length_of_segment(points3d, segment_beg, segment_end): """Computes the length of a segment.""" return np.sqrt( (points3d[f"{segment_beg}_x"] - points3d[f"{segment_end}_x"]) ** 2 + (points3d[f"{segment_beg}_y"] - points3d[f"{segment_end}_y"]) ** 2 + (points3d[f"{segment_beg}_z"] - points3d[f"{segment_end}_z"]) ** 2 )
[docs] def leg_length_model(nmf_size: dict, leg_name: str, claw_is_ee: bool) -> float: """Returns the leg size from the nmf size dictionary.""" if claw_is_ee: return nmf_size[leg_name] return nmf_size[leg_name] - nmf_size[f"{leg_name}_Tarsus"]
[docs] def get_length_of_segments(points3d, claw_is_ee=False): """Returns a dictionary with segment sizes.""" segments = { "Coxa": ("thorax_coxa", "coxa_femur"), "Femur": ("coxa_femur", "femur_tibia"), "Tibia": ("femur_tibia", "tibia_tarsus"), # "Antenna": ("base_anten", "tip_anten"), } if claw_is_ee: segments = {**segments, "Tarsus": ("tibia_tarsus", "claw")} lengths = {} for segment_name, (segment_start, segment_end) in segments.items(): for side in ["R", "L"]: lengths[f"{side}_{segment_name}"] = compute_length_of_segment( points3d, f"{segment_start}_{side}", f"{segment_end}_{side}" ) return lengths
[docs] def compute_mean_length_of_segments(segment_lengths): """Computes mean length of each segment.""" mean_segment_length = {} for segment, length_array in segment_lengths.items(): mean_segment_length[segment] = np.mean(length_array) return mean_segment_length
[docs] def get_mean_length_of_segments(points3d): """Gets a dictionary with segment lengths.""" lengths = get_length_of_segments(points3d) return compute_mean_length_of_segments(lengths)
[docs] def get_leg_length(mean_segment_size): """Returns the leg size from the mean segment lengths.""" leg_length_r = np.sum( [size for name, size in mean_segment_size.items() if "R_" in name] ) leg_length_l = np.sum( [size for name, size in mean_segment_size.items() if "L_" in name] ) return {"R": leg_length_r, "L": leg_length_l}
[docs] def get_array(kp_name, kp_dict): """Returns 3D points of a key point in an array format.""" return np.array( [ kp_dict[f"{kp_name}_x"], kp_dict[f"{kp_name}_y"], kp_dict[f"{kp_name}_z"], ] ).reshape(-1, 3)
[docs] def get_mean_quantile(vector, quantile_diff=0.05): return 0.5 * ( np.quantile(vector, q=0.5 - quantile_diff) + np.quantile(vector, q=0.5 + quantile_diff) )
[docs] def dist_calc(v1, v2): return np.sqrt((v1[0] - v2[0]) ** 2 + (v1[1] - v2[1]) ** 2 + (v1[2] - v2[2]) ** 2)
[docs] def get_distance_btw_vecs(vector1, vector2): return np.linalg.norm(vector1 - vector2, axis=1)
[docs] def save_file(out_fname, data): """Save file.""" with open(out_fname, "wb") as f: pickle.dump(data, f)
[docs] def load_file(output_fname): """Load file.""" with open(output_fname, "rb") as f: pts = pickle.load(f) return pts
[docs] def from_anipose_to_array( points3d, claw_is_end_effector=False, kps=["thorax_coxa", "coxa_femur", "femur_tibia", "tibia_tarsus"], ) -> np.ndarray: """Convert usual dataframe format into a three dimensional array of size (N,KeyPoints,3).""" if claw_is_end_effector: kps += ["claw"] key_points = [f"{kp}_{side}" for side in ["R", "L"] for kp in kps] else: key_points = [f"{kp}_{side}" for side in ["R", "L"] for kp in kps] frames_no = points3d.shape[0] position_array = np.empty( (frames_no, len(key_points), 3) ) # timestep, key points, axes for i, kp in enumerate(key_points): position_array[:, i, :] = get_array(kp, points3d).T return position_array
[docs] def df_to_nparray(data_frame, side, claw_is_end_effector, segment="F"): """Convert usual dataframe format into a three dimensional array.""" if claw_is_end_effector: key_points = ["Coxa", "Femur", "Tibia", "Tarsus", "Claw"] else: key_points = ["Coxa", "Femur", "Tibia", "Tarsus"] position_array = np.empty( (data_frame[f"Pose_{side}F_Coxa_x"].shape[0], len(key_points), 3) ) # timestep, key points, axes for i, kp in enumerate(key_points): position_array[:, i, :] = np.array( [ data_frame[f"Pose_{side}{segment}_{kp}_x"].to_numpy(), data_frame[f"Pose_{side}{segment}_{kp}_y"].to_numpy(), data_frame[f"Pose_{side}{segment}_{kp}_z"].to_numpy(), ] ) return position_array
[docs] def dict_to_nparray_pose(pose_dict, claw_is_end_effector): """Convert usual df3dPP dictionary format into a three dimensional array.""" if claw_is_end_effector: key_points = ["Coxa", "Femur", "Tibia", "Tarsus", "Claw"] else: key_points = ["Coxa", "Femur", "Tibia", "Tarsus"] position_array = np.empty( (pose_dict["Coxa"]["raw_pos_aligned"].shape[0], len(key_points), 3) ) # timestep, key points, axes for i, kp in enumerate(key_points): position_array[:, i, :] = np.array(pose_dict[kp]["raw_pos_aligned"]) return position_array
[docs] def dict_to_nparray_angle(angle_dict, leg, claw_is_end_effector): """Convert usual df3dPP dictionary format into a three dimensional array.""" if claw_is_end_effector: dofs = [ "ThC_roll", "ThC_yaw", "ThC_pitch", "CTr_pitch", "CTr_roll", "FTi_pitch", "TiTa_pitch", ] else: dofs = [ "ThC_roll", "ThC_yaw", "ThC_pitch", "CTr_pitch", "CTr_roll", "FTi_pitch", ] angle_array = np.empty( (len(angle_dict[f"{leg}_leg"][dofs[0]]), len(dofs)) ) # timestep, dofs for i, kp in enumerate(dofs): angle_array[ :, i, ] = np.array(angle_dict[f"{leg}_leg"][kp]) return angle_array
[docs] def interpolate_signal(signal, original_ts, new_ts): """Interpolates signals.""" total_time = signal.shape[0] * original_ts original_x = np.arange(0, total_time, original_ts) new_x = np.arange(0, total_time, new_ts) try: interpolated = np.array(pchip_interpolate(original_x, signal, new_x)) except BaseException: signal[np.isinf(signal)] = 0 signal[-1] = 0 interpolated = np.array(pchip_interpolate(original_x, signal, new_x)) return interpolated
[docs] def interpolate_joint_angles(joint_angles_dict, **kwargs): """Interpolates joint angles.""" interpolated_joint_angles = {} for dof in joint_angles_dict: interpolated_joint_angles[dof] = interpolate_signal( signal=joint_angles_dict[dof], **kwargs ) return interpolated_joint_angles
[docs] def from_sdf(sdf_file: str): """Extracts a body template and joint bounds from an SDF file. Parameters ---------- sdf_file : str Path to the SDF file. Returns ------- Dict, Dict Body template and joint bounds. """ # Parse the SDF file sdf_in = ET.parse(sdf_file) root_in = sdf_in.getroot() model_in = root_in.find("world").find("model") # Get links and joints links = model_in.findall("link") joints = model_in.findall("joint") # Extract the body template body_template = {} for link in links: if not any(dof in link.attrib["name"] for dof in ["roll", "pitch", "yaw"]): # Get location of the joint joint_loc_str = link.find("pose").text # Convert string into a numpy array joint_loc = np.array([float(val) for val in joint_loc_str.split(" ")]) # Get only the x, y, z coordinates body_template[link.attrib["name"]] = joint_loc[:3] # Extract the joint bounds joint_bounds = { joint.attrib["name"]: ( float(joint.find("axis").find("limit").find("lower").text), float(joint.find("axis").find("limit").find("upper").text), ) for joint in joints } return body_template, joint_bounds