The Unscented Kalman Filter, simply the best! Python code

Jaroslaw Goslinski
10 min readJun 1, 2020

--

During the first missions in Project Apollo, the KF was implemented on analog hardware.

In almost every project of data science, we face one of the three problems: filtration, prediction, or smoothing. All of these can be solved by the use of the Kalman Filters. I guess you read about or work with Kalman Filters before, but just to clarify on that: we speak about the powerful tool of data filtering, which was invented by the Rudolf Kalman in the late ’50s, used in the Nasa’s Apollo program and extended to handle nonlinear problems in ’70s. It is also a subject of multiple articles during its way of neverending improvements. You can find a bunch of information here on medium as well. Today I’m going to explain how to work with that tool by simply giving you a working example with a minimum theory as that was shown and explain millions of times. Remember this article is more about the practice and I assume that the reader knows the basis of KF filtering, the process, idea, etc.

When a lot of improvements were introduced to the KF, also new generation was presented, called the Unscented Kalman Filter. It is the filter of the choice when it comes to filtering of nonlinear systems data. There are three big pros: the first, it makes approximation valid to the third order of Taylor series expansion, the second: it uses minimal computation in realizing the approximation via sigma points, the third: it is simple in use as it does not need Jacobian like the Extended Kalman Filter or Hessians (the Second-order Extended Kalman Filter). If you’ve seen KF in action or read articles covering estimation theory I bet you’ve also seen a nice set of graphs, like the ones below:

The confidence ellipses and mean values obtained in a process of sampling (reference), EKF and UKF approximations (source: “The Unscented Kalman Filter for Nonlinear Estimation”, Eric A. Wan and Rudolph van der Merwe)

In the figure above, the first column depicts the operation of data sampling; we have many data points describing x — the state of the system (in this case x is two dimensional), then each point goes through the nonlinear function f, yielding new data point y (also two-dimensional). Now, we can simply compute the mean and covariance of the x and y. In this case, the mean is a single data point (2-dim vector) while the covariance is a matrix of size 2x2. It is easy to draw a data point, but to draw covariance we need to find eigenvectors and eigenvalues of the covariance matrix, and then draw an ellipse which is called “confidence ellipse”. Based on the ellipse size we show how much data is covered, e.g. in case of Gaussian distribution, we may choose 2 sigma spread and cover ~95% of data. In the second and third graphs, we show how good the covariance and mean are captured by the EKF and UKF respectively. The picture above shows the main reason why the researchers improve KF. The better it approximates statistical Moments, the better estimation it provides. I think it would be beneficial to show this in a real case scenario. For this purpose, we need a model with two-dimensional state and two-dimensional output. I’ve modified the univariate nonstationary growth model (UNGM) to obtain multivariate nonstationary growth model MNGM:

The discrete MNGM

As you can see, the model is given in a discrete form, so it can be used directly in a UKF. The model is a nonlinear system, as there are two nonlinear functions, f and g, the first one takes the system input (here i) and previous state, and generates a new state. The second function takes the current state and forms the output. This model is what always is needed to compose a KF. The model is recursive as the KF. Not always f and g are nonlinear, sometimes there are linear, sometimes f is nonlinear and g is linear. But you need to form the system of equations, otherwise, there is no way to create a working estimation tool based on Kalman Filter.

We use a more general description of the model given above i.e. model with additive noise, which can be given in a state-space representation:

The state-space representation of a model

where noises are given by u,v~N(0,1).

Having this information in mind, let’s introduce the model class:

import numpy as np
class MNGM2:
# class used for generation of multivariate nonstationary growth model
def __init__(self, n, x_0):
self.n = n
self.x = np.zeros((n, 2))
self.y = np.zeros((n, 2))
self.xi = np.zeros(2)
self.yi = np.zeros(2)
self.x[0, :] = x_0[:, 0] self.u = np.random.normal(0., 1.0, (n, 2))
self.v = np.random.normal(0., 1.0, (n, 2))
def generate_data(self): for i in range(1, self.n):
self.x[i, :] = self.state(i, self.x[i-1, :]) + self.u[i, :]
self.y[i, :] = self.output(self.x[i, :]) + self.v[i, :]
def state(self, i, xp): self.xi[0] = 0.5 * xp[0] - 0.1 * xp[1] + 0.7 * (xp[0] / (1 + xp[0] ** 2)) + 2.2 * np.cos(1.2 * (i - 1))
self.xi[1] = 0.8 * xp[1] - 0.2 * xp[0] + 0.9 * (xp[1] / (1 + xp[1] ** 2)) + 2.4 * np.cos(2.2 * (i - 1))
return self.xi def output(self, xi):
self.yi[0] = (xi[0] * 2) / 9.0 + (xi[1] * 2) / 7.0
self.yi[1] = (xi[0] * 2) / 6.0 + (xi[1] * 2) / 2.0
return self.yi

