~ubuntu-branches/ubuntu/trusty/swift/trusty-updates

« back to all changes in this revision

Viewing changes to swift/common/db.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, James Page, Chuck Short
  • Date: 2013-08-13 10:37:13 UTC
  • mfrom: (1.2.21)
  • Revision ID: package-import@ubuntu.com-20130813103713-1ctbx4zifyljs2aq
Tags: 1.9.1-0ubuntu1
[ James Page ]
* d/control: Update VCS fields for new branch locations.

[ Chuck Short ]
* New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
""" Database code for Swift """
17
17
 
18
18
from __future__ import with_statement
19
 
from contextlib import contextmanager
 
19
from contextlib import contextmanager, closing
20
20
import hashlib
21
21
import logging
22
22
import os
25
25
import time
26
26
import cPickle as pickle
27
27
import errno
 
28
from gettext import gettext as _
28
29
from tempfile import mkstemp
29
30
 
30
31
from eventlet import sleep, Timeout
155
156
                                              'DB file created by connect?')
156
157
        conn.row_factory = sqlite3.Row
157
158
        conn.text_factory = str
158
 
        conn.execute('PRAGMA synchronous = NORMAL')
159
 
        conn.execute('PRAGMA count_changes = OFF')
160
 
        conn.execute('PRAGMA temp_store = MEMORY')
161
 
        conn.execute('PRAGMA journal_mode = DELETE')
 
159
        with closing(conn.cursor()) as cur:
 
160
            cur.execute('PRAGMA synchronous = NORMAL')
 
161
            cur.execute('PRAGMA count_changes = OFF')
 
162
            cur.execute('PRAGMA temp_store = MEMORY')
 
163
            cur.execute('PRAGMA journal_mode = DELETE')
162
164
        conn.create_function('chexor', 3, chexor)
163
165
    except sqlite3.DatabaseError:
164
166
        import traceback
171
173
    """Encapsulates working with a database."""
172
174
 
173
175
    def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None,
174
 
                 account=None, container=None, pending_timeout=10,
 
176
                 account=None, container=None, pending_timeout=None,
175
177
                 stale_reads_ok=False):
176
 
        """ Encapsulates working with a database. """
 
178
        """Encapsulates working with a database."""
177
179
        self.conn = None
178
180
        self.db_file = db_file
179
181
        self.pending_file = self.db_file + '.pending'
180
 
        self.pending_timeout = pending_timeout
 
182
        self.pending_timeout = pending_timeout or 10
181
183
        self.stale_reads_ok = stale_reads_ok
182
184
        self.db_dir = os.path.dirname(db_file)
183
185
        self.timeout = timeout
203
205
                                   factory=GreenDBConnection, timeout=0)
204
206
        # creating dbs implicitly does a lot of transactions, so we
205
207
        # pick fast, unsafe options here and do a big fsync at the end.
206
 
        conn.execute('PRAGMA synchronous = OFF')
207
 
        conn.execute('PRAGMA temp_store = MEMORY')
208
 
        conn.execute('PRAGMA journal_mode = MEMORY')
 
208
        with closing(conn.cursor()) as cur:
 
209
            cur.execute('PRAGMA synchronous = OFF')
 
210
            cur.execute('PRAGMA temp_store = MEMORY')
 
211
            cur.execute('PRAGMA journal_mode = MEMORY')
209
212
        conn.create_function('chexor', 3, chexor)
210
213
        conn.row_factory = sqlite3.Row
211
214
        conn.text_factory = str
292
295
        elif 'file is encrypted or is not a database' in str(exc_value):
293
296
            exc_hint = 'corrupted'
294
297
        else:
295
 
            raise exc_type(*exc_value.args), None, exc_traceback
 
298
            raise exc_type, exc_value, exc_traceback
296
299
        prefix_path = os.path.dirname(self.db_dir)
297
300
        partition_path = os.path.dirname(prefix_path)
298
301
        dbs_path = os.path.dirname(partition_path)
332
335
        except sqlite3.DatabaseError:
333
336
            try:
334
337
                conn.close()
335
 
            except:
 
338
            except Exception:
336
339
                pass
337
340
            self.possibly_quarantine(*sys.exc_info())
338
341
        except (Exception, Timeout):
414
417
        :param count: number to get
415
418
        :returns: list of objects between start and end
416
419
        """
417
 
        try:
418
 
            self._commit_puts()
419
 
        except LockTimeout:
420
 
            if not self.stale_reads_ok:
421
 
                raise
 
420
        self._commit_puts_stale_ok()
422
421
        with self.get() as conn:
423
422
            curs = conn.execute('''
424
423
                SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ?
467
466
        :returns: dict containing keys: hash, id, created_at, put_timestamp,
468
467
            delete_timestamp, count, max_row, and metadata
469
468
        """
