Skip to article frontmatterSkip to article content

Gal3 Autonomous Surface Vessel IMU EKF

This notebook rewrites parts of Gal3ImuExample to make it work with the data found in gazebo_data.csv

This tutorial shows how to use GTSAM’s Gal3-specific EKF (Gal3ImuEKF) with synthetic IMU data and occasional position measurements. You’ll learn:

  • How the predict step integrates IMU into a Gal3 increment and propagates covariance.

  • How to write a position measurement in the EKF’s local coordinates (the correct H = [0, 0, R, 0]).

  • How to interpret the results with ±2σ uncertainty bands for yaw/pitch/roll, velocity, and position.

We simulate a simple motion and run the EKF end-to-end, then visualize estimates vs. ground truth along with uncertainty.

Authors: Derek Benham & Frank Dellaert

Open In Colab

Source
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import gtsam
from gtsam import Gal3, Point3, Pose3, Rot3


# Helper to display yaw/pitch/roll in degrees
def rot3_ypr_rad(R: Rot3):
    """Return yaw, pitch, roll (degrees) for a Rot3."""
    return np.degrees(R.ypr())

Background and setup

State: Gal3 X = (R, v, p, t) with rotation R ∈ SO(3), velocity v ∈ R^3, position p ∈ R^3, time t ∈ R.

  • EKF local coordinates are [δθ, δv_body, δp_body, δt], i.e., position/velocity increments are expressed in the body frame.

  • For a world-position measurement z ≈ p_world, the linearized measurement Jacobian is H = [0, 0, R, 0].

We’ll now define a simple scenario that provides ground truth and noise-free IMU to drive the predict step.

Source
# Single function to convert CSV into as similar as input used in NavStateImuExample
def parse_data(filename):
    # read csv file as np
    data_np = np.genfromtxt(filename, delimiter=",", dtype=None, skip_header=1)  # type: ignore
    start = data_np[1, 7:]
    imu_meas = [
        (row[1:4], row[4:7]) for row in data_np[1:]
    ]  # skipping first row because of gt initialization
    data_gt = data_np[:, 7:]

    dts = np.diff(
        data_np[:, 0]
    )  # I found that not all dt values are the same, therefore I'm using an array.
    position = start[:3]
    quat = start[3:7]  # x,y,z,w
    velocity_body = start[7:10]

    rot = Rot3.Quaternion(quat[3], quat[0], quat[1], quat[2])

    pose = Pose3(rot, Point3(*position))
    velocity_world = rot.rotate(velocity_body)
    X0 = Gal3(rot, pose.translation(), velocity_world, 0.0)

    pos_true = data_gt[:, 0:3]
    quat_true = data_gt[:, 3:7]
    vel_true_body = data_gt[:, 7:10]
    vel_true_world = np.empty_like(vel_true_body)
    ypr_true = np.empty((vel_true_body.shape[0], 3))
    for i in range(vel_true_body.shape[0]):
        rot = Rot3.Quaternion(
            quat_true[i, 3], quat_true[i, 0], quat_true[i, 1], quat_true[i, 2]
        )
        vel_true_world[i] = rot.rotate(vel_true_body[i])
        ypr_true[i] = rot3_ypr_rad(rot)

    return X0, imu_meas, pos_true, ypr_true, vel_true_world, dts
filename = gtsam.findExampleDataFile("gazebo_ASV.csv")
if filename is None:
    raise RuntimeError(
        "Could not find example data file 'gazebo_ASV.csv'. Make sure GTSAM is installed correctly."
    )

X0, imu_meas, pos_true, ypr_true, vel_true, dts = parse_data(filename)
N = dts.shape[0]

