~vil/pydev/upstream

« back to all changes in this revision

Viewing changes to org.python.pydev.jython/Lib/mailbox.py

  • Committer: Vladimír Lapáček
  • Date: 2006-08-30 18:38:44 UTC
  • Revision ID: vladimir.lapacek@gmail.com-20060830183844-f4d82c1239a7770a
Initial import of upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#! /usr/bin/env python
 
2
 
 
3
"""Classes to handle Unix style, MMDF style, and MH style mailboxes."""
 
4
 
 
5
 
 
6
import rfc822
 
7
import os
 
8
 
 
9
__all__ = ["UnixMailbox","MmdfMailbox","MHMailbox","Maildir","BabylMailbox"]
 
10
 
 
11
class _Mailbox:
 
12
    def __init__(self, fp, factory=rfc822.Message):
 
13
        self.fp = fp
 
14
        self.seekp = 0
 
15
        self.factory = factory
 
16
 
 
17
    def next(self):
 
18
        while 1:
 
19
            self.fp.seek(self.seekp)
 
20
            try:
 
21
                self._search_start()
 
22
            except EOFError:
 
23
                self.seekp = self.fp.tell()
 
24
                return None
 
25
            start = self.fp.tell()
 
26
            self._search_end()
 
27
            self.seekp = stop = self.fp.tell()
 
28
            if start != stop:
 
29
                break
 
30
        return self.factory(_Subfile(self.fp, start, stop))
 
31
 
 
32
 
 
33
class _Subfile:
 
34
    def __init__(self, fp, start, stop):
 
35
        self.fp = fp
 
36
        self.start = start
 
37
        self.stop = stop
 
38
        self.pos = self.start
 
39
 
 
40
    def read(self, length = None):
 
41
        if self.pos >= self.stop:
 
42
            return ''
 
43
        remaining = self.stop - self.pos
 
44
        if length is None or length < 0:
 
45
            length = remaining
 
46
        elif length > remaining:
 
47
            length = remaining
 
48
        self.fp.seek(self.pos)
 
49
        data = self.fp.read(length)
 
50
        self.pos = self.fp.tell()
 
51
        return data
 
52
 
 
53
    def readline(self, length = None):
 
54
        if self.pos >= self.stop:
 
55
            return ''
 
56
        if length is None:
 
57
            length = self.stop - self.pos
 
58
        self.fp.seek(self.pos)
 
59
        data = self.fp.readline(length)
 
60
        self.pos = self.fp.tell()
 
61
        return data
 
62
 
 
63
    def readlines(self, sizehint = -1):
 
64
        lines = []
 
65
        while 1:
 
66
            line = self.readline()
 
67
            if not line:
 
68
                break
 
69
            lines.append(line)
 
70
            if sizehint >= 0:
 
71
                sizehint = sizehint - len(line)
 
72
                if sizehint <= 0:
 
73
                    break
 
74
        return lines
 
75
 
 
76
    def tell(self):
 
77
        return self.pos - self.start
 
78
 
 
79
    def seek(self, pos, whence=0):
 
80
        if whence == 0:
 
81
            self.pos = self.start + pos
 
82
        elif whence == 1:
 
83
            self.pos = self.pos + pos
 
84
        elif whence == 2:
 
85
            self.pos = self.stop + pos
 
86
 
 
87
    def close(self):
 
88
        del self.fp
 
89
 
 
90
 
 
91
class UnixMailbox(_Mailbox):
 
92
    def _search_start(self):
 
93
        while 1:
 
94
            pos = self.fp.tell()
 
95
            line = self.fp.readline()
 
96
            if not line:
 
97
                raise EOFError
 
98
            if line[:5] == 'From ' and self._isrealfromline(line):
 
99
                self.fp.seek(pos)
 
100
                return
 
101
 
 
102
    def _search_end(self):
 
103
        self.fp.readline()      # Throw away header line
 
104
        while 1:
 
105
            pos = self.fp.tell()
 
106
            line = self.fp.readline()
 
107
            if not line:
 
108
                return
 
