Source code for dpcluster.test

import unittest
from dpcluster import *

[docs]def grad_check(f,x,eps =1e-4): dx = eps*np.random.normal(size=x.size).reshape(x.shape) fv,g ,n = f(x) f1,g_,n = f(x+.5*dx) f2,g_,n = f(x-.5*dx) for i in range(len(g.shape)-len(dx.shape)): dx = np.expand_dims(dx,1) dfp = (g*dx).sum(axis= len(g.shape)-1) dfa = (f1-f2) ind = np.logical_and(dfa==0,dfp==0 ) dfp[ind] = 1 dfa[ind] = 1 r = dfa/dfp np.testing.assert_almost_equal(r,1,6)
[docs]class Tests(unittest.TestCase): #TODO: hessians not tested
[docs] def test_gaussian(self): k = 4 d = Gaussian(k) mus = np.random.sample((10,k)) Sgs = np.random.sample((10,k,k)) Sgs = np.einsum('tki,tkj->tij',Sgs,Sgs) nu = d.usual2nat(mus, Sgs) mus_,Sgs_ = d.nat2usual(nu) np.testing.assert_array_almost_equal(mus,mus_) np.testing.assert_array_almost_equal(Sgs,Sgs_) data = np.random.sample((100,k)) xs = d.sufficient_stats(data) lls = d.ll(xs,nu)[0] Sg = Sgs[0,:,:] mu = mus[0,:] x = data[0,:] ll = (-k*.5*math.log(2*np.pi) -.5* np.linalg.slogdet(Sg)[1] -.5* ((mu-x)*scipy.linalg.solve(Sg,(mu-x))).sum() ) self.assertAlmostEqual(ll, lls[0,0])
[docs] def test_niw(self): p = 3 d = NIW(p) mus = np.random.randn(100,p) Sgs = np.random.randn(100,p,p) Sgs = np.einsum('tki,tkj->tij',Sgs,Sgs) x = d.sufficient_stats(mus, Sgs) mu0 = np.random.randn(10,p) Psi = np.random.randn(10,p,p) Psi = np.einsum('tki,tkj->tij',Psi,Psi) k = np.random.rand(10)*10 nu = p - 1 + k nus = d.usual2nat(mu0,Psi,k,nu) mu0_,Psi_,k_,nu_ = d.nat2usual(nus)[0] np.testing.assert_array_almost_equal(mu0_,mu0) np.testing.assert_array_almost_equal(Psi_,Psi) np.testing.assert_array_almost_equal(k_,k) np.testing.assert_array_almost_equal(nu_,nu) jac = d.log_partition(nus, (False,True,False))[1] eSgi = -2*jac[:,p:p*(p+1)].reshape(-1,p,p) Psii = np.array(map(np.linalg.inv, Psi)) eSgi_ = Psii*(nu)[:,np.newaxis,np.newaxis] np.testing.assert_array_almost_equal(eSgi,eSgi_) lls = d.ll(x,nus)[0] mu0 = mu0[0,:] mu = mus[0,:] Sg = Sgs[0,:,:] Psi = Psi[0,:,:] k = k[0] nu = nu[0] ll1 = (-p*.5*math.log(2*np.pi) -.5* np.linalg.slogdet(Sg/k)[1] -.5* ((mu0-mu)*scipy.linalg.solve(Sg/k,(mu0-mu))).sum() ) ll2 = (.5*nu*np.linalg.slogdet(Psi)[1] - .5*nu*p*np.log(2) - scipy.special.multigammaln(.5*nu,p) - .5*(nu+p+1)*np.linalg.slogdet(Sg)[1] - .5 * np.sum(Psi * np.linalg.inv(Sg)) ) self.assertAlmostEqual(ll1+ll2, lls[0,0] ) al = 1e-10 nu1 = al*nus[1,:] -al *nus[0,:] + .5 *nus[0,:] + .5*nus[1,:] nu2 = al*nus[0,:] -al *nus[1,:] + .5 *nus[0,:] + .5*nus[1,:] diff = (d.log_partition(nu2[np.newaxis,:])[0] - d.log_partition(nu1[np.newaxis,:])[0])[0] jac = d.log_partition(.5 *nus[0:1,:] + .5*nus[1:2,:], (False,True,False))[1] self.assertAlmostEqual(diff, (jac.reshape(-1)*(nu2-nu1)).sum()) def fmu(tau): (mu,Psi,k,nu),(mug,Psig,kg,nug),trs = d.nat2usual(tau, (True,True,False)) return mu,mug,None def fps(tau): (mu,Psi,k,nu),(mug,Psig,kg,nug),trs = d.nat2usual(tau, (True,True,False)) return Psi,Psig,None grad_check(fmu,nus,eps=1e-5) grad_check(fps,nus,eps=1e-5)
[docs] def test_gniw(self): p = 2 d = GaussianNIW(p) np.random.seed(1) x = np.random.randn(100,p) mu0 = np.random.randn(10,p) Psi = np.random.randn(10,p,p) Psi = np.einsum('tki,tkj->tij',Psi,Psi) k = np.random.rand(10)*10 nu = p - 1 + k nus = d.prior.usual2nat(mu0,Psi,k,nu) ll, gr, hs= ConjugatePair.posterior_ll(d,x,nus, (True,True,False),True) ll_,gr_,hs_= d.posterior_ll(x,nus, (True,True,False),True) np.testing.assert_array_almost_equal(ll,ll_) al = 1e-10 x1 = al*x[1,:] -al *x[0,:] + .5 *x[0,:] + .5*x[1,:] x2 = al*x[0,:] -al *x[1,:] + .5 *x[0,:] + .5*x[1,:] diff = (d.posterior_ll(x2[np.newaxis,:],nus,(True,False,False),True)[0] - d.posterior_ll(x1[np.newaxis,:],nus,(True,False,False),True)[0]) jac = d.posterior_ll(.5 *x[0:1,:] + .5*x[1:2,:],nus, (False,True,False), True)[1] diff_ = (jac *(x2-x1)[np.newaxis,np.newaxis,:]).sum(2) np.testing.assert_array_almost_equal(diff,diff_)
[docs] def gen_data(self,A, mu, n=10): xs = np.random.multivariate_normal(mu,np.eye(mu.size),size=n) ys = (np.einsum('ij,nj->ni',A,xs) + np.random.multivariate_normal( np.zeros(A.shape[0]),np.eye(A.shape[0]),size=n)) return np.hstack((ys,xs))
[docs] def setUp(self): np.random.seed(1) As = np.array([[[1,2,5],[2,2,2]], [[-4,3,-1],[2,2,2]], [[-4,3,1],[-2,-2,-2]], ]) mus = np.array([[10,0,0], [0,10,0], [0,0,10], ]) n = 120 self.nc = mus.shape[0] self.data = np.vstack([self.gen_data(A,mu,n=n) for A,mu in zip(As,mus)]) self.As=As self.mus=mus
[docs] def test_batch_vdp(self): data = self.data n,d = data.shape # can forget mus, As prob = VDP(GaussianNIW(d), k=50,w=.4) x = prob.distr.sufficient_stats(data) prob.batch_learn(x, verbose = False) print prob.cluster_sizes() np.testing.assert_almost_equal((prob.al-1)[:3], n/self.nc)
# Log likelihood of training data under model #print prob.ll(x)[0].sum()
[docs] def test_ll(self): x = self.data n,d = x.shape n/= self.nc # done generating test data # k is the max number of clusters # w is the prior parameter. prob = VDP(GaussianNIW(d), k=30,w=0.1) xt = prob.distr.sufficient_stats(x) prob.batch_learn(xt, verbose = False) ll , gr, hs = prob.ll(x,(True,True,False)) al = 1e-10 x1 = al*x[1,:] -al *x[0,:] + .5 *x[0,:] + .5*x[1,:] x2 = al*x[0,:] -al *x[1,:] + .5 *x[0,:] + .5*x[1,:] diff = (prob.ll(x2[np.newaxis,:])[0] - prob.ll(x1[np.newaxis,:])[0]) jac = prob.ll(.5 *x[0:1,:] + .5*x[1:2,:], (False,True,False))[1] diff_ = (jac *(x2-x1)[np.newaxis,np.newaxis,:]).sum(2) np.testing.assert_array_almost_equal(diff,diff_[0])
[docs] def test_resp(self): x = self.data n,d = x.shape n/= self.nc prob = VDP(GaussianNIW(d), k=30,w=0.1) xt = prob.distr.sufficient_stats(x) prob.batch_learn(xt, verbose = False) slc = (2,3,4) prob = prob.marginal(slc) x = x[:,slc] ps,gr,hs = prob.resp(x,(True,True,False)) np.testing.assert_equal( np.histogram(np.argmax(ps,1),range(self.nc+1))[0], n) # test gradient al = 1e-5 x1 = al*x[1,:] -al *x[0,:] + .5 *x[0,:] + .5*x[1,:] x2 = al*x[0,:] -al *x[1,:] + .5 *x[0,:] + .5*x[1,:] diff = (prob.resp(x2[np.newaxis,:])[0] - prob.resp(x1[np.newaxis,:])[0]) jac = prob.resp(.5 *x[0:1,:] + .5*x[1:2,:], (False,True,False))[1] diff_ = (jac *(x2-x1)[np.newaxis,np.newaxis,:]).sum(2) r = diff/diff_ np.testing.assert_array_almost_equal(r, 1, 6)
@unittest.skipUnless(__name__== '__main__', 'still in development')
[docs] def test_presp(self): x = self.data n,d = x.shape n/= self.nc prob = VDP(GaussianNIW(d), k=30,w=0.1) xt = prob.distr.sufficient_stats(x) prob.batch_learn(xt, verbose = False) slc = (2,3,4) prob = prob.marginal(slc) x = x[:,slc] ps,gr,hs = prob.pseudo_resp(x,(True,False,False)) ps_,gr,hs = prob.resp(x,(True,False,False)) print ps print ps_
@unittest.skipUnless(__name__== '__main__', 'still in development')
[docs] def test_online_vdp(self): hvdp = OnlineVDP(GaussianNIW(3), w=1e-2, k = 20, tol=1e-3, max_items = 100 ) for t in range(10): x = np.mod(np.linspace(0,2*np.pi*3,134),2*np.pi) t1 = time.time() data = np.vstack((x,np.sin(x),np.cos(x))).T hvdp.put(hvdp.distr.sufficient_stats(data)) hvdp.get_model() print time.time()-t1
[docs] def test_gniw_conditionals(self): distr = GaussianNIW(self.data.shape[1]) nus = np.vstack([distr.prior_param + distr.sufficient_stats(data).sum(0) for data in self.data.reshape(self.nc,-1,self.data.shape[1])]) nus = np.vstack((nus,nus)) nx = 200 z = np.random.normal(size = distr.prior.dim*nx ).reshape(nx,distr.prior.dim) iy = (0,1) ix = (2,3,4) x = z[:,ix] f = lambda x_: distr.conditional_expectation(x_,nus,iy,ix, (True,True,False) ) g = lambda x_: distr.conditional_variance(x_,nus,iy,ix, (True,True,False) ) h = lambda x_: distr.conditional(x_,nus,iy,ix)[1:4] grad_check(f,x) grad_check(g,x) grad_check(h,x)
[docs] def test_vdp_conditionals(self): data = self.data n,d = data.shape prob = VDP(GaussianNIW(d), k=50,w=.4) x = prob.distr.sufficient_stats(data) prob.batch_learn(x, verbose = False) nz = 200 z = np.random.normal(size = d*nz ).reshape(nz,d) iy = (0,1) ix = (2,3,4) x = z[:,ix] f = lambda x_: prob.conditional_expectation(x_,iy,ix, (True,True,False) ) g = lambda x_: prob.var_cond_exp(x_,iy,ix, (True,True,False) ) grad_check(f,x) grad_check(g,x)
@unittest.skipUnless(__name__== '__main__', 'still in development')
[docs] def test_predictor(self): data = self.data n,d = data.shape prob = VDP(GaussianNIW(d), k=50,w=.4) x = prob.distr.sufficient_stats(data) prob.batch_learn(x, verbose = False) #nz = 200 #z = np.random.normal(size = d*nz ).reshape(nz,d) z = data iy = (0,1) ix = (2,3,4) x = z[:,ix] predictor = Predictor(prob,ix,iy) g = lambda x_: predictor.predict(x_,(True,True,False) ) grad_check(g,x) predictor = PredictorKL(prob,ix,iy) g = lambda x_: predictor.predict(x_,(True,True,False) ) grad_check(g,x)
if __name__ == '__main__': single_test = 'test_predictor' if hasattr(Tests, single_test): dev_suite = unittest.TestSuite() dev_suite.addTest(Tests(single_test)) unittest.TextTestRunner().run(dev_suite) else: unittest.main()