~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/news/database.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# -*- test-case-name: twisted.news.test.test_news -*-
2
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
 
# See LICENSE for details.
4
 
 
5
 
 
6
 
"""
7
 
News server backend implementations
8
 
 
9
 
Maintainer: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>}
10
 
 
11
 
Stability: semi-stable
12
 
 
13
 
Future Plans: A PyFramer-based backend and a new backend interface that is
14
 
less NNTP specific
15
 
"""
16
 
 
17
 
 
18
 
from __future__ import nested_scopes
19
 
 
20
 
from twisted.news.nntp import NNTPError
21
 
from twisted.mail import smtp
22
 
from twisted.internet import defer
23
 
from twisted.enterprise import adbapi
24
 
from twisted.persisted import dirdbm
25
 
 
26
 
import getpass, pickle, time, socket, md5
27
 
import os
28
 
import StringIO
29
 
from zope.interface import implements, Interface
30
 
 
31
 
 
32
 
ERR_NOGROUP, ERR_NOARTICLE = range(2, 4)  # XXX - put NNTP values here (I guess?)
33
 
 
34
 
OVERVIEW_FMT = [
35
 
    'Subject', 'From', 'Date', 'Message-ID', 'References',
36
 
    'Bytes', 'Lines', 'Xref'
37
 
]
38
 
 
39
 
def hexdigest(md5): #XXX: argh. 1.5.2 doesn't have this.
40
 
    return ''.join(map(lambda x: hex(ord(x))[2:], md5.digest()))
41
 
 
42
 
class Article:
43
 
    def __init__(self, head, body):
44
 
        self.body = body
45
 
        self.headers = {}
46
 
        header = None
47
 
        for line in head.split('\r\n'):
48
 
            if line[0] in ' \t':
49
 
                i = list(self.headers[header])
50
 
                i[1] += '\r\n' + line
51
 
            else:
52
 
                i = line.split(': ', 1)
53
 
                header = i[0].lower()
54
 
            self.headers[header] = tuple(i)
55
 
 
56
 
        if not self.getHeader('Message-ID'):
57
 
            s = str(time.time()) + self.body
58
 
            id = hexdigest(md5.md5(s)) + '@' + socket.gethostname()
59
 
            self.putHeader('Message-ID', '<%s>' % id)
60
 
 
61
 
        if not self.getHeader('Bytes'):
62
 
            self.putHeader('Bytes', str(len(self.body)))
63
 
        
64
 
        if not self.getHeader('Lines'):
65
 
            self.putHeader('Lines', str(self.body.count('\n')))
66
 
        
67
 
        if not self.getHeader('Date'):
68
 
            self.putHeader('Date', time.ctime(time.time()))
69
 
 
70
 
 
71
 
    def getHeader(self, header):
72
 
        h = header.lower()
73
 
        if self.headers.has_key(h):
74
 
            return self.headers[h][1]
75
 
        else:
76
 
            return ''
77
 
 
78
 
 
79
 
    def putHeader(self, header, value):
80
 
        self.headers[header.lower()] = (header, value)
81
 
 
82
 
 
83
 
    def textHeaders(self):
84
 
        headers = []
85
 
        for i in self.headers.values():
86
 
            headers.append('%s: %s' % i)
87
 
        return '\r\n'.join(headers) + '\r\n'
88
 
    
89
 
    def overview(self):
90
 
        xover = []
91
 
        for i in OVERVIEW_FMT:
92
 
            xover.append(self.getHeader(i))
93
 
        return xover
94
 
 
95
 
 
96
 
class NewsServerError(Exception):
97
 
    pass
98
 
 
99
 
    
100
 
class INewsStorage(Interface):
101
 
    """
102
 
    An interface for storing and requesting news articles
103
 
    """
104
 
    
105
 
    def listRequest():
106
 
        """
107
 
        Returns a deferred whose callback will be passed a list of 4-tuples
108
 
        containing (name, max index, min index, flags) for each news group
109
 
        """
110
 
 
111
 
 
112
 
    def subscriptionRequest():
113
 
        """
114
 
        Returns a deferred whose callback will be passed the list of
115
 
        recommended subscription groups for new server users
116
 
        """
117
 
    
118
 
    
119
 
    def postRequest(message):
120
 
        """
121
 
        Returns a deferred whose callback will be invoked if 'message'
122
 
        is successfully posted to one or more specified groups and
123
 
        whose errback will be invoked otherwise.
124
 
        """
125
 
    
126
 
    
127
 
    def overviewRequest():
128
 
        """
129
 
        Returns a deferred whose callback will be passed the a list of
130
 
        headers describing this server's overview format.
131
 
        """
132
 
 
133
 
 
134
 
    def xoverRequest(group, low, high):
135
 
        """
136
 
        Returns a deferred whose callback will be passed a list of xover
137
 
        headers for the given group over the given range.  If low is None,
138
 
        the range starts at the first article.  If high is None, the range
139
 
        ends at the last article.
140
 
        """