470
 
        try:
471
 
            self._commit_puts()
472
 
        except LockTimeout:
473
 
            if not self.stale_reads_ok:
474
 
                raise
 
469
        self._commit_puts_stale_ok()
475
470
        query_part1 = '''
476
471
            SELECT hash, id, created_at, put_timestamp, delete_timestamp,
477
472
                %s_count AS count,
493
488
            curs.row_factory = dict_factory
494
489
            return curs.fetchone()
495
490
 
496
 
    def _commit_puts(self):
497
 
        pass    # stub to be overridden if need be
 
491
    def _commit_puts(self, item_list=None):
 
492
        """
 
493
        Scan for .pending files and commit the found records by feeding them
 
494
        to merge_items().
 
495
 
 
496
        :param item_list: A list of items to commit in addition to .pending
 
497
        """
 
498
        if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
 
499
            return
 
500
        if item_list is None:
 
501
            item_list = []
 
502
        with lock_parent_directory(self.pending_file, self.pending_timeout):
 
503
            self._preallocate()
 
504
            if not os.path.getsize(self.pending_file):
 
505
                if item_list:
 
506
                    self.merge_items(item_list)
 
507
                return
 
508
            with open(self.pending_file, 'r+b') as fp:
 
509
                for entry in fp.read().split(':'):
 
510
                    if entry:
 
511
                        try:
 
512
                            self._commit_puts_load(item_list, entry)
 
513
                        except Exception:
 
514
                            self.logger.exception(
 
515
                                _('Invalid pending entry %(file)s: %(entry)s'),
 
516
                                {'file': self.pending_file, 'entry': entry})
 
517
                if item_list:
 
518
                    self.merge_items(item_list)
 
519
                try:
 
520
                    os.ftruncate(fp.fileno(), 0)
 
521
                except OSError, err:
 
522
                    if err.errno != errno.ENOENT:
 
523
                        raise
 
524
 
 
525
    def _commit_puts_stale_ok(self):
 
526
        """
 
527
        Catch failures of _commit_puts() if broker is intended for
 
528
        reading of stats, and thus does not care for pending updates.
 
529
        """
 
530
        try:
 
531
            self._commit_puts()
 
532
        except LockTimeout:
 
533
            if not self.stale_reads_ok:
 
534
                raise
 
535
 
 
536
    def _commit_puts_load(self, item_list, entry):
 
537
        """
 
538
        Unmarshall the :param:entry and append it to :param:item_list.
 
539
        This is implemented by a particular broker to be compatible
 
540
        with its merge_items().
 
541
        """
 
542
        raise NotImplementedError
498
543
 
499
544
    def merge_syncs(self, sync_points, incoming=True):
500
545
        """
790
835
                status_changed_at = ?
791
836
            WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
792
837
 
 
838
    def _commit_puts_load(self, item_list, entry):
 
839
        (name, timestamp, size, content_type, etag, deleted) = \
 
840
            pickle.loads(entry.decode('base64'))
 
841
        item_list.append({'name': name,
 
842
                          'created_at': timestamp,
 
843
                          'size': size,
 
844
                          'content_type': content_type,
 
845
                          'etag': etag,
 
846
                          'deleted': deleted})
 
847
 
793
848
    def empty(self):
794
849
        """
795
850
        Check if the DB is empty.
796
851
 
797
852
        :returns: True if the database has no active objects, False otherwise
798
853
        """
799
 
        try:
800
 
            self._commit_puts()
801
 
        except LockTimeout:
802
 
            if not self.stale_reads_ok:
803
 
                raise
 
854
        self._commit_puts_stale_ok()
804
855
        with self.get() as conn:
805
856
            row = conn.execute(
806
857
                'SELECT object_count from container_stat').fetchone()
807
858
            return (row[0] == 0)
808
859
 
809
 
    def _commit_puts(self, item_list=None):
810
 
        """Handles committing rows in .pending files."""
811
 
        if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
812
 
            return
813
 
        if item_list is None:
814
 
            item_list = []
815
 
        with lock_parent_directory(self.pending_file, self.pending_timeout):
816
 
            self._preallocate()
817
 
            if not os.path.getsize(self.pending_file):
818
 
                if item_list:
819
 
                    self.merge_items(item_list)
820
 
                return
821
 
            with open(self.pending_file, 'r+b') as fp:
822
 
                for entry in fp.read().split(':'):
823
 
                    if entry:
824
 
                        try:
825
 
                            (name, timestamp, size, content_type, etag,
826
 
                                deleted) = pickle.loads(entry.decode('base64'))
827
 
                            item_list.append({'name': name,
828
 
                                              'created_at': timestamp,
829
 
                                              'size': size,
830
 
                                              'content_type': content_type,
831
 
                                              'etag': etag,
832
 
                                              'deleted': deleted})
833
 
                        except Exception:
834
 
                            self.logger.exception(
835
 
                                _('Invalid pending entry %(file)s: %(entry)s'),
836
 
                                {'file': self.pending_file, 'entry': entry})
837
 
                if item_list:
838
 
                    self.merge_items(item_list)
839
 
                try:
840
 
                    os.ftruncate(fp.fileno(), 0)
841
 
                except OSError, err:
842
 
                    if err.errno != errno.ENOENT:
843
 
                        raise
844
 
 
845
860
    def reclaim(self, object_timestamp, sync_timestamp):
846
861
        """
847
862
        Delete rows from the object table that are marked deleted and
913
928
        if pending_size > PENDING_CAP:
914
929
            self._commit_puts([record])
915
930
        else:
916
 
            with lock_parent_directory(
917
 
                    self.pending_file, self.pending_timeout):
 
931
            with lock_parent_directory(self.pending_file,
 
932
                                       self.pending_timeout):
918
933
                with open(self.pending_file, 'a+b') as fp:
919
934
                    # Colons aren't used in base64 encoding; so they are our
920
935
                    # delimiter
932
947
        """
933
948
        if self.db_file != ':memory:' and not os.path.exists(self.db_file):
934
949
            return True
935
 
        try:
936
 
            self._commit_puts()
937
 
        except LockTimeout:
938
 
            if not self.stale_reads_ok:
939
 
                raise
 
950
        self._commit_puts_stale_ok()
940
951
        with self.get() as conn:
941
952
            row = conn.execute('''
