Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions filterpy/kalman/CubatureKalmanFilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,116 @@ def update(self, z, R=None, hx_args=()):
self._likelihood = None
self._mahalanobis = None

def rts_smoother(self, Xs, Ps, Qs=None, dts=None, fx_args=()):
"""
Runs the Rauch-Tung-Striebel smoother on filtered CKF means and
covariances. The usual input would come from storing the outputs of
repeated calls to ``predict()`` and ``update()``.

Parameters
----------
Xs : numpy.array
Filtered state means. Accepted shapes are ``(n, dim_x)`` and
``(n, dim_x, 1)``.

Ps : numpy.array
Filtered state covariances of shape ``(n, dim_x, dim_x)``.

Qs : list-like collection of numpy.array, optional
Process noise covariance at each time step. If not provided,
``self.Q`` is used for every step.

dts : float or array-like of float, optional
Time step for each prediction. If a scalar is provided, the same
step is used for all smoothing recursions. If omitted,
``self._dt`` is used.

fx_args : tuple, optional, default (,)
Optional arguments passed into ``fx()`` after the state variable
and time step.

Returns
-------
x : numpy.ndarray
Smoothed state means with the same dimensionality convention as
``Xs``.

P : numpy.ndarray
Smoothed state covariances.

K : numpy.ndarray
Smoother gain at each step.

References
----------
.. [1] Arasaratnam, I., Haykin, S. "Cubature Kalman Smoothers."
Automatica, 47(10), 2245-2250, 2011.
"""

Xs = np.asarray(Xs)
Ps = np.asarray(Ps)

if len(Xs) != len(Ps):
raise ValueError('Xs and Ps must have the same length')

if Xs.ndim == 3 and Xs.shape[2] == 1:
reshape_output = True
xs = Xs[:, :, 0].copy()
elif Xs.ndim == 2:
reshape_output = False
xs = Xs.copy()
else:
raise ValueError('Xs must have shape (n, dim_x) or (n, dim_x, 1)')

if Ps.ndim != 3 or Ps.shape[1] != Ps.shape[2]:
raise ValueError('Ps must have shape (n, dim_x, dim_x)')

n, dim_x = xs.shape

if dts is None:
dts = [self._dt] * n
elif isscalar(dts):
dts = [dts] * n

if Qs is None:
Qs = [self.Q] * n
else:
Qs = np.asarray(Qs)
if Qs.ndim == 2:
Qs = [Qs] * n

if not isinstance(fx_args, tuple):
fx_args = (fx_args,)

ps = Ps.copy()
Ks = zeros((n, dim_x, dim_x))
sigmas_f = zeros((self._num_sigmas, dim_x))

for k in range(n - 2, -1, -1):
sigmas = spherical_radial_sigmas(xs[k], ps[k])
for i in range(self._num_sigmas):
sigmas_f[i] = self.fx(sigmas[i], dts[k], *fx_args)

xb, Pb = ckf_transform(sigmas_f, Qs[k])
xb = xb[:, 0]

Pxb = np.zeros((dim_x, dim_x))
for i in range(self._num_sigmas):
dx = self.residual_x(sigmas[i], xs[k])
dy = self.residual_x(sigmas_f[i], xb)
Pxb += np.outer(dx, dy)
Pxb /= self._num_sigmas

K = dot(Pxb, inv(Pb))
xs[k] += dot(K, self.residual_x(xs[k + 1], xb))
ps[k] += dot(K, ps[k + 1] - Pb).dot(K.T)
Ks[k] = K

if reshape_output:
xs = xs[:, :, None]

return xs, ps, Ks

@property
def log_likelihood(self):
"""
Expand Down
95 changes: 91 additions & 4 deletions filterpy/kalman/tests/test_ckf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
@author: rlabbe
"""

from filterpy.common import Saver
from filterpy.kalman import CubatureKalmanFilter as CKF
from filterpy.kalman import UnscentedKalmanFilter as UKF
from filterpy.kalman import MerweScaledSigmaPoints
from filterpy.kalman import KalmanFilter
from filterpy.common import Q_discrete_white_noise
import numpy as np
from numpy.random import randn
import pytest
from pytest import approx
from scipy.spatial.distance import mahalanobis as scipy_mahalanobis

