Running SeqIKPy in parallel

Running SeqIKPy in parallel#

In this tutorial, we will speed up processing by running inverse and forward kinematics in parallel.

Note that depending on how the Jupyter server is set up, running parallel jobs from Jupyter Notebook can induce additional overhead. In your production pipeline, you should run the jobs in a plain Python script.

We start by importing the necessary packages:

import numpy as np
from time import time
from pathlib import Path

import seqikpy
from seqikpy.kinematic_chain import KinematicChainSeq
from seqikpy.leg_inverse_kinematics import LegInvKinSeq
from seqikpy.utils import load_file, calculate_body_size
from seqikpy.body_config import neuromechfly_body_config

Next, we define our NeuroMechFly body configuration:

_TEMPLATE_NMF_LOCOMOTION = {
    "RF_Coxa": np.array([0.35, -0.27, 0.400]),
    "RF_Femur": np.array([0.35, -0.27, -0.025]),
    "RF_Tibia": np.array([0.35, -0.27, -0.731]),
    "RF_Tarsus": np.array([0.35, -0.27, -1.249]),
    "RF_Claw": np.array([0.35, -0.27, -1.912]),
    "LF_Coxa": np.array([0.35, 0.27, 0.400]),
    "LF_Femur": np.array([0.35, 0.27, -0.025]),
    "LF_Tibia": np.array([0.35, 0.27, -0.731]),
    "LF_Tarsus": np.array([0.35, 0.27, -1.249]),
    "LF_Claw": np.array([0.35, 0.27, -1.912]),
    "RM_Coxa": np.array([0, -0.125, 0]),
    "RM_Femur": np.array([0, -0.125, -0.182]),
    "RM_Tibia": np.array([0, -0.125, -0.965]),
    "RM_Tarsus": np.array([0, -0.125, -1.633]),
    "RM_Claw": np.array([0, -0.125, -2.328]),
    "LM_Coxa": np.array([0, 0.125, 0]),
    "LM_Femur": np.array([0, 0.125, -0.182]),
    "LM_Tibia": np.array([0, 0.125, -0.965]),
    "LM_Tarsus": np.array([0, 0.125, -1.633]),
    "LM_Claw": np.array([0, 0.125, -2.328]),
    "RH_Coxa": np.array([-0.215, -0.087, -0.073]),
    "RH_Femur": np.array([-0.215, -0.087, -0.272]),
    "RH_Tibia": np.array([-0.215, -0.087, -1.108]),
    "RH_Tarsus": np.array([-0.215, -0.087, -1.793]),
    "RH_Claw": np.array([-0.215, -0.087, -2.588]),
    "LH_Coxa": np.array([-0.215, 0.087, -0.073]),
    "LH_Femur": np.array([-0.215, 0.087, -0.272]),
    "LH_Tibia": np.array([-0.215, 0.087, -1.108]),
    "LH_Tarsus": np.array([-0.215, 0.087, -1.793]),
    "LH_Claw": np.array([-0.215, 0.087, -2.588]),
}

_INITIAL_ANGLES_LOCOMOTION_RAD = {
    "RF": {
        # Base ThC yaw pitch CTr pitch
        "stage_1": np.array([0.0, 0.45, -0.07, -2.14]),
        # BaseΒ ThC yaw pitch roll CTr pitch CTr roll
        "stage_2": np.array([0.0, 0.45, -0.07, -0.32, -2.14, 1.4]),
        # Base ThC yaw pitch roll CTr pitch CTr roll FTi pitch
        "stage_3": np.array([0.0, 0.45, -0.07, -0.32, -2.14, -1.25, 1.48, 0.0]),
        # Base ThC yaw pitch roll CTr pitch CTr roll FTi pitch TiTa pitch
        "stage_4": np.array([0.0, 0.45, -0.07, -0.32, -2.14, -1.25, 1.48, 0.0, 0.0]),
    },
    "LF": {
        "stage_1": np.array([0.0, -0.45, -0.07, -2.14]),
        "stage_2": np.array([0.0, -0.45, -0.07, 0.32, -2.14, 1.4]),
        "stage_3": np.array([0.0, -0.45, -0.07, 0.32, -2.14, 1.25, 1.48, 0.0]),
        "stage_4": np.array([0.0, -0.45, -0.07, 0.32, -2.14, 1.25, 1.48, 0.0, 0.0]),
    },
    "RM": {
        "stage_1": np.array([0.0, 0.45, 0.37, -2.14]),
        "stage_2": np.array([0.0, 0.45, 0.37, -0.32, -2.14, 1.4]),
        "stage_3": np.array([0.0, 0.45, 0.37, -0.32, -2.14, -1.25, 1.48, 0.0]),
        "stage_4": np.array([0.0, 0.45, 0.37, -0.32, -2.14, -1.25, 1.48, 0.0, 0.0]),
    },
    "LM": {
        "stage_1": np.array([0.0, -0.45, 0.37, -2.14]),
        "stage_2": np.array([0.0, -0.45, 0.37, 0.32, -2.14, 1.4]),
        "stage_3": np.array([0.0, -0.45, 0.37, 0.32, -2.14, 1.25, 1.48, 0.0]),
        "stage_4": np.array([0.0, -0.45, 0.37, 0.32, -2.14, 1.25, 1.48, 0.0, 0.0]),
    },
    "RH": {
        "stage_1": np.array([0.0, 0.45, 0.07, -2.14]),
        "stage_2": np.array([0.0, 0.45, 0.07, -0.32, -2.14, 1.4]),
        "stage_3": np.array([0.0, 0.45, 0.07, -0.32, -2.14, -1.25, 1.48, 0.0]),
        "stage_4": np.array([0.0, 0.45, 0.07, -0.32, -2.14, -1.25, 1.48, 0.0, 0.0]),
    },
    "LH": {
        "stage_1": np.array([0.0, -0.45, 0.07, -2.14]),
        "stage_2": np.array([0.0, -0.45, 0.07, 0.32, -2.14, 1.4]),
        "stage_3": np.array([0.0, -0.45, 0.07, 0.32, -2.14, 1.25, 1.48, 0.0]),
        "stage_4": np.array([0.0, -0.45, 0.07, 0.32, -2.14, 1.25, 1.48, 0.0, 0.0]),
    },
}


_BOUNDS_LOCOMOTION_DEG = {
    "RF_ThC_yaw": (-180, 180),
    "RF_ThC_pitch": (-90, 90),
    "RF_ThC_roll": (-180, 180),
    "RF_CTr_pitch": (-180, 180),
    "RF_FTi_pitch": (-180, 180),
    "RF_CTr_roll": (-180, 180),
    "RF_TiTa_pitch": (-180, 0),
    "RM_ThC_yaw": (-50, 50),
    "RM_ThC_pitch": (-180, 180),
    "RM_ThC_roll": (-180, 0),
    "RM_CTr_pitch": (-180, 180),
    "RM_FTi_pitch": (-180, 180),
    "RM_CTr_roll": (-180, 180),
    "RM_TiTa_pitch": (-180, 0),
    "RH_ThC_yaw": (-50, 50),
    "RH_ThC_pitch": (-50, 50),
    "RH_ThC_roll": (-180, 0),
    "RH_CTr_pitch": (-180, 0),
    "RH_FTi_pitch": (-180, 180),
    "RH_CTr_roll": (-180, 180),
    "RH_TiTa_pitch": (-180, 0),
    "LF_ThC_yaw": (-180, 180),
    "LF_ThC_pitch": (-90, 90),
    "LF_ThC_roll": (-180, 180),
    "LF_CTr_pitch": (-180, 180),
    "LF_FTi_pitch": (-180, 180),
    "LF_CTr_roll": (-180, 180),
    "LF_TiTa_pitch": (-180, 0),
    "LM_ThC_yaw": (-50, 50),
    "LM_ThC_pitch": (-180, 180),
    "LM_ThC_roll": (0, 180),
    "LM_CTr_pitch": (-180, 180),
    "LM_FTi_pitch": (-180, 180),
    "LM_CTr_roll": (-180, 180),
    "LM_TiTa_pitch": (-180, 0),
    "LH_ThC_yaw": (-50, 50),
    "LH_ThC_pitch": (-50, 50),
    "LH_ThC_roll": (0, 180),
    "LH_CTr_pitch": (-180, 0),
    "LH_FTi_pitch": (-180, 180),
    "LH_CTr_roll": (-180, 180),
    "LH_TiTa_pitch": (-180, 0),
}

body_config = neuromechfly_body_config.deepcopy()
body_config.template = _TEMPLATE_NMF_LOCOMOTION
body_config.initial_angles_rad = _INITIAL_ANGLES_LOCOMOTION_RAD
body_config.set_dof_bounds_in_deg(_BOUNDS_LOCOMOTION_DEG)