942
953
                SELECT put_timestamp, delete_timestamp, object_count
960
971
                  reported_object_count, reported_bytes_used, hash, id,
961
972
                  x_container_sync_point1, and x_container_sync_point2.
962
973
        """
963
 
        try:
964
 
            self._commit_puts()
965
 
        except LockTimeout:
966
 
            if not self.stale_reads_ok:
967
 
                raise
 
974
        self._commit_puts_stale_ok()
968
975
        with self.get() as conn:
969
976
            data = None
970
977
            trailing = 'x_container_sync_point1, x_container_sync_point2'
1073
1080
        delim_force_gte = False
1074
1081
        (marker, end_marker, prefix, delimiter, path) = utf8encode(
1075
1082
            marker, end_marker, prefix, delimiter, path)
1076
 
        try:
1077
 
            self._commit_puts()
1078
 
        except LockTimeout:
1079
 
            if not self.stale_reads_ok:
1080
 
                raise
 
1083
        self._commit_puts_stale_ok()
1081
1084
        if path is not None:
1082
1085
            prefix = path
1083
1086
            if path:
1340
1343
                status_changed_at = ?
1341
1344
            WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
1342
1345
 
1343
 
    def _commit_puts(self, item_list=None):
1344
 
        """Handles committing rows in .pending files."""
1345
 
        if self.db_file == ':memory:' or not os.path.exists(self.pending_file):
1346
 
            return
1347
 
        if item_list is None:
1348
 
            item_list = []
1349
 
        with lock_parent_directory(self.pending_file, self.pending_timeout):
1350
 
            self._preallocate()
1351
 
            if not os.path.getsize(self.pending_file):
1352
 
                if item_list:
1353
 
                    self.merge_items(item_list)
1354
 
                return
1355
 
            with open(self.pending_file, 'r+b') as fp:
1356
 
                for entry in fp.read().split(':'):
1357
 
                    if entry:
1358
 
                        try:
1359
 
                            (name, put_timestamp, delete_timestamp,
1360
 
                             object_count, bytes_used, deleted) = \
1361
 
                                pickle.loads(entry.decode('base64'))
1362
 
                            item_list.append(
1363
 
                                {'name': name,
1364
 
                                 'put_timestamp': put_timestamp,
1365
 
                                 'delete_timestamp': delete_timestamp,
1366
 
                                 'object_count': object_count,
1367
 
                                 'bytes_used': bytes_used,
1368
 
                                 'deleted': deleted})
1369
 
                        except Exception:
1370
 
                            self.logger.exception(
1371
 
                                _('Invalid pending entry %(file)s: %(entry)s'),
1372
 
                                {'file': self.pending_file, 'entry': entry})
1373
 
                if item_list:
1374
 
                    self.merge_items(item_list)
1375
 
                try:
1376
 
                    os.ftruncate(fp.fileno(), 0)
1377
 
                except OSError, err:
1378
 
                    if err.errno != errno.ENOENT:
1379
 
                        raise
 
1346
    def _commit_puts_load(self, item_list, entry):
 
1347
        (name, put_timestamp, delete_timestamp,
 
1348
         object_count, bytes_used, deleted) = \
 
1349
            pickle.loads(entry.decode('base64'))
 
1350
        item_list.append(
 
1351
            {'name': name,
 
1352
             'put_timestamp': put_timestamp,
 
1353
             'delete_timestamp': delete_timestamp,
 
1354
             'object_count': object_count,
 
1355
             'bytes_used': bytes_used,
 
1356
             'deleted': deleted})
1380
1357
 
1381
1358
    def empty(self):
1382
1359
        """
