~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/news/database.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.news.test.test_news -*-
 
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
 
 
6
"""
 
7
News server backend implementations
 
8
 
 
9
Maintainer: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>}
 
10
 
 
11
Stability: semi-stable
 
12
 
 
13
Future Plans: A PyFramer-based backend and a new backend interface that is
 
14
less NNTP specific
 
15
"""
 
16
 
 
17
 
 
18
from __future__ import nested_scopes
 
19
 
 
20
from twisted.news.nntp import NNTPError
 
21
from twisted.mail import smtp
 
22
from twisted.internet import defer
 
23
from twisted.enterprise import adbapi
 
24
from twisted.persisted import dirdbm
 
25
 
 
26
import getpass, pickle, time, socket, md5
 
27
import os
 
28
import StringIO
 
29
from zope.interface import implements, Interface
 
30
 
 
31
 
 
32
ERR_NOGROUP, ERR_NOARTICLE = range(2, 4)  # XXX - put NNTP values here (I guess?)
 
33
 
 
34
OVERVIEW_FMT = [
 
35
    'Subject', 'From', 'Date', 'Message-ID', 'References',
 
36
    'Bytes', 'Lines', 'Xref'
 
37
]
 
38
 
 
39
def hexdigest(md5): #XXX: argh. 1.5.2 doesn't have this.
 
40
    return ''.join(map(lambda x: hex(ord(x))[2:], md5.digest()))
 
41
 
 
42
class Article:
 
43
    def __init__(self, head, body):
 
44
        self.body = body
 
45
        self.headers = {}
 
46
        header = None
 
47
        for line in head.split('\r\n'):
 
48
            if line[0] in ' \t':
 
49
                i = list(self.headers[header])
 
50
                i[1] += '\r\n' + line
 
51
            else:
 
52
                i = line.split(': ', 1)
 
53
                header = i[0].lower()
 
54
            self.headers[header] = tuple(i)
 
55
 
 
56
        if not self.getHeader('Message-ID'):
 
57
            s = str(time.time()) + self.body
 
58
            id = hexdigest(md5.md5(s)) + '@' + socket.gethostname()
 
59
            self.putHeader('Message-ID', '<%s>' % id)
 
60
 
 
61
        if not self.getHeader('Bytes'):
 
62
            self.putHeader('Bytes', str(len(self.body)))
 
63
        
 
64
        if not self.getHeader('Lines'):
 
65
            self.putHeader('Lines', str(self.body.count('\n')))
 
66
        
 
67
        if not self.getHeader('Date'):
 
68
            self.putHeader('Date', time.ctime(time.time()))
 
69
 
 
70
 
 
71
    def getHeader(self, header):
 
72
        h = header.lower()
 
73
        if self.headers.has_key(h):
 
74
            return self.headers[h][1]
 
75
        else:
 
76
            return ''
 
77
 
 
78
 
 
79
    def putHeader(self, header, value):
 
80
        self.headers[header.lower()] = (header, value)
 
81
 
 
82
 
 
83
    def textHeaders(self):
 
84
        headers = []
 
85
        for i in self.headers.values():
 
86
            headers.append('%s: %s' % i)
 
87
        return '\r\n'.join(headers) + '\r\n'
 
88
    
 
89
    def overview(self):
 
90
        xover = []
 
91
        for i in OVERVIEW_FMT:
 
92
            xover.append(self.getHeader(i))
 
93
        return xover
 
94
 
 
95
 
 
96
class NewsServerError(Exception):
 
97
    pass
 
98
 
 
99
    
 
100
class INewsStorage(Interface):
 
101
    """
 
102
    An interface for storing and requesting news articles
 
103
    """
 
104
    
 
105
    def listRequest():
 
106
        """
 
107
        Returns a deferred whose callback will be passed a list of 4-tuples
 
108
        containing (name, max index, min index, flags) for each news group
 
109
        """
 
110
 
 
111
 
 
112
    def subscriptionRequest():
 
113
        """
 
114
        Returns a deferred whose callback will be passed the list of
 
115
        recommended subscription groups for new server users
 
116
        """
 
117
    
 
118
    
 
119
    def postRequest(message):
 
120
        """
 
121
        Returns a deferred whose callback will be invoked if 'message'
 
122
        is successfully posted to one or more specified groups and
 
123
        whose errback will be invoked otherwise.
 
124
        """
 
125
    
 
126
    
 
127
    def overviewRequest():
 
128
        """
 
129
        Returns a deferred whose callback will be passed the a list of
 
130
        headers describing this server's overview format.
 
131
        """
 
132
 
 
133
 
 
134
    def xoverRequest(group, low, high):
 
135
        """
 
136
        Returns a deferred whose callback will be passed a list of xover
 
137
        headers for the given group over the given range.  If low is None,
 
138
        the range starts at the first article.  If high is None, the range
 
139
        ends at the last article.
 
140
        """
 