print(f"Number of IMU measurements: {len(imu_meas)}")
print(f"Number of time steps (N): {N}")
print(f"Shape of position ground truth: {pos_true.shape}")
print(f"Shape of velocity ground truth: {vel_true.shape}")
print(f"Shape of yaw/pitch/roll ground truth: {ypr_true.shape}")
print(f"Min/Max delta t: {np.min(dts):.5f} -> {np.max(dts):.5f}")
ypr_deg = np.round(np.degrees(X0.attitude().ypr()), 2)
pos = np.round(X0.position(), 2)
vel = np.round(X0.velocity(), 2)
print(
    f"Initial Gal3: ypr (deg) = [{ypr_deg[0]:.2f}, {ypr_deg[1]:.2f}, {ypr_deg[2]:.2f}], "
    f"vel = [{vel[0]:.2f}, {vel[1]:.2f}, {vel[2]:.2f}] "
    f"pos = [{pos[0]:.2f}, {pos[1]:.2f}, {pos[2]:.2f}], "
    f"time = {X0.time():.2f}]",
    flush=True,
)
Number of IMU measurements: 988
Number of time steps (N): 988
Shape of position ground truth: (989, 3)
Shape of velocity ground truth: (989, 3)
Shape of yaw/pitch/roll ground truth: (989, 3)
Min/Max delta t: 0.01000 -> 0.03000
Initial Gal3: ypr (deg) = [-176.68, -0.56, 0.14], vel = [-2.04, -0.14, 0.26] pos = [220.43, -1.49, 0.03], time = 0.00]

Simulation issues

It appears that my simulation data is not perfect and that there are some dropped messages. See the chart below for all occurrences of time steps that are greater than 0.01

Source
# Find the maximum delta
max_delta = np.max(dts)

# Create a Plotly figure
fig = go.Figure()

# Add a line plot for deltas
fig.add_trace(
    go.Scatter(y=dts, mode="lines", name="Delta (s)", line=dict(color="blue"))
)

# Update layout for the plot
fig.update_layout(
    title="Delta between Consecutive Timestamps",
    xaxis_title="Index",
    yaxis_title="Delta (s)",
    template="plotly_white",
)

# Show the plot
fig.show()

# Find the indices where the delta is greater than 0.01
large_deltas_idx = np.where(dts > 0.010001)[0]  # In some cases dt is 0.0100000000001
print(f"Indices of deltas greater than 0.01: {large_deltas_idx}")

print(f"The maximum delta between consecutive timestamps is: {max_delta}")
Loading...

Create Parameters

params = gtsam.PreintegrationParams.MakeSharedU(9.81)  # gravity in m/s^2

# # IMU covariances (tune as needed)
params.setAccelerometerCovariance(np.diag([1e-3] * 3))  # type: ignore
params.setIntegrationCovariance(np.diag([1e-3] * 3))  #  type: ignore
params.setGyroscopeCovariance(np.diag([1e-4] * 3))  # type: ignore

Run the EKF

We integrate IMU in the predict step and occasionally update with a position measurement. The position Jacobian must be H = [0, R, 0] because EKF local coordinates use body-frame increments for p and v.

# Initialize EKF with ground-truth state and modest covariance
P0 = np.eye(10) * 0.1  # std ~ 0.316 on each component
P0[9, 9] = 1e-9  # tiny variance for time
ekf = gtsam.Gal3ImuEKF(X0, P0, params)  # type: ignore
times = np.concatenate(([0.0], np.cumsum(dts)))

# Storage for plotting
ypr_est = []
vel_est = []
pos_est = []
time_est = []

# Std-dev storage from EKF covariance (for ±2σ bands)
rot_std_deg_list = []  # yaw/pitch/roll std in degrees
pos_std_list = []  # x/y/z std in meters
vel_std_list = []  # vx/vy/vz std in m/s
time_std_list = []  # time std in seconds