1384
1361
 
1385
1362
        :returns: True if the database has no active containers.
1386
1363
        """
1387
 
        try:
1388
 
            self._commit_puts()
1389
 
        except LockTimeout:
1390
 
            if not self.stale_reads_ok:
1391
 
                raise
 
1364
        self._commit_puts_stale_ok()
1392
1365
        with self.get() as conn:
1393
1366
            row = conn.execute(
1394
1367
                'SELECT container_count from account_stat').fetchone()
1475
1448
                        protocol=PICKLE_PROTOCOL).encode('base64'))
1476
1449
                    fp.flush()
1477
1450
 
1478
 
    def can_delete_db(self, cutoff):
1479
 
        """
1480
 
        Check if the accont DB can be deleted.
1481
 
 
1482
 
        :returns: True if the account can be deleted, False otherwise
1483
 
        """
1484
 
        self._commit_puts()
1485
 
        with self.get() as conn:
1486
 
            row = conn.execute('''
1487
 
                SELECT status, put_timestamp, delete_timestamp, container_count
1488
 
                FROM account_stat''').fetchone()
1489
 
            # The account is considered deleted if its status is marked
1490
 
            # as 'DELETED" and the delete_timestamp is older than the supplied
1491
 
            # cutoff date; or if the delete_timestamp value is greater than
1492
 
            # the put_timestamp, and there are no containers for the account
1493
 
            status_del = (row['status'] == 'DELETED')
1494
 
            deltime = float(row['delete_timestamp'])
1495
 
            past_cutoff = (deltime < cutoff)
1496
 
            time_later = (row['delete_timestamp'] > row['put_timestamp'])
1497
 
            no_containers = (row['container_count'] in (None, '', 0, '0'))
1498
 
            return (
1499
 
                (status_del and past_cutoff) or (time_later and no_containers))
1500
 
 
1501
1451
    def is_deleted(self):
1502
1452
        """
1503
1453
        Check if the account DB is considered to be deleted.
1507
1457
        """
1508
1458
        if self.db_file != ':memory:' and not os.path.exists(self.db_file):
1509
1459
            return True
1510
 
        try:
1511
 
            self._commit_puts()
1512
 
        except LockTimeout:
1513
 
            if not self.stale_reads_ok:
1514
 
                raise
 
1460
        self._commit_puts_stale_ok()
1515
1461
        with self.get() as conn:
1516
1462
            row = conn.execute('''
1517
1463
                SELECT put_timestamp, delete_timestamp, container_count, status
1536
1482
                  delete_timestamp, container_count, object_count,
1537
1483
                  bytes_used, hash, id
1538
1484
        """
1539
 
        try:
1540
 
            self._commit_puts()
1541
 
        except LockTimeout:
1542
 
            if not self.stale_reads_ok:
1543
 
                raise
 
1485
        self._commit_puts_stale_ok()
1544
1486
        with self.get() as conn:
1545
1487
            return dict(conn.execute('''
1546
1488
                SELECT account, created_at,  put_timestamp, delete_timestamp,
1565
1507
        """
1566
1508
        (marker, end_marker, prefix, delimiter) = utf8encode(
1567
1509
            marker, end_marker, prefix, delimiter)
1568
 
        try:
1569
 
            self._commit_puts()
1570
 
        except LockTimeout:
1571
 
            if not self.stale_reads_ok:
1572
 
                raise
 
1510
        self._commit_puts_stale_ok()
1573
1511
        if delimiter and not prefix:
1574
1512
            prefix = ''
1575
1513
        orig_marker = marker