#!/usr/bin/python
"""
21 Sep 2011
"""
__author__ = "Francois-Jose Serra"
__email__ = "francois@barrabin.org"
__licence__ = "GPLv3"
__version__ = "0.13"
from ecolopy_dev.models.untb_model import UNTBModel
from ecolopy_dev.utils import table_mpfr, table, factorial_div
from ecolopy_dev.utils import power_polyn, lpoch
from ecolopy_dev.utils import pre_get_stirlings, stirling, mul_polyn
from warnings import warn
try:
from gmpy2 import log, lngamma, exp, gamma, mpfr
except ImportError:
warn("WARNING: GMPY2 library not found, using numpy")
from numpy import log, exp, float128 as mpfr
from scipy.special import gamma, gammaln as lngamma
from scipy.optimize import fmin, fmin_slsqp, fmin_tnc, fmin_l_bfgs_b
from sys import stdout
from random import random
[docs]class EtienneModel(UNTBModel):
"""
Class representing Ecological models
:argument name: name of the class, can be either ewens, etienne or lognorm
:returns: EcologicalModel object
"""
def __init__(self, community, **kwargs):
if not 'kda' in kwargs:
self._kda = None
super(EtienneModel, self).__init__(community, **kwargs)
self.optimize(**kwargs)
[docs] def random_community (self, inds=None, theta=None, immig=None):
'''
generates random distribution according to J, theta and I
:argument inds: number of individuals in community (J)
:argument theta: corresponding to the model
:argument immig: immigration rate (I)
:returns: distribution of abundance (list)
'''
theta = float (theta) if theta else self.theta
inds = inds or self.community.J
immig = float (immig) if immig else self.I
mcnum = [0] * int (inds)
locnum = [0] * int (inds)
mcnum[0] = 1
new = nxt = -1
for ind in xrange (inds):
if random () > immig / (ind + immig):
locnum [ind] = locnum [int (random () * ind)]
else:
new += 1
if random () <= theta / (theta + new):
nxt += 1
mcnum[new] = nxt + 1
else:
mcnum[new] = mcnum[int (random () * (new))]
locnum[ind] = mcnum[new]
return table(locnum, new + 1)
[docs] def likelihood(self, params):
'''
log-likelihood function
:argument params: a list of 2 parameters:
* theta = params[0]
* m = params[1]
:returns: log likelihood of given theta and I
'''
kda = self._kda
theta = params[0]
immig = float(params[1]) / (1 - params[1]) * (self.community.J - 1)
log_immig = log(immig)
theta_s = theta + self.community.S
poch1 = exp(self._factor + log(theta) * self.community.S - \
lpoch(immig, self.community.J) + \
log_immig * self.community.S + lngamma(theta))
gam_theta_s = gamma(theta_s)
lik = mpfr(0.0)
for abd in xrange(self.community.J - self.community.S):
lik += poch1 * exp(kda[abd] + abd * log_immig) / gam_theta_s
gam_theta_s *= theta_s + abd
return -log(lik)
[docs] def optimize(self, method='fmin', start=None, verbose=True):
'''
Main function to optimize theta and I using etienne likelihood function
using Scipy package, values that are closest to the one proposed
by Tetame, are raised by fmin function.
:argument fmin method: optimization strategy, can be one of fmin,
slsqp, l_bfgs_b or tnc (see scipy.optimize documentation)
:argument (community.S,0.5) start: tupple for startin values of theta
and m
:argument True verbose: displays running status
'''
# define bounds
bounds = [(0, len (self.community.abund)), (1e-49, 1-1e-49)]
all_ok = True
err = ''
# define starting values
start = start or self.community.S/2, 0.5
# compute kda
if not self._kda:
if verbose:
print "\nGetting K(D,A) according to Etienne 2005 formula:"
self._get_kda (verbose=verbose)
# function minimization
if method == 'fmin':
theta, mut = fmin (self.likelihood, start,
full_output=False, disp=0)
elif method == 'slsqp':
(theta, mut), _, _, err, _ = fmin_slsqp(self.likelihood, start,
iter=1000, iprint=0,
bounds=bounds,
full_output=True)
if err != 0:
all_ok = False
elif method == 'l_bfgs_b':
theta, mut, _, err = fmin_l_bfgs_b(self.likelihood, start,
maxfun=1000, bounds=bounds,
iprint=-1, approx_grad=True)
if err['warnflag'] != 0:
all_ok = False
elif method == 'tnc':
theta, mut, _, err = fmin_tnc(self.likelihood, start, maxfun=1000,
messages=0, bounds=bounds,
approx_grad=True)
if err != 1:
all_ok = False
self._parameters['I'] = mut * (self.community.J - 1) / (1 - mut)
self._lnL = self.likelihood ([theta, mut])
self._parameters['m'] = mut
self._parameters['theta'] = theta
if not all_ok:
raise Exception ('Optimization failed\n', err)
def _get_kda (self, verbose=True):
'''
compute K(D,A) according to etienne formula
'''
specabund = [sorted(list(set(self.community.abund))),
table_mpfr(self.community.abund)]
sdiff = len(specabund[1])
polyn = []
# compute all stirling numbers taking advantage of recurrence function
needed = {0: True}
for i in xrange(sdiff):
for k in xrange(1, specabund[0][i] + 1):
needed[int(specabund[0][i])] = True
if verbose:
stdout.write(' Getting some stirling numbers...\n')
pre_get_stirlings(max(specabund[0]), needed, verbose=verbose)
for i in xrange(sdiff):
if verbose:
stdout.write("\r Computing K(D,A) at species %s out of %s" \
% (i+1, sdiff))
stdout.flush ()
sai = specabund[0][i]
polyn1 = [stirling(sai, k) * factorial_div(k, sai) \
for k in xrange(1, sai+1)]
polyn1 = power_polyn(polyn1, specabund[1][i]) \
if polyn1 else [mpfr(1.)]
polyn = mul_polyn(polyn, polyn1)
self._kda = [log(i) for i in polyn]
if verbose:
stdout.write('\n')