155
156
'DB file created by connect?')
156
157
conn.row_factory = sqlite3.Row
157
158
conn.text_factory = str
158
conn.execute('PRAGMA synchronous = NORMAL')
159
conn.execute('PRAGMA count_changes = OFF')
160
conn.execute('PRAGMA temp_store = MEMORY')
161
conn.execute('PRAGMA journal_mode = DELETE')
159
with closing(conn.cursor()) as cur:
160
cur.execute('PRAGMA synchronous = NORMAL')
161
cur.execute('PRAGMA count_changes = OFF')
162
cur.execute('PRAGMA temp_store = MEMORY')
163
cur.execute('PRAGMA journal_mode = DELETE')
162
164
conn.create_function('chexor', 3, chexor)
163
165
except sqlite3.DatabaseError:
171
173
"""Encapsulates working with a database."""
173
175
def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None,
174
account=None, container=None, pending_timeout=10,
176
account=None, container=None, pending_timeout=None,
175
177
stale_reads_ok=False):
176
""" Encapsulates working with a database. """
178
"""Encapsulates working with a database."""
178
180
self.db_file = db_file
179
181
self.pending_file = self.db_file + '.pending'
180
self.pending_timeout = pending_timeout
182
self.pending_timeout = pending_timeout or 10
181
183
self.stale_reads_ok = stale_reads_ok
182
184
self.db_dir = os.path.dirname(db_file)
183
185
self.timeout = timeout
203
205
factory=GreenDBConnection, timeout=0)
204
206
# creating dbs implicitly does a lot of transactions, so we
205
207
# pick fast, unsafe options here and do a big fsync at the end.
206
conn.execute('PRAGMA synchronous = OFF')
207
conn.execute('PRAGMA temp_store = MEMORY')
208
conn.execute('PRAGMA journal_mode = MEMORY')
208
with closing(conn.cursor()) as cur:
209
cur.execute('PRAGMA synchronous = OFF')
210
cur.execute('PRAGMA temp_store = MEMORY')
211
cur.execute('PRAGMA journal_mode = MEMORY')
209
212
conn.create_function('chexor', 3, chexor)
210
213
conn.row_factory = sqlite3.Row
211
214
conn.text_factory = str
292
295
elif 'file is encrypted or is not a database' in str(exc_value):
293
296
exc_hint = 'corrupted'
295
raise exc_type(*exc_value.args), None, exc_traceback
298
raise exc_type, exc_value, exc_traceback
296
299
prefix_path = os.path.dirname(self.db_dir)
297
300
partition_path = os.path.dirname(prefix_path)
298
301
dbs_path = os.path.dirname(partition_path)
467
466
:returns: dict containing keys: hash, id, created_at, put_timestamp,
468
467
delete_timestamp, count, max_row, and metadata
473
if not self.stale_reads_ok:
469
self._commit_puts_stale_ok()
475
470
query_part1 = '''
476
471
SELECT hash, id, created_at, put_timestamp, delete_timestamp,
477
472
%s_count AS count,
493
488
curs.row_factory = dict_factory
494
489
return curs.fetchone()
496
def _commit_puts(self):
497
pass # stub to be overridden if need be
491
def _commit_puts(self, item_list=None):
493
Scan for .pending files and commit the found records by feeding them
496
:param item_list: A list of items to commit in addition to .pending
498
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
500
if item_list is None:
502
with lock_parent_directory(self.pending_file, self.pending_timeout):
504
if not os.path.getsize(self.pending_file):
506
self.merge_items(item_list)
508
with open(self.pending_file, 'r+b') as fp:
509
for entry in fp.read().split(':'):
512
self._commit_puts_load(item_list, entry)
514
self.logger.exception(
515
_('Invalid pending entry %(file)s: %(entry)s'),
516
{'file': self.pending_file, 'entry': entry})
518
self.merge_items(item_list)
520
os.ftruncate(fp.fileno(), 0)
522
if err.errno != errno.ENOENT:
525
def _commit_puts_stale_ok(self):
527
Catch failures of _commit_puts() if broker is intended for
528
reading of stats, and thus does not care for pending updates.
533
if not self.stale_reads_ok:
536
def _commit_puts_load(self, item_list, entry):
538
Unmarshall the :param:entry and append it to :param:item_list.
539
This is implemented by a particular broker to be compatible
540
with its merge_items().
542
raise NotImplementedError
499
544
def merge_syncs(self, sync_points, incoming=True):
790
835
status_changed_at = ?
791
836
WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
838
def _commit_puts_load(self, item_list, entry):
839
(name, timestamp, size, content_type, etag, deleted) = \
840
pickle.loads(entry.decode('base64'))
841
item_list.append({'name': name,
842
'created_at': timestamp,
844
'content_type': content_type,
795
850
Check if the DB is empty.
797
852
:returns: True if the database has no active objects, False otherwise
802
if not self.stale_reads_ok:
854
self._commit_puts_stale_ok()
804
855
with self.get() as conn:
805
856
row = conn.execute(
806
857
'SELECT object_count from container_stat').fetchone()
807
858
return (row[0] == 0)
809
def _commit_puts(self, item_list=None):
810
"""Handles committing rows in .pending files."""
811
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
813
if item_list is None:
815
with lock_parent_directory(self.pending_file, self.pending_timeout):
817
if not os.path.getsize(self.pending_file):
819
self.merge_items(item_list)
821
with open(self.pending_file, 'r+b') as fp:
822
for entry in fp.read().split(':'):
825
(name, timestamp, size, content_type, etag,
826
deleted) = pickle.loads(entry.decode('base64'))
827
item_list.append({'name': name,
828
'created_at': timestamp,
830
'content_type': content_type,
834
self.logger.exception(
835
_('Invalid pending entry %(file)s: %(entry)s'),
836
{'file': self.pending_file, 'entry': entry})
838
self.merge_items(item_list)
840
os.ftruncate(fp.fileno(), 0)
842
if err.errno != errno.ENOENT:
845
860
def reclaim(self, object_timestamp, sync_timestamp):
847
862
Delete rows from the object table that are marked deleted and
913
928
if pending_size > PENDING_CAP:
914
929
self._commit_puts([record])
916
with lock_parent_directory(
917
self.pending_file, self.pending_timeout):
931
with lock_parent_directory(self.pending_file,
932
self.pending_timeout):
918
933
with open(self.pending_file, 'a+b') as fp:
919
934
# Colons aren't used in base64 encoding; so they are our
960
971
reported_object_count, reported_bytes_used, hash, id,
961
972
x_container_sync_point1, and x_container_sync_point2.
966
if not self.stale_reads_ok:
974
self._commit_puts_stale_ok()
968
975
with self.get() as conn:
970
977
trailing = 'x_container_sync_point1, x_container_sync_point2'
1073
1080
delim_force_gte = False
1074
1081
(marker, end_marker, prefix, delimiter, path) = utf8encode(
1075
1082
marker, end_marker, prefix, delimiter, path)
1079
if not self.stale_reads_ok:
1083
self._commit_puts_stale_ok()
1081
1084
if path is not None:
1340
1343
status_changed_at = ?
1341
1344
WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
1343
def _commit_puts(self, item_list=None):
1344
"""Handles committing rows in .pending files."""
1345
if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
1347
if item_list is None:
1349
with lock_parent_directory(self.pending_file, self.pending_timeout):
1351
if not os.path.getsize(self.pending_file):
1353
self.merge_items(item_list)
1355
with open(self.pending_file, 'r+b') as fp:
1356
for entry in fp.read().split(':'):
1359
(name, put_timestamp, delete_timestamp,
1360
object_count, bytes_used, deleted) = \
1361
pickle.loads(entry.decode('base64'))
1364
'put_timestamp': put_timestamp,
1365
'delete_timestamp': delete_timestamp,
1366
'object_count': object_count,
1367
'bytes_used': bytes_used,
1368
'deleted': deleted})
1370
self.logger.exception(
1371
_('Invalid pending entry %(file)s: %(entry)s'),
1372
{'file': self.pending_file, 'entry': entry})
1374
self.merge_items(item_list)
1376
os.ftruncate(fp.fileno(), 0)
1377
except OSError, err:
1378
if err.errno != errno.ENOENT:
1346
def _commit_puts_load(self, item_list, entry):
1347
(name, put_timestamp, delete_timestamp,
1348
object_count, bytes_used, deleted) = \
1349
pickle.loads(entry.decode('base64'))
1352
'put_timestamp': put_timestamp,
1353
'delete_timestamp': delete_timestamp,
1354
'object_count': object_count,
1355
'bytes_used': bytes_used,
1356
'deleted': deleted})
1381
1358
def empty(self):
1475
1448
protocol=PICKLE_PROTOCOL).encode('base64'))
1478
def can_delete_db(self, cutoff):
1480
Check if the accont DB can be deleted.
1482
:returns: True if the account can be deleted, False otherwise
1485
with self.get() as conn:
1486
row = conn.execute('''
1487
SELECT status, put_timestamp, delete_timestamp, container_count
1488
FROM account_stat''').fetchone()
1489
# The account is considered deleted if its status is marked
1490
# as 'DELETED" and the delete_timestamp is older than the supplied
1491
# cutoff date; or if the delete_timestamp value is greater than
1492
# the put_timestamp, and there are no containers for the account
1493
status_del = (row['status'] == 'DELETED')
1494
deltime = float(row['delete_timestamp'])
1495
past_cutoff = (deltime < cutoff)
1496
time_later = (row['delete_timestamp'] > row['put_timestamp'])
1497
no_containers = (row['container_count'] in (None, '', 0, '0'))
1499
(status_del and past_cutoff) or (time_later and no_containers))
1501
1451
def is_deleted(self):
1503
1453
Check if the account DB is considered to be deleted.
1508
1458
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
1513
if not self.stale_reads_ok:
1460
self._commit_puts_stale_ok()
1515
1461
with self.get() as conn:
1516
1462
row = conn.execute('''
1517
1463
SELECT put_timestamp, delete_timestamp, container_count, status
1536
1482
delete_timestamp, container_count, object_count,
1537
1483
bytes_used, hash, id
1542
if not self.stale_reads_ok:
1485
self._commit_puts_stale_ok()
1544
1486
with self.get() as conn:
1545
1487
return dict(conn.execute('''
1546
1488
SELECT account, created_at, put_timestamp, delete_timestamp,
1566
1508
(marker, end_marker, prefix, delimiter) = utf8encode(
1567
1509
marker, end_marker, prefix, delimiter)
1571
if not self.stale_reads_ok:
1510
self._commit_puts_stale_ok()
1573
1511
if delimiter and not prefix:
1575
1513
orig_marker = marker