Let’s load some toy data (these are already aligned).

data_dir = (
    Path(seqikpy.__path__[0]).parent / "data/df3d_pose_result__210902_PR_Fly1"
)
aligned_pose_data = load_file(data_dir / "pose3d_aligned.pkl")

As in the previous tutorials, we will now create instances of KinematicChainSeq and LegInvKinSeq from the data:

seq_len = aligned_pose_data[list(aligned_pose_data.keys())[0]].shape[0]
legs = [f"{side}{pos}" for side in "RL" for pos in ["F", "M", "H"]]

kin_chain = KinematicChainSeq(
    bounds_dof=body_config.dof_bounds_rad,
    body_size=calculate_body_size(body_config.template, legs),
    legs_list=legs,
)

seq_ik = LegInvKinSeq(
    aligned_pos=aligned_pose_data,
    kinematic_chain_class=kin_chain,
    initial_angles=body_config.initial_angles_rad,
)

Similar to the previous tutorials, we can now run inverse and forward kinematics from the main process. Let’s time this operation:

start_time = time()
serial_output = seq_ik.run_ik_and_fk()
wall_time = time() - start_time
print(f"Serial processing: {seq_len}x6 frames processed in {wall_time:.2f}s")
RF stage 1: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:00<00:00, 191.46it/s]
RF stage 2: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 69.28it/s]
RF stage 3: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 53.39it/s]
RF stage 4: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 52.62it/s]
RM stage 1: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:00<00:00, 250.63it/s]
RM stage 2: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 72.30it/s]
RM stage 3: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 54.40it/s]
RM stage 4: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 54.17it/s]
RH stage 1: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:00<00:00, 340.79it/s]
RH stage 2: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 66.08it/s]
RH stage 3: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 53.19it/s]
RH stage 4: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 51.25it/s]
LF stage 1: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:00<00:00, 179.64it/s]
LF stage 2: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 69.96it/s]
LF stage 3: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 53.31it/s]
LF stage 4: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 51.50it/s]
LM stage 1: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:00<00:00, 195.01it/s]
LM stage 2: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 66.71it/s]
LM stage 3: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 52.99it/s]
LM stage 4: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:02<00:00, 49.58it/s]
LH stage 1: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:00<00:00, 274.26it/s]
LH stage 2: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 62.50it/s]
LH stage 3: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:01<00:00, 54.46it/s]
LH stage 4: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 100/100 [00:02<00:00, 49.24it/s]
Serial processing: 100x6 frames processed in 34.49s

As we can see in the log, the six legs are processed sequentially. However, the six kinematic chains are completely independent. This means that we can process each of them in their respective process, so they can run together using different CPU cores.

To demonstrate the performance gain, we will use a longer sequence of behavior data (if the job is too short, the additional overhead introduced by parallelization can make parallelization not worth it). For this, we will write a generate_input_sequence function that repeats the data a couple of times:

def generate_input_sequence(n_repeats):
    """Artificially repeat data to increase sequence length for benchmarking"""
    repeated_pose_data = {}
    for key, arr in aligned_pose_data.items():
        arr_back = arr[::-1, ...]
        repeated_pose_data[key] = np.concatenate(
            [arr, arr_back] * (n_repeats // 2) + [arr] * (n_repeats % 2), axis=0
        )
    return repeated_pose_data

Let’s put the initalization of the KinematicChainSeq and LegInvKinSeq in a wrapper. This helps us deal with different sequence lengths:

def create_kinchain_and_seqik_objects(n_repeats):
    repeated_pose_data = generate_input_sequence(n_repeats)
    seq_len = repeated_pose_data[list(repeated_pose_data.keys())[0]].shape[0]
    legs = [f"{side}{pos}" for side in "RL" for pos in ["F", "M", "H"]]
    kin_chain = KinematicChainSeq(
        bounds_dof=body_config.dof_bounds_rad,
        body_size=calculate_body_size(body_config.template, legs),
        legs_list=legs,
    )
    seq_ik = LegInvKinSeq(
        aligned_pos=repeated_pose_data,
        kinematic_chain_class=kin_chain,
        initial_angles=body_config.initial_angles_rad,
    )
    return kin_chain, seq_ik, seq_len


kin_chain_x6, seq_ik_x6, seq_len_x6 = create_kinchain_and_seqik_objects(n_repeats=6)

We can re-run inverse and forward kinematics again in parallel. We can do this simply by setting n_workers to 6. Since we are only parallelizing over legs for now, we will set parallel_over_time flag to False. We will disable the progress bar to avoid having too much information from all the worker processes.

start_time = time()
parallel_legs_output = seq_ik_x6.run_ik_and_fk(
    n_workers=6, parallel_over_time=False, hide_progress_bar=True
)
wall_time = time() - start_time
print(
    f"Parallel processing over legs: {seq_len_x6}x6 frames processed in {wall_time:.2f}s"
)
Parallel processing over legs: 600x6 frames processed in 38.32s

Over roughly the same amount of time, we now have processed 6x as much data. Let’s check if the results are identical:

example_dof = "Angle_RF_ThC_pitch"
serial_angles_ts = serial_output[0][example_dof]
parallel_angles_ts = parallel_legs_output[0][example_dof]
# trim off extra frames from parallel output that were added for benchmarking
parallel_angles_ts = parallel_angles_ts[:len(serial_angles_ts)]
print("Results are identical:", np.allclose(serial_angles_ts, parallel_angles_ts))
Results are identical: True

We can also parallelize over time by splitting the sequence into smaller chunks. This will not lead to identical results because for each frame, the initial joint angles are taken from the previous loop. Therefore, starting the optimization half way into the sequence changes the initial conditions and technically changes the outcome of the inverse kinematics computation. Therefore, we include a small overlap between each two adjacent chunks. In the first half of the overlap, the output from the previous chunk will be used while the new chunk β€œwarms up.” In the second half of the overlap, the final result is a linear β€œblend” of the results from the two chunks, with the weight of the latter chunk increasing linearly from 0 to 1. With this configuration, the discrepancy in the final results is negligible.

To enable parallelization over time, simply set parallel_over_time to True and use more workers.

n_workers = 12
kin_chain_x12, seq_ik_x12, seq_len_x12 = create_kinchain_and_seqik_objects(n_repeats=12)

start_time = time()
parallel_time_output = seq_ik_x12.run_ik_and_fk(
    n_workers=12, parallel_over_time=True, hide_progress_bar=True
)
wall_time = time() - start_time
print(
    f"Parallel processing over time and legs: "
    f"{seq_len_x12}x12 frames processed in {wall_time:.2f}s"
)
Parallel processing over time and legs: 1200x12 frames processed in 64.10s

Note that parallelizing over time is only helpful if your CPU has more cores than the number of kinematic chains you have. The computer that I’m writing this tutorial on has 8 CPU cores (16 threads), so we expect to see some speedup, but not a lot. If you’re running on a better computer (e.g. on a cluster), you will see a clearer difference. We’ve done a Weak Scaling Test on a 36-core (72-thread) Intel Xeon Platinum 8360Y CPU, and observed a ~32x speedup using 36 workers, and a ~58x speedup using 72 workers. The effective processing time is around 2ms per frame.

Let’s check if the results are close enough

legs_parallel_angles_ts = parallel_legs_output[0][example_dof]
time_parallel_angles_ts = parallel_time_output[0][example_dof]
time_parallel_angles_ts = time_parallel_angles_ts[: len(legs_parallel_angles_ts)]

soft_max_min_vals = np.nanpercentile(legs_parallel_angles_ts, [5, 95])
range_ = soft_max_min_vals[1] - soft_max_min_vals[0]
diff_norm = np.abs(legs_parallel_angles_ts - time_parallel_angles_ts) / range_
diff_norm_nonan = diff_norm[~np.isnan(diff_norm)]
max_diff_norm = diff_norm_nonan.max()
nonzero_mask = ~np.isclose(diff_norm_nonan, 0)
nonzero_diff_frac = nonzero_mask.sum() / nonzero_mask.size
nonzero_diff_mean = np.mean(diff_norm_nonan[nonzero_mask])
print(
    f"Parallelizing over time:\n"
    f"  Max normalized difference: {max_diff_norm*100:.6f}%\n"
    f"  Fraction of values with non-zero difference: {nonzero_diff_frac*100:.2f}%\n"
    f"  Mean difference among different frames: {nonzero_diff_mean*100:.6f}%"
)
Parallelizing over time:
  Max normalized difference: 0.000275%
  Fraction of values with non-zero difference: 1.17%
  Mean difference among different frames: 0.000046%