We use three functions/methods: generate_data — generates all vectors of data with noise added, state, and output — these are an f and g functions, respectively. The latter ones are used in the UKF.

The UKF is a well-known algorithm, and all things that work under its hood have been developed for many decades. Yes, there are many variations of the UKF (e.g. Square Root Unscented Kalman Filter), but nearly all of them aim at computation efficiency improvement, not the core idea which is the use of sigma points.

The UKF algorithm

In the UKF given above, there are f and g functions used recursively. The algorithm is divided into three parts init, time update, and measurement update. The measurement update phase is being realized only in the last two equations so the division given above is not sticking to the original one. The code below is my python implementation of the algorithm:

import numpy as np
import scipy
import scipy.linalg # SciPy Linear Algebra Library
from src.MNGM2 import MNGM2

class UKF:

def __init__(self, n, m):

self.n = n
self.m = m

# UKF params
self.kappa = 0.0
self.alfa = 0.001
self.beta = 2.0
self.lambda_ = (self.n + self.kappa) * self.alfa * self.alfa - self.n
self.gamma = np.sqrt(self.n + self.lambda_)
self.W0m = self.lambda_ / (self.n + self.lambda_)
self.W0c = self.lambda_ / (self.n + self.lambda_) + (1.0 - self.alfa * self.alfa + self.beta)
self.W = 1.0 / (2.0 * (self.n + self.lambda_))

# all vectors used in the UKF process
self.x_apriori = np.zeros((self.n,), dtype=float)
self.x_aposteriori = np.zeros((self.n,), dtype=float)
self.x_P = np.zeros((self.n,), dtype=float)
self.y_P = np.zeros((self.m,), dtype=float)
self.in_ = np.zeros((self.m,), dtype=float)
self.y = np.zeros((self.m,), dtype=float)

# covarince matrices used in the process

self.P_apriori = np.zeros((self.n, self.n), dtype=float)
self.P_aprioriP = np.zeros((self.n, self.n), dtype=float)

self.P_aposteriori = np.zeros((self.n, self.n), dtype=float)

# square root product of a given covariances s
self.sP_apriori = np.zeros((self.n, self.n), dtype=float)
self.sP_aposteriori = np.zeros((self.n, self.n), dtype=float)

# clear sigma points
self.y_sigma = np.zeros((self.m, (2 * self.n + 1)), dtype=float)
self.x_sigma = np.zeros((self.n, (2 * self.n + 1)), dtype=float)

# sigma points after passing through the function f/h
self.x_sigma_f = np.zeros((self.n, (2 * self.n + 1)), dtype=float)

# cross covariances
self.P_xy = np.zeros((self.n, self.m), dtype=float)
self.P_xyP = np.zeros((self.n, self.m), dtype=float)

self.P_y = np.zeros((self.m, self.m), dtype=float)
self.oP_y = np.zeros((self.m, self.m), dtype=float)
self.P_y_P = np.zeros((self.m, self.m), dtype=float)
self.K = np.zeros((self.n, self.m), dtype=float)
self.K_0 = np.zeros((self.n, self.m), dtype=float)
self.K_UKF_T = np.zeros((self.m, self.n), dtype=float)

self.Q = np.zeros((self.n, self.n), dtype=float)
self.R = np.zeros((self.m, self.m), dtype=float)

self.Rs = 0
self.Qs = 0

self.mngm = 0



def resetUKF(self, _Q, _R, x_0):
# Q - filter process noise covraiance
# R - measurement noise covariance,
# P - init covariance noise

self.mngm = MNGM2(self.n, x_0)
# init of all vectors and matrices where the first dim := n
self.y = np.zeros((self.m,))
self.y_P = np.zeros((self.m,))
self.P_y = np.zeros((self.m, self.m))
self.P_y_P = np.zeros((self.m, self.m))
self.P_xy = np.zeros((self.n, self.m))
self.P_xyP = np.zeros((self.n, self.m))