141
 
 
142
 
 
143
 
    def xhdrRequest(group, low, high, header):
144
 
        """
145
 
        Returns a deferred whose callback will be passed a list of XHDR data
146
 
        for the given group over the given range.  If low is None,
147
 
        the range starts at the first article.  If high is None, the range
148
 
        ends at the last article.
149
 
        """
150
 
 
151
 
    
152
 
    def listGroupRequest(group):
153
 
        """
154
 
        Returns a deferred whose callback will be passed a two-tuple of
155
 
        (group name, [article indices])
156
 
        """
157
 
    
158
 
    
159
 
    def groupRequest(group):
160
 
        """
161
 
        Returns a deferred whose callback will be passed a five-tuple of
162
 
        (group name, article count, highest index, lowest index, group flags)
163
 
        """
164
 
 
165
 
    
166
 
    def articleExistsRequest(id):
167
 
        """
168
 
        Returns a deferred whose callback will be passed with a true value
169
 
        if a message with the specified Message-ID exists in the database
170
 
        and with a false value otherwise.
171
 
        """
172
 
 
173
 
 
174
 
    def articleRequest(group, index, id = None):
175
 
        """ 
176
 
        Returns a deferred whose callback will be passed a file-like object
177
 
        containing the full article text (headers and body) for the article
178
 
        of the specified index in the specified group, and whose errback
179
 
        will be invoked if the article or group does not exist.  If id is
180
 
        not None, index is ignored and the article with the given Message-ID
181
 
        will be returned instead, along with its index in the specified
182
 
        group.
183
 
        """
184
 
 
185
 
    
186
 
    def headRequest(group, index):
187
 
        """
188
 
        Returns a deferred whose callback will be passed the header for
189
 
        the article of the specified index in the specified group, and
190
 
        whose errback will be invoked if the article or group does not
191
 
        exist.
192
 
        """
193
 
 
194
 
    
195
 
    def bodyRequest(group, index):
196
 
        """
197
 
        Returns a deferred whose callback will be passed the body for
198
 
        the article of the specified index in the specified group, and
199
 
        whose errback will be invoked if the article or group does not
200
 
        exist.
201
 
        """
202
 
 
203
 
class NewsStorage:
204
 
    """
205
 
    Backwards compatibility class -- There is no reason to inherit from this,
206
 
    just implement INewsStorage instead.
207
 
    """
208
 
    def listRequest(self):
209
 
        raise NotImplementedError()
210
 
    def subscriptionRequest(self):
211
 
        raise NotImplementedError()
212
 
    def postRequest(self, message):
213
 
        raise NotImplementedError()
214
 
    def overviewRequest(self):
215
 
        return defer.succeed(OVERVIEW_FMT)
216
 
    def xoverRequest(self, group, low, high):
217
 
        raise NotImplementedError()
218
 
    def xhdrRequest(self, group, low, high, header):
219
 
        raise NotImplementedError()
220
 
    def listGroupRequest(self, group):
221
 
        raise NotImplementedError()
222
 
    def groupRequest(self, group):
223
 
        raise NotImplementedError()
224
 
    def articleExistsRequest(self, id):
225
 
        raise NotImplementedError()
226
 
    def articleRequest(self, group, index, id = None):
227
 
        raise NotImplementedError()
228
 
    def headRequest(self, group, index):
229
 
        raise NotImplementedError()
230
 
    def bodyRequest(self, group, index):
231
 
        raise NotImplementedError()
232
 
 
233
 
 
234
 
class PickleStorage:
235
 
    """A trivial NewsStorage implementation using pickles
236
 
    
237
 
    Contains numerous flaws and is generally unsuitable for any
238
 
    real applications.  Consider yourself warned!
239
 
    """
240
 
 
241
 
    implements(INewsStorage)
242
 
 
243
 
    sharedDBs = {}
244
 
 
245
 
    def __init__(self, filename, groups = None, moderators = ()):
246
 
        self.datafile = filename
247
 
        self.load(filename, groups, moderators)
248
 
 
249
 
 
250
 
    def getModerators(self, groups):
251
 
        # first see if any groups are moderated.  if so, nothing gets posted,
252
 
        # but the whole messages gets forwarded to the moderator address
253
 
        moderators = []
254
 
        for group in groups:
255
 
            moderators.append(self.db['moderators'].get(group, None))
256
 
        return filter(None, moderators)
257
 
 
258
 
 
259
 
    def notifyModerators(self, moderators, article):
260
 
        # Moderated postings go through as long as they have an Approved
261
 
        # header, regardless of what the value is
262
 
        article.putHeader('To', ', '.join(moderators))
263
 
        return smtp.sendEmail(
264
 
            'twisted@' + socket.gethostname(),
265
 
            moderators,
266
 
            article.body,
267
 
            dict(article.headers.values())
268
 
        )
269
 
 
270
 
 
271
 
    def listRequest(self):
272
 
        "Returns a list of 4-tuples: (name, max index, min index, flags)"
273
 
        l = self.db['groups']
274
 
        r = []
275
 
        for i in l:
276
 
            if len(self.db[i].keys()):
277
 
                low = min(self.db[i].keys())
278
 
                high = max(self.db[i].keys()) + 1
279
 
            else:
280
 
                low = high = 0
281
 
            if self.db['moderators'].has_key(i):
282
 
                flags = 'm'
283
 
            else:
284
 
                flags = 'y'
285
 
            r.append((i, high, low, flags))
286
 
        return defer.succeed(r)
287
 
 
288
 
    def subscriptionRequest(self):
289
 
        return defer.succeed(['alt.test'])
290
 
 
291
 
    def postRequest(self, message):
292
 
        cleave = message.find('\r\n\r\n')
293
 
        headers, article = message[:cleave], message[cleave + 4:]
294
 
 
295
 
        a = Article(headers, article)
296
 
        groups = a.getHeader('Newsgroups').split()
297
 
        xref = []
298
 
 
299
 
        # Check moderated status
300
 
        moderators = self.getModerators(groups)
301
 
        if moderators and not a.getHeader('Approved'):
302
 
            return self.notifyModerators(moderators, a)
303
 
 
304
 
        for group in groups:
305
 
            if self.db.has_key(group):
306
 
                if len(self.db[group].keys()):
307
 
                    index = max(self.db[group].keys()) + 1
308
 
                else:
309
 
                    index = 1
310
 
                xref.append((group, str(index)))
311
 
                self.db[group][index] = a
312
 
 
313
 
        if len(xref) == 0:
314
 
            return defer.fail(None)
315
 
 
316
 
        a.putHeader('Xref', '%s %s' % (
317
 
            socket.gethostname().split()[0],
318
 
            ''.join(map(lambda x: ':'.join(x), xref))
319
 
        ))
320
 
 
321
 
        self.flush()
322
 
        return defer.succeed(None)
323
 
 
324
 
 
325
 
    def overviewRequest(self):
326
 
        return defer.succeed(OVERVIEW_FMT)
327
 
 
328
 
 
329
 
    def xoverRequest(self, group, low, high):
330
 
        if not self.db.has_key(group):
331
 
            return defer.succeed([])
332
 
        r = []
333
 
        for i in self.db[group].keys():
334
 
            if (low is None or i >= low) and (high is None or i <= high):
335
 
                r.append([str(i)] + self.db[group][i].overview())
336
 
        return defer.succeed(r)
337
 
 
338
 
 
339
 
    def xhdrRequest(self, group, low, high, header):
340
 
        if not self.db.has_key(group):
341
 
            return defer.succeed([])
342
 
        r = []
343
 
        for i in self.db[group].keys():
344
 
            if low is None or i >= low and high is None or i <= high:
345
 
                r.append((i, self.db[group][i].getHeader(header)))
346
 
        return defer.succeed(r)
347
 
 
348
 
 
349
 
    def listGroupRequest(self, group):
350
 
        if self.db.has_key(group):
351
 
            return defer.succeed((group, self.db[group].keys()))
352
 
        else:
353
 
            return defer.fail(None)
354
 
 
355
 
    def groupRequest(self, group):
356
 
        if self.db.has_key(group):
357
 
            if len(self.db[group].keys()):
358
 
                num = len(self.db[group].keys())
359
 
                low = min(self.db[group].keys())
360
 
                high = max(self.db[group].keys())
361
 
            else:
362
 
                num = low = high = 0
363
 
            flags = 'y'
364
 
            return defer.succeed((group, num, high, low, flags))
365
 
        else:
366
 
            return defer.fail(ERR_NOGROUP)
367
 
 
368
 
 
369
 
    def articleExistsRequest(self, id):
370
 
        for g in self.db.values():
371
 
            for a in g.values():
372
 
                if a.getHeader('Message-ID') == id:
373
 
                    return defer.succeed(1)
374
 
        return defer.succeed(0)
375
 
 
376
 
 
377
 
    def articleRequest(self, group, index, id = None):
378
 
        if id is not None:
379
 
            raise NotImplementedError
380
 
 
381
 
        if self.db.has_key(group):
382
 
            if self.db[group].has_key(index):
383
 
                a = self.db[group][index]
384
 
                return defer.succeed((
385
 
                    index,
386
 
                    a.getHeader('Message-ID'),
387
 
                    StringIO.StringIO(a.textHeaders() + '\r\n' + a.body)
388
 
                ))
389
 
            else:
390
 
                return defer.fail(ERR_NOARTICLE)
391
 
        else:
392
 
            return defer.fail(ERR_NOGROUP)
393
 
                
394
 
    
395
 
    def headRequest(self, group, index):
396
 
        if self.db.has_key(group):
397
 
            if self.db[group].has_key(index):
398
 
                a = self.db[group][index]
399
 
                return defer.succeed((index, a.getHeader('Message-ID'), a.textHeaders()))
400
 
            else:
401
 
                return defer.fail(ERR_NOARTICLE)
402
 
        else:
403
 
            return defer.fail(ERR_NOGROUP)
404
 
 
405
 
 
406
 
    def bodyRequest(self, group, index):
407
 
        if self.db.has_key(group):
408
 
            if self.db[group].has_key(index):
409
 
                a = self.db[group][index]
410
 
                return defer.succeed((index, a.getHeader('Message-ID'), StringIO.StringIO(a.body)))
411
 
            else:
412
 
                return defer.fail(ERR_NOARTICLE)
413
 
        else:
414
 
            return defer.fail(ERR_NOGROUP)
415
 
 
416
 
 
417
 
    def flush(self):
418
 
        pickle.dump(self.db, open(self.datafile, 'w'))
419
 
 
420
 
 
421
 
    def load(self, filename, groups = None, moderators = ()):
422
 
        if PickleStorage.sharedDBs.has_key(filename):
423
 
            self.db = PickleStorage.sharedDBs[filename]
424
 
        else:
425
 
            try:
426
 
                self.db = pickle.load(open(filename))
427
 
                PickleStorage.sharedDBs[filename] = self.db
428
 
            except IOError, e:
429
 
                self.db = PickleStorage.sharedDBs[filename] = {}
430
 
                self.db['groups'] = groups
431
 
                if groups is not None:
432
 
                    for i in groups:
433
 
                        self.db[i] = {}
434
 
                self.db['moderators'] = dict(moderators)
435
 
                self.flush()
436
 
 
437
 
 
438
 
class Group:
439
 
    name = None
440
 
    flags = ''
441
 
    minArticle = 1
442
 
    maxArticle = 0
443
 
    articles = None
444
 
    
445
 
    def __init__(self, name, flags = 'y'):
446
 
        self.name = name
447
 
        self.flags = flags
448
 
        self.articles = {}
449
 
 
450
 
 
451
 
class NewsShelf:
452
 
    """
453
 
    A NewStorage implementation using Twisted's dirdbm persistence module.
454
 
    """
455
 
    
456
 
    implements(INewsStorage)    
457
 
    
458
 
    def __init__(self, mailhost, path):
459
 
        self.path = path
460
 
        self.mailhost = mailhost
461
 
 
462
 
        if not os.path.exists(path):
463
 
            os.mkdir(path)
464
 
 
465
 
        self.dbm = dirdbm.Shelf(os.path.join(path, "newsshelf"))
466
 
        if not len(self.dbm.keys()):
467
 
            self.initialize()
468
 
 
469
 
 
470
 
    def initialize(self):
471
 
        # A dictionary of group name/Group instance items
472
 
        self.dbm['groups'] = dirdbm.Shelf(os.path.join(self.path, 'groups'))
473
 
 
474
 
        # A dictionary of group name/email address
475
 
        self.dbm['moderators'] = dirdbm.Shelf(os.path.join(self.path, 'moderators'))
476
 
 
477
 
        # A list of group names
478
 
        self.dbm['subscriptions'] = []
479
 
 
480
 
        # A dictionary of MessageID strings/xref lists
481
 
        self.dbm['Message-IDs'] = dirdbm.Shelf(os.path.join(self.path, 'Message-IDs'))
482
 
 
483
 
 
484
 
    def addGroup(self, name, flags):
485
 
        self.dbm['groups'][name] = Group(name, flags)
486
 
 
487
 
 
488
 
    def addSubscription(self, name):
489
 
        self.dbm['subscriptions'] = self.dbm['subscriptions'] + [name]
490
 
 
491
 
 
492
 
    def addModerator(self, group, email):
493
 
        self.dbm['moderators'][group] = email
494
 
 
495
 
 
496
 
    def listRequest(self):
497
 
        result = []
498
 
        for g in self.dbm['groups'].values():
499
 
            result.append((g.name, g.maxArticle, g.minArticle, g.flags))
500
 
        return defer.succeed(result)
501
 
 
502
 
 
503
 
    def subscriptionRequest(self):
504
 
        return defer.succeed(self.dbm['subscriptions'])
505
 
    
506
 
    
507
 
    def getModerator(self, groups):
508
 
        # first see if any groups are moderated.  if so, nothing gets posted,
509
 
        # but the whole messages gets forwarded to the moderator address
510
 
        for group in groups:
511
 
            try:
512
 
                return self.dbm['moderators'][group]
513
 
            except KeyError:
514
 
                pass
515
 
        return None
516
 
 
517
 
 
518
 
    def notifyModerator(self, moderator, article):
519
 
        # Moderated postings go through as long as they have an Approved
520
 
        # header, regardless of what the value is
521
 
        print 'To is ', moderator
522
 
        article.putHeader('To', moderator)
523
 
        return smtp.sendEmail(
524
 
            self.mailhost,
525
 
            'twisted-news@' + socket.gethostname(),
526
 
            moderator,
527
 
            article.body,
528
 
            dict(article.headers.values())
529
 
        )
530
 
 
531
 
 
532
 
    def postRequest(self, message):
533
 
        cleave = message.find('\r\n\r\n')
534
 
        headers, article = message[:cleave], message[cleave + 4:]
535
 
        
536
 
        article = Article(headers, article)
537
 
        groups = article.getHeader('Newsgroups').split()
538
 
        xref = []
539
 
        
540
 
        # Check for moderated status
541
 
        moderator = self.getModerator(groups)
542
 
        if moderator and not article.getHeader('Approved'):
543
 
            return self.notifyModerator(moderator, article)
544
 
        
545
 
        
546
 
        for group in groups:
547
 
            try:
548
 
                g = self.dbm['groups'][group]
549
 
            except KeyError:
550
 
                pass
551
 
            else:
552
 
                index = g.maxArticle + 1
553
 
                g.maxArticle += 1
554
 
                g.articles[index] = article
555
 
                xref.append((group, str(index)))
556
 
                self.dbm['groups'][group] = g
557
 
 
558
 
        if not xref:
559
 
            return defer.fail(NewsServerError("No groups carried: " + ' '.join(groups)))
560
 
 
561
 
        article.putHeader('Xref', '%s %s' % (socket.gethostname().split()[0], ' '.join(map(lambda x: ':'.join(x), xref))))
562
 
        self.dbm['Message-IDs'][article.getHeader('Message-ID')] = xref
563
 
        return defer.succeed(None)
564
 
 
565
 
 
566
 
    def overviewRequest(self):
567
 
        return defer.succeed(OVERVIEW_FMT)
568
 
 
569
 
 
570
 
    def xoverRequest(self, group, low, high):
571
 
        if not self.dbm['groups'].has_key(group):
572
 
            return defer.succeed([])
573
 
        
574
 
        if low is None:
575
 
            low = 0
576
 
        if high is None:
577
 
            high = self.dbm['groups'][group].maxArticle
578
 
        r = []
579
 
        for i in range(low, high + 1):
580
 
            if self.dbm['groups'][group].articles.has_key(i):
581
 
                r.append([str(i)] + self.dbm['groups'][group].articles[i].overview())
582
 
        return defer.succeed(r)
583
 
 
584
 
 
585
 
    def xhdrRequest(self, group, low, high, header):
586
 
        if group not in self.dbm['groups']:
587
 
            return defer.succeed([])
588
 
        
589
 
        if low is None:
590
 
            low = 0
591
 
        if high is None:
592
 
            high = self.dbm['groups'][group].maxArticle
593
 
        r = []
594
 
        for i in range(low, high + 1):
595
 
            if self.dbm['groups'][group].articles.has_key(i):
596
 
                r.append((i, self.dbm['groups'][group].articles[i].getHeader(header)))
597
 
        return defer.succeed(r)
598
 
 
599
 
 
600
 
    def listGroupRequest(self, group):
601
 
        if self.dbm['groups'].has_key(group):
602
 
            return defer.succeed((group, self.dbm['groups'][group].articles.keys()))
603
 
        return defer.fail(NewsServerError("No such group: " + group))
604
 
 
605
 
 
606
 
    def groupRequest(self, group):
607
 
        try:
608
 
            g = self.dbm['groups'][group]
609
 
        except KeyError:
610
 
            return defer.fail(NewsServerError("No such group: " + group))
611
 
        else:
612
 
            flags = g.flags
613
 
            low = g.minArticle
614
 
            high = g.maxArticle
615
 
            num = high - low + 1
616
 
            return defer.succeed((group, num, high, low, flags))
617
 
 
618
 
 
619
 
    def articleExistsRequest(self, id):
620
 
        return defer.succeed(id in self.dbm['Message-IDs'])
621
 
    
622
 
    
623
 
    def articleRequest(self, group, index, id = None):
624
 
        if id is not None:
625
 
            try:
626
 
                xref = self.dbm['Message-IDs'][id]
627
 
            except KeyError:
628
 
                return defer.fail(NewsServerError("No such article: " + id))
629
 
            else:
630
 
                group, index = xref[0]
631
 
                index = int(index)
632
 
        
633
 
        try:
634
 
            a = self.dbm['groups'][group].articles[index]
635
 
        except KeyError:
636
 
            return defer.fail(NewsServerError("No such group: " + group))
637
 
        else:
638
 
            return defer.succeed((
639
 
                index,
640
 
                a.getHeader('Message-ID'),
641
 
                StringIO.StringIO(a.textHeaders() + '\r\n' + a.body)
642
 
            ))
643
 
    
644
 
    
645
 
    def headRequest(self, group, index, id = None):