141
 
 
142
 
 
143
    def xhdrRequest(group, low, high, header):
 
144
        """
 
145
        Returns a deferred whose callback will be passed a list of XHDR data
 
146
        for the given group over the given range.  If low is None,
 
147
        the range starts at the first article.  If high is None, the range
 
148
        ends at the last article.
 
149
        """
 
150
 
 
151
    
 
152
    def listGroupRequest(group):
 
153
        """
 
154
        Returns a deferred whose callback will be passed a two-tuple of
 
155
        (group name, [article indices])
 
156
        """
 
157
    
 
158
    
 
159
    def groupRequest(group):
 
160
        """
 
161
        Returns a deferred whose callback will be passed a five-tuple of
 
162
        (group name, article count, highest index, lowest index, group flags)
 
163
        """
 
164
 
 
165
    
 
166
    def articleExistsRequest(id):
 
167
        """
 
168
        Returns a deferred whose callback will be passed with a true value
 
169
        if a message with the specified Message-ID exists in the database
 
170
        and with a false value otherwise.
 
171
        """
 
172
 
 
173
 
 
174
    def articleRequest(group, index, id = None):
 
175
        """ 
 
176
        Returns a deferred whose callback will be passed a file-like object
 
177
        containing the full article text (headers and body) for the article
 
178
        of the specified index in the specified group, and whose errback
 
179
        will be invoked if the article or group does not exist.  If id is
 
180
        not None, index is ignored and the article with the given Message-ID
 
181
        will be returned instead, along with its index in the specified
 
182
        group.
 
183
        """
 
184
 
 
185
    
 
186
    def headRequest(group, index):
 
187
        """
 
188
        Returns a deferred whose callback will be passed the header for
 
189
        the article of the specified index in the specified group, and
 
190
        whose errback will be invoked if the article or group does not
 
191
        exist.
 
192
        """
 
193
 
 
194
    
 
195
    def bodyRequest(group, index):
 
196
        """
 
197
        Returns a deferred whose callback will be passed the body for
 
198
        the article of the specified index in the specified group, and
 
199
        whose errback will be invoked if the article or group does not
 
200
        exist.
 
201
        """
 
202
 
 
203
class NewsStorage:
 
204
    """
 
205
    Backwards compatibility class -- There is no reason to inherit from this,
 
206
    just implement INewsStorage instead.
 
207
    """
 
208
    def listRequest(self):
 
209
        raise NotImplementedError()
 
210
    def subscriptionRequest(self):
 
211
        raise NotImplementedError()
 
212
    def postRequest(self, message):
 
213
        raise NotImplementedError()
 
214
    def overviewRequest(self):
 
215
        return defer.succeed(OVERVIEW_FMT)
 
216
    def xoverRequest(self, group, low, high):
 
217
        raise NotImplementedError()
 
218
    def xhdrRequest(self, group, low, high, header):
 
219
        raise NotImplementedError()
 
220
    def listGroupRequest(self, group):
 
221
        raise NotImplementedError()
 
222
    def groupRequest(self, group):
 
223
        raise NotImplementedError()
 
224
    def articleExistsRequest(self, id):
 
225
        raise NotImplementedError()
 
226
    def articleRequest(self, group, index, id = None):
 
227
        raise NotImplementedError()
 
228
    def headRequest(self, group, index):
 
229
        raise NotImplementedError()
 
230
    def bodyRequest(self, group, index):
 
231
        raise NotImplementedError()
 
232
 
 
233
 
 
234
class PickleStorage:
 
235
    """A trivial NewsStorage implementation using pickles
 
236
    
 
237
    Contains numerous flaws and is generally unsuitable for any
 
238
    real applications.  Consider yourself warned!
 
239
    """
 
240
 
 
241
    implements(INewsStorage)
 
242
 
 
243
    sharedDBs = {}
 
244
 
 
245
    def __init__(self, filename, groups = None, moderators = ()):
 
246
        self.datafile = filename
 
247
        self.load(filename, groups, moderators)
 
248
 
 
249
 
 
250
    def getModerators(self, groups):
 
251
        # first see if any groups are moderated.  if so, nothing gets posted,
 
252
        # but the whole messages gets forwarded to the moderator address
 
253
        moderators = []
 
254
        for group in groups:
 
255
            moderators.append(self.db['moderators'].get(group, None))
 
256
        return filter(None, moderators)
 
257
 
 
258
 
 
259
    def notifyModerators(self, moderators, article):
 
260
        # Moderated postings go through as long as they have an Approved
 
261
        # header, regardless of what the value is
 
262
        article.putHeader('To', ', '.join(moderators))
 
263
        return smtp.sendEmail(
 
264
            'twisted@' + socket.gethostname(),
 
265
            moderators,
 
266
            article.body,
 
267
            dict(article.headers.values())
 
268
        )
 
269
 
 
270
 
 
271
    def listRequest(self):
 
