1
"""Tests suite for fame io submodule.
4
:contact: mattknox_ca_at_hotmail_dot_com
5
:version: $Id: test_fame.py 2578 2007-01-17 19:25:10Z mattknox_ca $
7
__author__ = "Matt Knox ($Author: mattknox_ca $)"
9
__revision__ = "$Revision: 2578 $"
10
__date__ = '$Date: 2007-01-17 14:25:10 -0500 (Wed, 17 Jan 2007) $'
14
from numpy import bool_, complex_, float_, int_, object_
15
import numpy.core.fromnumeric as fromnumeric
16
import numpy.core.numeric as numeric
17
from numpy.testing import NumpyTest, NumpyTestCase
18
from numpy.testing.utils import build_err_msg
20
from timeseries.io import fame
21
from timeseries.io.fame import const
22
from timeseries import const as _c
23
from timeseries import Report
24
import timeseries as ts
25
import maskedarray as ma
29
from maskedarray import masked_array, masked, nomask
31
import maskedarray.testutils
32
from maskedarray.testutils import assert_equal, assert_array_equal, approx, assert_mask_equal
34
# setup all the data to be used for reading and writing
35
data = {'dates':{}, 'darrays':{}, 'freqs':{}, 'cser':{}, 'tser':{}, 'scalars':{}}
37
data['dates']['a'] = ts.Date(freq='A', year=2004)
38
data['dates']['q'] = ts.Date(freq='Q', year=2004, quarter=1)
39
data['dates']['m'] = ts.Date(freq='M', year=2004, month=1)
40
data['dates']['w'] = ts.Date(freq='W', year=2004, month=1, day=1)
41
data['dates']['b'] = ts.Date(freq='b', year=2004, month=1, day=1)
42
data['dates']['d'] = ts.Date(freq='d', year=2004, month=1, day=1)
43
data['dates']['h'] = ts.Date(freq='h', year=2004, month=1, day=1, hour=0)
44
data['dates']['t'] = ts.Date(freq='t', year=2004, month=1, day=1, hour=0, minute=0)
45
data['dates']['s'] = ts.Date(freq='s', year=2004, month=1, day=1, hour=0, minute=0, second=0)
47
for freq in data['dates']:
48
data['darrays'][freq] = ts.date_array(start_date=data['dates'][freq], length=10)
49
data['cser']['date_'+freq] = data['darrays'][freq]
51
data['cser']['bool'] = [True, False, True, False, True, True]
52
data['cser']['int32'] = np.arange(6).astype(np.int32)
53
data['cser']['int64'] = np.arange(6).astype(np.int64)
54
data['cser']['float32'] = np.arange(6).astype(np.float32)
55
data['cser']['float64'] = np.arange(6).astype(np.float64)
56
data['cser']['str'] = ["asdf", "aasssssssss", "zzzzzzzzzzzz", "", "blah"]
58
for x in data['cser']:
59
data['cser'][x] = ma.masked_array(data['cser'][x])
60
data['tser'][x] = ts.time_series(data['cser'][x], start_date=data['dates']['a'])
62
for freq in data['dates']:
63
data['freqs'][freq] = ts.time_series(np.arange(20).astype(np.float32), start_date=data['dates'][freq])
65
# test writing for all data types as time series and as case series
66
for x in data['tser']:
67
data['tser'][x][1] = ma.masked
68
data['cser'][x][1] = ma.masked
70
# series for testing appending data to an existing series
71
appendTSer = ts.time_series(np.arange(10, 15).astype(np.float32), freq='A', start_date=ts.Date(freq='A', year=2007))
72
appendCSer = np.arange(10, 15).astype(np.float32)
74
# series for testing writing over a specified range
75
rangeTSer = ts.time_series(np.arange(20).astype(np.float32), freq='A', start_date=ts.Date(freq='A', year=2004))
76
rangeCSer = np.arange(20).astype(np.float32)
78
data['scalars']['int32'] = np.int32(5)
79
data['scalars']['int64'] = np.int64(5)
80
data['scalars']['float32'] = np.float32(5)
81
data['scalars']['float64'] = np.float64(5)
82
data['scalars']['pyInt'] = 5
83
data['scalars']['pyFloat'] = 5234.6323
84
data['scalars']['string'] = "mystring"
85
data['scalars']['namelist'] = ["mystring", "$asdf","gggggggg"]
86
data['scalars']['boolean'] = True
87
for f in data['dates']:
88
data['scalars']['date_'+f] = data['dates'][f]
90
_desc = "my desc\nline 2"
91
_doc = "my doc\nline 2"
93
class test_write(NumpyTestCase):
96
self.db = fame.FameDb("testdb.db",'o')
99
"execute all the tests. Order is important here"
101
tests = ["_test_write_scalars", "_test_read_scalars",
102
"_test_dict_scalars", "_test_write_freqs_tser",
103
"_test_read_freqs_tser", "_test_write_dtypes_tser",
104
"_test_read_dtypes_tser", "_test_read_range_tser",
105
"_test_write_append_tser", "_test_read_append_tser",
106
"_test_write_range_tser", "_test_verify_write_range_tser",
107
"_test_write_empty_tser", "_test_read_empty_tser",
108
"_test_overwrite_tser", "_test_assume_exists_tser",
109
"_test_dict_tser", "_test_write_dtypes_cser",
110
"_test_read_dtypes_cser", "_test_read_range_cser",
111
"_test_write_append_cser", "_test_read_append_cser",
112
"_test_write_range_cser", "_test_verify_write_range_cser",
113
"_test_write_empty_cser", "_test_read_empty_cser",
114
"_test_overwrite_cser", "_test_assume_exists_cser",
115
"_test_dict_cser", "_test_whats",
116
"_test_exists", "_test_delete",
117
"_test_wildlist", "_test_restore",
118
"_test_db_attribs", "_test_initialize_obj_and_post",
119
"_test_copy_rename", "_test_obj_attribs",
127
def _test_write_scalars(self):
128
"test writing all types of scalar values"
129
for s in data['scalars']:
130
self.db.write_scalar('$scalar_'+s, data['scalars'][s])
132
def _test_dict_scalars(self):
133
"test writing multiple scalars at once using write_scalar_dict"
134
self.db.write_scalar_dict({'$scalar_1':data['scalars']['float32'],
135
'$scalar_2':data['scalars']['float32']})
136
result = self.db.read(['$scalar_1', '$scalar_2'])
137
assert_equal(result['$scalar_1'], data['scalars']['float32'])
138
assert_equal(result['$scalar_2'], data['scalars']['float32'])
140
def _test_read_scalars(self):
141
"read scalars of every data type"
142
for s in data['scalars']:
143
sclr = self.db.read('$scalar_'+s)
144
orig = data['scalars'][s]
147
assert_equal(sclr, orig.astype(np.float32))
148
elif s in ('pyInt', 'pyFloat', 'int64'):
149
assert_equal(sclr, np.float64(orig))
150
elif s == 'namelist':
151
assert_equal(sclr, [x.upper() for x in orig])
153
assert_equal(sclr, orig)
155
def _test_write_freqs_tser(self):
156
"test writing time series for all frequencies"
157
for x in data['freqs']:
158
self.db.write_tser('$freq_'+x, data['freqs'][x])
160
def _test_read_freqs_tser(self):
161
"""read series at every frequency and ensure they are the
162
same as what was written"""
163
for x in data['freqs']:
164
ser = self.db.read('$freq_'+x)
165
assert_mask_equal(ser.mask, data['freqs'][x].mask)
166
assert((ser == data['freqs'][x]).all())
168
def _test_write_dtypes_tser(self):
169
"test writing for all dtypes for time series"
170
for x in data['tser']:
171
self.db.write_tser('$tser_'+x, data['tser'][x])
173
def _test_read_dtypes_tser(self):
174
"read time series of every data type"
175
for x in data['tser']:
176
ser = self.db.read('$tser_'+x)
177
if str(ser.dtype)[:5] == 'float' and str(data['tser'][x].dtype)[:3] == 'int':
178
ser = ser.astype(data['tser'][x].dtype)
180
assert_mask_equal(ser.mask, data['tser'][x].mask)
181
assert((ser == data['tser'][x]).all())
183
def _test_read_range_tser(self):
184
"test reading a time series over specified ranges"
185
src = data['tser']['float32']
186
s1 = src.start_date+2
187
s2 = src.start_date-2
191
dateList = [(s1, e1),
196
for s, e in dateList:
197
res = ts.adjust_endpoints(src, start_date=s, end_date=e)
198
ser = self.db.read('$tser_float32', start_date=s, end_date=e)
199
assert_array_equal(res, ser)
202
def _test_write_append_tser(self):
203
"test appending data to an existing time series"
204
self.db.write_tser('$appendTSer', data['tser']['float32'])
205
self.db.write_tser('$appendTSer', appendTSer)
207
def _test_read_append_tser(self):
208
"test reading of appended time series"
209
result = ts.adjust_endpoints(data['tser']['float32'],
210
start_date=data['tser']['float32'].start_date,
211
end_date=appendTSer.end_date)
212
result[appendTSer.start_date:appendTSer.end_date+1] = appendTSer
214
ser = self.db.read('$appendTSer')
216
assert_array_equal(result, ser)
219
def _test_write_range_tser(self):
220
"test writing a time series over a specified range"
221
self.db.write_tser('$rangeTSer', rangeTSer,
222
start_date=ts.Date(freq='A', year=2008),
223
end_date=ts.Date(freq='A', year=2012))
225
def _test_verify_write_range_tser(self):
226
"verify that _test_write_range_write_tser worked as expected"
228
ser = self.db.read('$rangeTSer')
230
sDate = ts.Date(freq='A', year=2008)
231
eDate = ts.Date(freq='A', year=2012)
233
assert_array_equal(ser, rangeTSer[sDate:eDate+1])
235
def _test_write_empty_tser(self):
236
"test writing a time series with no data"
237
emptySer = ts.time_series([], freq='A')
238
self.db.write_tser('$emptyTSer', emptySer)
240
def _test_read_empty_tser(self):
241
"test reading a time series with no data"
242
ser = self.db.read('$emptyTSer')
243
assert(ser.start_date is None)
245
def _test_overwrite_tser(self):
246
"test overwriting a time series"
247
self.db.write_tser('$tser_float32', data['tser']['bool'], overwrite=True)
248
ser = self.db.read('$tser_float32')
249
assert_array_equal(ser, data['tser']['bool'])
251
def _test_assume_exists_tser(self):
252
"check to see if the assume_exists flag works for write_tser"
255
self.db.write_tser('$doesNotExist', appendTSer, assume_exists=True)
260
def _test_dict_tser(self):
261
"test writing multiple time series at once using write_tser_dict"
262
self.db.write_tser_dict({'$tser_1':data['tser']['float32'],
263
'$tser_2':data['tser']['float32']})
264
result = self.db.read(['$tser_1', '$tser_2'])
265
assert_array_equal(result['$tser_1'], data['tser']['float32'])
266
assert_array_equal(result['$tser_2'], data['tser']['float32'])
268
def _test_write_dtypes_cser(self):
269
"test writing for all dtypes for case series"""
270
for x in data['cser']:
271
self.db.write_cser('$cser_'+x, data['cser'][x])
273
def _test_read_dtypes_cser(self):
274
"read case series of every data type"
275
for x in data['cser']:
276
ser = self.db.read('$cser_'+x)
277
if str(ser.dtype)[:5] == 'float' and str(data['cser'][x].dtype)[:3] == 'int':
278
ser = ser.astype(data['cser'][x].dtype)
280
assert_mask_equal(ser.mask, data['cser'][x].mask)
281
assert((ser == data['cser'][x]).all())
283
def _test_read_range_cser(self):
284
"test reading case series over specified ranges"
285
src = data['cser']['float32']
291
caseList = [(s1, e1),
296
for s, e in caseList:
298
res = ma.array([0]*size , np.float32, mask=[1]*size )
300
if e < src.size: _e = size
301
else: _e = size - max(e-size, 0, size - src.size)
303
res[0:_e] = src[s-1:min(e, src.size)]
304
ser = self.db.read('$cser_float32', start_case=s, end_case=e)
306
assert_array_equal(res, ser)
308
def _test_write_append_cser(self):
309
"test appending to an existing case series"
310
self.db.write_cser('$appendCSer', data['cser']['float32'])
311
self.db.write_cser('$appendCSer', appendCSer, zero_represents=4)
313
def _test_read_append_cser(self):
314
"test reading of appended case series"
316
result = ma.concatenate([data['cser']['float32'][:3], appendCSer])
317
ser = self.db.read('$appendCSer')
318
assert_array_equal(result, ser)
320
def _test_write_range_cser(self):
321
"test writing over a specified range"
322
self.db.write_cser('$rangeCSer', rangeCSer,
323
start_case=5, end_case=9)
325
def _test_verify_write_range_cser(self):
326
"verify that _test_write_range_write_cser worked as expected"
328
ser = self.db.read('$rangeCSer')
329
result = ma.arange(9).astype(np.float32)
330
result[:4] = ma.masked
332
assert_array_equal(ser, result)
334
def _test_write_empty_cser(self):
335
"test writing a case series with no data"
336
self.db.write_cser('$emptyCSer', ma.array([]))
338
def _test_read_empty_cser(self):
339
"test reading a case series with no data"
340
ser = self.db.read('$emptyCSer')
341
assert_equal(ser.size, 0)
343
def _test_overwrite_cser(self):
344
"test overwriting a case series"
345
self.db.write_cser('$cser_float32', data['cser']['bool'], overwrite=True)
346
ser = self.db.read('$cser_float32')
347
assert_array_equal(ser, data['cser']['bool'])
349
def _test_assume_exists_cser(self):
350
"check to see if the assume_exists flag works for write_cser"
353
self.db.write_cser('$doesNotExist', appendCSer, assume_exists=True)
358
def _test_dict_cser(self):
359
"test writing multiple case series at once using write_cser_dict"
360
self.db.write_cser_dict({'$cser_1':data['cser']['float32'],
361
'$cser_2':data['cser']['float32']})
362
result = self.db.read(['$cser_1', '$cser_2'])
363
assert_array_equal(result['$cser_1'], data['cser']['float32'])
364
assert_array_equal(result['$cser_2'], data['cser']['float32'])
366
def _test_whats(self):
368
# just make sure it doesn't crash for now
369
what_dict = self.db._whats('$tser_float32')
371
def _test_exists(self):
373
assert(self.db.obj_exists('$cser_float32'))
374
assert(not self.db.obj_exists('$fake_series'))
376
def _test_delete(self):
378
assert(self.db.obj_exists('$cser_1'))
379
assert(self.db.obj_exists('$cser_2'))
380
self.db.delete_obj(['$cser_1', '$cser_2'])
381
assert(not self.db.obj_exists('$cser_1'))
382
assert(not self.db.obj_exists('$cser_2'))
383
self.db.delete_obj('$cser_1', must_exist=False)
385
def _test_wildlist(self):
386
"test wildlist method"
387
wl1 = self.db.wildlist("$cser_?")
388
wl2 = self.db.wildlist("$cser_?", wildonly=True)
389
res1 = sorted(["$CSER_"+x.upper() for x in list(data['cser'])])
390
res2 = sorted([x.upper() for x in list(data['cser'])])
391
assert_equal(wl1, res1)
392
assert_equal(wl2, res2)
394
def _test_restore(self):
395
"test restore method"
397
self.db = fame.FameDb("testdb.db",'s')
399
self.db.delete_obj('$tser_float32')
400
assert(not self.db.obj_exists('$tser_float32'))
402
assert(self.db.obj_exists('$tser_float32'))
404
def _test_db_attribs(self):
405
"test setting and retrieving database attributes"
406
self.db.set_db_desc(_desc)
407
self.db.set_db_doc(_doc)
409
created = self.db.db_created()
410
modified = self.db.db_modified()
411
desc = self.db.db_desc()
412
doc = self.db.db_doc()
414
assert(abs(ts.thisday('s') - created) < 100)
415
assert(abs(ts.thisday('s') - modified) < 100)
416
assert_equal(desc, _desc)
417
assert_equal(doc, _doc)
419
assert(self.db.db_is_open())
421
assert(not self.db.db_is_open())
422
self.db = fame.FameDb("testdb.db",'s')
424
def _test_initialize_obj_and_post(self):
425
"""test initializing an object and posting of database"""
426
self.db.initialize_obj("$postobj", ts.thisday('B'))
427
exist_script = "from timeseries.io import fame;"
428
exist_script += "db = fame.FameDb('testdb.db', 'r');"
429
exist_script += "print db.obj_exists('$postobj');"
431
proc = os.popen('python -c "'+exist_script+'"')
432
exists = proc.readlines()[0].strip('\n')
435
assert_equal(exists, "False")
439
proc = os.popen('python -c "'+exist_script+'"')
440
exists = proc.readlines()[0].strip('\n')
443
assert_equal(exists, "True")
445
def _test_copy_rename(self):
446
"test copying and renaming an object"
447
db2 = fame.FameDb("testdb2.db", 'o')
448
self.db.copy_obj(db2, "$tser_float32", "$copied_obj")
449
orig_obj = self.db.read("$tser_float32")
450
copied_obj = db2.read("$copied_obj")
451
assert_array_equal(orig_obj, copied_obj)
453
db2.rename_obj("$copied_obj", "$renamed_obj")
454
assert(db2.obj_exists("$renamed_obj"))
455
assert(not db2.obj_exists("$copied_obj"))
459
def _test_obj_attribs(self):
460
"test getting and setting object attributes"
461
assert_equal(self.db.obj_freq("$freq_b"), data['freqs']['b'].freq)
463
assert_equal(self.db.obj_start_date("$freq_b"),
464
data['freqs']['b'].start_date)
465
assert_equal(self.db.obj_end_date("$freq_b"),
466
data['freqs']['b'].end_date)
468
created = self.db.obj_created("$freq_b")
469
modified = self.db.obj_modified("$freq_b")
471
assert(abs(ts.thisday('s') - created) < 100)
472
assert(abs(ts.thisday('s') - modified) < 100)
474
self.db.set_obj_desc("$freq_b", _desc)
475
self.db.set_obj_doc("$freq_b", _doc)
477
desc = self.db.obj_desc("$freq_b")
478
doc = self.db.obj_doc("$freq_b")
480
assert_equal(desc, _desc)
481
assert_equal(doc, _doc)
483
self.db.set_obj_basis("$freq_b", const.HBSDAY)
484
assert_equal(self.db.obj_basis("$freq_b"), const.HBSDAY)
485
self.db.set_obj_basis("$freq_b", const.HBSBUS)
486
assert_equal(self.db.obj_basis("$freq_b"), const.HBSBUS)
488
self.db.set_obj_observed("$freq_b", "END")
489
assert_equal(self.db.obj_observed("$freq_b"), "ENDING")
491
self.db.set_obj_observed("$freq_b", "MAX")
492
assert_equal(self.db.obj_observed("$freq_b"), "MAXIMUM")
494
self.db.set_obj_observed("$freq_b", "AVERAGE")
495
assert_equal(self.db.obj_observed("$freq_b"), "AVERAGED")
497
def _test_misc_funcs(self):
498
"test FAME functions that aren't database methods"
499
assert_equal(fame.license_expires().freq, _c.FR_DAY)
501
# just test that this doesn't crash for now
502
fame.set_option("DBSIZE", "LARGE")
509
###############################################################################
510
#------------------------------------------------------------------------------
511
if __name__ == "__main__":