646
 
        if id is not None:
647
 
            try:
648
 
                xref = self.dbm['Message-IDs'][id]
649
 
            except KeyError:
650
 
                return defer.fail(NewsServerError("No such article: " + id))
651
 
            else:
652
 
                group, index = xref[0]
653
 
                index = int(index)
654
 
        
655
 
        try:
656
 
            a = self.dbm['groups'][group].articles[index]
657
 
        except KeyError:
658
 
            return defer.fail(NewsServerError("No such group: " + group))
659
 
        else:
660
 
            return defer.succeed((index, a.getHeader('Message-ID'), a.textHeaders()))
661
 
 
662
 
 
663
 
    def bodyRequest(self, group, index, id = None):
664
 
        if id is not None:
665
 
            try:
666
 
                xref = self.dbm['Message-IDs'][id]
667
 
            except KeyError:
668
 
                return defer.fail(NewsServerError("No such article: " + id))
669
 
            else:
670
 
                group, index = xref[0]
671
 
                index = int(index)
672
 
        
673
 
        try:
674
 
            a = self.dbm['groups'][group].articles[index]
675
 
        except KeyError:
676
 
            return defer.fail(NewsServerError("No such group: " + group))
677
 
        else:
678
 
            return defer.succeed((index, a.getHeader('Message-ID'), StringIO.StringIO(a.body)))
679
 
 
680
 
 
681
 
class NewsStorageAugmentation:
682
 
    """
683
 
    A NewsStorage implementation using Twisted's asynchronous DB-API
684
 
    """
685
 
 
686
 
    implements(INewsStorage)
687
 
 
688
 
    schema = """
689
 
 
690
 
    CREATE TABLE groups (
691
 
        group_id      SERIAL,
692
 
        name          VARCHAR(80) NOT NULL,
693
 
        
694
 
        flags         INTEGER DEFAULT 0 NOT NULL
695
 
    );
696
 
 
697
 
    CREATE UNIQUE INDEX group_id_index ON groups (group_id);
698
 
    CREATE UNIQUE INDEX name_id_index ON groups (name);
699
 
 
700
 
    CREATE TABLE articles (
701
 
        article_id    SERIAL,
702
 
        message_id    TEXT,
703
 
        
704
 
        header        TEXT,
705
 
        body          TEXT
706
 
    );
707
 
 
708
 
    CREATE UNIQUE INDEX article_id_index ON articles (article_id);
709
 
    CREATE UNIQUE INDEX article_message_index ON articles (message_id);
710
 
 
711
 
    CREATE TABLE postings (
712
 
        group_id      INTEGER,
713
 
        article_id    INTEGER,
714
 
        article_index INTEGER NOT NULL
715
 
    );
716
 
 
717
 
    CREATE UNIQUE INDEX posting_article_index ON postings (article_id);
718
 
 
719
 
    CREATE TABLE subscriptions (
720
 
        group_id    INTEGER
721
 
    );
722
 
    
723
 
    CREATE TABLE overview (
724
 
        header      TEXT
725
 
    );
726
 
    """
727
 
    
728
 
    def __init__(self, info):
729
 
        self.info = info
730
 
        self.dbpool = adbapi.ConnectionPool(**self.info)
731
 
        
732
 
 
733
 
    def __setstate__(self, state):
734
 
        self.__dict__ = state
735
 
        self.info['password'] = getpass.getpass('Database password for %s: ' % (self.info['user'],))
736
 
        self.dbpool = adbapi.ConnectionPool(**self.info)
737
 
        del self.info['password']
738
 
 
739
 
 
740
 
    def listRequest(self):
741
 
        # COALESCE may not be totally portable
742
 
        # it is shorthand for
743
 
        # CASE WHEN (first parameter) IS NOT NULL then (first parameter) ELSE (second parameter) END
744
 
        sql = """
745
 
            SELECT groups.name,
746
 
                COALESCE(MAX(postings.article_index), 0),
747
 
                COALESCE(MIN(postings.article_index), 0),
748
 
                groups.flags
749
 
            FROM groups LEFT OUTER JOIN postings
750
 
            ON postings.group_id = groups.group_id
751
 
            GROUP BY groups.name, groups.flags
752
 
            ORDER BY groups.name
753
 
        """
754
 
        return self.dbpool.runQuery(sql)
755
 
 
756
 
 
757
 
    def subscriptionRequest(self):
758
 
        sql = """
759
 
            SELECT groups.name FROM groups,subscriptions WHERE groups.group_id = subscriptions.group_id
760
 
        """
761
 
        return self.dbpool.runQuery(sql)
762
 
 
763
 
 
764
 
    def postRequest(self, message):
765
 
        cleave = message.find('\r\n\r\n')
766
 
        headers, article = message[:cleave], message[cleave + 4:]
767
 
        article = Article(headers, article)
768
 
        return self.dbpool.runInteraction(self._doPost, article)