272
        "Returns a list of 4-tuples: (name, max index, min index, flags)"
 
273
        l = self.db['groups']
 
274
        r = []
 
275
        for i in l:
 
276
            if len(self.db[i].keys()):
 
277
                low = min(self.db[i].keys())
 
278
                high = max(self.db[i].keys()) + 1
 
279
            else:
 
280
                low = high = 0
 
281
            if self.db['moderators'].has_key(i):
 
282
                flags = 'm'
 
283
            else:
 
284
                flags = 'y'
 
285
            r.append((i, high, low, flags))
 
286
        return defer.succeed(r)
 
287
 
 
288
    def subscriptionRequest(self):
 
289
        return defer.succeed(['alt.test'])
 
290
 
 
291
    def postRequest(self, message):
 
292
        cleave = message.find('\r\n\r\n')
 
293
        headers, article = message[:cleave], message[cleave + 4:]
 
294
 
 
295
        a = Article(headers, article)
 
296
        groups = a.getHeader('Newsgroups').split()
 
297
        xref = []
 
298
 
 
299
        # Check moderated status
 
300
        moderators = self.getModerators(groups)
 
301
        if moderators and not a.getHeader('Approved'):
 
302
            return self.notifyModerators(moderators, a)
 
303
 
 
304
        for group in groups:
 
305
            if self.db.has_key(group):
 
306
                if len(self.db[group].keys()):
 
307
                    index = max(self.db[group].keys()) + 1
 
308
                else:
 
309
                    index = 1
 
310
                xref.append((group, str(index)))
 
311
                self.db[group][index] = a
 
312
 
 
313
        if len(xref) == 0:
 
314
            return defer.fail(None)
 
315
 
 
316
        a.putHeader('Xref', '%s %s' % (
 
317
            socket.gethostname().split()[0],
 
318
            ''.join(map(lambda x: ':'.join(x), xref))
 
319
        ))
 
320
 
 
321
        self.flush()
 
322
        return defer.succeed(None)
 
323
 
 
324
 
 
325
    def overviewRequest(self):
 
326
        return defer.succeed(OVERVIEW_FMT)
 
327
 
 
328
 
 
329
    def xoverRequest(self, group, low, high):
 
330
        if not self.db.has_key(group):
 
331
            return defer.succeed([])
 
332
        r = []
 
333
        for i in self.db[group].keys():
 
334
            if (low is None or i >= low) and (high is None or i <= high):
 
335
                r.append([str(i)] + self.db[group][i].overview())
 
336
        return defer.succeed(r)
 
337
 
 
338
 
 
339
    def xhdrRequest(self, group, low, high, header):
 
340
        if not self.db.has_key(group):
 
341
            return defer.succeed([])
 
342
        r = []
 
343
        for i in self.db[group].keys():
 
344
            if low is None or i >= low and high is None or i <= high:
 
345
                r.append((i, self.db[group][i].getHeader(header)))
 
346
        return defer.succeed(r)
 
347
 
 
348
 
 
349
    def listGroupRequest(self, group):
 
350
        if self.db.has_key(group):
 
351
            return defer.succeed((group, self.db[group].keys()))
 
352
        else:
 
353
            return defer.fail(None)
 
354
 
 
355
    def groupRequest(self, group):
 
356
        if self.db.has_key(group):
 
357
            if len(self.db[group].keys()):
 
358
                num = len(self.db[group].keys())
 
359
                low = min(self.db[group].keys())
 
360
                high = max(self.db[group].keys())
 
361
            else:
 
362
                num = low = high = 0
 
363
            flags = 'y'
 
364
            return defer.succeed((group, num, high, low, flags))
 
365
        else:
 
366
            return defer.fail(ERR_NOGROUP)
 
367
 
 
368
 
 
369
    def articleExistsRequest(self, id):
 
370
        for g in self.db.values():
 
371
            for a in g.values():
 
372
                if a.getHeader('Message-ID') == id:
 
373
                    return defer.succeed(1)
 
374
        return defer.succeed(0)
 
375
 
 
376
 
 
377
    def articleRequest(self, group, index, id = None):
 
378
        if id is not None:
 
379
            raise NotImplementedError
 
380
 
 
381
        if self.db.has_key(group):
 
382
            if self.db[group].has_key(index):
 
383
                a = self.db[group][index]
 
384
                return defer.succeed((
 
385
                    index,
 
386
                    a.getHeader('Message-ID'),
 
387
                    StringIO.StringIO(a.textHeaders() + '\r\n' + a.body)
 
388
                ))
 
389
            else:
 
390
                return defer.fail(ERR_NOARTICLE)
 
391
        else:
 
392
            return defer.fail(ERR_NOGROUP)
 
393
                
 
394
    
 
395
    def headRequest(self, group, index):
 
396
        if self.db.has_key(group):
 
397
            if self.db[group].has_key(index):
 
398
                a = self.db[group][index]
 
