44
from zope.interface import implementer
46
40
from mailman.config import config
47
41
from mailman.email.message import Message
48
42
from mailman.interfaces.configuration import ConfigurationUpdatedEvent
49
43
from mailman.interfaces.switchboard import ISwitchboard
50
44
from mailman.utilities.filesystem import makedirs
51
45
from mailman.utilities.string import expand
54
# 20 bytes of all bits set, maximum hashlib.sha.digest() value.
55
shamax = 0xffffffffffffffffffffffffffffffffffffffffL
46
from six.moves import cPickle
47
from zope.interface import implementer
50
# 20 bytes of all bits set, maximum hashlib.sha.digest() value. We do it this
51
# way for Python 2/3 compatibility.
52
shamax = int('0xffffffffffffffffffffffffffffffffffffffff', 16)
56
53
# Small increment to add to time in case two entries have the same time. This
57
54
# prevents skipping one of two entries with the same time until the next pass.
92
89
self.queue_directory = queue_directory
93
90
# If configured to, create the directory if it doesn't yet exist.
94
91
if config.create_paths:
95
makedirs(self.queue_directory, 0770)
92
makedirs(self.queue_directory, 0o770)
96
93
# Fast track for no slices
112
109
# of parallel runner processes.
113
110
data = _metadata.copy()
114
111
data.update(_kws)
115
listname = data.get('listname', '--nolist--')
112
list_id = data.get('listid', '--nolist--')
116
113
# Get some data for the input to the sha hash.
114
now = repr(time.time())
118
115
if data.get('_plaintext'):
120
117
msgsave = cPickle.dumps(str(_msg), protocol)
122
119
protocol = pickle.HIGHEST_PROTOCOL
123
120
msgsave = cPickle.dumps(_msg, protocol)
124
# listname is unicode but the input to the hash function must be an
125
# 8-bit string (eventually, a bytes object).
126
hashfood = msgsave + listname.encode('utf-8') + repr(now)
121
# The list-id field is a string but the input to the hash function must
123
hashfood = msgsave + list_id.encode('utf-8') + now.encode('utf-8')
127
124
# Encode the current time into the file name for FIFO sorting. The
128
125
# file name consists of two parts separated by a '+': the received
129
126
# time for this message (i.e. when it first showed up on this system)
130
127
# and the sha hex digest.
131
filebase = repr(now) + '+' + hashlib.sha1(hashfood).hexdigest()
128
filebase = now + '+' + hashlib.sha1(hashfood).hexdigest()
132
129
filename = os.path.join(self.queue_directory, filebase + '.pck')
133
130
tmpfile = filename + '.tmp'
134
131
# Always add the metadata schema version number
135
132
data['version'] = config.QFILE_SCHEMA_VERSION
136
133
# Filter out volatile entries. Use .keys() so that we can mutate the
137
134
# dictionary during the iteration.
138
for k in data.keys():
139
136
if k.startswith('_'):
141
138
# We have to tell the dequeue() method whether to parse the message
143
140
data['_parsemsg'] = (protocol == 0)
144
141
# Write to the pickle file the message object and metadata.
145
with open(tmpfile, 'w') as fp:
142
with open(tmpfile, 'wb') as fp:
146
143
fp.write(msgsave)
147
144
cPickle.dump(data, fp, protocol)
156
153
filename = os.path.join(self.queue_directory, filebase + '.pck')
157
154
backfile = os.path.join(self.queue_directory, filebase + '.bak')
158
155
# Read the message object and metadata.
159
with open(filename) as fp:
156
with open(filename, 'rb') as fp:
160
157
# Move the file to the backup file name for processing. If this
161
158
# process crashes uncleanly the .bak file will be used to
162
159
# re-instate the .pck file in order to try again.
207
204
# Throw out any files which don't match our bitrange. BAW: test
208
205
# performance and end-cases of this algorithm. MAS: both
209
206
# comparisons need to be <= to get complete range.
210
if lower is None or (lower <= long(digest, 16) <= upper):
207
if lower is None or (lower <= int(digest, 16) <= upper):
211
208
key = float(when)
212
209
while key in times:
214
211
times[key] = filebase
216
return [times[key] for key in sorted(times)]
213
return [times[k] for k in sorted(times)]
218
215
def recover_backup_files(self):
219
216
"""See `ISwitchboard`."""
228
225
dst = os.path.join(self.queue_directory, filebase + '.pck')
229
226
with open(src, 'rb+') as fp:
231
msg = cPickle.load(fp)
228
# Throw away the message object.
232
230
data_pos = fp.tell()
233
231
data = cPickle.load(fp)
234
232
except Exception as error: