~ubuntu-branches/ubuntu/trusty/swift/trusty-updates

« back to all changes in this revision

Viewing changes to swift/common/ring/ring.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Soren Hansen, Chuck Short
  • Date: 2012-09-07 19:02:36 UTC
  • mfrom: (1.2.12)
  • Revision ID: package-import@ubuntu.com-20120907190236-fqrmbzm7v6zivs8d
Tags: 1.7.0-0ubuntu1
[ Soren Hansen ]
* Update debian/watch to account for symbolically named tarballs and
  use newer URL.
* Run unit tests at build time.
* Fix Launchpad URLs in debian/watch.

[ Chuck Short ]
* New upstream release
* debian/control: Add pubthon-moc as a build dep
* debian/rules: Dont fail if testsuite fails.

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
# See the License for the specific language governing permissions and
14
14
# limitations under the License.
15
15
 
 
16
import array
16
17
import cPickle as pickle
17
18
from collections import defaultdict
18
19
from gzip import GzipFile
19
 
from os.path import getmtime
20
 
from struct import unpack_from
 
20
from os.path import getmtime, join as pathjoin
 
21
import struct
21
22
from time import time
22
23
import os
23
24
from io import BufferedReader
25
26
from swift.common.utils import hash_path, validate_configuration
26
27
from swift.common.ring.utils import tiers_for_dev
27
28
 
 
29
try:
 
30
    import simplejson as json
 
31
except ImportError:
 
32
    import json
 
33
 
28
34
 
29
35
class RingData(object):
30
36
    """Partitioned consistent hashing ring data (used for serialization)."""
34
40
        self._replica2part2dev_id = replica2part2dev_id
35
41
        self._part_shift = part_shift
36
42
 
 
43
    @classmethod
 
44
    def deserialize_v1(cls, gz_file):
 
45
        json_len, = struct.unpack('!I', gz_file.read(4))
 
46
        ring_dict = json.loads(gz_file.read(json_len))
 
47
        ring_dict['replica2part2dev_id'] = []
 
48
        partition_count = 1 << (32 - ring_dict['part_shift'])
 
49
        for x in xrange(ring_dict['replica_count']):
 
50
            ring_dict['replica2part2dev_id'].append(
 
51
                array.array('H', gz_file.read(2 * partition_count)))
 
52
        return ring_dict
 
53
 
 
54
    @classmethod
 
55
    def load(cls, filename):
 
56
        """
 
57
        Load ring data from a file.
 
58
 
 
59
        :param filename: Path to a file serialized by the save() method.
 
60
        :returns: A RingData instance containing the loaded data.
 
61
        """
 
62
        gz_file = GzipFile(filename, 'rb')
 
63
        # Python 2.6 GzipFile doesn't support BufferedIO
 
64
        if hasattr(gz_file, '_checkReadable'):
 
65
            gz_file = BufferedReader(gz_file)
 
66
 
 
67
        # See if the file is in the new format
 
68
        magic = gz_file.read(4)
 
69
        if magic == 'R1NG':
 
70
            version, = struct.unpack('!H', gz_file.read(2))
 
71
            if version == 1:
 
72
                ring_data = cls.deserialize_v1(gz_file)
 
73
            else:
 
74
                raise Exception('Unknown ring format version %d' % version)
 
75
        else:
 
76
            # Assume old-style pickled ring
 
77
            gz_file.seek(0)
 
78
            ring_data = pickle.load(gz_file)
 
79
        if not hasattr(ring_data, 'devs'):
 
80
            ring_data = RingData(ring_data['replica2part2dev_id'],
 
81
                                 ring_data['devs'], ring_data['part_shift'])
 
82
        return ring_data
 
83
 
 
84
    def serialize_v1(self, file_obj):
 
85
        # Write out new-style serialization magic and version:
 
86
        file_obj.write(struct.pack('!4sH', 'R1NG', 1))
 
87
        ring = self.to_dict()
 
88
        json_text = json.dumps(
 
89
            {'devs': ring['devs'], 'part_shift': ring['part_shift'],
 
90
             'replica_count': len(ring['replica2part2dev_id'])})
 
91
        json_len = len(json_text)
 
92
        file_obj.write(struct.pack('!I', json_len))
 
93
        file_obj.write(json_text)
 
94
        for part2dev_id in ring['replica2part2dev_id']:
 
95
            file_obj.write(part2dev_id.tostring())
 
96
 
 
97
    def save(self, filename):
 
98
        """
 
99
        Serialize this RingData instance to disk.
 
100
 
 
101
        :param filename: File into which this instance should be serialized.
 
102
        """
 
103
        gz_file = GzipFile(filename, 'wb')
 
104
        self.serialize_v1(gz_file)
 
105
        gz_file.close()
 
106
 
37
107
    def to_dict(self):
38
108
        return {'devs': self.devs,
39
109
                'replica2part2dev_id': self._replica2part2dev_id,
44
114
    """
45
115
    Partitioned consistent hashing ring.
46
116
 
47
 
    :param pickle_gz_path: path to ring file
 
117
    :param serialized_path: path to serialized RingData instance
48
118
    :param reload_time: time interval in seconds to check for a ring change
49
119
    """
50
120
 
51
 
    def __init__(self, pickle_gz_path, reload_time=15, ring_name=None):
 
121
    def __init__(self, serialized_path, reload_time=15, ring_name=None):
52
122
        # can't use the ring unless HASH_PATH_SUFFIX is set
53
123
        validate_configuration()
54
124
        if ring_name:
55
 
            self.pickle_gz_path = os.path.join(pickle_gz_path,
56
 
                    ring_name + '.ring.gz')
 
125
            self.serialized_path = os.path.join(serialized_path,
 
126
                                                ring_name + '.ring.gz')
57
127
        else:
58
 
            self.pickle_gz_path = os.path.join(pickle_gz_path)
 
128
            self.serialized_path = os.path.join(serialized_path)
59
129
        self.reload_time = reload_time
60
130
        self._reload(force=True)
61
131
 
62
132
    def _reload(self, force=False):
63
133
        self._rtime = time() + self.reload_time
64
134
        if force or self.has_changed():
65
 
            ring_data = pickle.load(self._get_gz_file())
66
 
            if not hasattr(ring_data, 'devs'):
67
 
                ring_data = RingData(ring_data['replica2part2dev_id'],
68
 
                    ring_data['devs'], ring_data['part_shift'])
69
 
            self._mtime = getmtime(self.pickle_gz_path)
 
135
            ring_data = RingData.load(self.serialized_path)
 
136
            self._mtime = getmtime(self.serialized_path)
70
137
            self._devs = ring_data.devs
71
138
 
72
139
            self._replica2part2dev_id = ring_data._replica2part2dev_id
73
140
            self._part_shift = ring_data._part_shift
74
141
            self._rebuild_tier_data()
75
142
 
76
 
    def _get_gz_file(self):
77
 
        gz_file = GzipFile(self.pickle_gz_path, 'rb')
78
 
        if hasattr(gz_file, '_checkReadable'):
79
 
            return BufferedReader(gz_file)
80
 
        else:
81
 
            # Python 2.6 doesn't support BufferedIO
82
 
            return gz_file
83
 
 
84
143
    def _rebuild_tier_data(self):
85
144
        self.tier2devs = defaultdict(list)
86
145
        for dev in self._devs:
121
180
 
122
181
        :returns: True if the ring on disk has changed, False otherwise
123
182
        """
124
 
        return getmtime(self.pickle_gz_path) != self._mtime
 
183
        return getmtime(self.serialized_path) != self._mtime
125
184
 
126
185
    def get_part_nodes(self, part):
127
186
        """
172
231
        key = hash_path(account, container, obj, raw_digest=True)
173
232
        if time() > self._rtime:
174
233
            self._reload()
175
 
        part = unpack_from('>I', key)[0] >> self._part_shift
 
234
        part = struct.unpack_from('>I', key)[0] >> self._part_shift
176
235
        seen_ids = set()
177
236
        return part, [self._devs[r[part]] for r in self._replica2part2dev_id
178
 
                if not (r[part] in seen_ids or seen_ids.add(r[part]))]
 
237
                      if not (r[part] in seen_ids or seen_ids.add(r[part]))]
179
238
 
180
239
    def get_more_nodes(self, part):
181
240
        """