Simple linear Kalman filter on a moving 2D point using factor graphs in GTSAM. This example manually creates all of the needed data structures to show how the Kalman filter works under the hood using factor graphs, but uses a loop to handle the repetitive prediction and update steps.
Author: Matt Kielo. Based on the C++ example by Frank Dellaert and Stephen Williams
import gtsam
import numpy as np
from gtsam import Point2, noiseModel
from gtsam.symbol_shorthand import X
The code below basically implements the SRIF (Square-root Information filter version of the EKF) with Cholesky factorization.
# Setup containers for linearization points
linearization_points = gtsam.Values()
# Initialize state x0 at origin
x_initial = Point2(0, 0)
p_initial = noiseModel.Isotropic.Sigma(2, 0.1)
# Add x0 to linearization points
linearization_points.insert(X(0), x_initial)
# Initial factor graph with prior on X(0)
gfg = gtsam.GaussianFactorGraph()
ordering = gtsam.Ordering()
ordering.push_back(X(0))
gfg.add(X(0), p_initial.R(), np.zeros(2), noiseModel.Unit.Create(2))
# Common parameters for all steps
motion_delta = Point2(1, 0) # Always move 1 unit to the right
process_noise = noiseModel.Isotropic.Sigma(2, 0.1)
measurement_noise = noiseModel.Isotropic.Sigma(2, 0.25)
# Current state and conditional
current_x = X(0)
current_conditional = None
current_result = None
# Run three predict-update cycles
for step in range(1, 4):
# =====================================================================
# Prediction step
# =====================================================================
next_x = X(step)
# Create new graph with prior from previous step if not the first step
if step > 1:
gfg = gtsam.GaussianFactorGraph()
gfg.add(
current_x,
current_conditional.R(),
current_conditional.d() - current_conditional.R() @ current_result.at(current_x),
current_conditional.get_model()
)
# Add next state to ordering and create motion model
ordering = gtsam.Ordering()
ordering.push_back(current_x)
ordering.push_back(next_x)
# Create motion factor and add to graph
motion_factor = gtsam.BetweenFactorPoint2(current_x, next_x, motion_delta, process_noise)
# Add next state to linearization points if this is the first step
if step == 1:
linearization_points.insert(next_x, x_initial)
else:
linearization_points.insert(next_x,
linearization_points.atPoint2(current_x))
# Add linearized factor to graph
gfg.push_back(motion_factor.linearize(linearization_points))
# Solve for prediction
prediction_bayes_net = gfg.eliminateSequential(ordering)
next_conditional = prediction_bayes_net.back()
prediction_result = prediction_bayes_net.optimize()
# Extract and store predicted state
next_predict = linearization_points.atPoint2(next_x) + Point2(prediction_result.at(next_x))
print(f"X{step} Predict:", next_predict)
linearization_points.update(next_x, next_predict)
# =====================================================================
# Update step
# =====================================================================
# Create new graph with prior from prediction
gfg = gtsam.GaussianFactorGraph()
gfg.add(
next_x,
next_conditional.R(),
next_conditional.d() - next_conditional.R() @ prediction_result.at(next_x),
next_conditional.get_model()
)
# Create ordering for update
ordering = gtsam.Ordering()
ordering.push_back(next_x)
# Create measurement at correct position
measurement = Point2(float(step), 0.0)
meas_factor = gtsam.PriorFactorPoint2(next_x, measurement, measurement_noise)
# Add measurement factor to graph
gfg.push_back(meas_factor.linearize(linearization_points))
# Solve for update
update_bayes_net = gfg.eliminateSequential(ordering)
current_conditional = update_bayes_net.back()
current_result = update_bayes_net.optimize()
# Extract and store updated state
next_update = linearization_points.atPoint2(next_x) + Point2(current_result.at(next_x))
print(f"X{step} Update:", next_update)
linearization_points.update(next_x, next_update)
# Move to next state
current_x = next_x
final_R = current_conditional.R()
final_information = final_R.transpose() @ final_R
final_covariance = np.linalg.inv(final_information)
print("\nElaborate Final Covariance (after update):\n", final_covariance)
X1 Predict: [1. 0.]
X1 Update: [1. 0.]
X2 Predict: [2. 0.]
X2 Update: [2. 0.]
X3 Predict: [3. 0.]
X3 Update: [3. 0.]
Elaborate Final Covariance (after update):
[[0.0193 0. ]
[0. 0.0193]]