399
                return defer.succeed((index, a.getHeader('Message-ID'), a.textHeaders()))
 
400
            else:
 
401
                return defer.fail(ERR_NOARTICLE)
 
402
        else:
 
403
            return defer.fail(ERR_NOGROUP)
 
404
 
 
405
 
 
406
    def bodyRequest(self, group, index):
 
407
        if self.db.has_key(group):
 
408
            if self.db[group].has_key(index):
 
409
                a = self.db[group][index]
 
410
                return defer.succeed((index, a.getHeader('Message-ID'), StringIO.StringIO(a.body)))
 
411
            else:
 
412
                return defer.fail(ERR_NOARTICLE)
 
413
        else:
 
414
            return defer.fail(ERR_NOGROUP)
 
415
 
 
416
 
 
417
    def flush(self):
 
418
        pickle.dump(self.db, open(self.datafile, 'w'))
 
419
 
 
420
 
 
421
    def load(self, filename, groups = None, moderators = ()):
 
422
        if PickleStorage.sharedDBs.has_key(filename):
 
423
            self.db = PickleStorage.sharedDBs[filename]
 
424
        else:
 
425
            try:
 
426
                self.db = pickle.load(open(filename))
 
427
                PickleStorage.sharedDBs[filename] = self.db
 
428
            except IOError, e:
 
429
                self.db = PickleStorage.sharedDBs[filename] = {}
 
430
                self.db['groups'] = groups
 
431
                if groups is not None:
 
432
                    for i in groups:
 
433
                        self.db[i] = {}
 
434
                self.db['moderators'] = dict(moderators)
 
435
                self.flush()
 
436
 
 
437
 
 
438
class Group:
 
439
    name = None
 
440
    flags = ''
 
441
    minArticle = 1
 
442
    maxArticle = 0
 
443
    articles = None
 
444
    
 
445
    def __init__(self, name, flags = 'y'):
 
446
        self.name = name
 
447
        self.flags = flags
 
448
        self.articles = {}
 
449
 
 
450
 
 
451
class NewsShelf:
 
452
    """
 
453
    A NewStorage implementation using Twisted's dirdbm persistence module.
 
454
    """
 
455
    
 
456
    implements(INewsStorage)    
 
457
    
 
458
    def __init__(self, mailhost, path):
 
459
        self.path = path
 
460
        self.mailhost = mailhost
 
461
 
 
462
        if not os.path.exists(path):
 
463
            os.mkdir(path)
 
464
 
 
465
        self.dbm = dirdbm.Shelf(os.path.join(path, "newsshelf"))
 
466
        if not len(self.dbm.keys()):
 
467
            self.initialize()
 
468
 
 
469
 
 
470
    def initialize(self):
 
471
        # A dictionary of group name/Group instance items
 
472
        self.dbm['groups'] = dirdbm.Shelf(os.path.join(self.path, 'groups'))
 
473
 
 
474
        # A dictionary of group name/email address
 
475
        self.dbm['moderators'] = dirdbm.Shelf(os.path.join(self.path, 'moderators'))
 
476
 
 
477
        # A list of group names
 
478
        self.dbm['subscriptions'] = []
 
479
 
 
480
        # A dictionary of MessageID strings/xref lists
 
481
        self.dbm['Message-IDs'] = dirdbm.Shelf(os.path.join(self.path, 'Message-IDs'))
 
482
 
 
483
 
 
484
    def addGroup(self, name, flags):
 
485
        self.dbm['groups'][name] = Group(name, flags)
 
486
 
 
487
 
 
488
    def addSubscription(self, name):
 
489
        self.dbm['subscriptions'] = self.dbm['subscriptions'] + [name]
 
490
 
 
491
 
 
492
    def addModerator(self, group, email):
 
493
        self.dbm['moderators'][group] = email
 
494
 
 
495
 
 
496
    def listRequest(self):
 
497
        result = []
 
498
        for g in self.dbm['groups'].values():
 
499
            result.append((g.name, g.maxArticle, g.minArticle, g.flags))
 
500
        return defer.succeed(result)
 
501
 
 
502
 
 
503
    def subscriptionRequest(self):
 
504
        return defer.succeed(self.dbm['subscriptions'])
 
505
    
 
506
    
 
507
    def getModerator(self, groups):
 
508
        # first see if any groups are moderated.  if so, nothing gets posted,
 
509
        # but the whole messages gets forwarded to the moderator address
 
510
        for group in groups:
 
511
            try:
 
512
                return self.dbm['moderators'][group]
 
513
            except KeyError:
 
514
                pass
 
515
        return None
 
516
 
 
517
 
 
518
    def notifyModerator(self, moderator, article):
 
519
        # Moderated postings go through as long as they have an Approved
 
520
        # header, regardless of what the value is
 
521
        print 'To is ', moderator
 
522
        article.putHeader('To', moderator)
 