# Simulate and filter
Xk = X0
for k, t in enumerate(times):
    if k == 0:
        # Record initial estimate and covariance
        X_est = ekf.state()
        ypr_est.append(rot3_ypr_rad(X_est.attitude()))
        vel_est.append(np.asarray(X_est.velocity()))
        pos_est.append(np.asarray(X_est.position()))
        time_est.append(X_est.time())
        P = ekf.covariance()
        std = np.sqrt(np.diag(P))
        rot_std_deg_list.append(np.rad2deg(std[0:3]))
        vel_std_list.append(std[3:6])
        pos_std_list.append(std[6:9])
        time_std_list.append(std[9])
        continue

    # Predict using noise-free IMU from scenario (at interval start)
    omega_meas = np.array(imu_meas[k-1][0])
    acc_meas = np.array(imu_meas[k-1][1])
    dt = dts[k-1]
    ekf.predict(omega_meas, acc_meas, dt)

    # Log estimate after predict
    X_est = ekf.state()
    ypr_est.append(rot3_ypr_rad(X_est.attitude()))
    vel_est.append(np.asarray(X_est.velocity()))
    pos_est.append(np.asarray(X_est.position()))
    time_est.append(X_est.time())

    # Log current covariance std-devs
    P = ekf.covariance()  # post-predict, pre-update
    std = np.sqrt(np.diag(P))
    rot_std_deg_list.append(np.rad2deg(std[0:3]))
    vel_std_list.append(std[3:6])
    pos_std_list.append(std[6:9])
    time_std_list.append(std[9])

    # Position measurement every M steps
    M = 100  # measurement cadence
    if k % M == 0 and k > 0:
        predicted_position = X_est.position()
        measured_position = pos_true[k]
        R = np.eye(3) * 1.0  # measurement covariance (tune as needed)

        # Measurement Jacobian in EKF local coords: H = [0, R, 0]
        H = np.zeros((3, 10))
        H[:, 6:9] = X_est.attitude().matrix()
        ekf.updateWithVector(predicted_position, H, measured_position, R)  # type: ignore

# Convert to arrays for plotting
ypr_est = np.unwrap(np.vstack(ypr_est), axis=0)
vel_est = np.vstack(vel_est)
pos_est = np.vstack(pos_est)
time_est = np.array(time_est)

rot_std_deg = np.vstack(rot_std_deg_list)
pos_std = np.vstack(pos_std_list)
vel_std = np.vstack(vel_std_list)
time_std = np.array(time_std_list)

Plot Yaw/Pitch/Roll (degrees)

We show estimated angles with ±2σ bands and dashed ground truth. Ideally, the truth lies mostly within the shaded bands if covariances are well tuned.

Source
# Create 3-row subplot: one for each of Yaw, Pitch, Roll
fig = make_subplots(rows=3, cols=1, shared_xaxes=True, subplot_titles=("Yaw (deg)", "Pitch (deg)", "Roll (deg)"))
color_rgb = {0: ("rgba(255,0,0,0.15)", "#ff0000"), 1: ("rgba(0,128,0,0.15)", "#008000"), 2: ("rgba(0,0,255,0.15)", "#0000ff")}
names = {0: "Yaw", 1: "Pitch", 2: "Roll"}
for i in range(3):
    fill_rgba, line_color = color_rgb[i]; mean = ypr_est[:, i]; std2p = 2.0 * rot_std_deg[:, i]; upper = mean + std2p; lower = mean - std2p; row = i + 1
    fig.add_trace(go.Scatter(x=times, y=upper, line=dict(color=line_color, width=0), showlegend=False), row=row, col=1)
    fig.add_trace(go.Scatter(x=times, y=lower, fill="tonexty", fillcolor=fill_rgba, line=dict(color=line_color, width=0), showlegend=False), row=row, col=1)
    fig.add_trace(go.Scatter(x=times, y=mean, name=f"{names[i]} est", line=dict(color=line_color, width=2)), row=row, col=1)
    fig.add_trace(go.Scatter(x=times, y=ypr_true[:, i], name=f"{names[i]} true", line=dict(color=line_color, dash="dash", width=2)), row=row, col=1)
fig.update_layout(height=900, title_text="Yaw, Pitch, Roll (deg) with ±2σ bounds", xaxis3_title="Time (s)", yaxis_title="Degrees", showlegend=True)
fig.show()
Loading...

Plot Velocity (m/s)

Velocity often benefits from frequent updates (e.g., GPS Doppler). Here we only have occasional position updates; expect velocity uncertainty to grow between updates and shrink slightly after them.

Source
# 3 subplots for vx, vy, vz
fig = make_subplots(rows=3, cols=1, shared_xaxes=True, subplot_titles=("Velocity X (m/s)", "Velocity Y (m/s)", "Velocity Z (m/s)"))
color_rgb = {0: ("rgba(255,0,0,0.15)", "#ff0000"), 1: ("rgba(0,128,0,0.15)", "#008000"), 2: ("rgba(0,0,255,0.15)", "#0000ff")}
for i, comp in enumerate(["x", "y", "z"]):
    fill_rgba, line_color = color_rgb[i]; mean = vel_est[:, i]; std2p = 2.0 * vel_std[:, i]; upper = mean + std2p; lower = mean - std2p; row = i + 1
    fig.add_trace(go.Scatter(x=times, y=upper, line=dict(color=line_color, width=0), showlegend=False), row=row, col=1)
    fig.add_trace(go.Scatter(x=times, y=lower, fill="tonexty", fillcolor=fill_rgba, line=dict(color=line_color, width=0), showlegend=False), row=row, col=1)
    fig.add_trace(go.Scatter(x=times, y=mean, name=f"v{comp} est", line=dict(color=line_color, width=2)), row=row, col=1)
    fig.add_trace(go.Scatter(x=times, y=vel_true[:, i], name=f"v{comp} true", line=dict(color=line_color, dash="dash", width=2)), row=row, col=1)
fig.update_layout(height=900, title_text="Velocity Components in World Frame with ±2σ Bounds", xaxis3_title="Time (s)", yaxis_title="Velocity (m/s)", showlegend=True)
fig.show()
Loading...

Plot Position (m)

Position updates should pull the estimate toward truth and reduce the ±2σ band. If divergence occurs, check:

  • H matches EKF local coords ([0, 0, R, 0] for world-position).

  • Measurement noise R reflects your sensor.

  • Process noise (IMU) is not overconfident.

Source
# 3 subplots for px, py, pz
fig = make_subplots(rows=3, cols=1, shared_xaxes=True, subplot_titles=("Position X (m)", "Position Y (m)", "Position Z (m)"))
color_rgb = {0: ("rgba(255,0,0,0.15)", "#ff0000"), 1: ("rgba(0,128,0,0.15)", "#008000"), 2: ("rgba(0,0,255,0.15)", "#0000ff")}
for i, comp in enumerate(["x", "y", "z"]):
    fill_rgba, line_color = color_rgb[i]; mean = pos_est[:, i]; std2p = 2.0 * pos_std[:, i]; upper = mean + std2p; lower = mean - std2p; row = i + 1
    fig.add_trace(go.Scatter(x=times, y=upper, line=dict(color=line_color, width=0), showlegend=False), row=row, col=1)
    fig.add_trace(go.Scatter(x=times, y=lower, fill="tonexty", fillcolor=fill_rgba, line=dict(color=line_color, width=0), showlegend=False), row=row, col=1)
    fig.add_trace(go.Scatter(x=times, y=mean, name=f"p{comp} est", line=dict(color=line_color, width=2)), row=row, col=1)
    fig.add_trace(go.Scatter(x=times, y=pos_true[:, i], name=f"p{comp} true", line=dict(color=line_color, dash="dash", width=2)), row=row, col=1)
fig.update_layout(height=900, title_text="Position Components with ±2σ Bounds", xaxis3_title="Time (s)", yaxis_title="Position (m)", showlegend=True)
fig.show()
Loading...

Conclusion

This notebook demonstrated the use of GTSAM’s Gal3ImuEKF for fusing IMU and position measurements in an autonomous surface vessel scenario. We visualized the EKF’s estimates and uncertainty for orientation, velocity, and position, comparing them to ground truth. The results show that the EKF tracks the true state well, with uncertainty growing between updates and shrinking after position corrections. Careful tuning of process and measurement noise is essential for reliable performance. This workflow provides a foundation for integrating additional sensors and handling real-world data imperfections.