欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

贝叶斯在线变点检测 原理 & 代码(Bayesian Online Changepoint Detection)

程序员文章站 2022-04-19 13:21:49
"""参考:https://github.com/gwgundersen/bocd/blob/master/bocd.py""""""============================================================================Python implementation of Bayesian online changepoint detection for a normalmodel with unknown mean parameter...
"""
参考:https://github.com/gwgundersen/bocd/blob/master/bocd.py
"""
"""============================================================================
Python implementation of Bayesian online changepoint detection for a normal
model with unknown mean parameter. For details, see Adams & MacKay 2007:
    "Bayesian Online Changepoint Detection"
    https://arxiv.org/abs/0710.3742
This code implements the figure in the following blog post:
    http://gregorygundersen.com/blog/2019/08/13/bocd/
Author: Gregory Gundersen
============================================================================"""

import matplotlib.pyplot as plt
from   matplotlib.colors import LogNorm
import numpy as np
from   scipy.stats import norm


# -----------------------------------------------------------------------------

def bocd(data, model, hazard):
    """Return run length posterior using Algorithm 1 in Adams & MacKay 2007.
    """
    # 1. Initialize lower triangular matrix representing the posterior as
    # function of time. Model parameters are initialized in the model class.
    R = np.zeros((T + 1, T + 1))
    R[0, 0] = 1
    message = np.array([1])

    for t in range(1, T + 1):

        # 2. Observe new datum.
        x = data[t - 1]

        # 3. Evaluate predictive probabilities.
        pis = model.pred_prob(t, x)

        # 4. Calculate growth probabilities.
        growth_probs = pis * message * (1 - hazard)

        # 5. Calculate changepoint probabilities.
        cp_prob = sum(pis * message * hazard)

        # 6. Calculate evidence
        new_joint = np.append(cp_prob, growth_probs)

        # 7. Determine run length distribution.
        R[t, :t + 1] = new_joint
        evidence = sum(new_joint)
        R[t, :] /= evidence

        # 8. Update sufficient statistics.
        model.update_statistics(t, x)

        # Setup message passing.
        message = new_joint

    return R


# -----------------------------------------------------------------------------

# Implementation of a Gaussian model with known precision. See Kevin Murphy's
# "Conjugate Bayesian analysis of the Gaussian distribution" for a complete
# derivation of the model:
#
#     https://www.cs.ubc.ca/~murphyk/Papers/bayesGauss.pdf
#
class NormalKnownPrecision:

    def __init__(self, mean0, prec0):
        """Initialize model parameters.
        """
        self.mean0 = mean0
        self.prec0 = prec0
        self.mean_params = np.array([mean0])
        self.prec_params = np.array([prec0])

    def pred_prob(self, t, x):
        """Compute predictive probabilities.
        """
        d = lambda x, mu, tau: norm.pdf(x, mu, 1 / tau + 1)
        return np.array([d(x, self.mean_params[i], self.prec_params[i])
                         for i in range(t)])

    def update_statistics(self, t, x):
        """Update sufficient statistics.
        """
        # `offsets` is just a clever way to +1 all the sufficient statistics.
        offsets = np.arange(1, t + 1)
        new_mean_params = (self.mean_params * offsets + x) / (offsets + 1)
        new_prec_params = self.prec_params + 1
        self.mean_params = np.append([self.mean0], new_mean_params)
        self.prec_params = np.append([self.prec0], new_prec_params)


# -----------------------------------------------------------------------------

def generate_data(mean0, prec0, T, cp_prob):
    """Generate partitioned data of T observations according to constant
    changepoint probability `cp_prob` with hyperpriors `mean0` and `prec0`.
    """
    means = [0]
    data = []
    cpts = []
    for t in range(0, T):
        if np.random.random() < cp_prob:
            mean = np.random.normal(mean0, 1 / prec0)
            means.append(mean)
            cpts.append(t)
        data.append(np.random.normal(means[-1], 1))
    return data, cpts


# -----------------------------------------------------------------------------

def plot_posterior(T, data, R, cpts):
    """Plot data, run length posterior, and groundtruth changepoints.
    """
    fig, axes = plt.subplots(2, 1, figsize=(20, 10))
    ax1, ax2 = axes

    ax1.scatter(range(0, T), data)
    ax1.plot(range(0, T), data)
    ax1.set_xlim([0, T])
    ax1.margins(0)

    norm = LogNorm(vmin=0.0001, vmax=1)
    ax2.imshow(np.rot90(R), aspect='auto', cmap='gray_r', norm=norm)
    ax2.set_xlim([0, T])
    # This just reverses the y-tick marks.
    ticks = list(range(0, T+1, 50))
    ax2.set_yticks(ticks)
    ax2.set_yticklabels(ticks[::-1])
    ax2.margins(0)

    for cpt in cpts:
        ax1.axvline(cpt, c='r', ls='dotted')
        ax2.axvline(cpt, c='r', ls='dotted')

    plt.tight_layout()
    plt.show()


# -----------------------------------------------------------------------------

if __name__ == '__main__':
    T = 300         # Number of observations.
    cp_prob = 1/50  # Constant prior on changepoint probability.
    mean0 = 0       # Prior on Gaussian mean.
    prec0 = 0.2     # Prior on Gaussian precision.

    data, cpts = generate_data(mean0, prec0, T, cp_prob)
    model = NormalKnownPrecision(mean0, prec0)
    R = bocd(data=data, model=model, hazard=1/50)
    # The model becomes numerically unstable for large `T` because the mass is
    # distributed across a support whose size is increasing.
    for row in R:
        assert np.isclose(np.sum(row), 1)
    plot_posterior(T, data, R, cpts)

参考:

  1. Bayesian Online Changepoint Detection
  2. Github_1 (BOCD)

本文地址:https://blog.csdn.net/weixin_41888257/article/details/107161481