523
        return smtp.sendEmail(
 
524
            self.mailhost,
 
525
            'twisted-news@' + socket.gethostname(),
 
526
            moderator,
 
527
            article.body,
 
528
            dict(article.headers.values())
 
529
        )
 
530
 
 
531
 
 
532
    def postRequest(self, message):
 
533
        cleave = message.find('\r\n\r\n')
 
534
        headers, article = message[:cleave], message[cleave + 4:]
 
535
        
 
536
        article = Article(headers, article)
 
537
        groups = article.getHeader('Newsgroups').split()
 
538
        xref = []
 
539
        
 
540
        # Check for moderated status
 
541
        moderator = self.getModerator(groups)
 
542
        if moderator and not article.getHeader('Approved'):
 
543
            return self.notifyModerator(moderator, article)
 
544
        
 
545
        
 
546
        for group in groups:
 
547
            try:
 
548
                g = self.dbm['groups'][group]
 
549
            except KeyError:
 
550
                pass
 
551
            else:
 
552
                index = g.maxArticle + 1
 
553
                g.maxArticle += 1
 
554
                g.articles[index] = article
 
555
                xref.append((group, str(index)))
 
556
                self.dbm['groups'][group] = g
 
557
 
 
558
        if not xref:
 
559
            return defer.fail(NewsServerError("No groups carried: " + ' '.join(groups)))
 
560
 
 
561
        article.putHeader('Xref', '%s %s' % (socket.gethostname().split()[0], ' '.join(map(lambda x: ':'.join(x), xref))))
 
562
        self.dbm['Message-IDs'][article.getHeader('Message-ID')] = xref
 
563
        return defer.succeed(None)
 
564
 
 
565
 
 
566
    def overviewRequest(self):
 
567
        return defer.succeed(OVERVIEW_FMT)
 
568
 
 
569
 
 
570
    def xoverRequest(self, group, low, high):
 
571
        if not self.dbm['groups'].has_key(group):
 
572
            return defer.succeed([])
 
573
        
 
574
        if low is None:
 
575
            low = 0
 
576
        if high is None:
 
577
            high = self.dbm['groups'][group].maxArticle
 
578
        r = []
 
579
        for i in range(low, high + 1):
 
580
            if self.dbm['groups'][group].articles.has_key(i):
 
581
                r.append([str(i)] + self.dbm['groups'][group].articles[i].overview())
 
582
        return defer.succeed(r)
 
583
 
 
584
 
 
585
    def xhdrRequest(self, group, low, high, header):
 
586
        if group not in self.dbm['groups']:
 
587
            return defer.succeed([])
 
588
        
 
589
        if low is None:
 
590
            low = 0
 
591
        if high is None:
 
592
            high = self.dbm['groups'][group].maxArticle
 
593
        r = []
 
594
        for i in range(low, high + 1):
 
595
            if self.dbm['groups'][group].articles.has_key(i):
 
596
                r.append((i, self.dbm['groups'][group].articles[i].getHeader(header)))
 
597
        return defer.succeed(r)
 
598
 
 
599
 
 
600
    def listGroupRequest(self, group):
 
601
        if self.dbm['groups'].has_key(group):
 
602
            return defer.succeed((group, self.dbm['groups'][group].articles.keys()))
 
603
        return defer.fail(NewsServerError("No such group: " + group))
 
604
 
 
605
 
 
606
    def groupRequest(self, group):
 
607
        try:
 
608
            g = self.dbm['groups'][group]
 
609
        except KeyError:
 
610
            return defer.fail(NewsServerError("No such group: " + group))
 
611
        else:
 
612
            flags = g.flags
 
613
            low = g.minArticle
 
614
            high = g.maxArticle
 
615
            num = high - low + 1
 
616
            return defer.succeed((group, num, high, low, flags))
 
617
 
 
618
 
 
619
    def articleExistsRequest(self, id):
 
620
        return defer.succeed(id in self.dbm['Message-IDs'])
 
621
    
 
622
    
 
623
    def articleRequest(self, group, index, id = None):
 
624
        if id is not None:
 
625
            try:
 
626
                xref = self.dbm['Message-IDs'][id]
 
627
            except KeyError:
 
628
                return defer.fail(NewsServerError("No such article: " + id))
 
629
            else:
 
630
                group, index = xref[0]
 
631
                index = int(index)
 
632
        
 
633
        try:
 
634
            a = self.dbm['groups'][group].articles[index]
 
635
        except KeyError:
 
636
            return defer.fail(NewsServerError("No such group: " + group))
 
637
        else:
 
638
            return defer.succeed((
 
639
                index,
 
640
                a.getHeader('Message-ID'),
 
641
                StringIO.StringIO(a.textHeaders() + '\r\n' + a.body)
 
642
            ))
 
643
    
 
644
    
 
645
    def headRequest(self, group, index, id = None):
 
646
        if id is not None:
 
647
            try:
 