self.K = np.zeros((self.n, self.m))
self.K_0 = np.zeros((self.n, self.m))
self.K_UKF_T = np.zeros((self.m, self.n))
self.y_sigma = np.zeros((self.m, (2 * self.n + 1)))
self.x_sigma = np.zeros((self.n, (2 * self.n + 1)))
self.x_sigma_f = np.zeros((self.n, (2 * self.n + 1)))

self.P_apriori = np.zeros((self.n, self.n))
self.P_aprioriP = np.zeros((self.n, self.n))
self.P_aposteriori = np.zeros((self.n, self.n))

self.x_apriori = x_0[:, 0]
self.x_aposteriori = x_0[:, 0]
self.x_P = np.zeros((self.n,))

for i in range(0, self.n):
self.P_apriori[i, i] = _Q
self.P_aposteriori[i, i] = _Q

self.setCovariances(_Q, _R)

def setCovariances(self, _Q, _R):

self.Q = np.zeros((self.n, self.n))
self.R = np.zeros((self.m, self.m))

for i in range(self.n):
self.Q[i, i] = _Q

for i in range(self.m):
self.R[i, i] = _R

def sigma_points(self, vect_X, matrix_S):
# vect_X - state vector
# sigma points are drawn from P
self.x_sigma[:, 0] = vect_X # the first column

for k in range(1, self.n+1):
self.x_sigma[:, k] = vect_X + self.gamma * matrix_S[:, k - 1]
self.x_sigma[:, self.n + k] = vect_X - self.gamma * matrix_S[:, k - 1]

def y_UKF_calc(self):
# finding the y = h(x, ...)
# the input is x_sigma, which is using h(...) then we find y_sigma_UKF from which we get to the y_UKF
for k in range(2 * self.n + 1):
#for i in range(self.m):
xi = self.x_sigma_f[:, k]
self.y_sigma[:, k] = self.mngm.output(xi)

# y_UKF
self.y= self.W0m * self.y_sigma[:, 0]

for k in range(1, 2 * self.n + 1):
self.y = self.y + self.W * self.y_sigma[:, k]

def state(self, w):
# w - input vector data,
for j in range(2 * self.n + 1):
xp = self.x_sigma[:, j]
self.x_sigma_f[:, j] = self.mngm.state(w, xp)

def squareRoot(self, in_):
out_ = scipy.linalg.cholesky(in_, lower=False)
return out_

def timeUpdate(self, w):

self.sP_aposteriori = self.squareRoot(self.P_aposteriori)

self.sigma_points(self.x_aposteriori, self.sP_aposteriori)

self.state(w)

# apriori state:
self.x_apriori= self.W0m * self.x_sigma_f[:, 0]
for k in range(1, 2 * self.n + 1):
self.x_apriori = self.x_apriori + self.W * self.x_sigma_f[:, k]


#apriori covariance matrix:
self.P_apriori = np.zeros((self.n, self.n))

for k in range(2 * self.n + 1):
#for i in range(self.n):
self.x_P = self.x_sigma_f[:, k]

self.x_P = self.x_P - self.x_apriori
self.P_aprioriP = np.dot(np.expand_dims(self.x_P, axis=1), np.transpose(np.expand_dims(self.x_P, axis=1)))

if k == 0:
self.P_aprioriP = np.dot(self.W0c, self.P_aprioriP)
else:
self.P_aprioriP = np.dot(self.W, self.P_aprioriP)
self.P_apriori = self.P_apriori + self.P_aprioriP

self.P_apriori = self.P_apriori + self.Q

#new sigma points can be computed here but it is not necessery
#self.sP_apriori = self.squareRoot(self.P_apriori)
#self.sigma_points(self.x_apriori, self.sP_apriori)

self.y_UKF_calc()


def measurementUpdate(self, z):
# cov matrix oytpu/output
self.P_y = np.zeros((self.m, self.m))

for k in range(2 * self.n + 1):
#for i in range(self.n):
self.y_P = self.y_sigma[:, k]

self.y_P = self.y_P - self.y
self.P_y_P = np.dot(np.expand_dims(self.y_P, axis=1), np.transpose(np.expand_dims(self.y_P, axis=1)))

