Skip to article frontmatterSkip to article content

EKF-SLAM (using Incremental Fixed-Lag Smoother)

Open In Colab

This notebook demonstrates 2D Simultaneous Localization and Mapping (SLAM) using an EKF, although it is implemented using GTSAM’s IncrementalFixedLagSmoother, just using a lag of 1.

Scenario: A robot moves in a circular path, receiving noisy odometry and bearing-range measurements to landmarks.

Approach: We use a fixed-lag smoother which maintains and optimizes only a recent window of variables (defined by the SMOOTHER_LAG). Variables older than the lag are marginalized out, keeping the computational cost bounded, making it suitable for online applications. By default we set the lag to 1 here, which makes this an extended Kalman filter. But feel free to change the lag and see the fixed-lag smoother results.

1. Setup and Imports

# Install GTSAM and Plotly from pip if running in Google Colab
try:
    import google.colab
    %pip install --quiet gtsam-develop plotly
except ImportError:
    pass # Not in Colab
import numpy as np
from tqdm.notebook import tqdm # Progress bar
import math

import gtsam
from gtsam.symbol_shorthand import X, L # Symbols for poses and landmarks

# Import the IncrementalFixedLagSmoother
from gtsam import IncrementalFixedLagSmoother

# Helper modules
import simulation
from gtsam_plotly import SlamFrameData, create_slam_animation

2. Simulation and Smoother Parameters

Define parameters for the simulation environment, robot motion, noise models, and the fixed-lag smoother.

# World parameters
NUM_LANDMARKS = 15
WORLD_SIZE = 10.0 # Environment bounds [-WORLD_SIZE/2, WORLD_SIZE/2]

# Robot parameters
ROBOT_RADIUS = 3.0
ROBOT_ANGULAR_VEL = np.deg2rad(20.0) # Radians per step
NUM_STEPS = 50
DT = 1.0 # Time step duration

# Noise parameters (GTSAM Noise Models)
PRIOR_NOISE = gtsam.noiseModel.Diagonal.Sigmas(np.array([0.1, 0.1, np.deg2rad(1.0)]))
ODOMETRY_NOISE = gtsam.noiseModel.Diagonal.Sigmas(np.array([0.1, 0.05, np.deg2rad(2.0)]))
MEASUREMENT_NOISE = gtsam.noiseModel.Diagonal.Sigmas(np.array([np.deg2rad(2.0), 0.2]))

# Sensor parameters
MAX_SENSOR_RANGE = 5.0

# --- Fixed-Lag Smoother Parameters ---
# Define the length of the smoothing window in seconds.
# A lag of 1*DT (e.g., 1.0 second) means the smoother maintains the state
# estimate for the current time step only, behaving like a filter.
# Larger lags incorporate more past information for smoothing.
SMOOTHER_LAG = 0.99 * DT

3. Generate Ground Truth Data

Create the true environment, robot path, and simulate noisy sensor readings using the simulation module.

landmarks_gt_dict, poses_gt, odometry_measurements, measurements_sim, landmarks_gt_array = \
    simulation.generate_simulation_data(
        num_landmarks=NUM_LANDMARKS,
        world_size=WORLD_SIZE,
        robot_radius=ROBOT_RADIUS,
        robot_angular_vel=ROBOT_ANGULAR_VEL,
        num_steps=NUM_STEPS,
        dt=DT,
        odometry_noise_model=ODOMETRY_NOISE,
        measurement_noise_model=MEASUREMENT_NOISE,
        max_sensor_range=MAX_SENSOR_RANGE,
        X=X, # Pass symbol functions
        L=L
    )
Simulation Generated: 15 landmarks.
Simulation Generated: 51 ground truth poses and 50 odometry measurements.
Simulation Generated: 210 bearing-range measurements.

4. Fixed-Lag Smoother SLAM Implementation

Initialize Smoother and Helper Functions

We create the IncrementalFixedLagSmoother with the specified lag. We also initialize the first state (pose X(0) at time 0.0) and add it to the smoother using its update method. The update method requires factors, initial values (theta), and timestamps for the new variables being added.