648
                xref = self.dbm['Message-IDs'][id]
 
649
            except KeyError:
 
650
                return defer.fail(NewsServerError("No such article: " + id))
 
651
            else:
 
652
                group, index = xref[0]
 
653
                index = int(index)
 
654
        
 
655
        try:
 
656
            a = self.dbm['groups'][group].articles[index]
 
657
        except KeyError:
 
658
            return defer.fail(NewsServerError("No such group: " + group))
 
659
        else:
 
660
            return defer.succeed((index, a.getHeader('Message-ID'), a.textHeaders()))
 
661
 
 
662
 
 
663
    def bodyRequest(self, group, index, id = None):
 
664
        if id is not None:
 
665
            try:
 
666
                xref = self.dbm['Message-IDs'][id]
 
667
            except KeyError:
 
668
                return defer.fail(NewsServerError("No such article: " + id))
 
669
            else:
 
670
                group, index = xref[0]
 
671
                index = int(index)
 
672
        
 
673
        try:
 
674
            a = self.dbm['groups'][group].articles[index]
 
675
        except KeyError:
 
676
            return defer.fail(NewsServerError("No such group: " + group))
 
677
        else:
 
678
            return defer.succeed((index, a.getHeader('Message-ID'), StringIO.StringIO(a.body)))
 
679
 
 
680
 
 
681
class NewsStorageAugmentation:
 
682
    """
 
683
    A NewsStorage implementation using Twisted's asynchronous DB-API
 
684
    """
 
685
 
 
686
    implements(INewsStorage)
 
687
 
 
688
    schema = """
 
689
 
 
690
    CREATE TABLE groups (
 
691
        group_id      SERIAL,
 
692
        name          VARCHAR(80) NOT NULL,
 
693
        
 
694
        flags         INTEGER DEFAULT 0 NOT NULL
 
695
    );
 
696
 
 
697
    CREATE UNIQUE INDEX group_id_index ON groups (group_id);
 
698
    CREATE UNIQUE INDEX name_id_index ON groups (name);
 
699
 
 
700
    CREATE TABLE articles (
 
701
        article_id    SERIAL,
 
702
        message_id    TEXT,
 
703
        
 
704
        header        TEXT,
 
705
        body          TEXT
 
706
    );
 
707
 
 
708
    CREATE UNIQUE INDEX article_id_index ON articles (article_id);
 
709
    CREATE UNIQUE INDEX article_message_index ON articles (message_id);
 
710
 
 
711
    CREATE TABLE postings (
 
712
        group_id      INTEGER,
 
713
        article_id    INTEGER,
 
714
        article_index INTEGER NOT NULL
 
715
    );
 
716
 
 
717
    CREATE UNIQUE INDEX posting_article_index ON postings (article_id);
 
718
 
 
719
    CREATE TABLE subscriptions (
 
720
        group_id    INTEGER
 
721
    );
 
722
    
 
723
    CREATE TABLE overview (
 
724
        header      TEXT
 
725
    );
 
726
    """
 
727
    
 
728
    def __init__(self, info):
 
729
        self.info = info
 
730
        self.dbpool = adbapi.ConnectionPool(**self.info)
 
731
        
 
732
 
 
733
    def __setstate__(self, state):
 
734
        self.__dict__ = state
 
735
        self.info['password'] = getpass.getpass('Database password for %s: ' % (self.info['user'],))
 
736
        self.dbpool = adbapi.ConnectionPool(**self.info)
 
737
        del self.info['password']
 
738
 
 
739
 
 
740
    def listRequest(self):
 
741
        # COALESCE may not be totally portable
 
742
        # it is shorthand for
 
743
        # CASE WHEN (first parameter) IS NOT NULL then (first parameter) ELSE (second parameter) END
 
744
        sql = """
 
745
            SELECT groups.name,
 
746
                COALESCE(MAX(postings.article_index), 0),
 
747
                COALESCE(MIN(postings.article_index), 0),
 
748
                groups.flags
 
749
            FROM groups LEFT OUTER JOIN postings
 
750
            ON postings.group_id = groups.group_id
 
751
            GROUP BY groups.name, groups.flags
 
752
            ORDER BY groups.name
 
753
        """
 
754
        return self.dbpool.runQuery(sql)
 
755
 
 
756
 
 
757
    def subscriptionRequest(self):
 
758
        sql = """
 
759
            SELECT groups.name FROM groups,subscriptions WHERE groups.group_id = subscriptions.group_id
 
760
        """
 
761
        return self.dbpool.runQuery(sql)
 
762
 
 
763
 
 
764
    def postRequest(self, message):
 
765
        cleave = message.find('\r\n\r\n')
 
766
        headers, article = message[:cleave], message[cleave + 4:]
 
767
        article = Article(headers, article)
 
768
        return self.dbpool.runInteraction(self._doPost, article)
 
