1
# -*- test-case-name: twisted.news.test.test_news -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
7
News server backend implementations
9
Maintainer: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>}
11
Stability: semi-stable
13
Future Plans: A PyFramer-based backend and a new backend interface that is
18
from __future__ import nested_scopes
20
from twisted.news.nntp import NNTPError
21
from twisted.mail import smtp
22
from twisted.internet import defer
23
from twisted.enterprise import adbapi
24
from twisted.persisted import dirdbm
26
import getpass, pickle, time, socket, md5
29
from zope.interface import implements, Interface
32
ERR_NOGROUP, ERR_NOARTICLE = range(2, 4) # XXX - put NNTP values here (I guess?)
35
'Subject', 'From', 'Date', 'Message-ID', 'References',
36
'Bytes', 'Lines', 'Xref'
39
def hexdigest(md5): #XXX: argh. 1.5.2 doesn't have this.
40
return ''.join(map(lambda x: hex(ord(x))[2:], md5.digest()))
43
def __init__(self, head, body):
47
for line in head.split('\r\n'):
49
i = list(self.headers[header])
52
i = line.split(': ', 1)
54
self.headers[header] = tuple(i)
56
if not self.getHeader('Message-ID'):
57
s = str(time.time()) + self.body
58
id = hexdigest(md5.md5(s)) + '@' + socket.gethostname()
59
self.putHeader('Message-ID', '<%s>' % id)
61
if not self.getHeader('Bytes'):
62
self.putHeader('Bytes', str(len(self.body)))
64
if not self.getHeader('Lines'):
65
self.putHeader('Lines', str(self.body.count('\n')))
67
if not self.getHeader('Date'):
68
self.putHeader('Date', time.ctime(time.time()))
71
def getHeader(self, header):
73
if self.headers.has_key(h):
74
return self.headers[h][1]
79
def putHeader(self, header, value):
80
self.headers[header.lower()] = (header, value)
83
def textHeaders(self):
85
for i in self.headers.values():
86
headers.append('%s: %s' % i)
87
return '\r\n'.join(headers) + '\r\n'
91
for i in OVERVIEW_FMT:
92
xover.append(self.getHeader(i))
96
class NewsServerError(Exception):
100
class INewsStorage(Interface):
102
An interface for storing and requesting news articles
107
Returns a deferred whose callback will be passed a list of 4-tuples
108
containing (name, max index, min index, flags) for each news group
112
def subscriptionRequest():
114
Returns a deferred whose callback will be passed the list of
115
recommended subscription groups for new server users
119
def postRequest(message):
121
Returns a deferred whose callback will be invoked if 'message'
122
is successfully posted to one or more specified groups and
123
whose errback will be invoked otherwise.
127
def overviewRequest():
129
Returns a deferred whose callback will be passed the a list of
130
headers describing this server's overview format.
134
def xoverRequest(group, low, high):
136
Returns a deferred whose callback will be passed a list of xover
137
headers for the given group over the given range. If low is None,
138
the range starts at the first article. If high is None, the range
139
ends at the last article.
143
def xhdrRequest(group, low, high, header):
145
Returns a deferred whose callback will be passed a list of XHDR data
146
for the given group over the given range. If low is None,
147
the range starts at the first article. If high is None, the range
148
ends at the last article.
152
def listGroupRequest(group):
154
Returns a deferred whose callback will be passed a two-tuple of
155
(group name, [article indices])
159
def groupRequest(group):
161
Returns a deferred whose callback will be passed a five-tuple of
162
(group name, article count, highest index, lowest index, group flags)
166
def articleExistsRequest(id):
168
Returns a deferred whose callback will be passed with a true value
169
if a message with the specified Message-ID exists in the database
170
and with a false value otherwise.
174
def articleRequest(group, index, id = None):
176
Returns a deferred whose callback will be passed a file-like object
177
containing the full article text (headers and body) for the article
178
of the specified index in the specified group, and whose errback
179
will be invoked if the article or group does not exist. If id is
180
not None, index is ignored and the article with the given Message-ID
181
will be returned instead, along with its index in the specified
186
def headRequest(group, index):
188
Returns a deferred whose callback will be passed the header for
189
the article of the specified index in the specified group, and
190
whose errback will be invoked if the article or group does not
195
def bodyRequest(group, index):
197
Returns a deferred whose callback will be passed the body for
198
the article of the specified index in the specified group, and
199
whose errback will be invoked if the article or group does not
205
Backwards compatibility class -- There is no reason to inherit from this,
206
just implement INewsStorage instead.
208
def listRequest(self):
209
raise NotImplementedError()
210
def subscriptionRequest(self):
211
raise NotImplementedError()
212
def postRequest(self, message):
213
raise NotImplementedError()
214
def overviewRequest(self):
215
return defer.succeed(OVERVIEW_FMT)
216
def xoverRequest(self, group, low, high):
217
raise NotImplementedError()
218
def xhdrRequest(self, group, low, high, header):
219
raise NotImplementedError()
220
def listGroupRequest(self, group):
221
raise NotImplementedError()
222
def groupRequest(self, group):
223
raise NotImplementedError()
224
def articleExistsRequest(self, id):
225
raise NotImplementedError()
226
def articleRequest(self, group, index, id = None):
227
raise NotImplementedError()
228
def headRequest(self, group, index):
229
raise NotImplementedError()
230
def bodyRequest(self, group, index):
231
raise NotImplementedError()
235
"""A trivial NewsStorage implementation using pickles
237
Contains numerous flaws and is generally unsuitable for any
238
real applications. Consider yourself warned!
241
implements(INewsStorage)
245
def __init__(self, filename, groups = None, moderators = ()):
246
self.datafile = filename
247
self.load(filename, groups, moderators)
250
def getModerators(self, groups):
251
# first see if any groups are moderated. if so, nothing gets posted,
252
# but the whole messages gets forwarded to the moderator address
255
moderators.append(self.db['moderators'].get(group, None))
256
return filter(None, moderators)
259
def notifyModerators(self, moderators, article):
260
# Moderated postings go through as long as they have an Approved
261
# header, regardless of what the value is
262
article.putHeader('To', ', '.join(moderators))
263
return smtp.sendEmail(
264
'twisted@' + socket.gethostname(),
267
dict(article.headers.values())
271
def listRequest(self):
272
"Returns a list of 4-tuples: (name, max index, min index, flags)"
273
l = self.db['groups']
276
if len(self.db[i].keys()):
277
low = min(self.db[i].keys())
278
high = max(self.db[i].keys()) + 1
281
if self.db['moderators'].has_key(i):
285
r.append((i, high, low, flags))
286
return defer.succeed(r)
288
def subscriptionRequest(self):
289
return defer.succeed(['alt.test'])
291
def postRequest(self, message):
292
cleave = message.find('\r\n\r\n')
293
headers, article = message[:cleave], message[cleave + 4:]
295
a = Article(headers, article)
296
groups = a.getHeader('Newsgroups').split()
299
# Check moderated status
300
moderators = self.getModerators(groups)
301
if moderators and not a.getHeader('Approved'):
302
return self.notifyModerators(moderators, a)
305
if self.db.has_key(group):
306
if len(self.db[group].keys()):
307
index = max(self.db[group].keys()) + 1
310
xref.append((group, str(index)))
311
self.db[group][index] = a
314
return defer.fail(None)
316
a.putHeader('Xref', '%s %s' % (
317
socket.gethostname().split()[0],
318
''.join(map(lambda x: ':'.join(x), xref))
322
return defer.succeed(None)
325
def overviewRequest(self):
326
return defer.succeed(OVERVIEW_FMT)
329
def xoverRequest(self, group, low, high):
330
if not self.db.has_key(group):
331
return defer.succeed([])
333
for i in self.db[group].keys():
334
if (low is None or i >= low) and (high is None or i <= high):
335
r.append([str(i)] + self.db[group][i].overview())
336
return defer.succeed(r)
339
def xhdrRequest(self, group, low, high, header):
340
if not self.db.has_key(group):
341
return defer.succeed([])
343
for i in self.db[group].keys():
344
if low is None or i >= low and high is None or i <= high:
345
r.append((i, self.db[group][i].getHeader(header)))
346
return defer.succeed(r)
349
def listGroupRequest(self, group):
350
if self.db.has_key(group):
351
return defer.succeed((group, self.db[group].keys()))
353
return defer.fail(None)
355
def groupRequest(self, group):
356
if self.db.has_key(group):
357
if len(self.db[group].keys()):
358
num = len(self.db[group].keys())
359
low = min(self.db[group].keys())
360
high = max(self.db[group].keys())
364
return defer.succeed((group, num, high, low, flags))
366
return defer.fail(ERR_NOGROUP)
369
def articleExistsRequest(self, id):
370
for g in self.db.values():
372
if a.getHeader('Message-ID') == id:
373
return defer.succeed(1)
374
return defer.succeed(0)
377
def articleRequest(self, group, index, id = None):
379
raise NotImplementedError
381
if self.db.has_key(group):
382
if self.db[group].has_key(index):
383
a = self.db[group][index]
384
return defer.succeed((
386
a.getHeader('Message-ID'),
387
StringIO.StringIO(a.textHeaders() + '\r\n' + a.body)
390
return defer.fail(ERR_NOARTICLE)
392
return defer.fail(ERR_NOGROUP)
395
def headRequest(self, group, index):
396
if self.db.has_key(group):
397
if self.db[group].has_key(index):
398
a = self.db[group][index]
399
return defer.succeed((index, a.getHeader('Message-ID'), a.textHeaders()))
401
return defer.fail(ERR_NOARTICLE)
403
return defer.fail(ERR_NOGROUP)
406
def bodyRequest(self, group, index):
407
if self.db.has_key(group):
408
if self.db[group].has_key(index):
409
a = self.db[group][index]
410
return defer.succeed((index, a.getHeader('Message-ID'), StringIO.StringIO(a.body)))
412
return defer.fail(ERR_NOARTICLE)
414
return defer.fail(ERR_NOGROUP)
418
pickle.dump(self.db, open(self.datafile, 'w'))
421
def load(self, filename, groups = None, moderators = ()):
422
if PickleStorage.sharedDBs.has_key(filename):
423
self.db = PickleStorage.sharedDBs[filename]
426
self.db = pickle.load(open(filename))
427
PickleStorage.sharedDBs[filename] = self.db
429
self.db = PickleStorage.sharedDBs[filename] = {}
430
self.db['groups'] = groups
431
if groups is not None:
434
self.db['moderators'] = dict(moderators)
445
def __init__(self, name, flags = 'y'):
453
A NewStorage implementation using Twisted's dirdbm persistence module.
456
implements(INewsStorage)
458
def __init__(self, mailhost, path):
460
self.mailhost = mailhost
462
if not os.path.exists(path):
465
self.dbm = dirdbm.Shelf(os.path.join(path, "newsshelf"))
466
if not len(self.dbm.keys()):
470
def initialize(self):
471
# A dictionary of group name/Group instance items
472
self.dbm['groups'] = dirdbm.Shelf(os.path.join(self.path, 'groups'))
474
# A dictionary of group name/email address
475
self.dbm['moderators'] = dirdbm.Shelf(os.path.join(self.path, 'moderators'))
477
# A list of group names
478
self.dbm['subscriptions'] = []
480
# A dictionary of MessageID strings/xref lists
481
self.dbm['Message-IDs'] = dirdbm.Shelf(os.path.join(self.path, 'Message-IDs'))
484
def addGroup(self, name, flags):
485
self.dbm['groups'][name] = Group(name, flags)
488
def addSubscription(self, name):
489
self.dbm['subscriptions'] = self.dbm['subscriptions'] + [name]
492
def addModerator(self, group, email):
493
self.dbm['moderators'][group] = email
496
def listRequest(self):
498
for g in self.dbm['groups'].values():
499
result.append((g.name, g.maxArticle, g.minArticle, g.flags))
500
return defer.succeed(result)
503
def subscriptionRequest(self):
504
return defer.succeed(self.dbm['subscriptions'])
507
def getModerator(self, groups):
508
# first see if any groups are moderated. if so, nothing gets posted,
509
# but the whole messages gets forwarded to the moderator address
512
return self.dbm['moderators'][group]
518
def notifyModerator(self, moderator, article):
519
# Moderated postings go through as long as they have an Approved
520
# header, regardless of what the value is
521
print 'To is ', moderator
522
article.putHeader('To', moderator)
523
return smtp.sendEmail(
525
'twisted-news@' + socket.gethostname(),
528
dict(article.headers.values())
532
def postRequest(self, message):
533
cleave = message.find('\r\n\r\n')
534
headers, article = message[:cleave], message[cleave + 4:]
536
article = Article(headers, article)
537
groups = article.getHeader('Newsgroups').split()
540
# Check for moderated status
541
moderator = self.getModerator(groups)
542
if moderator and not article.getHeader('Approved'):
543
return self.notifyModerator(moderator, article)
548
g = self.dbm['groups'][group]
552
index = g.maxArticle + 1
554
g.articles[index] = article
555
xref.append((group, str(index)))
556
self.dbm['groups'][group] = g
559
return defer.fail(NewsServerError("No groups carried: " + ' '.join(groups)))
561
article.putHeader('Xref', '%s %s' % (socket.gethostname().split()[0], ' '.join(map(lambda x: ':'.join(x), xref))))
562
self.dbm['Message-IDs'][article.getHeader('Message-ID')] = xref
563
return defer.succeed(None)
566
def overviewRequest(self):
567
return defer.succeed(OVERVIEW_FMT)
570
def xoverRequest(self, group, low, high):
571
if not self.dbm['groups'].has_key(group):
572
return defer.succeed([])
577
high = self.dbm['groups'][group].maxArticle
579
for i in range(low, high + 1):
580
if self.dbm['groups'][group].articles.has_key(i):
581
r.append([str(i)] + self.dbm['groups'][group].articles[i].overview())
582
return defer.succeed(r)
585
def xhdrRequest(self, group, low, high, header):
586
if group not in self.dbm['groups']:
587
return defer.succeed([])
592
high = self.dbm['groups'][group].maxArticle
594
for i in range(low, high + 1):
595
if self.dbm['groups'][group].articles.has_key(i):
596
r.append((i, self.dbm['groups'][group].articles[i].getHeader(header)))
597
return defer.succeed(r)
600
def listGroupRequest(self, group):
601
if self.dbm['groups'].has_key(group):
602
return defer.succeed((group, self.dbm['groups'][group].articles.keys()))
603
return defer.fail(NewsServerError("No such group: " + group))
606
def groupRequest(self, group):
608
g = self.dbm['groups'][group]
610
return defer.fail(NewsServerError("No such group: " + group))
616
return defer.succeed((group, num, high, low, flags))
619
def articleExistsRequest(self, id):
620
return defer.succeed(id in self.dbm['Message-IDs'])
623
def articleRequest(self, group, index, id = None):
626
xref = self.dbm['Message-IDs'][id]
628
return defer.fail(NewsServerError("No such article: " + id))
630
group, index = xref[0]
634
a = self.dbm['groups'][group].articles[index]
636
return defer.fail(NewsServerError("No such group: " + group))
638
return defer.succeed((
640
a.getHeader('Message-ID'),
641
StringIO.StringIO(a.textHeaders() + '\r\n' + a.body)
645
def headRequest(self, group, index, id = None):
648
xref = self.dbm['Message-IDs'][id]
650
return defer.fail(NewsServerError("No such article: " + id))
652
group, index = xref[0]
656
a = self.dbm['groups'][group].articles[index]
658
return defer.fail(NewsServerError("No such group: " + group))
660
return defer.succeed((index, a.getHeader('Message-ID'), a.textHeaders()))
663
def bodyRequest(self, group, index, id = None):
666
xref = self.dbm['Message-IDs'][id]
668
return defer.fail(NewsServerError("No such article: " + id))
670
group, index = xref[0]
674
a = self.dbm['groups'][group].articles[index]
676
return defer.fail(NewsServerError("No such group: " + group))
678
return defer.succeed((index, a.getHeader('Message-ID'), StringIO.StringIO(a.body)))
681
class NewsStorageAugmentation:
683
A NewsStorage implementation using Twisted's asynchronous DB-API
686
implements(INewsStorage)
690
CREATE TABLE groups (
692
name VARCHAR(80) NOT NULL,
694
flags INTEGER DEFAULT 0 NOT NULL
697
CREATE UNIQUE INDEX group_id_index ON groups (group_id);
698
CREATE UNIQUE INDEX name_id_index ON groups (name);
700
CREATE TABLE articles (
708
CREATE UNIQUE INDEX article_id_index ON articles (article_id);
709
CREATE UNIQUE INDEX article_message_index ON articles (message_id);
711
CREATE TABLE postings (
714
article_index INTEGER NOT NULL
717
CREATE UNIQUE INDEX posting_article_index ON postings (article_id);
719
CREATE TABLE subscriptions (
723
CREATE TABLE overview (
728
def __init__(self, info):
730
self.dbpool = adbapi.ConnectionPool(**self.info)
733
def __setstate__(self, state):
734
self.__dict__ = state
735
self.info['password'] = getpass.getpass('Database password for %s: ' % (self.info['user'],))
736
self.dbpool = adbapi.ConnectionPool(**self.info)
737
del self.info['password']
740
def listRequest(self):
741
# COALESCE may not be totally portable
742
# it is shorthand for
743
# CASE WHEN (first parameter) IS NOT NULL then (first parameter) ELSE (second parameter) END
746
COALESCE(MAX(postings.article_index), 0),
747
COALESCE(MIN(postings.article_index), 0),
749
FROM groups LEFT OUTER JOIN postings
750
ON postings.group_id = groups.group_id
751
GROUP BY groups.name, groups.flags
754
return self.dbpool.runQuery(sql)
757
def subscriptionRequest(self):
759
SELECT groups.name FROM groups,subscriptions WHERE groups.group_id = subscriptions.group_id
761
return self.dbpool.runQuery(sql)
764
def postRequest(self, message):
765
cleave = message.find('\r\n\r\n')
766
headers, article = message[:cleave], message[cleave + 4:]
767
article = Article(headers, article)
768
return self.dbpool.runInteraction(self._doPost, article)
771
def _doPost(self, transaction, article):
773
groups = article.getHeader('Newsgroups').split()
775
raise NNTPError('Missing Newsgroups header')
778
SELECT name, group_id FROM groups
780
""" % (', '.join([("'%s'" % (adbapi.safe(group),)) for group in groups]),)
782
transaction.execute(sql)
783
result = transaction.fetchall()
785
# No relevant groups, bye bye!
787
raise NNTPError('None of groups in Newsgroup header carried')
789
# Got some groups, now find the indices this article will have in each
791
SELECT groups.group_id, COALESCE(MAX(postings.article_index), 0) + 1
792
FROM groups LEFT OUTER JOIN postings
793
ON postings.group_id = groups.group_id
794
WHERE groups.group_id IN (%s)
795
GROUP BY groups.group_id
796
""" % (', '.join([("%d" % (id,)) for (group, id) in result]),)
798
transaction.execute(sql)
799
indices = transaction.fetchall()
802
raise NNTPError('Internal server error - no indices found')
804
# Associate indices with group names
805
gidToName = dict([(b, a) for (a, b) in result])
806
gidToIndex = dict(indices)
810
nameIndex.append((gidToName[i], gidToIndex[i]))
813
xrefs = socket.gethostname().split()[0]
814
xrefs = xrefs + ' ' + ' '.join([('%s:%d' % (group, id)) for (group, id) in nameIndex])
815
article.putHeader('Xref', xrefs)
817
# Hey! The article is ready to be posted! God damn f'in finally.
819
INSERT INTO articles (message_id, header, body)
820
VALUES ('%s', '%s', '%s')
822
adbapi.safe(article.getHeader('Message-ID')),
823
adbapi.safe(article.textHeaders()),
824
adbapi.safe(article.body)
827
transaction.execute(sql)
829
# Now update the posting to reflect the groups to which this belongs
830
for gid in gidToName:
832
INSERT INTO postings (group_id, article_id, article_index)
833
VALUES (%d, (SELECT last_value FROM articles_article_id_seq), %d)
834
""" % (gid, gidToIndex[gid])
835
transaction.execute(sql)
837
return len(nameIndex)
840
def overviewRequest(self):
842
SELECT header FROM overview
844
return self.dbpool.runQuery(sql).addCallback(lambda result: [header[0] for header in result])
847
def xoverRequest(self, group, low, high):
849
SELECT postings.article_index, articles.header
850
FROM articles,postings,groups
851
WHERE postings.group_id = groups.group_id
852
AND groups.name = '%s'
853
AND postings.article_id = articles.article_id
858
low is not None and "AND postings.article_index >= %d" % (low,) or "",
859
high is not None and "AND postings.article_index <= %d" % (high,) or ""
862
return self.dbpool.runQuery(sql).addCallback(
864
[id] + Article(header, None).overview() for (id, header) in results
869
def xhdrRequest(self, group, low, high, header):
871
SELECT articles.header
872
FROM groups,postings,articles
873
WHERE groups.name = '%s' AND postings.group_id = groups.group_id
874
AND postings.article_index >= %d
875
AND postings.article_index <= %d
876
""" % (adbapi.safe(group), low, high)
878
return self.dbpool.runQuery(sql).addCallback(
880
(i, Article(h, None).getHeader(h)) for (i, h) in results
885
def listGroupRequest(self, group):
887
SELECT postings.article_index FROM postings,groups
888
WHERE postings.group_id = groups.group_id
889
AND groups.name = '%s'
890
""" % (adbapi.safe(group),)
892
return self.dbpool.runQuery(sql).addCallback(
893
lambda results, group = group: (group, [res[0] for res in results])
897
def groupRequest(self, group):
900
COUNT(postings.article_index),
901
COALESCE(MAX(postings.article_index), 0),
902
COALESCE(MIN(postings.article_index), 0),
904
FROM groups LEFT OUTER JOIN postings
905
ON postings.group_id = groups.group_id
906
WHERE groups.name = '%s'
907
GROUP BY groups.name, groups.flags
908
""" % (adbapi.safe(group),)
910
return self.dbpool.runQuery(sql).addCallback(
911
lambda results: tuple(results[0])
915
def articleExistsRequest(self, id):
917
SELECT COUNT(message_id) FROM articles
918
WHERE message_id = '%s'
919
""" % (adbapi.safe(id),)
921
return self.dbpool.runQuery(sql).addCallback(
922
lambda result: bool(result[0][0])
926
def articleRequest(self, group, index, id = None):
929
SELECT postings.article_index, articles.message_id, articles.header, articles.body
930
FROM groups,postings LEFT OUTER JOIN articles
931
ON articles.message_id = '%s'
932
WHERE groups.name = '%s'
933
AND groups.group_id = postings.group_id
934
""" % (adbapi.safe(id), adbapi.safe(group))
937
SELECT postings.article_index, articles.message_id, articles.header, articles.body
938
FROM groups,articles LEFT OUTER JOIN postings
939
ON postings.article_id = articles.article_id
940
WHERE postings.article_index = %d
941
AND postings.group_id = groups.group_id
942
AND groups.name = '%s'
943
""" % (index, adbapi.safe(group))
945
return self.dbpool.runQuery(sql).addCallback(
949
StringIO.StringIO(result[0][2] + '\r\n' + result[0][3])
954
def headRequest(self, group, index):
956
SELECT postings.article_index, articles.message_id, articles.header
957
FROM groups,articles LEFT OUTER JOIN postings
958
ON postings.article_id = articles.article_id
959
WHERE postings.article_index = %d
960
AND postings.group_id = groups.group_id
961
AND groups.name = '%s'
962
""" % (index, adbapi.safe(group))
964
return self.dbpool.runQuery(sql).addCallback(lambda result: result[0])
967
def bodyRequest(self, group, index):
969
SELECT postings.article_index, articles.message_id, articles.body
970
FROM groups,articles LEFT OUTER JOIN postings
971
ON postings.article_id = articles.article_id
972
WHERE postings.article_index = %d
973
AND postings.group_id = groups.group_id
974
AND groups.name = '%s'
975
""" % (index, adbapi.safe(group))
977
return self.dbpool.runQuery(sql).addCallback(
978
lambda result: result[0]
980
lambda (index, id, body): (index, id, StringIO.StringIO(body))
984
#### XXX - make these static methods some day
986
def makeGroupSQL(groups):
989
res = res + """\n INSERT INTO groups (name) VALUES ('%s');\n""" % (adbapi.safe(g),)
993
def makeOverviewSQL():
995
for o in OVERVIEW_FMT:
996
res = res + """\n INSERT INTO overview (header) VALUES ('%s');\n""" % (adbapi.safe(o),)