import math
import numpy as np
import scipy.linalg
import scipy.special
import matplotlib.pyplot as plt
import time
from caching import cached
#TODO: gradient and hessian information not currently used except for grad log likelihood for the NIW distribution. Consider removing extra info.
[docs]class ExponentialFamilyDistribution:
r""" Models a distribution in the exponential family of the form:
:math:`f(x | \nu) = h(x) \exp( \nu \cdot T(x) - A(\nu) )`
Parameters to be defined in subclasses:
* h is the base measure
* nu (:math:`\nu`) are the parameters
* T(x) are the sufficient statistics of the data
* A is the log partition function
"""
#TODO: base measure assumed to be scalar. Needs to be fixed for generality.
[docs] def log_base_measure(self,x, ret_ll_gr_hs = (True,False,False) ):
"""
Log of the base measure. To be implemented by subclasses.
:arg x: sufficient statistics of the data.
"""
pass
[docs] def log_partition(self,nu, ret_ll_gr_hs = (True,False,False) ):
"""
Log of the partition function and derivatives with respect to sufficient statistics. To be implemented by subclasses.
:arg nu: parameters of the distribution
:arg ret_ll_gr_hs: what to return: log likelihood, gradient, hessian
"""
pass
#TODO: derivatives not implemented. Consider removing
[docs] def ll(self,xs,nus,ret_ll_gr_hs = (True,False,False) ):
"""
Log likelihood (and derivatives, optionally) of data under distribution.
:arg xs: sufficient statistics of data
:arg nus: parameters of distribution
"""
return ((np.einsum('ci,di->dc',nus,xs)
+ self.log_base_measure(xs)[0]
- self.log_partition(nus)[0][np.newaxis,:] ),)
[docs]class Gaussian(ExponentialFamilyDistribution):
r"""Multivariate Gaussian distribution with density:
:math:`f(x | \mu, \Sigma) = |2 \pi \Sigma|^{-1/2} \exp(-(x-\mu)^T \Sigma^{-1} (x - \mu)/2)`
Natural parameters:
:math:`\nu = [\Sigma^{-1} \mu, -\Sigma^{-1}/2]`
Sufficient statistics of data:
:math:`T(x) = [x, x \cdot x^T]`
:arg d: dimension.
"""
def __init__(self,d):
self.dim = d
self.lbm = math.log(2*np.pi)* (-d/2.0)
self.inv = np.linalg.inv
self.slogdet = np.linalg.slogdet
[docs] def sufficient_stats(self,x):
r""" Sufficient statistics of data.
:arg x: data
"""
tmp = (x[:,np.newaxis,:]*x[:,:,np.newaxis]).reshape(x.shape[0],-1)
return np.hstack((x,tmp))
[docs] def sufficient_stats_dim(self):
"""
Dimension of sufficient statistics.
"""
d = self.dim
return d + d*d
[docs] def log_base_measure(self,x,ret_ll_gr_hs = (True,True,True)):
r""" Log base measure.
"""
return (self.lbm, 0.0,0.0)
[docs] def usual2nat(self,mus, Sgs):
r"""Convert usual parameters to natural parameters.
"""
nu2 = np.array(map(self.inv,Sgs))
nu1 = np.einsum('nij,nj->ni',nu2,mus)
nu = np.hstack((nu1,-.5*nu2.reshape(nu2.shape[0],-1)))
return nu
[docs] def nat2usual(self,nus):
"""Convert natural parameters to usual parameters"""
d = self.dim
nu1 = nus[:,:d]
nu2 = nus[:,d:].reshape((-1,d,d))
Sgs = np.array(map(self.inv,-2.0*nu2))
mus = np.einsum('nij,nj->ni',Sgs,nu1)
return mus,Sgs
[docs] def log_partition(self,nus):
# todo: implement gradient and hessian
d = self.dim
nu1 = nus[:,:d]
nu2 = nus[:,d:].reshape((-1,d,d))
inv = np.array(map(self.inv,nu2))
# TODO: einsum wrong:
t1 = -.25* np.einsum('ti,tij,tj->t',nu1,inv,nu1)
t2 = -.5*np.array(map(self.slogdet,-2*nu2))[:,1]
return (t1+t2,)
[docs]class NIW(ExponentialFamilyDistribution):
r""" Normal Inverse Wishart distribution defined by:
:math:`f(\mu,\Sigma|\mu_0,\Psi,k) = \text{Gaussian}(\mu|\mu_0,\Sigma/k) \cdot \text{Inverse-Wishart}(\Sigma|\Psi,\nu-d-2)`
where :math:`\mu, \mu_0 \in R^d, \Sigma, \Psi \in R^{d \times d}, k \in R, \nu > 2d+1 \in R`
This is an exponential family conjugate prior for the Gaussian.
:arg d: dimension
"""
def __init__(self,d):
self.dim = d
self.lbm = math.log(2*np.pi)* (-d/2.0)
[docs] def sufficient_stats_dim(self):
d = self.dim
return d+d*d +2
[docs] def log_base_measure(self,x,ret_ll_gr_hs = (True,True,True)):
return (self.lbm, 0.0,0.0)
[docs] def log_partition(self,nu, ret_ll_gr_hs= (True,False,False),
no_k_grad=False ):
# todo: implement hessian
rt = ret_ll_gr_hs
ll = None
gr = None
hs = None
d = self.dim
l1 = nu[:,:d]
l2 = nu[:,d:-2].reshape(-1,d,d)
l3 = nu[:,-2]
l4 = nu[:,-1]
nu = (l4-d-2).reshape(-1)
psi = (l2 -
l1[:,:,np.newaxis]*l1[:,np.newaxis,:]/l3[:,np.newaxis,np.newaxis])
if not no_k_grad:
ld = np.array(map(np.linalg.slogdet,psi))[:,1]
if rt[0]:
if not nu.size==1:
lmg = scipy.special.multigammaln(.5*nu,d)
else:
lmg = scipy.special.multigammaln(.5*nu[0],d)
al = -.5*d*np.log(l3) + .5*nu*(d * np.log(2) - ld ) + lmg
ll = al
if rt[1]:
inv = np.array(map(np.linalg.inv,psi))
g1 = (nu/l3)[:,np.newaxis]* np.einsum('nij,nj->ni',inv,l1)
g2 = -.5*nu[:,np.newaxis] *inv.reshape(l2.shape[0],-1)
g3 = ( -.5 * d/l3
- .5/l3 * (g1*l1).sum(1) )[:,np.newaxis]
if not no_k_grad:
g4 = ( + .5 *d*np.log(2) - .5*ld + .5*self.multipsi(.5*nu,d)
)[:,np.newaxis]
else:
g4 = np.zeros((nu.shape[0],1))
gr = np.hstack((g1,g2,g3,g4))
if rt[2]:
# not implemented
pass
return (ll,gr,hs)
[docs] def multipsi(self,a,d):
res = np.zeros(a.shape)
for i in range(d):
res += scipy.special.psi(a - .5*i)
return res
[docs] def sufficient_stats(self,mus,Sgs):
Sgi = np.array(map(np.linalg.inv,Sgs))
nu1 = np.einsum('nij,nj->ni',Sgi,mus)
nu = np.hstack((nu1,-.5*Sgi.reshape(Sgi.shape[0],-1)))
t1 = -.5* (Sgi*mus[:,:,np.newaxis]*mus[:,np.newaxis,:]).sum(1).sum(1)
#t1 = -.5* np.einsum('ti,tij,tj->t',mus,Sgi,mus)
t2 = -.5*np.array(map(np.linalg.slogdet,Sgs))[:,1]
return np.hstack((nu, t1[:,np.newaxis],t2[:,np.newaxis]))
[docs] def usual2nat(self,mu0,Psi,k,nu):
l3 = k.reshape(-1,1)
l4 = (nu+2+self.dim).reshape(-1,1)
l1 = mu0*l3
l2 = Psi + l1[:,:,np.newaxis]*l1[:,np.newaxis,:]/k[:,np.newaxis,np.newaxis]
return np.hstack((l1,l2.reshape(l2.shape[0],-1),l3,l4 ))
@cached
def nat2usual(self,tau,lgh=(True,False,False)):
d = self.dim
n,e = tau.shape
l1 = tau[:,:d]
l2 = tau[:,d:-2].reshape(-1,d,d)
l3 = tau[:,-2]
l4 = tau[:,-1]
k = l3
nu = l4 - 2 - d
mu = l1/l3[:,np.newaxis]
Psi = -l1[:,:,np.newaxis]*l1[:,np.newaxis,:]/l3[:,np.newaxis,np.newaxis]
Psi += l2
vs = (mu,Psi,k,nu)
gr = (None,None,None,None)
if lgh[1]:
l1g = np.zeros((n,d,e))
l1g[:,:,:d]=np.eye(d)
l2g = np.zeros((n,d*d,e))
l2g[:,:,d:d*(d+1)]=np.eye(d*d)
l2g = l2g.reshape(n,d,d,e)
kg = np.zeros((n,e))
kg[:,-2]=1.0
nug = np.zeros((n,e))
nug[:,-1]=1.0
wx = np.newaxis
mug = l1g/k[:,wx,wx] - kg[:,wx,:]*l1[:,:,wx]/(k*k)[:,wx,wx]
t2 = (l1[:,:,wx,wx]*l1g[:,wx,:,:] + l1[:,wx,:,wx]*l1g[:,:,wx,:] )/l3[:,wx,wx,wx]
t3 = kg[:,wx,wx,:]*(l1[:,:,wx]*l1[:,wx,:]/(l3*l3)[:,wx,wx])[:,:,:,wx]
Psig = (l2g - t2 + t3)
gr = (mug,Psig,kg,nug)
return vs,gr,None
[docs]class ConjugatePair:
"""
Conjugate prior-evidence pair of distributions in the exponential family. Conjugacy means that the posterior has the same for as the prior with updated parameters.
:arg evidence_distr: Evidence distribution. Must be an instance of :class:`ExponentialFamilyDistribution`
:arg prior_distr: Prior distribution. Must be an instance of :class:`ExponentialFamilyDistribution`
:arg prior_param: Prior parameters.
"""
def __init__(self,evidence_distr,prior_distr, prior_param):
self.evidence = evidence_distr
self.prior = prior_distr
self.prior_param = prior_param
[docs] def sufficient_stats(self,data):
pass
[docs] def posterior_ll(self,x,nu, ret_ll_gr_hs=(True,False,False),
usual_x=False):
""" Log likelihood (and derivatives) of data under posterior predictive distribution.
:arg x: sufficient statistics of data
:arg nu: prior parameters
"""
if usual_x:
x = self.sufficient_stats(x)
ll,gr,hs = (None,None,None)
n = x.shape[0]
k,d = nu.shape
nu_p = (nu[np.newaxis,:,:] + x[:,np.newaxis,:])
llk, grk, hsk = self.prior.log_partition(nu_p.reshape((-1,d)),
ret_ll_gr_hs)
if ret_ll_gr_hs[0]:
t1 = self.evidence.log_base_measure(x)[0]
t2 = self.prior.log_partition(nu)[0]
t3 = llk.reshape((n,k))
ll = t1 - t2[np.newaxis,:] + t3
if ret_ll_gr_hs[1]:
gr = grk.reshape((n,k,d))
gr += self.evidence.log_base_measure(x)[1]
if ret_ll_gr_hs[2]:
hs = hsk.reshape((n,k,d,d))
hs += self.evidence.log_base_measure(x)[2]
return (ll,gr,hs)
[docs] def sufficient_stats_dim(self):
return self.prior.sufficient_stats_dim()
[docs]class GaussianNIW(ConjugatePair):
"""Gaussian, Normal-Inverse-Wishart conjugate pair.
The predictive posterior is a multivariate t-distribution.
:arg d: dimension
"""
def __init__(self,d):
# old version used 2*d+1+2, this seems to work well
# 2*d + 1 was also a good choice but can't remember why
ConjugatePair.__init__(self,
Gaussian(d),
NIW(d),
np.concatenate([np.zeros(d*d + d), np.array([0, 2*d+1])])
)
[docs] def sufficient_stats(self,data):
x = self.evidence.sufficient_stats(data)
x1 = np.insert(x,x.shape[1],1,axis=1)
x1 = np.insert(x1,x1.shape[1],1,axis=1)
return x1
@cached
def posterior_ll_cache(self,nu,rt):
p = self.prior.dim
(mu, psi,k,nu0) = self.prior.nat2usual(nu)[0]
nu = nu0-p+1
nu0 = nu0+ 1
psi = psi*((k+1)/k/nu)[:,np.newaxis,np.newaxis]
sgi = np.array(map(np.linalg.inv,psi))
if rt:
ll0 = ( scipy.special.gammaln( .5*(nu0))
- scipy.special.gammaln( .5*(nu))
- .5*np.array(map(np.linalg.slogdet,psi))[:,1]
- .5*p*np.log(nu)
) - .5*p*np.log(np.pi)
else:
ll0 = None
return ll0,nu,nu0,mu,sgi
@cached
def posterior_ll(self,x,nu,ret_ll_gr_hs=(True,False,False),usual_x=False):
#cases not optimized for here
if not usual_x or ret_ll_gr_hs[2]:
return ConjugatePair.posterior_ll(self,x,nu,
ret_ll_gr_hs=ret_ll_gr_hs,usual_x=usual_x)
rt = ret_ll_gr_hs
ll0,nu,nu0,mu,sgi = self.posterior_ll_cache(nu,rt[0])
dx = x[:,np.newaxis,:] - mu[np.newaxis,:,:]
gr = np.einsum('kij,nkj->nki', sgi,dx)
al = 1 + np.einsum('nki,nki->nk', gr,dx)/nu
if rt[0]:
ll = ll0 - .5*(nu0)*np.log(al)
else:
ll = None
if rt[1]:
gr = -((nu0)/al/nu)[:,:,np.newaxis]*gr
else:
gr = None
hs = None
return (ll,gr,hs)
[docs] def marginal(self, nu,slc):
#TODO: write a test for this
d = self.prior.dim
ds = len(slc)
slice_distr = GaussianNIW(ds)
l1 = nu[:,:d]
l2 = nu[:,d:-2].reshape(-1,d,d)
l3 = nu[:,-2:-1]
l4 = nu[:,-1:] # should sub from this one
l1 = l1[:,slc]
l2 = l2[:,slc,:][:,:,slc]
l4 = l4 - (d-ds) #TODO: either *2 or *1. not sure which
nus = np.hstack([l1,l2.reshape(l2.shape[0],-1), l3, l4])
return slice_distr,nus
[docs] def plot(self, nu, szs, slc,n = 100,):
nuE = self.prior.nat2usual(nu[szs>0,:])[0]
d = self.prior.dim
mus, Sgs, k, nu = nuE
# plot the mode of the distribution
Sgs=Sgs/(k + slc.size + 1)[:,np.newaxis,np.newaxis]
szs /= szs.sum()
for mu, Sg,sz in zip(mus[:,slc],Sgs[:,slc,:][:,:,slc],szs):
w,V = np.linalg.eig(Sg)
V = np.array(np.matrix(V)*np.matrix(np.diag(np.sqrt(w))))
sn = np.sin(np.linspace(0,2*np.pi,n))
cs = np.cos(np.linspace(0,2*np.pi,n))
x = V[:,1]*cs[:,np.newaxis] + V[:,0]*sn[:,np.newaxis]
x += mu
plt.plot(x[:,1],x[:,0],linewidth=sz*10)
@cached
def conditionals_cache(self,nus,i1,i2):
mu,Psi,n,nu = self.prior.nat2usual(nus)[0]
my,mx = mu[:,i1],mu[:,i2]
A,B,D = Psi[:,i1,:][:,:,i1], Psi[:,i1,:][:,:,i2], Psi[:,i2,:][:,:,i2]
Di = np.array(map(np.linalg.inv,D))
P = np.einsum('njk,nkl->njl',B,Di)
mygx = my - np.einsum('nij,nj->ni',P,mx)
Li = A-np.einsum('nik,nlk->nil',P,B)
V1 = Li/(nu - len(i1) -1)[:,np.newaxis,np.newaxis]
V2 = Di
return mygx,mx,P,V1,V2,n
@cached
def conditionals_cache_bare(self,nus,i1,i2):
mu,Psi,n,nu = self.prior.nat2usual(nus)[0]
my,mx = mu[:,i1],mu[:,i2]
A,B,D = Psi[:,i1,:][:,:,i1], Psi[:,i1,:][:,:,i2], Psi[:,i2,:][:,:,i2]
Di = np.array(map(np.linalg.inv,D))
P = np.einsum('njk,nkl->njl',B,Di)
#mygx = my - np.einsum('nij,nj->ni',P,mx)
Li = A-np.einsum('nik,nlk->nil',P,B)
return my,mx,P,Li,Di,n,nu
@cached
def conditional_expectation(self,x,nu,iy,ix,
ret_ll_gr_hs=(True,True,False) ):
mygx,mx,P,V1,V2,n = self.conditionals_cache(nu,iy,ix)
m = mygx + np.einsum('kij,nj->nki',P,x)
return (m,P[np.newaxis,:,:,:],None)
[docs] def conditional_variance(self,x,nu,iy,ix,
ret_ll_gr_hs=(True,True,False),full_var=True ):
mygx,mx,P,V1,V2,n = self.conditionals_cache(nu,iy,ix)
df = x[:,np.newaxis,:]-mx[np.newaxis,:,:]
cf = np.einsum('nkj,nkj->nk',np.einsum('nki,kij->nkj',df, V2),df )
if full_var:
V =V1[np.newaxis,:,:,:]*(1.0+ 1.0/n + cf)[:,:,np.newaxis,np.newaxis]
else:
V = V1[np.newaxis,:,:,:]*(1.0/n + cf)[:,:,np.newaxis,np.newaxis]
cfg = 2*np.einsum('nki,kij->nkj',df,V2)
gr = V1[np.newaxis,:,:,:,np.newaxis] * cfg[:,:,np.newaxis,np.newaxis,:]
return V,gr,None
@cached
def conditional(self,x,nu,iy,ix,lgh = (True,True,False)):
my,mx,P,Psi,Lxx,n,nu = self.conditionals_cache_bare(nu,iy,ix)
df = x[:,np.newaxis,:]-mx[np.newaxis,:,:]
nb = 1.0/(np.einsum('nki,kij,nkj->nk',df,Lxx,df) + 1.0/n)
nub = np.tile(nu[np.newaxis,:],[nb.shape[0],1])
mub = my + np.einsum('kij,nkj->nki',P,df)
d = len(iy)
wx = np.newaxis
l3 = nb
l4 = (nub+2+d)
l1 = mub*nb[:,:,wx]
l2 = Psi[wx,:,:,:] + mub[:,:,:,wx]*mub[:,:,wx,:]*nb[:,:,wx,wx]
tau = np.dstack((l1,l2.reshape(l1.shape[0],l1.shape[1],-1),l3,l4))
# gradients
if lgh[1]:
l3g = - 2*(nb*nb)[:,:,np.newaxis]* np.einsum('nki,kij->nkj',df,Lxx)
l4g = np.zeros(l3g.shape)
l1g = nb[:,:,wx,wx]*P[wx,:,:,:] + l3g[:,:,wx,:]*mub[:,:,:,wx]
l2g = (mub[:,:,:,wx,wx]*mub[:,:,wx,:,wx]*l3g[:,:,wx,wx,:]
+ P[wx,:,:,wx,:]*mub[:,:,wx,:,wx]*l3[:,:,wx,wx,wx]
+ mub[:,:,:,wx,wx]*P[wx,:,wx,:,:]*l3[:,:,wx,wx,wx]
)
taug = np.concatenate((l1g,
l2g.reshape(l1g.shape[0],l1g.shape[1],-1,l1g.shape[3]),
l3g[:,:,wx,:],l4g[:,:,wx,:]),axis=2)
else:
taug = None
distr = GaussianNIW(d)
return distr,tau,taug,None