769
 
 
770
 
 
771
    def _doPost(self, transaction, article):
 
772
        # Get the group ids
 
773
        groups = article.getHeader('Newsgroups').split()
 
774
        if not len(groups):
 
775
            raise NNTPError('Missing Newsgroups header')
 
776
 
 
777
        sql = """
 
778
            SELECT name, group_id FROM groups
 
779
            WHERE name IN (%s)
 
780
        """ % (', '.join([("'%s'" % (adbapi.safe(group),)) for group in groups]),)
 
781
        
 
782
        transaction.execute(sql)
 
783
        result = transaction.fetchall()
 
784
        
 
785
        # No relevant groups, bye bye!
 
786
        if not len(result):
 
787
            raise NNTPError('None of groups in Newsgroup header carried')
 
788
        
 
789
        # Got some groups, now find the indices this article will have in each
 
790
        sql = """
 
791
            SELECT groups.group_id, COALESCE(MAX(postings.article_index), 0) + 1
 
792
            FROM groups LEFT OUTER JOIN postings
 
793
            ON postings.group_id = groups.group_id
 
794
            WHERE groups.group_id IN (%s)
 
795
            GROUP BY groups.group_id
 
796
        """ % (', '.join([("%d" % (id,)) for (group, id) in result]),)
 
797
 
 
798
        transaction.execute(sql)
 
799
        indices = transaction.fetchall()
 
800
 
 
801
        if not len(indices):
 
802
            raise NNTPError('Internal server error - no indices found')
 
803
        
 
804
        # Associate indices with group names
 
805
        gidToName = dict([(b, a) for (a, b) in result])
 
806
        gidToIndex = dict(indices)
 
807
        
 
808
        nameIndex = []
 
809
        for i in gidToName:
 
810
            nameIndex.append((gidToName[i], gidToIndex[i]))
 
811
        
 
812
        # Build xrefs
 
813
        xrefs = socket.gethostname().split()[0]
 
814
        xrefs = xrefs + ' ' + ' '.join([('%s:%d' % (group, id)) for (group, id) in nameIndex])
 
815
        article.putHeader('Xref', xrefs)
 
816
        
 
817
        # Hey!  The article is ready to be posted!  God damn f'in finally.
 
818
        sql = """
 
819
            INSERT INTO articles (message_id, header, body)
 
820
            VALUES ('%s', '%s', '%s')
 
821
        """ % (
 
822
            adbapi.safe(article.getHeader('Message-ID')),
 
823
            adbapi.safe(article.textHeaders()),
 
824
            adbapi.safe(article.body)
 
825
        )
 
826
        
 
827
        transaction.execute(sql)
 
828
        
 
829
        # Now update the posting to reflect the groups to which this belongs
 
830
        for gid in gidToName:
 
831
            sql = """
 
832
                INSERT INTO postings (group_id, article_id, article_index)
 
833
                VALUES (%d, (SELECT last_value FROM articles_article_id_seq), %d)
 
834
            """ % (gid, gidToIndex[gid])
 
835
            transaction.execute(sql)
 
836
        
 
837
        return len(nameIndex)
 
838
 
 
839
 
 
840
    def overviewRequest(self):
 
841
        sql = """
 
842
            SELECT header FROM overview
 
843
        """
 
844
        return self.dbpool.runQuery(sql).addCallback(lambda result: [header[0] for header in result])
 
845
 
 
846
 
 
847
    def xoverRequest(self, group, low, high):
 
848
        sql = """
 
849
            SELECT postings.article_index, articles.header
 
850
            FROM articles,postings,groups
 
851
            WHERE postings.group_id = groups.group_id
 
852
            AND groups.name = '%s'
 
853
            AND postings.article_id = articles.article_id
 
854
            %s
 
855
            %s
 
856
        """ % (
 
857
            adbapi.safe(group),
 
858
            low is not None and "AND postings.article_index >= %d" % (low,) or "",
 
859
            high is not None and "AND postings.article_index <= %d" % (high,) or ""
 
860
        )
 
861
 
 
862
        return self.dbpool.runQuery(sql).addCallback(
 
863
            lambda results: [
 
864
                [id] + Article(header, None).overview() for (id, header) in results
 
865
            ]
 
866
        )
 
867
 
 
868
 
 
869
    def xhdrRequest(self, group, low, high, header):
 
870
        sql = """
 
871
            SELECT articles.header
 
872
            FROM groups,postings,articles
 
873
            WHERE groups.name = '%s' AND postings.group_id = groups.group_id
 
874
            AND postings.article_index >= %d
 
875
            AND postings.article_index <= %d
 
876
        """ % (adbapi.safe(group), low, high)
 
877
 
 
878
        return self.dbpool.runQuery(sql).addCallback(
 
879
            lambda results: [
 
880
                (i, Article(h, None).getHeader(h)) for (i, h) in results
 
881
            ]
 
882
        )
 
