#!/usr/bin/python
"""
04 Jul 2011
"""
__author__ = "Francois-Jose Serra"
__email__ = "francois@barrabin.org"
__licence__ = "GPLv3"
__version__ = "0.13"
from scipy.stats import chisqprob
from math import log, exp
from cPickle import dump, load
from os.path import isfile, exists
from sys import stdout, stderr
from numpy import arange
from warnings import warn
from ecolopy_dev.utils import shannon_entropy
from ecolopy_dev.models import *
[docs]class Community (object):
'''
Main community class.
:argument data: containing species count, can be given in one of:
* python list, each element being a species count
* text file containing one species count per line
* pickle file containing community object
:argument None j_tot: is the size of the metacommunity, if not defined
j_tot = J * 3
:returns: an Community object
**Examples:**
::
# python list
abd = Community([1, 1, 2, 3, 4, 8, 12])
# text file
abd = Community("bci.txt", j_tot=256987)
# finally a saved Community object
abd = Community("abundance_bci_first_try.pik")
'''
def __init__ (self, data, j_tot=None):
"""
creation of Community object
"""
self.__models = {}
if type (data) != list:
self.data_path = data
data = self._parse_infile ()
if not data is None:
self.abund = [x for x in sorted (data [:])]
self.J = sum (self.abund)
self.S = len (self.abund)
self.j_tot = j_tot if j_tot else self.J * 3
self.shannon = shannon_entropy (self.abund, self.J)
self.__current_model = None
def __str__(self):
"""
for print
"""
return """Community (object)
Number of individuals (J) : %d
Number of species (S) : %d
Shannon's index (shannon) : %.4f
Metacommunity size (j_tot): %d
Models computed : %s
Model loaded : %s
""" % (self.J, self.S, self.shannon, self.j_tot,
', '.join (self.__models.keys()),
(self.__current_model.__class__.__name__ + \
'\n' + ' ' * 8 + ('\n' + ' ' * 8).join(
[l for l in str(self.__current_model).split('\n')[2:]]
)) if self.__current_model else "None")
[docs] def rsa_ascii (self, width=100, height=50, pch='o'):
"""
Draw Relative Species Abundances curve (ASCII format).
:argument 100 width: width in term of characters
:argument 100 height: height in term of characters
:argument o pch: dot character
:returns: string corresponding to plot
"""
dots = sorted([float(x) for x in self.abund], reverse=True)
rabd = []
for dot in dots:
rabd.append (log (100 * dot / float(self.J)))
y_arange = sorted(arange(min(rabd), max(rabd),
abs (min(rabd) - max(rabd))/height),
reverse=True)
x_arange = sorted(arange(0, float(self.S), float(self.S) / width))
yval = 0
xval = 0
spaces = ''
graph = '\n(%) Relative\nAbundances\n\n'
graph += '{0:<7.4f}+'.format (exp(max(rabd)))
for i, dot in enumerate (rabd):
if dot < y_arange[yval]:
graph += '\n'
if not (yval)%5 and yval != 0:
graph += '{0:<7.4f}+'.format (exp(y_arange[yval]))
else:
graph += ' '*7 + '|'
graph += spaces
while dot < y_arange[yval]:
yval += 1
if i > x_arange[xval]:
graph += pch
spaces += ' '
if xval + 1 >= width:
break
xval += 1
graph += '\n'
graph += ' 1/inf ' + ''.join(
['+' if not x%5 else '-' for x in xrange(width+1)]) + '\n'
graph += ' '*7 + ''.join(
['{0:<5}'.format(int(x_arange[x])) for x in xrange(0,width,5)]
) + str( int(x_arange[-1])) + '\n\n'
graph += ' '*7 + '{0:^{1}}'.format('Species rank', width)
return graph
[docs] def draw_rsa(self, outfile=None, filetype=None, size=(15, 15)):
"""
Draw Relative Species Abundances curve.
:argument None outfile: path were image will be saved, if none, plot
will be shown using matplotlib GUI
:argument None filetype: pdf or png
:argument (15,15) size: size in inches of the drawing
"""
import pylab
try:
import matplotlib.ticker as ticker
except ImportError:
warn("WARNING: matplotlib not found, try rsa_ascii() instead")
pylab.grid(alpha=0.4)
yval = []
xval = []
for rank, abd in enumerate(sorted(self.abund, reverse=True,
key=float)):
xval.append (rank + 1)
yval.append (100 * float(abd) / float(self.J))
if len(yval) == 1:
raise Exception ('list of abundances is too short man')
pylab.plot(xval, yval, marker='o', ms=5, color='grey', lw=3,
mfc='black', ls='-')
# title legend...
max_x = len (xval)
pylab.yscale('log')
pylab.xticks(range (0, max_x+1, 5), range (0, max_x+1, 5), rotation=90)
pylab.title('Ranked Species Abundance')
pylab.xlabel('Species rank number (rank according to abundance)')
pylab.ylabel('Relative abundance percentage of each species')
pylab.xlim(1, len(xval) + 1)
pylab.ylim(log(1.0 / float(self.J)), max(yval) * 1115.5)
axe = pylab.gca()
axe.yaxis.set_major_formatter(ticker.FormatStrFormatter('%.4f'))
fig = pylab.gcf()
dpi = fig.get_dpi()
fig.set_size_inches (size)
if outfile:
if filetype == 'pdf':
fig.savefig(outfile, dpi=dpi+30, filetype='pdf')
elif filetype == 'png':
fig.savefig(outfile, dpi=dpi+30, filetype='png')
pylab.close()
else:
pylab.show()
[docs] def lrt (self, model_1, model_2, dgf=1):
'''
likelihood-ratio-test between two neutral models
:argument model_1: string representing simplest model, usually Ewens
:argument model_2: string representing most complex model, usually
Etienne
:argument 1 dgf: number of degrees of freedom (1 when comparing Etienne
and Ewens)
:returns: p-value of rejection of alternative model
eg: usually ewens, etienne. And if pval < 0.05 than use etienne
'''
return chisqprob(2 * (float (self.get_model(model_1).lnL) - \
float (self.get_model(model_2).lnL)), df=dgf)
[docs] def get_current_model_name (self):
"""
:returns: current model name
"""
return self.__current_model.name
[docs] def iter_models (self):
"""
iterate over pre computed models
:returns: model object
"""
for model in self.__models.values():
yield model
[docs] def fit_model (self, name="ewens", **kwargs):
"""
Fit Community to model.
Extra arguments can be pssed depending on the model to fit. See doc of
its corresponding optimize function.
:argument Ewens name: name of the model, between Etienne, Ewens or
Log-normal
:return: model
"""
if name == 'etienne':
self.__models['etienne'] = EtienneModel(self, **kwargs)
elif name == 'ewens':
self.__models['ewens'] = EwensModel(self, **kwargs)
elif name == 'lognormal':
self.__models['lognormal'] = LognormalModel(self, **kwargs)
[docs] def set_model (self, model):
"""
add one model computed externally to the computed models of current
Community object
:argument model: model object
"""
name = model.__class__.__name__.replace('Model', '').lower()
self.__models[name] = model
[docs] def get_model (self, name):
"""
:argument name: name of a computed model
:returns: a EcologicalModel object corresponding to one of the computed
models
"""
if name in self.__models:
return self.__models[name]
else:
return None
[docs] def set_current_model (self, name):
"""
set on model as default/current model.
:argument name: model name of precomputed model
"""
self.__current_model = self.get_model(name)
for key in ['theta', 'I', 'm']:
try:
self.__dict__[key] = self.__current_model._parameters[key]
except KeyError:
self.__dict__[key] = None
self.__dict__['lnL'] = self.__current_model.lnL
[docs] def test_neutrality (self, model='ewens', gens=100, full=False, fix_s=False,
tries=1000, method='shannon', verbose=False):
"""
test for neutrality comparing Shannon entropy
if (Hobs > Hrand_neut) then evenness of observed data is
higher then neutral
:argument ewens model: model name otherwise, current model is used
:argument 100 gens: number of random neutral distributions to generate
:argument False full: also return list of compared values (H or lnL) for
simulated abundances
:argument False fix_s: decide whether to fix or not the number of
species for the generation of random neutral communities
:argument False tries: in case S is fixed, determines the number of
tries in order to obtain the exact same number of species as original
community. In case The number of tries is exceeded, an ERROR message is
displayed, and p-value returned is 1.0.
:argument shannon method: can be either 'Shannon' for comparing
Shannon's entropies or 'loglike' for comparing log-likelihood values
(Etienne 2007). Last method is much more computationally expensive, as
likelihood of neutral distribution must be calculated.
:returns: p_value if full=True also returns Shannon entropy (or
likelihoods if method='loglike') of all random neutral abundances
generated
"""
fast_shannon = lambda abund: sum ([-spe * log(spe) for spe in abund])
pval = 0
inds = self.S if 'LognormalModel' in repr(model) else self.J
model = self.get_model(model)
if not model:
warn("WARNING: Model '%s' not found.\n" % model)
return None
neut_h = []
for _ in xrange (gens):
if verbose:
stdout.write ("\r Generating random neutral abundances %s/%s" \
% (_ + 1, gens))
stdout.flush ()
if fix_s:
for _ in xrange (tries):
tmp = model.random_community(inds)
if len(tmp) == self.S:
break
else:
stderr.write('ERROR: Unable to obtain S by simulation')
if full:
return float('nan'), []
return float('nan')
else:
tmp = model.random_community(inds)
l_tmp = sum (tmp)
if method == 'shannon':
neut_h.append ((fast_shannon(tmp) + l_tmp*log(l_tmp))/l_tmp)
pval += neut_h[-1] < self.shannon
elif method == 'loglike':
tmp = Community(tmp)
tmp._get_kda(verbose=False)
neut_h.append(tmp.etienne_likelihood([model.theta, model.m]))
pval += neut_h[-1] < model.lnL
if verbose:
stdout.write ('\n')
if full:
return float (pval)/gens, neut_h
return float (pval)/gens
def _parse_infile (self):
'''
TODO: use other columns
parse infile and return list of abundances
infile can contain several columns
'''
abundances = []
if exists(self.data_path):
lines = open (self.data_path).readlines()
else:
lines = self.data_path.split('\n')
if lines[0].strip() == '(dp1':
self.load_community (self.data_path)
return None
for line in lines:
if line:
abundances.append (int (line.strip().split('\t')[0]))
return abundances
[docs] def dump_community (self, outfile, force=False):
'''
save params and kda with pickle
force option is for writing pickle with no consideration
if existing
:argument outfile: path of the outfile
:argument False force: overwrite existing outfile
'''
if isfile (outfile) and not force:
self.load_community (outfile)
# changed, kda no longer here
#try:
# self.__models['KDA'] = self._kda[:]
#except TypeError:
# self.__models['KDA'] = None
self.__models['ABUND'] = self.abund[:]
dump (self.__models, open (outfile, 'w'))
del (self.__models['ABUND'])
[docs] def load_community (self, infile):
'''
load params and kda with pickle from infile
WARNING: do not overwrite values of params.
:argument infile: path of the outfile
'''
old_params = load (open (infile))
old_params.update(self.__models)
self.__models = old_params
self.abund = self.__models['ABUND']
del (self.__models['ABUND'])
#if 'KDA' in self.__models:
# self._kda = self.__models['KDA']
# del (self.__models['KDA'])
#else:
# self._kda = None
[docs] def generate_random_neutral_distribution(self, model=None, J=None):
"""
return distribution of abundance, according to a given model
and a given community size.
if none of them are given, values of current Community are used.
:argument None model: model name (default current model)
:argument None J: size of wanted community (default size of the
community of current abundance)
:returns: random neutral distribution of abundances
"""
if not model:
try:
model = self.__current_model
except:
return
else:
model = self.get_model(model)
if model is None:
raise Exception ('Need first to optimize this model,\n ' + \
' Unable to generate random distribution.')
if not J:
J = self.S if 'LognormalModel' in repr(model) else self.J
return model.random_community(J)