~ubuntu-branches/ubuntu/natty/python-cogent/natty

« back to all changes in this revision

Viewing changes to cogent/maths/period.py

  • Committer: Bazaar Package Importer
  • Author(s): Steffen Moeller
  • Date: 2010-12-04 22:30:35 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20101204223035-j11kinhcrrdgg2p2
Tags: 1.5-1
* Bumped standard to 3.9.1, no changes required.
* New upstream version.
  - major additions to Cookbook
  - added AlleleFreqs attribute to ensembl Variation objects.
  - added getGeneByStableId method to genome objects.
  - added Introns attribute to Transcript objects and an Intron class.
  - added Mann-Whitney test and a Monte-Carlo version
  - exploratory and confirmatory period estimation techniques (suitable for
    symbolic and continuous data)
  - Information theoretic measures (AIC and BIC) added
  - drawing of trees with collapsed nodes
  - progress display indicator support for terminal and GUI apps
  - added parser for illumina HiSeq2000 and GAiix sequence files as 
    cogent.parse.illumina_sequence.MinimalIlluminaSequenceParser.
  - added parser to FASTQ files, one of the output options for illumina's
    workflow, also added cookbook demo.
  - added functionality for parsing of SFF files without the Roche tools in
    cogent.parse.binary_sff
  - thousand fold performance improvement to nmds
  - >10-fold performance improvements to some Table operations

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from numpy import zeros, array, exp, pi, cos, fft, arange, power, sqrt, sum,\
 
2
                    multiply, float64, polyval
 
3
 
 
4
__author__ = "Hua Ying, Julien Epps and Gavin Huttley"
 
5
__copyright__ = "Copyright 2007-2010, The Cogent Project"
 
6
__credits__ = ["Julien Epps", "Hua Ying", "Gavin Huttley", "Peter Maxwell"]
 
7
__license__ = "GPL"
 
8
__version__ = "1.5.0"
 
9
__maintainer__ = "Gavin Huttley"
 
10
__email__ = "Gavin.Huttley@anu.edu.au"
 
11
__status__ = "Production"
 
12
 
 
13
def _goertzel_inner(x, N, period):
 
14
    coeff = 2.0 * cos(2 * pi / period)
 
15
    s_prev = 0.0
 
16
    s_prev2 = 0.0
 
17
    for n in range(N):
 
18
        s = x[n] + coeff * s_prev - s_prev2
 
19
        s_prev2 = s_prev
 
20
        s_prev = s
 
21
    pwr = sqrt(s_prev2**2 + s_prev**2 - coeff * s_prev2 * s_prev)
 
22
    return pwr
 
23
 
 
24
def _ipdft_inner(x, X, W, ulim, N): # naive python
 
25
    for p in range(ulim):
 
26
        w = 1
 
27
        for n in range(N):
 
28
            if n != 0:
 
29
                w *= W[p]
 
30
            X[p] = X[p] + x[n] * w
 
31
    return X
 
32
 
 
33
def _ipdft_inner2(x, X, W, ulim, N): # fastest python
 
34
    p = x[::-1] # reversed
 
35
    X = polyval(p, W)
 
36
    return X
 
37
 
 
38
def _autocorr_inner2(x, xc, N): # fastest python
 
39
    products = multiply.outer(x, x)
 
40
    v = [products.trace(offset=m) for m in range(-len(x)+1, len(x))]
 
41
    xc.put(xrange(xc.shape[0]), v)
 
42
 
 
43
def _autocorr_inner(x, xc, N): # naive python
 
44
    for m in range(-N+1, N):
 
45
        for n in range(N):
 
46
            if 0 <= n-m < N:
 
47
                xc[m+N-1] += (x[n]*x[n-m])
 
48
 
 
49
try:
 
50
    # try using pyrexed versions
 
51
    from _period import ipdft_inner, autocorr_inner, goertzel_inner
 
52
    # raise ImportError # for profiling
 
53
except ImportError:
 
54
    # fastest python versions
 
55
    ipdft_inner = _ipdft_inner2
 
56
    autocorr_inner = _autocorr_inner2
 
57
    goertzel_inner = _goertzel_inner
 
58
 
 
59
def goertzel(x, period):
 
60
    """returns the array(power), array(period) from series x for period
 
61
    result objects are arrays for consistency with that other period
 
62
    estimation functions"""
 
63
    calc = Goertzel(len(x), period=period)
 
64
    return calc(x)
 
65
 
 
66
class _PeriodEstimator(object):
 