# Helper for graphviz visualization
WRITER = gtsam.GraphvizFormatting()
WRITER.binaryEdges = True

def make_dot(graph, estimate):
    # Visualize the factor graph currently managed by the smoother
    WRITER.boxes = {key for key in estimate.keys() if gtsam.symbolChr(key) == ord('l')}
    return graph.dot(estimate, writer=WRITER)

# --- Smoother Initialization ---
print(f"Initializing IncrementalFixedLagSmoother with lag = {SMOOTHER_LAG} seconds...")
# Use ISAM2 parameters if specific settings are needed, otherwise defaults are used.
isam2_params = gtsam.ISAM2Params()
smoother = IncrementalFixedLagSmoother(SMOOTHER_LAG, isam2_params)

# Variables to store results for animation
history = []

# --- Initial Step (k=0) ---
initial_pose_key = X(0)
initial_time = 0.0
initial_pose = poses_gt[0] # Start at ground truth (can add noise if desired)

# Create containers for the first update
initial_factors = gtsam.NonlinearFactorGraph()
initial_values = gtsam.Values()
# The KeyTimestampMap maps variable keys (size_t) to their timestamps (double)
initial_timestamps = {}

# Add prior factor for the first pose
initial_factors.add(gtsam.PriorFactorPose2(initial_pose_key, initial_pose, PRIOR_NOISE))

# Add the initial pose estimate to Values
initial_values.insert(initial_pose_key, initial_pose)

# Add the timestamp for the initial pose
initial_timestamps[initial_pose_key] = initial_time

# Update the smoother with the initial state
print("Performing initial smoother update...")
smoother.update(initial_factors, initial_values, initial_timestamps)
print("Initial update complete.")

# Store initial state for animation
current_estimate = smoother.calculateEstimate()
current_graph = smoother.getFactors() # Get factors currently managed by smoother
# ISAM can serve as marginals object:
current_marginals = smoother.getISAM2()

history.append(SlamFrameData(0, current_estimate, current_marginals, make_dot(current_graph, current_estimate)))
Initializing IncrementalFixedLagSmoother with lag = 0.99 seconds...
Performing initial smoother update...
Initial update complete.

Main Iterative Loop

At each step k, we process the odometry measurement from X(k) to X(k+1) and all landmark measurements taken at pose X(k+1).

  1. Prepare Data: Collect new factors (NonlinearFactorGraph), initial estimates for new variables (Values), and timestamps for new variables (KeyTimestampMap) for the current step.
  2. Predict: Calculate an initial estimate for the new pose X(k+1) based on the previous estimate X(k) (retrieved from the smoother) and the odometry measurement.
  3. Initialize Landmarks: If a landmark is observed for the first time, calculate an initial estimate based on the predicted pose and the measurement, and add it to the new_values and new_timestamps.
  4. Update Smoother: Call smoother.update() with the collected factors, values, and timestamps. This incorporates the new information, performs optimization (iSAM2), and marginalizes old variables.
  5. Store Results: Retrieve the current estimate and factor graph from the smoother for visualization.
print(f"Running Incremental Fixed-Lag Smoother SLAM loop ({NUM_STEPS} steps)...")

