1
from numpy import zeros, array, exp, pi, cos, fft, arange, power, sqrt, sum,\
2
multiply, float64, polyval
4
__author__ = "Hua Ying, Julien Epps and Gavin Huttley"
5
__copyright__ = "Copyright 2007-2010, The Cogent Project"
6
__credits__ = ["Julien Epps", "Hua Ying", "Gavin Huttley", "Peter Maxwell"]
9
__maintainer__ = "Gavin Huttley"
10
__email__ = "Gavin.Huttley@anu.edu.au"
11
__status__ = "Production"
13
def _goertzel_inner(x, N, period):
14
coeff = 2.0 * cos(2 * pi / period)
18
s = x[n] + coeff * s_prev - s_prev2
21
pwr = sqrt(s_prev2**2 + s_prev**2 - coeff * s_prev2 * s_prev)
24
def _ipdft_inner(x, X, W, ulim, N): # naive python
30
X[p] = X[p] + x[n] * w
33
def _ipdft_inner2(x, X, W, ulim, N): # fastest python
34
p = x[::-1] # reversed
38
def _autocorr_inner2(x, xc, N): # fastest python
39
products = multiply.outer(x, x)
40
v = [products.trace(offset=m) for m in range(-len(x)+1, len(x))]
41
xc.put(xrange(xc.shape[0]), v)
43
def _autocorr_inner(x, xc, N): # naive python
44
for m in range(-N+1, N):
47
xc[m+N-1] += (x[n]*x[n-m])
50
# try using pyrexed versions
51
from _period import ipdft_inner, autocorr_inner, goertzel_inner
52
# raise ImportError # for profiling
54
# fastest python versions
55
ipdft_inner = _ipdft_inner2
56
autocorr_inner = _autocorr_inner2
57
goertzel_inner = _goertzel_inner
59
def goertzel(x, period):
60
"""returns the array(power), array(period) from series x for period
61
result objects are arrays for consistency with that other period
62
estimation functions"""
63
calc = Goertzel(len(x), period=period)
66
class _PeriodEstimator(object):
67
"""parent class for period estimation"""
68
def __init__(self, length, llim=None, ulim=None, period=None):
69
super(_PeriodEstimator, self).__init__()
72
self.ulim = ulim or (length-1)
74
if self.ulim > length:
75
raise RuntimeError, 'Error: ulim > length'
79
def getNumStats(self):
80
"""returns the number of statistics computed by this calculator"""
84
class AutoCorrelation(_PeriodEstimator):
85
def __init__(self, length, llim=None, ulim=None, period=None):
86
"""class for repetitive calculation of autocorrelation for series of
89
e.g. if x = [1,1,1,1], xc = [1,2,3,4,3,2,1]
90
The middle element of xc corresponds to a lag (period) of 0
91
xc is always symmetric for real x
92
N is the length of x"""
93
super(AutoCorrelation, self).__init__(length, llim, ulim, period)
95
periods = range(-length+1, length)
97
self.min_idx = periods.index(self.llim)
98
self.max_idx = periods.index(self.ulim)
99
self.periods = array(periods[self.min_idx: self.max_idx + 1])
100
self.xc = zeros(2*self.length-1)
102
def evaluate(self, x):
103
x = array(x, float64)
105
autocorr_inner(x, self.xc, self.length)
106
xc = self.xc[self.min_idx: self.max_idx + 1]
107
if self.period is not None:
108
return xc[self.period-self.llim]
110
return xc, self.periods
114
def auto_corr(x, llim=None, ulim=None):
115
"""returns the autocorrelation of x
116
e.g. if x = [1,1,1,1], xc = [1,2,3,4,3,2,1]
117
The middle element of xc corresponds to a lag (period) of 0
118
xc is always symmetric for real x
121
_autocorr = AutoCorrelation(len(x), llim=llim, ulim=ulim)
124
class Ipdft(_PeriodEstimator):
126
def __init__(self, length, llim=None, ulim=None, period=None, abs_ft_sig=True):
127
"""factory function for computing the integer period discrete Fourier
128
transform for repeated application to signals of the same length.
131
- length: the signal length
134
- period: a specific period to return the IPDFT power for
135
- abs_ft_sig: if True, returns absolute value of signal
137
if period is not None:
140
super(Ipdft, self).__init__(length, llim, ulim, period)
141
self.periods = array(range(self.llim, self.ulim+1))
142
self.W = exp(-1j * 2 * pi / arange(1, self.ulim+1))
143
self.X = array([0+0j] * self.length)
144
self.abs_ft_sig = abs_ft_sig
146
def evaluate(self, x):
147
x = array(x, float64)
149
self.X = ipdft_inner(x, self.X, self.W, self.ulim, self.length)
150
pwr = self.X[self.llim-1:self.ulim]
155
if self.period is not None:
156
return pwr[self.period-self.llim]
158
return array(pwr), self.periods
163
class Goertzel(_PeriodEstimator):
164
"""Computes the power of a signal for a specific period"""
165
def __init__(self, length=None, llim=None, ulim=None, period=None, abs_ft_sig=True):
166
assert period is not None, "Goertzel requires a period"
167
super(Goertzel, self).__init__(length=length, period=period)
169
def evaluate(self, x):
170
x = array(x, float64)
171
return _goertzel_inner(x, self.length, self.period)
176
class Hybrid(_PeriodEstimator):
177
"""hybrid statistic and corresponding periods for signal x
179
See Epps. EURASIP Journal on Bioinformatics and Systems Biology, 2009"""
181
def __init__(self, length, llim=None, ulim=None, period=None, abs_ft_sig=True, return_all=False):
183
- length: the length of signals to be encountered
184
- period: specified period at which to return the signal
185
- llim, ulim: the smallest, largest periods to evaluate
186
- return_all: whether to return the hybrid, ipdft, autocorr
187
statistics as a numpy array, or just the hybrid statistic
189
super(Hybrid, self).__init__(length, llim, ulim, period)
190
self.ipdft = Ipdft(length, llim, ulim, period, abs_ft_sig)
191
self.auto = AutoCorrelation(length, llim, ulim, period)
192
self._return_all = return_all
194
def getNumStats(self):
195
"""the number of stats computed by this calculator"""
196
num = [1, 3][self._return_all]
199
def evaluate(self, x):
200
if self.period is None:
201
auto_sig, auto_periods = self.auto(x)
202
ft_sig, ft_periods = self.ipdft(x)
203
hybrid = auto_sig * ft_sig
205
result = array([hybrid, ft_sig, auto_sig]), ft_periods
207
result = hybrid, ft_periods
209
auto_sig = self.auto(x)
210
# ft_sig = goertzel(x, period) # performance slower than ipdft!
211
ft_sig = self.ipdft(x)
212
hybrid = auto_sig * ft_sig
214
result = array([abs(hybrid), ft_sig, auto_sig])
222
def ipdft(x, llim=None, ulim=None, period=None):
223
"""returns the integer period discrete Fourier transform of the signal x
226
- x: series of symbols
230
x = array(x, float64)
231
ipdft_calc = Ipdft(len(x), llim, ulim, period)
234
def hybrid(x, llim=None, ulim=None, period=None, return_all=False):
236
Return hybrid statistic and corresponding periods for signal x
239
- return_all: whether to return the hybrid, ipdft, autocorr
240
statistics as a numpy array, or just the hybrid statistic
242
See Epps. EURASIP Journal on Bioinformatics and Systems Biology, 2009, 9
244
hybrid_calc = Hybrid(len(x), llim, ulim, period, return_all=return_all)
246
return hybrid_calc(x)
248
def dft(x, **kwargs):
250
Return discrete fft and corresponding periods for signal x
254
pwr = fft.rfft(x, n)[1:]
255
freq = (arange(n/2+1)/(float(n)))[1:]
257
periods = [1/f for f in freq]
260
return array(pwr), array(periods)
262
if __name__ == "__main__":
263
from numpy import sin
264
x = sin(2*pi/5*arange(1,9))
b'\\ No newline at end of file'