67
    """parent class for period estimation"""
 
68
    def __init__(self, length, llim=None, ulim=None, period=None):
 
69
        super(_PeriodEstimator, self).__init__()
 
70
        self.length = length
 
71
        self.llim = llim or 2
 
72
        self.ulim = ulim or (length-1)
 
73
        
 
74
        if self.ulim > length:
 
75
            raise RuntimeError, 'Error: ulim > length'
 
76
        
 
77
        self.period = period
 
78
    
 
79
    def getNumStats(self):
 
80
        """returns the number of statistics computed by this calculator"""
 
81
        return 1
 
82
    
 
83
 
 
84
class AutoCorrelation(_PeriodEstimator):
 
85
    def __init__(self, length, llim=None, ulim=None, period=None):
 
86
        """class for repetitive calculation of autocorrelation for series of
 
87
        fixed length
 
88
        
 
89
        e.g. if x = [1,1,1,1], xc = [1,2,3,4,3,2,1]
 
90
        The middle element of xc corresponds to a lag (period) of 0
 
91
        xc is always symmetric for real x
 
92
        N is the length of x"""
 
93
        super(AutoCorrelation, self).__init__(length, llim, ulim, period)
 
94
        
 
95
        periods = range(-length+1, length)
 
96
        
 
97
        self.min_idx = periods.index(self.llim)
 
98
        self.max_idx = periods.index(self.ulim)
 
99
        self.periods = array(periods[self.min_idx: self.max_idx + 1])
 
100
        self.xc = zeros(2*self.length-1)
 
101
    
 
102
    def evaluate(self, x):
 
103
        x = array(x, float64)
 
104
        self.xc.fill(0.0)
 
105
        autocorr_inner(x, self.xc, self.length)
 
106
        xc = self.xc[self.min_idx: self.max_idx + 1]
 
107
        if self.period is not None:
 
108
            return xc[self.period-self.llim]
 
109
        
 
110
        return xc, self.periods
 
111
    
 
112
    __call__ = evaluate
 
113
 
 
114
def auto_corr(x, llim=None, ulim=None):
 
115
    """returns the autocorrelation of x
 
116
    e.g. if x = [1,1,1,1], xc = [1,2,3,4,3,2,1]
 
117
    The middle element of xc corresponds to a lag (period) of 0
 
118
    xc is always symmetric for real x
 
119
    N is the length of x
 
120
    """
 
121
    _autocorr = AutoCorrelation(len(x), llim=llim, ulim=ulim)
 
122
    return _autocorr(x)
 
123
 
 
124
class Ipdft(_PeriodEstimator):
 
125
    
 
126
    def __init__(self, length, llim=None, ulim=None, period=None, abs_ft_sig=True):
 
127
        """factory function for computing the integer period discrete Fourier
 
128
        transform for repeated application to signals of the same length.
 
129
    
 
130
        Argument:
 
131
            - length: the signal length
 
132
            - llim: lower limit
 
133
            - ulim: upper limit
 
134
            - period: a specific period to return the IPDFT power for
 
135
            - abs_ft_sig: if True, returns absolute value of signal
 
136
        """
 
137
        if period is not None:
 
138
            llim = period
 
139
            ulim = period
 
140
        super(Ipdft, self).__init__(length, llim, ulim, period)
 
141
        self.periods = array(range(self.llim, self.ulim+1))
 
142
        self.W = exp(-1j * 2 * pi / arange(1, self.ulim+1))
 
143
        self.X = array([0+0j] * self.length)
 
144
        self.abs_ft_sig = abs_ft_sig
 
145
    
 
146
    def evaluate(self, x):
 
147
        x = array(x, float64)
 
148
        self.X.fill(0+0j)
 
149
        self.X = ipdft_inner(x, self.X, self.W, self.ulim, self.length)
 
150
        pwr = self.X[self.llim-1:self.ulim]
 
151
        
 
152
        if self.abs_ft_sig:
 
153
            pwr = abs(pwr)
 
154
        
 
155
        if self.period is not None:
 
156
            return pwr[self.period-self.llim]
 
157
        
 
158
        return array(pwr), self.periods
 
159
    
 
160
    __call__ = evaluate
 
161
    
 
162
 
 
163
class Goertzel(_PeriodEstimator):
 
164
    """Computes the power of a signal for a specific period"""
 
165
    def __init__(self, length=None, llim=None, ulim=None, period=None, abs_ft_sig=True):
 
166
        assert period is not None, "Goertzel requires a period"
 
167
        super(Goertzel, self).__init__(length=length, period=period)
 
168
    
 
169
    def evaluate(self, x):
 
170
        x = array(x, float64)
 
171
        return _goertzel_inner(x, self.length, self.period)
 
172
    
 
173
    __call__ = evaluate
 
174
 
 
175
 
 
176
class Hybrid(_PeriodEstimator):
 
177
    """hybrid statistic and corresponding periods for signal x
 
178
    
 
179
    See Epps. EURASIP Journal on Bioinformatics and Systems Biology, 2009"""
 
180
    
 
181
    def __init__(self, length, llim=None, ulim=None, period=None, abs_ft_sig=True, return_all=False):
 
182
        """Arguments:
 
183
            - length: the length of signals to be encountered
 
184
            - period: specified period at which to return the signal
 
185
            - llim, ulim: the smallest, largest periods to evaluate
 
186
            - return_all: whether to return the hybrid, ipdft, autocorr
 
187
              statistics as a numpy array, or just the hybrid statistic
 
188
        """
 
189
        super(Hybrid, self).__init__(length, llim, ulim, period)
 
190
        self.ipdft = Ipdft(length, llim, ulim, period, abs_ft_sig)
 
191
        self.auto = AutoCorrelation(length, llim, ulim, period)
 
192
        self._return_all = return_all
 
193
    
 
194
    def getNumStats(self):
 
195
        """the number of stats computed by this calculator"""
 
196
        num = [1, 3][self._return_all]
 
197
        return num
 
198
    
 
199
    def evaluate(self, x):
 
200
        if self.period is None:
 
201
            auto_sig, auto_periods = self.auto(x)
 
202
            ft_sig, ft_periods = self.ipdft(x)
 
203
            hybrid = auto_sig * ft_sig
 
204
            if self._return_all:
 
205
                result = array([hybrid, ft_sig, auto_sig]), ft_periods
 
206
            else:
 
207
                result = hybrid, ft_periods
 
208
        else:
 
209
            auto_sig = self.auto(x)
 
210
            # ft_sig = goertzel(x, period) # performance slower than ipdft!
 
211
            ft_sig = self.ipdft(x)
 
212
            hybrid = auto_sig * ft_sig
 
213
            if self._return_all:
 
214
                result = array([abs(hybrid), ft_sig, auto_sig])
 
215
            else:
 
216
                result = abs(hybrid)
 
217
        return result
 
218
    
 
219
    __call__ = evaluate
 
220
 
 
221
 
 
222
def ipdft(x, llim=None, ulim=None, period=None):
 
223
    """returns the integer period discrete Fourier transform of the signal x
 
224
    
 
225
    Arguments:
 
226
        - x: series of symbols
 
227
        - llim: lower limit
 
228
        - ulim: upper limit
 
229
    """
 
230
    x = array(x, float64)
 
231
    ipdft_calc = Ipdft(len(x), llim, ulim, period)
 
232
    return ipdft_calc(x)
 
233
 
 
234
def hybrid(x, llim=None, ulim=None, period=None, return_all=False):
 
235
    """
 
236
    Return hybrid statistic and corresponding periods for signal x
 
237
    
 
238
    Arguments:
 
239
        - return_all: whether to return the hybrid, ipdft, autocorr
 
240
          statistics as a numpy array, or just the hybrid statistic
 
241
    
 
242
    See Epps. EURASIP Journal on Bioinformatics and Systems Biology, 2009, 9
 
243
    """
 
244
    hybrid_calc = Hybrid(len(x), llim, ulim, period, return_all=return_all)
 
245
    x = array(x, float)
 
246
    return hybrid_calc(x)
 
247
 
 
248
def dft(x, **kwargs):
 
249
    """
 
250
    Return discrete fft and corresponding periods for signal x
 
251
    """
 
252
    n = len(x) / 2 * 2
 
253
    x = array(x[:n])
 
254
    pwr = fft.rfft(x, n)[1:]
 
255
    freq = (arange(n/2+1)/(float(n)))[1:]
 
256
    pwr = list(pwr)
 
257
    periods = [1/f for f in freq]
 
258
    pwr.reverse()
 
259
    periods.reverse()
 
260
    return array(pwr), array(periods)
 
261
 
 
262
if __name__ == "__main__":
 
263
    from numpy import sin
 
264
    x = sin(2*pi/5*arange(1,9))
 
265
    print x
 
266
    print goertzel(x, 4)
 
267
    print goertzel(x, 8)
 
268
    
 
 
b'\\ No newline at end of file'