769
 
 
770
 
 
771
 
    def _doPost(self, transaction, article):
772
 
        # Get the group ids
773
 
        groups = article.getHeader('Newsgroups').split()
774
 
        if not len(groups):
775
 
            raise NNTPError('Missing Newsgroups header')
776
 
 
777
 
        sql = """
778
 
            SELECT name, group_id FROM groups
779
 
            WHERE name IN (%s)
780
 
        """ % (', '.join([("'%s'" % (adbapi.safe(group),)) for group in groups]),)
781
 
        
782
 
        transaction.execute(sql)
783
 
        result = transaction.fetchall()
784
 
        
785
 
        # No relevant groups, bye bye!
786
 
        if not len(result):
787
 
            raise NNTPError('None of groups in Newsgroup header carried')
788
 
        
789
 
        # Got some groups, now find the indices this article will have in each
790
 
        sql = """
791
 
            SELECT groups.group_id, COALESCE(MAX(postings.article_index), 0) + 1
792
 
            FROM groups LEFT OUTER JOIN postings
793
 
            ON postings.group_id = groups.group_id
794
 
            WHERE groups.group_id IN (%s)
795
 
            GROUP BY groups.group_id
796
 
        """ % (', '.join([("%d" % (id,)) for (group, id) in result]),)
797
 
 
798
 
        transaction.execute(sql)
799
 
        indices = transaction.fetchall()
800
 
 
801
 
        if not len(indices):
802
 
            raise NNTPError('Internal server error - no indices found')
803
 
        
804
 
        # Associate indices with group names
805
 
        gidToName = dict([(b, a) for (a, b) in result])
806
 
        gidToIndex = dict(indices)
807
 
        
808
 
        nameIndex = []
809
 
        for i in gidToName:
810
 
            nameIndex.append((gidToName[i], gidToIndex[i]))
811
 
        
812
 
        # Build xrefs
813
 
        xrefs = socket.gethostname().split()[0]
814
 
        xrefs = xrefs + ' ' + ' '.join([('%s:%d' % (group, id)) for (group, id) in nameIndex])
815
 
        article.putHeader('Xref', xrefs)
816
 
        
817
 
        # Hey!  The article is ready to be posted!  God damn f'in finally.
818
 
        sql = """
819
 
            INSERT INTO articles (message_id, header, body)
820
 
            VALUES ('%s', '%s', '%s')
821
 
        """ % (
822
 
            adbapi.safe(article.getHeader('Message-ID')),
823
 
            adbapi.safe(article.textHeaders()),
824
 
            adbapi.safe(article.body)
825
 
        )
826
 
        
827
 
        transaction.execute(sql)
828
 
        
829
 
        # Now update the posting to reflect the groups to which this belongs
830
 
        for gid in gidToName:
831
 
            sql = """
832
 
                INSERT INTO postings (group_id, article_id, article_index)
833
 
                VALUES (%d, (SELECT last_value FROM articles_article_id_seq), %d)
834
 
            """ % (gid, gidToIndex[gid])
835
 
            transaction.execute(sql)
836
 
        
837
 
        return len(nameIndex)
838
 
 
839
 
 
840
 
    def overviewRequest(self):
841
 
        sql = """
842
 
            SELECT header FROM overview
843
 
        """
844
 
        return self.dbpool.runQuery(sql).addCallback(lambda result: [header[0] for header in result])
845
 
 
846
 
 
847
 
    def xoverRequest(self, group, low, high):
848
 
        sql = """
849
 
            SELECT postings.article_index, articles.header
850
 
            FROM articles,postings,groups
851
 
            WHERE postings.group_id = groups.group_id
852
 
            AND groups.name = '%s'
853
 
            AND postings.article_id = articles.article_id
854
 
            %s
855
 
            %s
856
 
        """ % (
857
 
            adbapi.safe(group),
858
 
            low is not None and "AND postings.article_index >= %d" % (low,) or "",
859
 
            high is not None and "AND postings.article_index <= %d" % (high,) or ""
860
 
        )
861
 
 
862
 
        return self.dbpool.runQuery(sql).addCallback(
863
 
            lambda results: [
864
 
                [id] + Article(header, None).overview() for (id, header) in results
865
 
            ]
866
 
        )
867
 
 
868
 
 
869
 
    def xhdrRequest(self, group, low, high, header):
870
 
        sql = """
871
 
            SELECT articles.header
872
 
            FROM groups,postings,articles
873
 
            WHERE groups.name = '%s' AND postings.group_id = groups.group_id
874
 
            AND postings.article_index >= %d
875
 
            AND postings.article_index <= %d
876
 
        """ % (adbapi.safe(group), low, high)
877
 
 
878
 
        return self.dbpool.runQuery(sql).addCallback(
879
 
            lambda results: [
880
 
                (i, Article(h, None).getHeader(h)) for (i, h) in results
881
 
            ]
882
 
        )