109
            if line[:5] == 'From ' and self._isrealfromline(line):
 
110
                self.fp.seek(pos)
 
111
                return
 
112
 
 
113
    # An overridable mechanism to test for From-line-ness.  You can either
 
114
    # specify a different regular expression or define a whole new
 
115
    # _isrealfromline() method.  Note that this only gets called for lines
 
116
    # starting with the 5 characters "From ".
 
117
    #
 
118
    # BAW: According to
 
119
    #http://home.netscape.com/eng/mozilla/2.0/relnotes/demo/content-length.html
 
120
    # the only portable, reliable way to find message delimiters in a BSD (i.e
 
121
    # Unix mailbox) style folder is to search for "\n\nFrom .*\n", or at the
 
122
    # beginning of the file, "^From .*\n".  While _fromlinepattern below seems
 
123
    # like a good idea, in practice, there are too many variations for more
 
124
    # strict parsing of the line to be completely accurate.
 
125
    #
 
126
    # _strict_isrealfromline() is the old version which tries to do stricter
 
127
    # parsing of the From_ line.  _portable_isrealfromline() simply returns
 
128
    # true, since it's never called if the line doesn't already start with
 
129
    # "From ".
 
130
    #
 
131
    # This algorithm, and the way it interacts with _search_start() and
 
132
    # _search_end() may not be completely correct, because it doesn't check
 
133
    # that the two characters preceding "From " are \n\n or the beginning of
 
134
    # the file.  Fixing this would require a more extensive rewrite than is
 
135
    # necessary.  For convenience, we've added a StrictUnixMailbox class which
 
136
    # uses the older, more strict _fromlinepattern regular expression.
 
137
 
 
138
    _fromlinepattern = r"From \s*[^\s]+\s+\w\w\w\s+\w\w\w\s+\d?\d\s+" \
 
139
                       r"\d?\d:\d\d(:\d\d)?(\s+[^\s]+)?\s+\d\d\d\d\s*$"
 
140
    _regexp = None
 
141
 
 
142
    def _strict_isrealfromline(self, line):
 
143
        if not self._regexp:
 
144
            import re
 
145
            self._regexp = re.compile(self._fromlinepattern)
 
146
        return self._regexp.match(line)
 
147
 
 
148
    def _portable_isrealfromline(self, line):
 
149
        return 1
 
150
 
 
151
    _isrealfromline = _strict_isrealfromline
 
152
 
 
153
 
 
154
class PortableUnixMailbox(UnixMailbox):
 
155
    _isrealfromline = UnixMailbox._portable_isrealfromline
 
156
 
 
157
 
 
158
class MmdfMailbox(_Mailbox):
 
159
    def _search_start(self):
 
160
        while 1:
 
161
            line = self.fp.readline()
 
162
            if not line:
 
163
                raise EOFError
 
164
            if line[:5] == '\001\001\001\001\n':
 
165
                return
 
166
 
 
167
    def _search_end(self):
 
168
        while 1:
 
169
            pos = self.fp.tell()
 
170
            line = self.fp.readline()
 
171
            if not line:
 
172
                return
 
173
            if line == '\001\001\001\001\n':
 
174
                self.fp.seek(pos)
 
175
                return
 
176
 
 
177
 
 
178
class MHMailbox:
 
179
    def __init__(self, dirname, factory=rfc822.Message):
 
180
        import re
 
181
        pat = re.compile('^[1-9][0-9]*$')
 
182
        self.dirname = dirname
 
183
        # the three following lines could be combined into:
 
184
        # list = map(long, filter(pat.match, os.listdir(self.dirname)))
 
185
        list = os.listdir(self.dirname)
 
186
        list = filter(pat.match, list)
 
187
        list = map(long, list)
 
188
        list.sort()
 
189
        # This only works in Python 1.6 or later;
 
190
        # before that str() added 'L':
 
191
        self.boxes = map(str, list)
 
192
        self.factory = factory
 
193
 
 
194
    def next(self):
 
195
        if not self.boxes:
 
196
            return None
 
