From 99d762f274c6cbb89ef7e336519e7faea2c5495f Mon Sep 17 00:00:00 2001 From: hass-nation Date: Mon, 29 Jun 2026 20:17:32 +0100 Subject: [PATCH] Add cubature RTS smoother Add a Rauch-Tung-Striebel smoother to CubatureKalmanFilter, including support for both flattened and column-vector state histories, and regression tests against the linear Kalman RTS smoother. The implementation follows Arasaratnam and Haykin (2011), 'Cubature Kalman Smoothers', Automatica 47(10):2245-2250. Co-Authored-By: OpenAI Codex --- filterpy/kalman/CubatureKalmanFilter.py | 110 ++++++++++++++++++++++++ filterpy/kalman/tests/test_ckf.py | 95 +++++++++++++++++++- 2 files changed, 201 insertions(+), 4 deletions(-) diff --git a/filterpy/kalman/CubatureKalmanFilter.py b/filterpy/kalman/CubatureKalmanFilter.py index bae2a1a5..2d0f5e6d 100644 --- a/filterpy/kalman/CubatureKalmanFilter.py +++ b/filterpy/kalman/CubatureKalmanFilter.py @@ -389,6 +389,116 @@ def update(self, z, R=None, hx_args=()): self._likelihood = None self._mahalanobis = None + def rts_smoother(self, Xs, Ps, Qs=None, dts=None, fx_args=()): + """ + Runs the Rauch-Tung-Striebel smoother on filtered CKF means and + covariances. The usual input would come from storing the outputs of + repeated calls to ``predict()`` and ``update()``. + + Parameters + ---------- + Xs : numpy.array + Filtered state means. Accepted shapes are ``(n, dim_x)`` and + ``(n, dim_x, 1)``. + + Ps : numpy.array + Filtered state covariances of shape ``(n, dim_x, dim_x)``. + + Qs : list-like collection of numpy.array, optional + Process noise covariance at each time step. If not provided, + ``self.Q`` is used for every step. + + dts : float or array-like of float, optional + Time step for each prediction. If a scalar is provided, the same + step is used for all smoothing recursions. If omitted, + ``self._dt`` is used. + + fx_args : tuple, optional, default (,) + Optional arguments passed into ``fx()`` after the state variable + and time step. + + Returns + ------- + x : numpy.ndarray + Smoothed state means with the same dimensionality convention as + ``Xs``. + + P : numpy.ndarray + Smoothed state covariances. + + K : numpy.ndarray + Smoother gain at each step. + + References + ---------- + .. [1] Arasaratnam, I., Haykin, S. "Cubature Kalman Smoothers." + Automatica, 47(10), 2245-2250, 2011. + """ + + Xs = np.asarray(Xs) + Ps = np.asarray(Ps) + + if len(Xs) != len(Ps): + raise ValueError('Xs and Ps must have the same length') + + if Xs.ndim == 3 and Xs.shape[2] == 1: + reshape_output = True + xs = Xs[:, :, 0].copy() + elif Xs.ndim == 2: + reshape_output = False + xs = Xs.copy() + else: + raise ValueError('Xs must have shape (n, dim_x) or (n, dim_x, 1)') + + if Ps.ndim != 3 or Ps.shape[1] != Ps.shape[2]: + raise ValueError('Ps must have shape (n, dim_x, dim_x)') + + n, dim_x = xs.shape + + if dts is None: + dts = [self._dt] * n + elif isscalar(dts): + dts = [dts] * n + + if Qs is None: + Qs = [self.Q] * n + else: + Qs = np.asarray(Qs) + if Qs.ndim == 2: + Qs = [Qs] * n + + if not isinstance(fx_args, tuple): + fx_args = (fx_args,) + + ps = Ps.copy() + Ks = zeros((n, dim_x, dim_x)) + sigmas_f = zeros((self._num_sigmas, dim_x)) + + for k in range(n - 2, -1, -1): + sigmas = spherical_radial_sigmas(xs[k], ps[k]) + for i in range(self._num_sigmas): + sigmas_f[i] = self.fx(sigmas[i], dts[k], *fx_args) + + xb, Pb = ckf_transform(sigmas_f, Qs[k]) + xb = xb[:, 0] + + Pxb = np.zeros((dim_x, dim_x)) + for i in range(self._num_sigmas): + dx = self.residual_x(sigmas[i], xs[k]) + dy = self.residual_x(sigmas_f[i], xb) + Pxb += np.outer(dx, dy) + Pxb /= self._num_sigmas + + K = dot(Pxb, inv(Pb)) + xs[k] += dot(K, self.residual_x(xs[k + 1], xb)) + ps[k] += dot(K, ps[k + 1] - Pb).dot(K.T) + Ks[k] = K + + if reshape_output: + xs = xs[:, :, None] + + return xs, ps, Ks + @property def log_likelihood(self): """ diff --git a/filterpy/kalman/tests/test_ckf.py b/filterpy/kalman/tests/test_ckf.py index 6fd4c396..10a229cf 100644 --- a/filterpy/kalman/tests/test_ckf.py +++ b/filterpy/kalman/tests/test_ckf.py @@ -5,12 +5,14 @@ @author: rlabbe """ -from filterpy.common import Saver from filterpy.kalman import CubatureKalmanFilter as CKF from filterpy.kalman import UnscentedKalmanFilter as UKF from filterpy.kalman import MerweScaledSigmaPoints +from filterpy.kalman import KalmanFilter +from filterpy.common import Q_discrete_white_noise import numpy as np from numpy.random import randn +import pytest from pytest import approx from scipy.spatial.distance import mahalanobis as scipy_mahalanobis @@ -44,7 +46,6 @@ def hx(x): kf.R *= 0.05 kf.Q = np.array([[0., 0], [0., .001]]) - s = Saver(kf) for i in range(50): z = np.array([[i+randn()*0.1]]) ckf.predict() @@ -53,14 +54,100 @@ def hx(x): kf.update(z[0]) assert abs(ckf.x[0] - kf.x[0]) < 1e-10 assert abs(ckf.x[1] - kf.x[1]) < 1e-10 - s.save() # test mahalanobis a = np.zeros(kf.y.shape) maha = scipy_mahalanobis(a, kf.y, kf.SI) assert kf.mahalanobis == approx(maha) - s.to_array() + +def _run_linear_ckf_history(): + def fx(x, dt): + F = np.array([[1., dt], + [0., 1.]]) + return np.dot(F, x) + + def hx(x): + return x[0:1] + + dt = 0.1 + sig_t = 0.1 + sig_o = 0.2 + + np.random.seed(100) + zs = [i + 1 + np.random.normal(scale=sig_o) for i in range(50)] + + ckf = CKF(dim_x=2, dim_z=1, dt=dt, hx=hx, fx=fx) + ckf.x = np.array([[0.], [1.]]) + ckf.R = np.array([[sig_o**2]]) + ckf.Q = Q_discrete_white_noise(dim=2, dt=dt, var=sig_t**2) + + xs, ps = [], [] + for z in zs: + ckf.predict() + ckf.update(np.array([[z]])) + xs.append(ckf.x.copy()) + ps.append(ckf.P.copy()) + + return zs, ckf, np.asarray(xs), np.asarray(ps) + + +def test_ckf_rts_smoother_linear_matches_kf(): + def fx(x, dt): + F = np.array([[1., dt], + [0., 1.]]) + return np.dot(F, x) + + H = np.array([[1., 0.]]) + dt = 0.1 + sig_t = 0.1 + sig_o = 0.2 + + zs, ckf, xs, ps = _run_linear_ckf_history() + x_ckf, p_ckf, k_ckf = ckf.rts_smoother(xs, ps) + + kf = KalmanFilter(dim_x=2, dim_z=1) + kf.x = np.array([[0.], [1.]]) + kf.F = np.array([[1., dt], + [0., 1.]]) + kf.H = H + kf.R = np.array([[sig_o**2]]) + kf.Q = Q_discrete_white_noise(dim=2, dt=dt, var=sig_t**2) + + mu_kf, cov_kf, _, _ = kf.batch_filter(zs) + x_kf, p_kf, k_kf, _ = kf.rts_smoother(mu_kf, cov_kf) + + assert np.allclose(x_ckf[:, 0, 0], x_kf[:, 0, 0], atol=1e-4) + assert np.allclose(x_ckf[:, 1, 0], x_kf[:, 1, 0], atol=1e-4) + assert np.allclose(p_ckf, p_kf, atol=1e-5) + assert np.allclose(k_ckf[:-1], k_kf[:-1], atol=2e-4) + + +def test_ckf_rts_smoother_flat_and_column_histories_match(): + _, ckf, xs, ps = _run_linear_ckf_history() + + x_col, p_col, k_col = ckf.rts_smoother(xs, ps) + x_flat, p_flat, k_flat = ckf.rts_smoother(xs[:, :, 0], ps) + x_q, p_q, k_q = ckf.rts_smoother( + xs, + ps, + Qs=[ckf.Q] * len(xs), + dts=[ckf._dt] * len(xs), + ) + + assert np.allclose(x_col[:, :, 0], x_flat) + assert np.allclose(p_col, p_flat) + assert np.allclose(k_col, k_flat) + assert np.allclose(x_col, x_q) + assert np.allclose(p_col, p_q) + assert np.allclose(k_col, k_q) + + +def test_ckf_rts_smoother_length_mismatch(): + _, ckf, xs, ps = _run_linear_ckf_history() + + with pytest.raises(ValueError, match="same length"): + ckf.rts_smoother(xs[:-1], ps) if __name__ == "__main__":