1
# dblite.py module contributed by Ralf W. Grosse-Kunstleve.
2
# Extended for Unicode by Steven Knight.
12
_open = __builtin__.open # avoid name clash
14
keep_all_files = 00000
15
ignore_corrupt_dbfiles = 0
17
def corruption_warning(filename):
18
print "Warning: Discarding corrupt database:", filename
20
if hasattr(types, 'UnicodeType'):
23
return t is types.StringType or t is types.UnicodeType
26
return type(s) is types.StringType
31
def unicode(s): return s
33
dblite_suffix = '.dblite'
38
def __init__(self, file_base_name, flag, mode):
39
assert flag in (None, "r", "w", "c", "n")
40
if (flag is None): flag = "r"
41
base, ext = os.path.splitext(file_base_name)
42
if ext == dblite_suffix:
43
# There's already a suffix on the file name, don't add one.
44
self._file_name = file_base_name
45
self._tmp_name = base + tmp_suffix
47
self._file_name = file_base_name + dblite_suffix
48
self._tmp_name = file_base_name + tmp_suffix
52
self._needs_sync = 00000
53
if (self._flag == "n"):
54
_open(self._file_name, "wb", self._mode)
57
f = _open(self._file_name, "rb")
59
if (self._flag != "c"):
61
_open(self._file_name, "wb", self._mode)
66
self._dict = cPickle.loads(p)
67
except (cPickle.UnpicklingError, EOFError):
68
if (ignore_corrupt_dbfiles == 0): raise
69
if (ignore_corrupt_dbfiles == 1):
70
corruption_warning(self._file_name)
73
if (self._needs_sync):
77
self._check_writable()
78
f = _open(self._tmp_name, "wb", self._mode)
79
cPickle.dump(self._dict, f, 1)
81
# Windows doesn't allow renaming if the file exists, so unlink
82
# it first, chmod'ing it to make sure we can do so. On UNIX, we
83
# may not be able to chmod the file if it's owned by someone else
84
# (e.g. from a previous run as root). We should still be able to
85
# unlink() the file if the directory's writable, though, so ignore
86
# any OSError exception thrown by the chmod() call.
87
try: os.chmod(self._file_name, 0777)
89
os.unlink(self._file_name)
90
os.rename(self._tmp_name, self._file_name)
91
self._needs_sync = 00000
95
self._file_name + "_" + str(int(time.time())))
97
def _check_writable(self):
98
if (self._flag == "r"):
99
raise IOError("Read-only database: %s" % self._file_name)
101
def __getitem__(self, key):
102
return self._dict[key]
104
def __setitem__(self, key, value):
105
self._check_writable()
106
if (not is_string(key)):
107
raise TypeError, "key `%s' must be a string but is %s" % (key, type(key))
108
if (not is_string(value)):
109
raise TypeError, "value `%s' must be a string but is %s" % (value, type(value))
110
self._dict[key] = value
111
self._needs_sync = 0001
114
return self._dict.keys()
116
def has_key(self, key):
117
return key in self._dict
119
def __contains__(self, key):
120
return key in self._dict
123
return self._dict.iterkeys()
128
return len(self._dict)
130
def open(file, flag=None, mode=0666):
131
return dblite(file, flag, mode)
134
db = open("tmp", "n")
137
assert db["foo"] == "bar"
138
db[unicode("ufoo")] = unicode("ubar")
139
assert db[unicode("ufoo")] == unicode("ubar")
141
db = open("tmp", "c")
142
assert len(db) == 2, len(db)
143
assert db["foo"] == "bar"
145
assert db["bar"] == "foo"
146
db[unicode("ubar")] = unicode("ufoo")
147
assert db[unicode("ubar")] == unicode("ufoo")
149
db = open("tmp", "r")
150
assert len(db) == 4, len(db)
151
assert db["foo"] == "bar"
152
assert db["bar"] == "foo"
153
assert db[unicode("ufoo")] == unicode("ubar")
154
assert db[unicode("ubar")] == unicode("ufoo")
158
assert str(e) == "Read-only database: tmp.dblite"
160
raise RuntimeError, "IOError expected."
161
db = open("tmp", "w")
168
assert str(e) == "key `(1, 2)' must be a string but is <type 'tuple'>", str(e)
170
raise RuntimeError, "TypeError exception expected"
174
assert str(e) == "value `[1, 2]' must be a string but is <type 'list'>", str(e)
176
raise RuntimeError, "TypeError exception expected"
177
db = open("tmp", "r")
179
db = open("tmp", "n")
181
_open("tmp.dblite", "w")
182
db = open("tmp", "r")
183
_open("tmp.dblite", "w").write("x")
185
db = open("tmp", "r")
186
except cPickle.UnpicklingError:
189
raise RuntimeError, "cPickle exception expected."
190
global ignore_corrupt_dbfiles
191
ignore_corrupt_dbfiles = 2
192
db = open("tmp", "r")
194
os.unlink("tmp.dblite")
196
db = open("tmp", "w")
198
assert str(e) == "[Errno 2] No such file or directory: 'tmp.dblite'", str(e)
200
raise RuntimeError, "IOError expected."
203
if (__name__ == "__main__"):