197
        fn = self.boxes[0]
 
198
        del self.boxes[0]
 
199
        fp = open(os.path.join(self.dirname, fn))
 
200
        return self.factory(fp)
 
201
 
 
202
 
 
203
class Maildir:
 
204
    # Qmail directory mailbox
 
205
 
 
206
    def __init__(self, dirname, factory=rfc822.Message):
 
207
        self.dirname = dirname
 
208
        self.factory = factory
 
209
 
 
210
        # check for new mail
 
211
        newdir = os.path.join(self.dirname, 'new')
 
212
        boxes = [os.path.join(newdir, f)
 
213
                 for f in os.listdir(newdir) if f[0] != '.']
 
214
 
 
215
        # Now check for current mail in this maildir
 
216
        curdir = os.path.join(self.dirname, 'cur')
 
217
        boxes += [os.path.join(curdir, f)
 
218
                  for f in os.listdir(curdir) if f[0] != '.']
 
219
 
 
220
        self.boxes = boxes
 
221
 
 
222
    def next(self):
 
223
        if not self.boxes:
 
224
            return None
 
225
        fn = self.boxes[0]
 
226
        del self.boxes[0]
 
227
        fp = open(fn)
 
228
        return self.factory(fp)
 
229
 
 
230
 
 
231
class BabylMailbox(_Mailbox):
 
232
    def _search_start(self):
 
233
        while 1:
 
234
            line = self.fp.readline()
 
235
            if not line:
 
236
                raise EOFError
 
237
            if line == '*** EOOH ***\n':
 
238
                return
 
239
 
 
240
    def _search_end(self):
 
241
        while 1:
 
242
            pos = self.fp.tell()
 
243
            line = self.fp.readline()
 
244
            if not line:
 
245
                return
 
246
            if line == '\037\014\n':
 
247
                self.fp.seek(pos)
 
248
                return
 
249
 
 
250
 
 
251
def _test():
 
252
    import time
 
253
    import sys
 
254
    import os
 
255
 
 
256
    args = sys.argv[1:]
 
257
    if not args:
 
258
        for key in 'MAILDIR', 'MAIL', 'LOGNAME', 'USER':
 
259
            if os.environ.has_key(key):
 
260
                mbox = os.environ[key]
 
261
                break
 
262
        else:
 
263
            print "$MAIL, $LOGNAME nor $USER set -- who are you?"
 
264
            return
 
265
    else:
 
266
        mbox = args[0]
 
267
    if mbox[:1] == '+':
 
268
        mbox = os.environ['HOME'] + '/Mail/' + mbox[1:]
 
269
    elif not '/' in mbox:
 
270
        mbox = '/usr/mail/' + mbox
 
271
    if os.path.isdir(mbox):
 
272
        if os.path.isdir(os.path.join(mbox, 'cur')):
 
273
            mb = Maildir(mbox)
 
274
        else:
 
275
            mb = MHMailbox(mbox)
 
276
    else:
 
277
        fp = open(mbox, 'r')
 
278
        mb = UnixMailbox(fp)
 
279
 
 
280
    msgs = []
 
281
    while 1:
 
282
        msg = mb.next()
 
283
        if msg is None:
 
284
            break
 
285
        msgs.append(msg)
 
286
        if len(args) <= 1:
 
287
            msg.fp = None
 
288
    if len(args) > 1:
 
289
        num = int(args[1])
 
290
        print 'Message %d body:'%num
 
291
        msg = msgs[num-1]
 
292
        msg.rewindbody()
 
293
        sys.stdout.write(msg.fp.read())
 
294
    else:
 
295
        print 'Mailbox',mbox,'has',len(msgs),'messages:'
 
296
        for msg in msgs:
 
297
            f = msg.getheader('from') or ""
 
298
            s = msg.getheader('subject') or ""
 
299
            d = msg.getheader('date') or ""
 
300
            print '-%20.20s   %20.20s   %-30.30s'%(f, d[5:], s)
 
301
 
 
302
 
 
303
if __name__ == '__main__':
 
304
    _test()