"""Module to calculate inverse kinematics from 3D pose based on IKPy."""
import logging
import warnings
from collections import defaultdict
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, Tuple, Union, Optional
import numpy as np
from tqdm import trange
from joblib import Parallel, delayed
from ikpy.chain import Chain
from seqikpy.utils import save_file, split_arrays_into_chunks, merge_chunks_into_arrays
from seqikpy.body_config import neuromechfly_body_config
from seqikpy.kinematic_chain import (
KinematicChainBase,
KinematicChainSeq,
KinematicChainGeneric,
)
_logger = logging.getLogger(__name__)
[docs]
class LegInvKinBase(ABC):
"""Abstract class to calculate inverse kinematics for leg joints.
Parameters
----------
aligned_pos : Dict[str, np.ndarray]
Aligned pose from the AlignPose class.
Should have the following structure
>>> pose_data_dict = {
"<side><segment>_leg": np.ndarray[N_frames,N_key_points,3],
}
kinematic_chain : KinematicChainBase
Kinematic chain of the leg.
initial_angles : Dict[str, np.ndarray], optional
Initial angles of DOFs.
If not provided, the default values from body_config.py will be used.
"""
def __init__(
self,
aligned_pos: Dict[str, np.ndarray],
kinematic_chain_class: KinematicChainBase,
initial_angles: Optional[Dict[str, np.ndarray]] = None,
) -> None:
self.aligned_pos = aligned_pos
self.kinematic_chain_class = kinematic_chain_class
if initial_angles is not None:
self.initial_angles = initial_angles
else:
self.initial_angles = neuromechfly_body_config.initial_angles_rad
[docs]
def calculate_ik(
self,
kinematic_chain: Chain,
target_pos: np.ndarray,
initial_angles: np.ndarray = None,
) -> np.ndarray:
"""Calculates the joint angles in the leg chain."""
# don"t take the last and first ones into account
return kinematic_chain.inverse_kinematics(
target_position=target_pos, initial_position=initial_angles
)
[docs]
def calculate_fk(
self, kinematic_chain: Chain, joint_angles: np.ndarray
) -> np.ndarray:
"""Calculates the forward kinematics from the joint dof angles."""
fk = kinematic_chain.forward_kinematics(joint_angles, full_kinematics=True)
end_effector_positions = np.zeros((len(kinematic_chain.links), 3))
for link in range(len(kinematic_chain.links)):
end_effector_positions[link, :] = fk[link][:3, 3]
return end_effector_positions
[docs]
def get_scale_factor(self, vector: np.ndarray, length: float) -> float:
"""Gets scale the ratio between two vectors."""
vector_diff = np.linalg.norm(np.diff(vector, axis=0), axis=1)
norm_sum = np.sum(vector_diff)
return length / norm_sum
[docs]
@abstractmethod
def calculate_ik_stage(
self,
end_effector_pos: np.ndarray,
origin: np.ndarray,
initial_angles: np.ndarray,
segment_name: str,
**kwargs,
) -> np.ndarray:
"""For a given trial pose data, calculates the inverse kinematics.
Parameters
----------
end_effector_pos : np.ndarray
3D array containing the position of the end effector pos
origin : np.ndarray
Origin of the kinematic chain, i.e., Thorax-Coxa joint
initial_angles : np.ndarray
Initial angles for the optimization seed
segment_name : str
Leg side, i.e., RF, LF, RM, LM, RH, LH
Returns
-------
np.ndarray
Array containing the cartesian coordinates of the joint positions.
The joint angles are saved in a class attribute
"""
[docs]
@abstractmethod
def run_ik_and_fk(
self, export_path: Union[Path, str] = None, **kwargs
) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray]]:
"""Runs inverse and forward kinematics for leg joints.
Parameters
----------
export_path : Union[Path, str], optional
Path where the results will be saved,
if None, nothing is saved, by default None
Returns
-------
Tuple[Dict[str,np.ndarray], Dict[str,np.ndarray]]
Two dictionaries containing joint angles and forward
kinematics, respectively.
"""
[docs]
class LegInvKinSeq(LegInvKinBase):
"""Class to calculate inverse kinematics for leg joints in a
sequential manner. This method finds the optimal joint angles
within the bounds to match each koint as closely as possible.
Parameters
----------
aligned_pos : Dict[str, np.ndarray]
Aligned pose from the AlignPose class.
Should have the following structure
>>> pose_data_dict = {
"<side><segment>_leg": np.ndarray[N_frames,N_key_points,3],
}
kinematic_chain : KinematicChainSeq
Kinematic chain of the leg.
initial_angles : Dict[str, np.ndarray], optional
Initial angles of DOFs.
If not provided, the default values from body_config.py will be used.
Examples
--------
>>> from pathlib import Path
>>> from seqikpy.kinematic_chain import KinematicChainSeq
>>> from seqikpy.leg_inverse_kinematics import LegInvKinSeq
>>> from seqikpy.body_config import neuromechfly_body_config
>>> from seqikpy.utils import load_file
>>> DATA_PATH = Path("../data/anipose_220525_aJO_Fly001_001/pose-3d")
>>> f_path = DATA_PATH / "pose3d_aligned.pkl"
>>> aligned_pos = load_file(f_path)
>>> seq_ik = LegInvKinSeq(
aligned_pos=aligned_pos,
kinematic_chain_class=KinematicChainSeq(
bounds_dof=neuromechfly_body_config.dof_bounds_rad,
legs_list=["RF", "LF"],
body_size=None,
),
initial_angles=neuromechfly_body_config.initial_angles_rad
)
>>> leg_joint_angles, forward_kinematics = seq_ik.run_ik_and_fk(
export_path=DATA_PATH,
hide_progress_bar=False
)
"""
def __init__(
self,
aligned_pos: Dict[str, np.ndarray],
kinematic_chain_class: KinematicChainSeq,
initial_angles: Optional[Dict[str, np.ndarray]] = None,
) -> None:
super().__init__(aligned_pos, kinematic_chain_class, initial_angles)
# Create an empty dict for joint angles
self.joint_angles_dict = {}
@staticmethod
def _process_single_leg_sequence(
segment_name,
segment_array,
stages,
kinematic_chain_class,
initial_angles_dict,
hide_progress_bar,
):
"""Process a single leg through all stages."""
leg_name = segment_name.split("_")[0]
if leg_name not in kinematic_chain_class.body_size:
return segment_name, None, {}
# Create a temporary instance to process this leg
temp_instance = LegInvKinSeq.__new__(LegInvKinSeq)
temp_instance.kinematic_chain_class = kinematic_chain_class
temp_instance.initial_angles = initial_angles_dict
temp_instance.joint_angles_dict = {}
origin = segment_array[:, 0, :]
forward_kinematics_result = None
for stage in stages:
end_effector_pos = segment_array[:, stage, :]
initial_angles = initial_angles_dict[leg_name][f"stage_{stage}"]
forward_kinematics_result = temp_instance.calculate_ik_stage(
end_effector_pos=end_effector_pos,
origin=origin,
initial_angles=initial_angles,
stage=stage,
segment_name=leg_name,
hide_progress_bar=hide_progress_bar,
)
return segment_name, forward_kinematics_result, temp_instance.joint_angles_dict
[docs]
def calculate_ik_stage(
self,
end_effector_pos: np.ndarray,
origin: np.ndarray,
initial_angles: np.ndarray,
segment_name: str,
**kwargs,
) -> np.ndarray:
"""For a given trial pose data, calculates the inverse kinematics.
Parameters
----------
end_effector_pos : np.ndarray
3D array containing the position of the end effector pos
origin : np.ndarray
Origin of the kinematic chain, i.e., Thorax-Coxa joint
initial_angles : np.ndarray
Initial angles for the optimization seed
segment_name (kwargs) : str
Leg side, i.e., RF, LF, RM, LM, RH, LH
stage (kwargs) : int
Stage in the sequential calculation, should be between 1 and 4
Returns
-------
np.ndarray
Array containing the cartesian coordinates of the joint positions.
The joint angles are saved in a class attribute
"""
stage = kwargs.get("stage", 1)
hide_progress_bar = kwargs.get("hide_progress_bar", False)
if not segment_name in ["RF", "LF", "RM", "LM", "RH", "LH"]:
raise ValueError(f"Segment name ({segment_name}) is not valid.")
if not 1 <= stage <= 4:
raise ValueError(f"Stage ({stage}) should be between 1 and 4.")
frames_no = end_effector_pos.shape[0]
end_effector_pos_diff = end_effector_pos - origin
# Joint angles shape: (number of frames, number of DOFs)
joint_angles = np.empty((frames_no, len(initial_angles)))
# Forward kinematics shape: (number of frames, number of
# joints in the kinematics chain, axes-x,y,z)
forward_kinematics = np.empty((frames_no, len(initial_angles), 3))
# if the origin is a vector, convert it to a matrix
if origin.size == 3:
origin = np.tile(origin, (frames_no, 1))
# Get the kinematic chain for Stage 1
if stage == 1:
kinematic_chain = self.kinematic_chain_class.create_leg_chain(
stage=stage,
leg_name=segment_name,
)
# Start the inverse kinematics calculation
for t in trange(
frames_no,
disable=hide_progress_bar,
desc=f"{segment_name} stage {stage}",
):
# Get the kinematic chain for the other stages
if stage in [2, 3, 4]:
kinematic_chain = self.kinematic_chain_class.create_leg_chain(
leg_name=segment_name,
stage=stage,
angles=self.joint_angles_dict,
t=t,
)
# from IPython import embed; embed()
# For the first frame, use the given initial angles, for the rest
# use the calculated joint angles from the previous time step
initial_angles = initial_angles if t == 0 else joint_angles[t - 1, :]
# Calculate the inverse kinematics
joint_angles[t, :] = self.calculate_ik(
kinematic_chain, end_effector_pos_diff[t, :], initial_angles
)
# Calculate the forward kinematics for the last stage only
if stage == 4:
forward_kinematics[t, :] = (
self.calculate_fk(kinematic_chain, joint_angles[t, :])
+ origin[t, :]
)
# Link names
link_names = [link.name for link in kinematic_chain.links]
# Store the joint angles based on the stage number
# Stage 1: Thorax-Coxa pitch and yaw
if stage == 1:
self.joint_angles_dict[f"Angle_{segment_name}_ThC_yaw"] = joint_angles[
:, link_names.index(f"{segment_name}_ThC_yaw")
]
self.joint_angles_dict[f"Angle_{segment_name}_ThC_pitch"] = joint_angles[
:, link_names.index(f"{segment_name}_ThC_pitch")
]
_logger.debug("Stage 1 is completed!")
# Stage 2: Thorax-Coxa roll, Coxa-Trochanter pitch
elif stage == 2:
self.joint_angles_dict[f"Angle_{segment_name}_ThC_roll"] = joint_angles[
:, link_names.index(f"{segment_name}_ThC_roll")
]
self.joint_angles_dict[f"Angle_{segment_name}_CTr_pitch"] = joint_angles[
:, link_names.index(f"{segment_name}_CTr_pitch")
]
_logger.debug("Stage 2 is completed!")
# Stage 3: Coxa-Trochanter roll, Femur-Tibia pitch
elif stage == 3:
self.joint_angles_dict[f"Angle_{segment_name}_CTr_roll"] = joint_angles[
:, link_names.index(f"{segment_name}_CTr_roll")
]
self.joint_angles_dict[f"Angle_{segment_name}_FTi_pitch"] = joint_angles[
:, link_names.index(f"{segment_name}_FTi_pitch")
]
_logger.debug("Stage 3 is completed!")
# Stage 4: Tibia-Tarsus pitch
elif stage == 4:
self.joint_angles_dict[f"Angle_{segment_name}_TiTa_pitch"] = joint_angles[
:, link_names.index(f"{segment_name}_TiTa_pitch")
]
_logger.debug("Stage 4 is completed!")
return forward_kinematics
[docs]
def run_ik_and_fk(
self,
export_path: Union[Path, str] = None,
stages: Optional[list[int]] = None,
n_workers: int = 1,
parallel_over_time: bool = True,
chunk_overlap: int = 20,
avg_workloads_per_worker: int = 2,
min_chunk_size: int = 100,
hide_progress_bar: bool = False,
) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray]]:
"""Runs inverse and forward kinematics for leg joints.
Parameters
----------
export_path : Union[Path, str], optional
Path where the results will be saved,
if None, nothing is saved, by default None
stages : List[int], optional
Stages to run the inverse kinematics. Default is all stages
([1, 2, 3, 4]).
n_workers : int, optional
Number of parallel jobs for leg processing. -1 uses all cores,
1 disables parallelization, by default 1.
parallel_over_time : bool, optional
Ignored unless n_workers > 1. This flag determines whether to
parallelize over time and legs or legs only. By default True.
chunk_overlap : int, optional
Ignored unless n_workers > 1 and parallel_over_time is True.
This is the number of overlapping frames between time series
chunks. By default 20.
avg_workloads_per_worker : int, optional
Ignored unless n_workers > 1 and parallel_over_time is True.
This is the rough number of workloads to be assigned to each
worker. Larger numbers lead to load balancing at the cost of
higher overhead. This number is approximate - scheduler will
change it for better rounding. By default 2.
min_chunk_size : int, optional
Ignored unless n_workers > 1 and parallel_over_time is True.
Minimum chunk size (in frames) for each time series payload.
If the time series is shorter than this size, the number of
workers will be reduced accordingly. By default 100.
hide_progress_bar : bool, optional
Hide the progress bar, by default False.
Returns
-------
Tuple[Dict[str,np.ndarray], Dict[str,np.ndarray]]
Two dictionaries containing joint angles and forward
kinematics, respectively.
"""
if stages is None:
stages = [1, 2, 3, 4]
if len(stages) == 0:
raise ValueError("At least one stage should be provided.")
if max(stages) > 4 or not all(np.diff(stages) == 1):
raise ValueError(
"Maximum stage number is 4 and the list should be strictly incremental."
)
# Get parallel processing parameters
_logger.info("Computing joint angles and forward kinematics...")
# Filter leg segments
leg_segments = [
(name, array) # each array has shape (N_frames, N_key_points, 3)
for name, array in self.aligned_pos.items()
if "leg" in name.lower()
]
if n_workers == 1 or len(leg_segments) <= 1:
# Serial processing
forward_kinematics_dict = self._run_ik_and_fk_serial(
leg_segments, stages, hide_progress_bar
)
elif not parallel_over_time:
# Parallel processing over kinematic chains (legs) only
forward_kinematics_dict = self._run_ik_fk_parallel_over_legs(
leg_segments, stages, n_workers, hide_progress_bar
)
else:
# Parallel processing over legs and time (time series is split into chunks)
forward_kinematics_dict = self._run_ik_fk_parallel_over_legs_and_time(
leg_segments,
stages,
n_workers,
avg_workloads_per_worker,
chunk_overlap,
min_chunk_size,
hide_progress_bar,
)
_logger.debug("Joint angles and forward kinematics are computed.")
if export_path is not None:
save_file(
Path(export_path) / "forward_kinematics.pkl",
forward_kinematics_dict,
)
save_file(
Path(export_path) / "leg_joint_angles.pkl",
self.joint_angles_dict,
)
_logger.info(
"Joint angles and forward kinematics are saved at %s",
export_path,
)
return self.joint_angles_dict, forward_kinematics_dict
def _run_ik_and_fk_serial(
self,
leg_segments: list[tuple[str, np.ndarray]],
stages: list[int],
hide_progress_bar: bool,
):
forward_kinematics_dict = {}
for segment_name, segment_array in leg_segments:
res = self._process_single_leg_sequence(
segment_name,
segment_array,
stages,
self.kinematic_chain_class,
self.initial_angles,
hide_progress_bar,
)
result_name, result_fk, local_joint_angles = res
if result_fk is not None:
forward_kinematics_dict[result_name] = result_fk
self.joint_angles_dict.update(local_joint_angles)
return forward_kinematics_dict
def _run_ik_fk_parallel_over_legs(
self,
leg_segments: list[tuple[str, np.ndarray]],
stages: list[int],
n_workers: int,
hide_progress_bar: bool,
):
forward_kinematics_dict = {}
results = Parallel(n_jobs=n_workers)(
delayed(self._process_single_leg_sequence)(
segment_name,
segment_array,
stages,
self.kinematic_chain_class,
self.initial_angles,
hide_progress_bar,
)
for segment_name, segment_array in leg_segments
)
# Collect results
for segment_name, result_fk, local_joint_angles in results:
if result_fk is not None:
forward_kinematics_dict[segment_name] = result_fk
self.joint_angles_dict.update(local_joint_angles)
return forward_kinematics_dict
def _run_ik_fk_parallel_over_legs_and_time(
self,
leg_segments: list[tuple[str, np.ndarray]],
stages: list[int],
n_workers: int,
avg_workloads_per_worker: int,
chunk_overlap: int,
min_chunk_size: int,
hide_progress_bar: bool,
):
seq_length = leg_segments[0][1].shape[0]
forward_kinematics_dict = {}
# Figure out number of frames per payload
parallel_executor = Parallel(n_jobs=n_workers)
n_workers_effective = parallel_executor._effective_n_jobs()
input_chunks = split_arrays_into_chunks(
[array for name, array in leg_segments],
approx_n_chunks_total=n_workers_effective * avg_workloads_per_worker,
overlap=chunk_overlap,
min_chunk_size=min_chunk_size,
)
# Define processing function for each payload
def process_chunk(array_idx, chunk_arr):
segment_name = leg_segments[array_idx][0]
return self._process_single_leg_sequence(
segment_name,
chunk_arr,
stages,
self.kinematic_chain_class,
self.initial_angles,
hide_progress_bar,
)
# Execute in parallel
results_by_chunk = parallel_executor(
delayed(process_chunk)(array_idx, chunk_arr)
for array_idx, start_idx, chunk_arr in input_chunks
)
# Get results
result_chunks_fk = []
result_chunks_joint_angles = defaultdict(list)
for chunk_idx in range(len(input_chunks)):
array_idx, start_idx, _ = input_chunks[chunk_idx]
_, result_fk_chunk, local_joint_angles = results_by_chunk[chunk_idx]
result_chunks_fk.append((array_idx, start_idx, result_fk_chunk))
for key, arr in local_joint_angles.items():
result_chunks_joint_angles[key].append((array_idx, start_idx, arr))
# Merge output chunks and store in dicts
output_arrays_fk = merge_chunks_into_arrays(
result_chunks_fk,
n_arrays=len(leg_segments),
seq_length=seq_length,
overlap=chunk_overlap,
)
for i, (segment_name, _) in enumerate(leg_segments):
forward_kinematics_dict[segment_name] = output_arrays_fk[i]
for key, arr_chunks in result_chunks_joint_angles.items():
self.joint_angles_dict[key] = merge_chunks_into_arrays(
arr_chunks,
n_arrays=1,
seq_length=seq_length,
overlap=chunk_overlap,
)[0]
return forward_kinematics_dict
[docs]
class LegInvKinGeneric(LegInvKinBase):
"""Class to calculate inverse kinematics for leg joints in a
generic manner to only match the given end effector.
Parameters
----------
aligned_pos : Dict[str, np.ndarray]
Aligned pose from the AlignPose class.
Should have the following structure
>>> pose_data_dict = {
"<side><segment>_leg": np.ndarray[N_frames,N_key_points,3],
}
kinematic_chain : KinematicChainGeneric
Kinematic chain of the leg.
initial_angles : Dict[str, np.ndarray], optional
Initial angles of DOFs.
If not provided, the default values from body_config.py will be used.
Examples
--------
>>> from pathlib import Path
>>> from seqikpy.kinematic_chain import KinematicChainGeneric
>>> from seqikpy.leg_inverse_kinematics import LegInvKinGeneric
>>> from seqikpy.body_config import neuromechfly_body_config
>>> from seqikpy.utils import load_file
>>> DATA_PATH = Path("../data/anipose_220525_aJO_Fly001_001/pose-3d")
>>> f_path = DATA_PATH / "pose3d_aligned.pkl"
>>> aligned_pos = load_file(f_path)
>>> seq_ik = LegInvKinGeneric(
aligned_pos=aligned_pos,
kinematic_chain_class=KinematicChainGeneric(
bounds_dof=neuromechfly_body_config.dof_bounds_rad,
legs_list=["RF", "LF"],
body_size=None,
),
initial_angles=neuromechfly_body_config.initial_angles_rad
)
>>> leg_joint_angles, forward_kinematics = gen_ik.run_ik_and_fk(
export_path=DATA_PATH,
hide_progress_bar=False
)
"""
def __init__(
self,
aligned_pos: Dict[str, np.ndarray],
kinematic_chain_class: KinematicChainGeneric,
initial_angles: Optional[Dict[str, np.ndarray]] = None,
) -> None:
super().__init__(aligned_pos, kinematic_chain_class, initial_angles)
# Create an empty dict for joint angles
self.joint_angles_dict = {}
@staticmethod
def _process_single_leg_generic(
segment_name,
segment_array,
kinematic_chain_class,
initial_angles_dict,
hide_progress_bar,
):
"""Process a single leg for generic inverse kinematics."""
leg_name = segment_name.split("_")[0]
if leg_name not in kinematic_chain_class.body_size:
return segment_name, None, {}
# Create a temporary instance to process this leg
temp_instance = LegInvKinGeneric.__new__(LegInvKinGeneric)
temp_instance.kinematic_chain_class = kinematic_chain_class
temp_instance.initial_angles = initial_angles_dict
temp_instance.joint_angles_dict = {}
origin = segment_array[:, 0, :]
end_effector_pos = segment_array[:, -1, :]
initial_angles = initial_angles_dict[leg_name]["stage_4"]
forward_kinematics_result = temp_instance.calculate_ik_stage(
end_effector_pos=end_effector_pos,
origin=origin,
initial_angles=initial_angles,
segment_name=leg_name,
hide_progress_bar=hide_progress_bar,
)
return segment_name, forward_kinematics_result, temp_instance.joint_angles_dict
[docs]
def calculate_ik_stage(
self,
end_effector_pos: np.ndarray,
origin: np.ndarray,
initial_angles: np.ndarray,
segment_name: str,
**kwargs,
) -> np.ndarray:
"""For a given trial pose data, calculates the inverse kinematics.
Parameters
----------
end_effector_pos : np.ndarray
3D array containing the position of the end effector pos
origin : np.ndarray
Origin of the kinematic chain, i.e., Thorax-Coxa joint
initial_angles : np.ndarray
Initial angles for the optimization seed
segment_name : str
Leg side, i.e., RF, LF, RM, LM, RH, LH
Returns
-------
np.ndarray
Array containing the cartesian coordinates of the joint positions.
The joint angles are saved in a class attribute
"""
hide_progress_bar = kwargs.get("hide_progress_bar", False)
if not segment_name in ["RF", "LF", "RM", "LM", "RH", "LH"]:
raise ValueError(f"Segment name ({segment_name}) is not valid.")
frames_no = end_effector_pos.shape[0]
end_effector_pos_diff = end_effector_pos - origin
# Joint angles shape: (number of frames, number of DOFs)
joint_angles = np.empty((frames_no, len(initial_angles)))
# Forward kinematics shape: (number of frames, number of
# joints in the kinematics chain, axes-x,y,z)
forward_kinematics = np.empty((frames_no, len(initial_angles), 3))
# if the origin is a vector, convert it to a matrix
if origin.size == 3:
origin = np.tile(origin, (frames_no, 1))
# Generic IK, the kinematic chain is the entire leg
kinematic_chain = self.kinematic_chain_class.create_leg_chain(
leg_name=segment_name,
)
# Start the inverse kinematics calculation
for t in trange(
frames_no,
disable=hide_progress_bar,
desc=f"Calculating IK {segment_name}",
):
# For the first frame, use the given initial angles, for the rest
# use the calculated joint angles from the previous time step
initial_angles = initial_angles if t == 0 else joint_angles[t - 1, :]
# Calculate the inverse kinematics
joint_angles[t, :] = self.calculate_ik(
kinematic_chain, end_effector_pos_diff[t, :], initial_angles
)
forward_kinematics[t, :] = (
self.calculate_fk(kinematic_chain, joint_angles[t, :]) + origin[t, :]
)
# Link names
link_names = [link.name for link in kinematic_chain.links]
# Store the joint angles based on the stage number
for link_name in link_names:
if "Base" in link_name or "Claw" in link_name:
continue
self.joint_angles_dict[f"Angle_{link_name}"] = joint_angles[
:, link_names.index(link_name)
]
return forward_kinematics
[docs]
def run_ik_and_fk(
self,
export_path: Union[Path, str] = None,
n_workers: int = 1,
hide_progress_bar=False,
) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray]]:
"""Runs inverse and forward kinematics for leg joints.
Parameters
----------
export_path : Union[Path, str], optional
Path where the results will be saved,
if None, nothing is saved, by default None
n_workers : int, optional
Number of parallel jobs for leg processing. -1 uses all cores,
1 disables parallelization, by default 1.
hide_progress_bar : bool, optional
Hide the progress bar, by default False.
Returns
-------
Tuple[Dict[str,np.ndarray], Dict[str,np.ndarray]]
Two dictionaries containing joint angles and forward
kinematics, respectively.
"""
forward_kinematics_dict = {}
_logger.info("Computing joint angles and forward kinematics...")
# Filter leg segments
leg_segments = [
(name, array)
for name, array in self.aligned_pos.items()
if "leg" in name.lower()
]
if n_workers == 1 or len(leg_segments) <= 1:
# Sequential processing
for segment_name, segment_array in leg_segments:
res = self._process_single_leg_generic(
segment_name,
segment_array,
self.kinematic_chain_class,
self.initial_angles,
hide_progress_bar,
)
result_name, result_fk, local_joint_angles = res
if result_fk is not None:
forward_kinematics_dict[result_name] = result_fk
self.joint_angles_dict.update(local_joint_angles)
else:
# Parallel processing of legs
results = Parallel(n_jobs=n_workers)(
delayed(self._process_single_leg_generic)(
segment_name,
segment_array,
self.kinematic_chain_class,
self.initial_angles,
hide_progress_bar,
)
for segment_name, segment_array in leg_segments
)
# Collect results
for segment_name, result_fk, local_joint_angles in results:
if result_fk is not None:
forward_kinematics_dict[segment_name] = result_fk
self.joint_angles_dict.update(local_joint_angles)
_logger.debug("Joint angles and forward kinematics are computed.")
if export_path is not None:
save_file(
Path(export_path) / "forward_kinematics.pkl",
forward_kinematics_dict,
)
save_file(
Path(export_path) / "leg_joint_angles.pkl",
self.joint_angles_dict,
)
_logger.info(
"Joint angles and forward kinematics are saved at %s",
export_path,
)
return self.joint_angles_dict, forward_kinematics_dict