Skip to article frontmatterSkip to article content

Elaborate EKF Example

Simple linear Kalman filter on a moving 2D point using factor graphs in GTSAM. This example manually creates all of the needed data structures to show how the Kalman filter works under the hood using factor graphs, but uses a loop to handle the repetitive prediction and update steps.

Author: Matt Kielo. Based on the C++ example by Frank Dellaert and Stephen Williams

Open In Colab

import gtsam
import numpy as np
from gtsam import Point2, noiseModel
from gtsam.symbol_shorthand import X

The code below basically implements the SRIF (Square-root Information filter version of the EKF) with Cholesky factorization.

# Setup containers for linearization points
linearization_points = gtsam.Values()

# Initialize state x0 at origin
x_initial = Point2(0, 0)
p_initial = noiseModel.Isotropic.Sigma(2, 0.1)

# Add x0 to linearization points
linearization_points.insert(X(0), x_initial)

# Initial factor graph with prior on X(0)
gfg = gtsam.GaussianFactorGraph()
ordering = gtsam.Ordering()
ordering.push_back(X(0))
gfg.add(X(0), p_initial.R(), np.zeros(2), noiseModel.Unit.Create(2))

# Common parameters for all steps
motion_delta = Point2(1, 0)  # Always move 1 unit to the right
process_noise = noiseModel.Isotropic.Sigma(2, 0.1)
measurement_noise = noiseModel.Isotropic.Sigma(2, 0.25)
# Current state and conditional
current_x = X(0)
current_conditional = None
current_result = None

# Run three predict-update cycles
for step in range(1, 4):
    # =====================================================================
    # Prediction step
    # =====================================================================
    next_x = X(step)
    
    # Create new graph with prior from previous step if not the first step
    if step > 1:
        gfg = gtsam.GaussianFactorGraph()
        gfg.add(
            current_x,
            current_conditional.R(),
            current_conditional.d() - current_conditional.R() @ current_result.at(current_x),
            current_conditional.get_model()
        )
    
    # Add next state to ordering and create motion model
    ordering = gtsam.Ordering()
    ordering.push_back(current_x)
    ordering.push_back(next_x)
    
    # Create motion factor and add to graph
    motion_factor = gtsam.BetweenFactorPoint2(current_x, next_x, motion_delta, process_noise)
    
    # Add next state to linearization points if this is the first step
    if step == 1:
        linearization_points.insert(next_x, x_initial)
    else:
        linearization_points.insert(next_x, 
                                    linearization_points.atPoint2(current_x))
    
    # Add linearized factor to graph
    gfg.push_back(motion_factor.linearize(linearization_points))
    
    # Solve for prediction
    prediction_bayes_net = gfg.eliminateSequential(ordering)
    next_conditional = prediction_bayes_net.back()
    prediction_result = prediction_bayes_net.optimize()
    
    # Extract and store predicted state
    next_predict = linearization_points.atPoint2(next_x) + Point2(prediction_result.at(next_x))
    print(f"X{step} Predict:", next_predict)
    linearization_points.update(next_x, next_predict)
    
    # =====================================================================
    # Update step
    # =====================================================================
    # Create new graph with prior from prediction
    gfg = gtsam.GaussianFactorGraph()
    gfg.add(
        next_x,
        next_conditional.R(),
        next_conditional.d() - next_conditional.R() @ prediction_result.at(next_x),
        next_conditional.get_model()
    )
    
    # Create ordering for update
    ordering = gtsam.Ordering()
    ordering.push_back(next_x)
    
    # Create measurement at correct position
    measurement = Point2(float(step), 0.0)
    meas_factor = gtsam.PriorFactorPoint2(next_x, measurement, measurement_noise)
    
    # Add measurement factor to graph
    gfg.push_back(meas_factor.linearize(linearization_points))
    
    # Solve for update
    update_bayes_net = gfg.eliminateSequential(ordering)
    current_conditional = update_bayes_net.back()
    current_result = update_bayes_net.optimize()
    
    # Extract and store updated state
    next_update = linearization_points.atPoint2(next_x) + Point2(current_result.at(next_x))
    print(f"X{step} Update:", next_update)
    linearization_points.update(next_x, next_update)
    
    # Move to next state
    current_x = next_x

final_R = current_conditional.R()
final_information = final_R.transpose() @ final_R
final_covariance = np.linalg.inv(final_information)
print("\nElaborate Final Covariance (after update):\n", final_covariance)
X1 Predict: [1. 0.]
X1 Update: [1. 0.]
X2 Predict: [2. 0.]
X2 Update: [2. 0.]
X3 Predict: [3. 0.]
X3 Update: [3. 0.]

Elaborate Final Covariance (after update):
 [[0.0193 0.    ]
 [0.     0.0193]]