~barry/mailman/lp1423756

« back to all changes in this revision

Viewing changes to src/mailman/core/switchboard.py

  • Committer: Barry Warsaw
  • Date: 2015-01-05 01:20:33 UTC
  • mfrom: (7264.4.66 py3)
  • Revision ID: barry@list.org-20150105012033-zdrw9c2odhpf22fz
Merge the Python 3 branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
dictionary is written.
25
25
"""
26
26
 
27
 
from __future__ import absolute_import, print_function, unicode_literals
28
 
 
29
 
__metaclass__ = type
30
27
__all__ = [
31
28
    'Switchboard',
32
29
    'handle_ConfigurationUpdatedEvent',
37
34
import time
38
35
import email
39
36
import pickle
40
 
import cPickle
41
37
import hashlib
42
38
import logging
43
39
 
44
 
from zope.interface import implementer
45
 
 
46
40
from mailman.config import config
47
41
from mailman.email.message import Message
48
42
from mailman.interfaces.configuration import ConfigurationUpdatedEvent
49
43
from mailman.interfaces.switchboard import ISwitchboard
50
44
from mailman.utilities.filesystem import makedirs
51
45
from mailman.utilities.string import expand
52
 
 
53
 
 
54
 
# 20 bytes of all bits set, maximum hashlib.sha.digest() value.
55
 
shamax = 0xffffffffffffffffffffffffffffffffffffffffL
 
46
from six.moves import cPickle
 
47
from zope.interface import implementer
 
48
 
 
49
 
 
50
# 20 bytes of all bits set, maximum hashlib.sha.digest() value.  We do it this
 
51
# way for Python 2/3 compatibility.
 
52
shamax = int('0xffffffffffffffffffffffffffffffffffffffff', 16)
56
53
# Small increment to add to time in case two entries have the same time.  This
57
54
# prevents skipping one of two entries with the same time until the next pass.
58
55
DELTA = .0001
92
89
        self.queue_directory = queue_directory
93
90
        # If configured to, create the directory if it doesn't yet exist.
94
91
        if config.create_paths:
95
 
            makedirs(self.queue_directory, 0770)
 
92
            makedirs(self.queue_directory, 0o770)
96
93
        # Fast track for no slices
97
94
        self._lower = None
98
95
        self._upper = None
112
109
        # of parallel runner processes.
113
110
        data = _metadata.copy()
114
111
        data.update(_kws)
115
 
        listname = data.get('listname', '--nolist--')
 
112
        list_id = data.get('listid', '--nolist--')
116
113
        # Get some data for the input to the sha hash.
117
 
        now = time.time()
 
114
        now = repr(time.time())
118
115
        if data.get('_plaintext'):
119
116
            protocol = 0
120
117
            msgsave = cPickle.dumps(str(_msg), protocol)
121
118
        else:
122
119
            protocol = pickle.HIGHEST_PROTOCOL
123
120
            msgsave = cPickle.dumps(_msg, protocol)
124
 
        # listname is unicode but the input to the hash function must be an
125
 
        # 8-bit string (eventually, a bytes object).
126
 
        hashfood = msgsave + listname.encode('utf-8') + repr(now)
 
121
        # The list-id field is a string but the input to the hash function must
 
122
        # be bytes.
 
123
        hashfood = msgsave + list_id.encode('utf-8') + now.encode('utf-8')
127
124
        # Encode the current time into the file name for FIFO sorting.  The
128
125
        # file name consists of two parts separated by a '+': the received
129
126
        # time for this message (i.e. when it first showed up on this system)
130
127
        # and the sha hex digest.
131
 
        filebase = repr(now) + '+' + hashlib.sha1(hashfood).hexdigest()
 
128
        filebase = now + '+' + hashlib.sha1(hashfood).hexdigest()
132
129
        filename = os.path.join(self.queue_directory, filebase + '.pck')
133
130
        tmpfile = filename + '.tmp'
134
131
        # Always add the metadata schema version number
135
132
        data['version'] = config.QFILE_SCHEMA_VERSION
136
133
        # Filter out volatile entries.  Use .keys() so that we can mutate the
137
134
        # dictionary during the iteration.
138
 
        for k in data.keys():
 
135
        for k in list(data):
139
136
            if k.startswith('_'):
140
137
                del data[k]
141
138
        # We have to tell the dequeue() method whether to parse the message
142
139
        # object or not.
143
140
        data['_parsemsg'] = (protocol == 0)
144
141
        # Write to the pickle file the message object and metadata.
145
 
        with open(tmpfile, 'w') as fp:
 
142
        with open(tmpfile, 'wb') as fp:
146
143
            fp.write(msgsave)
147
144
            cPickle.dump(data, fp, protocol)
148
145
            fp.flush()
156
153
        filename = os.path.join(self.queue_directory, filebase + '.pck')
157
154
        backfile = os.path.join(self.queue_directory, filebase + '.bak')
158
155
        # Read the message object and metadata.
159
 
        with open(filename) as fp:
 
156
        with open(filename, 'rb') as fp:
160
157
            # Move the file to the backup file name for processing.  If this
161
158
            # process crashes uncleanly the .bak file will be used to
162
159
            # re-instate the .pck file in order to try again.
207
204
            # Throw out any files which don't match our bitrange.  BAW: test
208
205
            # performance and end-cases of this algorithm.  MAS: both
209
206
            # comparisons need to be <= to get complete range.
210
 
            if lower is None or (lower <= long(digest, 16) <= upper):
 
207
            if lower is None or (lower <= int(digest, 16) <= upper):
211
208
                key = float(when)
212
209
                while key in times:
213
210
                    key += DELTA
214
211
                times[key] = filebase
215
212
        # FIFO sort
216
 
        return [times[key] for key in sorted(times)]
 
213
        return [times[k] for k in sorted(times)]
217
214
 
218
215
    def recover_backup_files(self):
219
216
        """See `ISwitchboard`."""
228
225
            dst = os.path.join(self.queue_directory, filebase + '.pck')
229
226
            with open(src, 'rb+') as fp:
230
227
                try:
231
 
                    msg = cPickle.load(fp)
 
228
                    # Throw away the message object.
 
229
                    cPickle.load(fp)
232
230
                    data_pos = fp.tell()
233
231
                    data = cPickle.load(fp)
234
232
                except Exception as error: