13
13
# See the License for the specific language governing permissions and
14
14
# limitations under the License.
16
17
import cPickle as pickle
17
18
from collections import defaultdict
18
19
from gzip import GzipFile
19
from os.path import getmtime
20
from struct import unpack_from
20
from os.path import getmtime, join as pathjoin
21
22
from time import time
23
24
from io import BufferedReader
34
40
self._replica2part2dev_id = replica2part2dev_id
35
41
self._part_shift = part_shift
44
def deserialize_v1(cls, gz_file):
45
json_len, = struct.unpack('!I', gz_file.read(4))
46
ring_dict = json.loads(gz_file.read(json_len))
47
ring_dict['replica2part2dev_id'] = []
48
partition_count = 1 << (32 - ring_dict['part_shift'])
49
for x in xrange(ring_dict['replica_count']):
50
ring_dict['replica2part2dev_id'].append(
51
array.array('H', gz_file.read(2 * partition_count)))
55
def load(cls, filename):
57
Load ring data from a file.
59
:param filename: Path to a file serialized by the save() method.
60
:returns: A RingData instance containing the loaded data.
62
gz_file = GzipFile(filename, 'rb')
63
# Python 2.6 GzipFile doesn't support BufferedIO
64
if hasattr(gz_file, '_checkReadable'):
65
gz_file = BufferedReader(gz_file)
67
# See if the file is in the new format
68
magic = gz_file.read(4)
70
version, = struct.unpack('!H', gz_file.read(2))
72
ring_data = cls.deserialize_v1(gz_file)
74
raise Exception('Unknown ring format version %d' % version)
76
# Assume old-style pickled ring
78
ring_data = pickle.load(gz_file)
79
if not hasattr(ring_data, 'devs'):
80
ring_data = RingData(ring_data['replica2part2dev_id'],
81
ring_data['devs'], ring_data['part_shift'])
84
def serialize_v1(self, file_obj):
85
# Write out new-style serialization magic and version:
86
file_obj.write(struct.pack('!4sH', 'R1NG', 1))
88
json_text = json.dumps(
89
{'devs': ring['devs'], 'part_shift': ring['part_shift'],
90
'replica_count': len(ring['replica2part2dev_id'])})
91
json_len = len(json_text)
92
file_obj.write(struct.pack('!I', json_len))
93
file_obj.write(json_text)
94
for part2dev_id in ring['replica2part2dev_id']:
95
file_obj.write(part2dev_id.tostring())
97
def save(self, filename):
99
Serialize this RingData instance to disk.
101
:param filename: File into which this instance should be serialized.
103
gz_file = GzipFile(filename, 'wb')
104
self.serialize_v1(gz_file)
37
107
def to_dict(self):
38
108
return {'devs': self.devs,
39
109
'replica2part2dev_id': self._replica2part2dev_id,
45
115
Partitioned consistent hashing ring.
47
:param pickle_gz_path: path to ring file
117
:param serialized_path: path to serialized RingData instance
48
118
:param reload_time: time interval in seconds to check for a ring change
51
def __init__(self, pickle_gz_path, reload_time=15, ring_name=None):
121
def __init__(self, serialized_path, reload_time=15, ring_name=None):
52
122
# can't use the ring unless HASH_PATH_SUFFIX is set
53
123
validate_configuration()
55
self.pickle_gz_path = os.path.join(pickle_gz_path,
56
ring_name + '.ring.gz')
125
self.serialized_path = os.path.join(serialized_path,
126
ring_name + '.ring.gz')
58
self.pickle_gz_path = os.path.join(pickle_gz_path)
128
self.serialized_path = os.path.join(serialized_path)
59
129
self.reload_time = reload_time
60
130
self._reload(force=True)
62
132
def _reload(self, force=False):
63
133
self._rtime = time() + self.reload_time
64
134
if force or self.has_changed():
65
ring_data = pickle.load(self._get_gz_file())
66
if not hasattr(ring_data, 'devs'):
67
ring_data = RingData(ring_data['replica2part2dev_id'],
68
ring_data['devs'], ring_data['part_shift'])
69
self._mtime = getmtime(self.pickle_gz_path)
135
ring_data = RingData.load(self.serialized_path)
136
self._mtime = getmtime(self.serialized_path)
70
137
self._devs = ring_data.devs
72
139
self._replica2part2dev_id = ring_data._replica2part2dev_id
73
140
self._part_shift = ring_data._part_shift
74
141
self._rebuild_tier_data()
76
def _get_gz_file(self):
77
gz_file = GzipFile(self.pickle_gz_path, 'rb')
78
if hasattr(gz_file, '_checkReadable'):
79
return BufferedReader(gz_file)
81
# Python 2.6 doesn't support BufferedIO
84
143
def _rebuild_tier_data(self):
85
144
self.tier2devs = defaultdict(list)
86
145
for dev in self._devs:
172
231
key = hash_path(account, container, obj, raw_digest=True)
173
232
if time() > self._rtime:
175
part = unpack_from('>I', key)[0] >> self._part_shift
234
part = struct.unpack_from('>I', key)[0] >> self._part_shift
177
236
return part, [self._devs[r[part]] for r in self._replica2part2dev_id
178
if not (r[part] in seen_ids or seen_ids.add(r[part]))]
237
if not (r[part] in seen_ids or seen_ids.add(r[part]))]
180
239
def get_more_nodes(self, part):