883
 
 
884
 
 
885
    def listGroupRequest(self, group):
 
886
        sql = """
 
887
            SELECT postings.article_index FROM postings,groups
 
888
            WHERE postings.group_id = groups.group_id
 
889
            AND groups.name = '%s'
 
890
        """ % (adbapi.safe(group),)
 
891
        
 
892
        return self.dbpool.runQuery(sql).addCallback(
 
893
            lambda results, group = group: (group, [res[0] for res in results])
 
894
        )
 
895
 
 
896
 
 
897
    def groupRequest(self, group): 
 
898
        sql = """
 
899
            SELECT groups.name,
 
900
                COUNT(postings.article_index),
 
901
                COALESCE(MAX(postings.article_index), 0),
 
902
                COALESCE(MIN(postings.article_index), 0),
 
903
                groups.flags
 
904
            FROM groups LEFT OUTER JOIN postings
 
905
            ON postings.group_id = groups.group_id
 
906
            WHERE groups.name = '%s'
 
907
            GROUP BY groups.name, groups.flags
 
908
        """ % (adbapi.safe(group),)
 
909
        
 
910
        return self.dbpool.runQuery(sql).addCallback(
 
911
            lambda results: tuple(results[0])
 
912
        )
 
913
 
 
914
 
 
915
    def articleExistsRequest(self, id):
 
916
        sql = """
 
917
            SELECT COUNT(message_id) FROM articles
 
918
            WHERE message_id = '%s'
 
919
        """ % (adbapi.safe(id),)
 
920
        
 
921
        return self.dbpool.runQuery(sql).addCallback(
 
922
            lambda result: bool(result[0][0])
 
923
        )
 
924
 
 
925
 
 
926
    def articleRequest(self, group, index, id = None):
 
927
        if id is not None:
 
928
            sql = """
 
929
                SELECT postings.article_index, articles.message_id, articles.header, articles.body
 
930
                FROM groups,postings LEFT OUTER JOIN articles
 
931
                ON articles.message_id = '%s'
 
932
                WHERE groups.name = '%s'
 
933
                AND groups.group_id = postings.group_id
 
934
            """ % (adbapi.safe(id), adbapi.safe(group))
 
935
        else:
 
936
            sql = """ 
 
937
                SELECT postings.article_index, articles.message_id, articles.header, articles.body
 
938
                FROM groups,articles LEFT OUTER JOIN postings
 
939
                ON postings.article_id = articles.article_id
 
940
                WHERE postings.article_index = %d
 
941
                AND postings.group_id = groups.group_id
 
942
                AND groups.name = '%s'
 
943
            """ % (index, adbapi.safe(group))
 
944
 
 
945
        return self.dbpool.runQuery(sql).addCallback(
 
946
            lambda result: (
 
947
                result[0][0],
 
948
                result[0][1],
 
949
                StringIO.StringIO(result[0][2] + '\r\n' + result[0][3])
 
950
            )
 
951
        )
 
952
 
 
953
 
 
954
    def headRequest(self, group, index):
 
955
        sql = """
 
956
            SELECT postings.article_index, articles.message_id, articles.header
 
957
            FROM groups,articles LEFT OUTER JOIN postings
 
958
            ON postings.article_id = articles.article_id
 
959
            WHERE postings.article_index = %d
 
960
            AND postings.group_id = groups.group_id
 
961
            AND groups.name = '%s'
 
962
        """ % (index, adbapi.safe(group))
 
963
        
 
964
        return self.dbpool.runQuery(sql).addCallback(lambda result: result[0])
 
965
 
 
966
 
 
967
    def bodyRequest(self, group, index):
 
968
        sql = """
 
969
            SELECT postings.article_index, articles.message_id, articles.body
 
970
            FROM groups,articles LEFT OUTER JOIN postings
 
971
            ON postings.article_id = articles.article_id
 
972
            WHERE postings.article_index = %d
 
973
            AND postings.group_id = groups.group_id
 
974
            AND groups.name = '%s'
 
975
        """ % (index, adbapi.safe(group))
 
976
        
 
977
        return self.dbpool.runQuery(sql).addCallback(
 
978
            lambda result: result[0]
 
979
        ).addCallback(
 
980
            lambda (index, id, body): (index, id, StringIO.StringIO(body))
 
981
        )
 
982
 
 
983
####
 
984
#### XXX - make these static methods some day
 
985
####
 
986
def makeGroupSQL(groups):
 
987
    res = ''
 
988
    for g in groups:
 
989
        res = res + """\n    INSERT INTO groups (name) VALUES ('%s');\n""" % (adbapi.safe(g),)
 
990
    return res
 
991
 
 
992
 
 
993
def makeOverviewSQL():
 
994
    res = ''
 
995
    for o in OVERVIEW_FMT:
 
996
        res = res + """\n    INSERT INTO overview (header) VALUES ('%s');\n""" % (adbapi.safe(o),)
 
997
    return res