for k in tqdm(range(NUM_STEPS)):
    # --- Prepare for Update --- 
    current_time = (k + 1) * DT

    # Create containers for the data needed for smoother.update()
    new_factors = gtsam.NonlinearFactorGraph()
    new_values = gtsam.Values() # Only initial estimates for *new* variables
    new_timestamps = {}

    # --- Odometry Factor --- 
    # Add the odometry factor connecting the previous pose to the current one.
    odom_k = odometry_measurements[k]
    new_factors.add(gtsam.BetweenFactorPose2(X(k), X(k + 1), odom_k, ODOMETRY_NOISE))

    # --- Predict Initial Estimate for New Pose --- 
    # Get the previous pose estimate *from the smoother's current solution*.
    prev_pose_estimate = current_estimate.atPose2(X(k))
    predicted_pose = prev_pose_estimate.compose(odom_k)
    # Add the initial estimate for the new pose X(k+1).
    new_values.insert(X(k + 1), predicted_pose)
    # Add the timestamp for the new pose.
    new_timestamps[X(k + 1)] = current_time

    # --- Measurement Factors & New Landmark Initialization --- 
    measurements_k1 = measurements_sim[k + 1] # Measurements taken AT pose k+1

    for lm_key, measured_bearing, measured_range in measurements_k1:
        # Add the measurement factor.
        new_factors.add(gtsam.BearingRangeFactor2D(X(k + 1), lm_key, 
                                                     measured_bearing, measured_range,
                                                     MEASUREMENT_NOISE))
        
        # --- Initialize New Landmark Estimates --- 
        if not current_estimate.exists(lm_key):
            # Calculate initial guess based on the *predicted* current pose and the measurement.
            delta_x = measured_range * math.cos(measured_bearing.theta())
            delta_y = measured_range * math.sin(measured_bearing.theta())
            lm_initial_guess = predicted_pose.transformFrom(gtsam.Point2(delta_x, delta_y))
            
            # Add the initial estimate for the new landmark.
            new_values.insert(lm_key, lm_initial_guess)
             
            # Add the timestamp for the new landmark (first time seen).
            # Important: Timestamps must be provided for all keys in `new_values`.
            new_timestamps[lm_key] = current_time
                        
    # --- Update Smoother --- 
    # Pass the new factors, initial estimates for new variables, and their timestamps.
    # The smoother internally manages adding these to the ISAM2 backend, optimizing,
    # and removing marginalized variables/factors.
    smoother_result = smoother.update(new_factors, new_values, new_timestamps)
    # Optionally, inspect smoother_result for info like iterations, error, etc.
    # print(f"Step {k+1}: Iterations={smoother_result.getIterations()}, Error={smoother_result.getError():.4f}")

    # --- Get Results for Visualization --- 
    current_estimate = smoother.calculateEstimate() # Get estimates for variables within the lag
    current_graph = smoother.getFactors() # Get factors currently managed by smoother
    # ISAM can serve as marginals object:
    current_marginals = smoother.getISAM2()

    # Store the current state for visualization
    history.append(SlamFrameData(k+1, current_estimate, current_marginals, make_dot(current_graph, current_estimate)))


print("\nIncremental Fixed-Lag Smoother SLAM finished.")
final_estimate = smoother.calculateEstimate()
print(f"Final number of poses in smoother state: {len([key for key in final_estimate.keys() if gtsam.Symbol(key).chr() == ord('x')])}")
print(f"Final number of landmarks in smoother state: {len([key for key in final_estimate.keys() if gtsam.Symbol(key).chr() == ord('l')])}")
Loading...

5. Create Plotly Animation

Visualize the results using the gtsam_plotly module. The animation shows the evolution of the robot’s path estimate and mapped landmarks within the smoother’s active window. The full ground truth path is shown for reference. If plot_full_estimated_trajectory=True, the entire estimated trajectory (including marginalized poses) is shown faintly.

fig = create_slam_animation(
    history,
    X=X,  # Pass symbol functions
    L=L,
    max_landmark_index=NUM_LANDMARKS,
    landmarks_gt_array=landmarks_gt_array,
    poses_gt=poses_gt, # Plot full ground truth for reference
    world_size=WORLD_SIZE,
)

print("Displaying animation...")
fig.show(config={'displayModeBar': False})
Loading...

6. Discussion

  • Fixed-Lag Smoothing: The IncrementalFixedLagSmoother successfully performed online SLAM, maintaining a bounded computational load by marginalizing variables older than the specified SMOOTHER_LAG.
  • smoother.update(): This core method efficiently integrated new measurements and optimized the active variable window using iSAM2.
  • Lag Parameter (SMOOTHER_LAG): This parameter controls the trade-off between computational cost and accuracy. A smaller lag (e.g., 1.0 * DT) acts like a filter, while a larger lag allows for more smoothing over recent history.
  • Visualization: The animation displays the estimates for variables currently within the smoother’s lag. The faint grey line (if enabled) shows the complete history of estimated poses, including those that have been marginalized out.