883
 
 
884
 
 
885
 
    def listGroupRequest(self, group):
886
 
        sql = """
887
 
            SELECT postings.article_index FROM postings,groups
888
 
            WHERE postings.group_id = groups.group_id
889
 
            AND groups.name = '%s'
890
 
        """ % (adbapi.safe(group),)
891
 
        
892
 
        return self.dbpool.runQuery(sql).addCallback(
893
 
            lambda results, group = group: (group, [res[0] for res in results])
894
 
        )
895
 
 
896
 
 
897
 
    def groupRequest(self, group): 
898
 
        sql = """
899
 
            SELECT groups.name,
900
 
                COUNT(postings.article_index),
901
 
                COALESCE(MAX(postings.article_index), 0),
902
 
                COALESCE(MIN(postings.article_index), 0),
903
 
                groups.flags
904
 
            FROM groups LEFT OUTER JOIN postings
905
 
            ON postings.group_id = groups.group_id
906
 
            WHERE groups.name = '%s'
907
 
            GROUP BY groups.name, groups.flags
908
 
        """ % (adbapi.safe(group),)
909
 
        
910
 
        return self.dbpool.runQuery(sql).addCallback(
911
 
            lambda results: tuple(results[0])
912
 
        )
913
 
 
914
 
 
915
 
    def articleExistsRequest(self, id):
916
 
        sql = """
917
 
            SELECT COUNT(message_id) FROM articles
918
 
            WHERE message_id = '%s'
919
 
        """ % (adbapi.safe(id),)
920
 
        
921
 
        return self.dbpool.runQuery(sql).addCallback(
922
 
            lambda result: bool(result[0][0])
923
 
        )
924
 
 
925
 
 
926
 
    def articleRequest(self, group, index, id = None):
927
 
        if id is not None:
928
 
            sql = """
929
 
                SELECT postings.article_index, articles.message_id, articles.header, articles.body
930
 
                FROM groups,postings LEFT OUTER JOIN articles
931
 
                ON articles.message_id = '%s'
932
 
                WHERE groups.name = '%s'
933
 
                AND groups.group_id = postings.group_id
934
 
            """ % (adbapi.safe(id), adbapi.safe(group))
935
 
        else:
936
 
            sql = """ 
937
 
                SELECT postings.article_index, articles.message_id, articles.header, articles.body
938
 
                FROM groups,articles LEFT OUTER JOIN postings
939
 
                ON postings.article_id = articles.article_id
940
 
                WHERE postings.article_index = %d
941
 
                AND postings.group_id = groups.group_id
942
 
                AND groups.name = '%s'
943
 
            """ % (index, adbapi.safe(group))
944
 
 
945
 
        return self.dbpool.runQuery(sql).addCallback(
946
 
            lambda result: (
947
 
                result[0][0],
948
 
                result[0][1],
949
 
                StringIO.StringIO(result[0][2] + '\r\n' + result[0][3])
950
 
            )
951
 
        )
952
 
 
953
 
 
954
 
    def headRequest(self, group, index):
955
 
        sql = """
956
 
            SELECT postings.article_index, articles.message_id, articles.header
957
 
            FROM groups,articles LEFT OUTER JOIN postings
958
 
            ON postings.article_id = articles.article_id
959
 
            WHERE postings.article_index = %d
960
 
            AND postings.group_id = groups.group_id
961
 
            AND groups.name = '%s'
962
 
        """ % (index, adbapi.safe(group))
963
 
        
964
 
        return self.dbpool.runQuery(sql).addCallback(lambda result: result[0])
965
 
 
966
 
 
967
 
    def bodyRequest(self, group, index):
968
 
        sql = """
969
 
            SELECT postings.article_index, articles.message_id, articles.body
970
 
            FROM groups,articles LEFT OUTER JOIN postings
971
 
            ON postings.article_id = articles.article_id
972
 
            WHERE postings.article_index = %d
973
 
            AND postings.group_id = groups.group_id
974
 
            AND groups.name = '%s'
975
 
        """ % (index, adbapi.safe(group))
976
 
        
977
 
        return self.dbpool.runQuery(sql).addCallback(
978
 
            lambda result: result[0]
979
 
        ).addCallback(
980
 
            lambda (index, id, body): (index, id, StringIO.StringIO(body))
981
 
        )
982
 
 
983
 
####
984
 
#### XXX - make these static methods some day
985
 
####
986
 
def makeGroupSQL(groups):
987
 
    res = ''
988
 
    for g in groups:
989
 
        res = res + """\n    INSERT INTO groups (name) VALUES ('%s');\n""" % (adbapi.safe(g),)
990
 
    return res
991
 
 
992
 
 
993
 
def makeOverviewSQL():
994
 
    res = ''
995
 
    for o in OVERVIEW_FMT:
996
 
        res = res + """\n    INSERT INTO overview (header) VALUES ('%s');\n""" % (adbapi.safe(o),)
997
 
    return res