if k == 0:
self.P_y_P = np.dot(self.W0c, self.P_y_P)
else:
self.P_y_P = np.dot(self.W, self.P_y_P)
self.P_y = self.P_y + self.P_y_P

self.P_y = self.P_y + self.R


# cross cov matrix input/output:
self.P_xy = np.zeros((self.n, self.m))

for k in range(2 * self.n + 1):
self.x_P = self.x_sigma_f[:, k]
self.y_P = self.y_sigma[:, k]

self.x_P = self.x_P - self.x_apriori
self.y_P = self.y_P - self.y
self.P_xyP = np.dot(np.expand_dims(self.x_P, axis=1), np.transpose(np.expand_dims(self.y_P, axis=1)))

if k == 0:
self.P_xyP = np.dot(self.W0c, self.P_xyP)
else:
self.P_xyP = np.dot(self.W, self.P_xyP)
self.P_xy = self.P_xy + self.P_xyP

# kalman gain:
self.K = np.dot(self.P_xy, np.linalg.inv(self.P_y))

# aposteriori state:
self.y_P = z - self.y
self.x_aposteriori = self.x_apriori + np.dot(self.K, self.y_P)

# cov aposteriori:
self.P_aposteriori = self.P_apriori - np.dot(np.dot(self.K, self.P_y), np.transpose(self.K))

If the code above is not readable in this block (can’t do it better on medium), don’t worry, at the end of the article you will find a link to the Github project.

Now we can generate noisy data, provide it to the UKF, and estimate the state:

def estimateState():
n = 2 #size of the state vector
m = 2 #size of the output vector

#initial x value
x_0 = np.zeros((n, 1))
x_0[0, 0] = 0.1
x_0[1, 0] = 0.1

mngm = MNGM2(500, x_0)
mngm.generate_data()

ukf = UKF(n, m)

#generated data:
dataX = mngm.x
dataY = mngm.y

size_n = dataX.shape[0]


ukf.resetUKF(0.1, 0.1, x_0)

timeUpdateInput = np.zeros((n, 1))
measurementUpdateInput = np.zeros((m, 1))

err_total = 0

est_state = np.zeros((size_n, n))
est_output = np.zeros((size_n, n))

# estimation loop
for i in range(size_n):

timeUpdateInput = i
measurementUpdateInput = dataY[i, :]

# recursively go through time update and measurement correction
ukf.timeUpdate(timeUpdateInput)
ukf.measurementUpdate(measurementUpdateInput)

err = 0
for j in range(n):
err = err + (ukf.x_aposteriori[j] - dataX[i, j])**2

est_state[i, 0] = ukf.x_aposteriori[0]
est_state[i, 1] = ukf.x_aposteriori[1]
est_output[i, 0] = ukf.y[0]
est_output[i, 1] = ukf.y[1]

est_output[i, 0] = ukf.K[0, 1]

err_total = err_total + err

#print(err)

print("total error:", err_total)

plt.plot(dataX[:, 0], 'g', label='x_1 original') # X from the orginal ungm
plt.plot(dataX[:, 1], 'b', label='x_2 original') # X from the orginal ungm
plt.plot(est_state[:, 0], 'r--', label='x_1 estimated') #estimated X
plt.plot(est_state[:, 1], 'k--', label='x_2 estimated') # estimated X
plt.legend(loc='upper right')

#plt.plot(dataY[:, 1], 'g') # Y from the orginal ungm
#plt.plot(est_output[:, 1], 'b') # estimated Y
#plt.plot(est_output[:, 0], 'b') # estimated Y

plt.show()

To run the above, type: estimateState()

Base upon the chosen settings i.e. Q and R the obtained results may vary. For the same values of variances of process and measurement noises, the following was achieved:

You can find easily the rule: the smaller the value of variance Q, the greater the confidence in the model, and: the smaller the value of variance R, the greater the confidence in the measurements. In the end, what’s matters is the variance ratio R/Q or Q/R.

The UKF can be used for various problems and models. The presented idea of ​​combining the model class in the UKF class should be treated as a framework. In the coming post, I will tell you more about the ellipse of confidence as this is closely related to the UKF and its superiority over derivate-based Kalman Filters. Stay tuned.

https://github.com/jaroslav87/UKF-MNGM.git

--

--

Jaroslaw Goslinski

AI researcher, Interested in estimation theory, sensors, robotics and machine learning