Expand Down Expand Up @@ -44,7 +46,6 @@ def hx(x):
kf.R *= 0.05
kf.Q = np.array([[0., 0], [0., .001]])

s = Saver(kf)
for i in range(50):
z = np.array([[i+randn()*0.1]])
ckf.predict()
Expand All @@ -53,14 +54,100 @@ def hx(x):
kf.update(z[0])
assert abs(ckf.x[0] - kf.x[0]) < 1e-10
assert abs(ckf.x[1] - kf.x[1]) < 1e-10
s.save()

# test mahalanobis
a = np.zeros(kf.y.shape)
maha = scipy_mahalanobis(a, kf.y, kf.SI)
assert kf.mahalanobis == approx(maha)

s.to_array()

def _run_linear_ckf_history():
def fx(x, dt):
F = np.array([[1., dt],
[0., 1.]])
return np.dot(F, x)

def hx(x):
return x[0:1]

dt = 0.1
sig_t = 0.1
sig_o = 0.2

np.random.seed(100)
zs = [i + 1 + np.random.normal(scale=sig_o) for i in range(50)]

ckf = CKF(dim_x=2, dim_z=1, dt=dt, hx=hx, fx=fx)
ckf.x = np.array([[0.], [1.]])
ckf.R = np.array([[sig_o**2]])
ckf.Q = Q_discrete_white_noise(dim=2, dt=dt, var=sig_t**2)

xs, ps = [], []
for z in zs:
ckf.predict()
ckf.update(np.array([[z]]))
xs.append(ckf.x.copy())
ps.append(ckf.P.copy())

return zs, ckf, np.asarray(xs), np.asarray(ps)


def test_ckf_rts_smoother_linear_matches_kf():
def fx(x, dt):
F = np.array([[1., dt],
[0., 1.]])
return np.dot(F, x)

H = np.array([[1., 0.]])
dt = 0.1
sig_t = 0.1
sig_o = 0.2

zs, ckf, xs, ps = _run_linear_ckf_history()
x_ckf, p_ckf, k_ckf = ckf.rts_smoother(xs, ps)

kf = KalmanFilter(dim_x=2, dim_z=1)
kf.x = np.array([[0.], [1.]])
kf.F = np.array([[1., dt],
[0., 1.]])
kf.H = H
kf.R = np.array([[sig_o**2]])
kf.Q = Q_discrete_white_noise(dim=2, dt=dt, var=sig_t**2)

mu_kf, cov_kf, _, _ = kf.batch_filter(zs)
x_kf, p_kf, k_kf, _ = kf.rts_smoother(mu_kf, cov_kf)

assert np.allclose(x_ckf[:, 0, 0], x_kf[:, 0, 0], atol=1e-4)
assert np.allclose(x_ckf[:, 1, 0], x_kf[:, 1, 0], atol=1e-4)
assert np.allclose(p_ckf, p_kf, atol=1e-5)
assert np.allclose(k_ckf[:-1], k_kf[:-1], atol=2e-4)


def test_ckf_rts_smoother_flat_and_column_histories_match():
_, ckf, xs, ps = _run_linear_ckf_history()

x_col, p_col, k_col = ckf.rts_smoother(xs, ps)
x_flat, p_flat, k_flat = ckf.rts_smoother(xs[:, :, 0], ps)
x_q, p_q, k_q = ckf.rts_smoother(
xs,
ps,
Qs=[ckf.Q] * len(xs),
dts=[ckf._dt] * len(xs),
)

assert np.allclose(x_col[:, :, 0], x_flat)
assert np.allclose(p_col, p_flat)
assert np.allclose(k_col, k_flat)
assert np.allclose(x_col, x_q)
assert np.allclose(p_col, p_q)
assert np.allclose(k_col, k_q)


def test_ckf_rts_smoother_length_mismatch():
_, ckf, xs, ps = _run_linear_ckf_history()

with pytest.raises(ValueError, match="same length"):
ckf.rts_smoother(xs[:-1], ps)


if __name__ == "__main__":
Expand Down