~ubuntu-branches/ubuntu/utopic/gozerbot/utopic

« back to all changes in this revision

Viewing changes to debian/gozerbot/usr/lib/python2.5/site-packages/gozerplugs/plugs/collective.py

  • Committer: Bazaar Package Importer
  • Author(s): Jeremy Malcolm
  • Date: 2009-09-14 09:00:29 UTC
  • mfrom: (1.1.4 upstream) (3.1.5 sid)
  • Revision ID: james.westby@ubuntu.com-20090914090029-uval0ekt72kmklxw
Tags: 0.9.1.3-3
Changed dependency on python-setuptools to python-pkg-resources
(Closes: #546435) 

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# plugs/collective.py
2
 
#
3
 
4
 
 
5
 
""" the collective is a network of gozerbots connected by use of the bots 
6
 
 builtin webserver. nodes are urls pointing to these webservers .. this plugin
7
 
 provides the client side of the collective, to join the collective the
8
 
 webserver must be enabled and open for other connections (see WEBSERVER)
9
 
"""
10
 
 
11
 
__copyright__ = 'this file is in the public domain'
12
 
__depend__ = ['webserver', ]
13
 
__gendocfirst__ = ['coll-disable', 'coll-enable', 'coll-boot']
14
 
 
15
 
from gozerbot.generic import rlog, geturl, toenc, fromenc, waitforqueue, \
16
 
strippedtxt, handle_exception, now, geturl2
17
 
from gozerbot.rsslist import rsslist
18
 
from gozerbot.datadir import datadir
19
 
from gozerbot.pdod import Pdod
20
 
from gozerbot.persiststate import PersistState
21
 
from gozerbot.commands import cmnds
22
 
from gozerbot.plugins import plugins
23
 
from gozerbot.thr import start_new_thread
24
 
from gozerbot.plughelp import plughelp
25
 
from gozerbot.examples import examples
26
 
from gozerbot.dol import Dol
27
 
from gozerbot.config import config
28
 
from gozerbot.periodical import periodical
29
 
from gozerplugs.plugs.webserver import cfg as webcfg
30
 
from xml.sax.saxutils import unescape
31
 
from gozerbot.contrib.simplejson import loads
32
 
import Queue, time, socket, re, os
33
 
 
34
 
gotuuid = True
35
 
try:
36
 
    import uuid
37
 
except ImportError:
38
 
    gotuuid = False
39
 
 
40
 
plughelp.add('collective', 'query other gozerbots')
41
 
 
42
 
coll = PersistState(datadir + os.sep + 'collective')
43
 
coll.define('enable', 0)
44
 
coll.define('nodes', [])
45
 
coll.define('names', {})
46
 
coll.define('active', [])
47
 
coll.define('wait', 5)
48
 
 
49
 
waitre = re.compile(' wait (\d+)', re.I)
50
 
 
51
 
def getid(url):
52
 
    if gotuuid:
53
 
        return str(uuid.uuid3(uuid.NAMESPACE_URL, toenc(url)))
54
 
    else:
55
 
        return url
56
 
 
57
 
class CollNode(object):
58
 
 
59
 
    def __init__(self, name, url):
60
 
        self.name = name
61
 
        self.id = getid(url)
62
 
        self.url = url
63
 
        self.seen = time.time()
64
 
        self.regtime = time.time()
65
 
        self.filter = []
66
 
        self.catch = []
67
 
 
68
 
    def __str__(self):
69
 
        return "name=%s url=<%s> seen=%s" % (self.name, self.url, now())
70
 
        
71
 
    def cmnd(self, what, queue=None):
72
 
        if what.startswith('coll'):
73
 
            return
74
 
        try:
75
 
            what = re.sub('\s+', '+', what)
76
 
            data = geturl('%s/json?%s' % (self.url, what))
77
 
            result = loads(fromenc(data))
78
 
        except ValueError:
79
 
            # ValueError: No JSON object could be decoded
80
 
            result = ['decode error', ]
81
 
        except IOError, ex:
82
 
            result = [str(ex), ]
83
 
        except Exception, ex:
84
 
            handle_exception()
85
 
            rlog(10, 'collective', "can't fetch %s data: %s" % (self.url, \
86
 
str(ex)))
87
 
            return ['fetch error', ]
88
 
        if queue:
89
 
            queue.put((self.name, result))
90
 
        else:
91
 
            return result
92
 
 
93
 
    def ping(self):
94
 
        try:
95
 
            pongtxt = fromenc(geturl("%s/ping" % self.url))
96
 
        except IOError, ex:
97
 
            pongtxt = str(ex)
98
 
        except Exception, ex:
99
 
            pongtxt = ""
100
 
        return pongtxt
101
 
 
102
 
class Collective(object):
103
 
 
104
 
    def __init__(self):
105
 
        self.nodes = {}
106
 
        self.state = Pdod(datadir + os.sep + 'coll.state')
107
 
        self.startup = Pdod(datadir + os.sep + 'coll.startup')
108
 
        if not self.state.has_key('ignore'):
109
 
            self.state['ignore'] = []
110
 
        if not self.state.has_key('urls'):
111
 
            self.state['urls'] = {}
112
 
        if not self.state.has_key('names'):
113
 
            self.state['names'] = {}
114
 
        if not self.startup.has_key('start'):
115
 
            self.startup['start'] = {}
116
 
 
117
 
    def add(self, name, url):
118
 
        id = getid(url)
119
 
        if self.nodes.has_key(id):
120
 
            return self.nodes[id]
121
 
        self.nodes[id] = CollNode(name, url)
122
 
        self.state.set('urls', url, id)
123
 
        self.state.set('names', name,  id)
124
 
        self.persist(name, url)
125
 
        rlog(-10, 'collective', 'added %s node (%s) <%s>' % (name, id, url))
126
 
        return self.nodes[id]
127
 
 
128
 
    def start(self):
129
 
        for name, url in self.startup['start'].iteritems():
130
 
            self.add(name, url)
131
 
        start_new_thread(self.joinall, ())
132
 
 
133
 
    def persist(self, name, url):
134
 
        self.startup.set('start', name, url)
135
 
        self.startup.save()
136
 
 
137
 
    def get(self, id):
138
 
        try:
139
 
            return self.nodes[id]
140
 
        except KeyError:
141
 
            return
142
 
 
143
 
    def byname(self, name):
144
 
        id = self.state.get('names', name)
145
 
        if id:
146
 
            return self.get(id)
147
 
 
148
 
    def remove(self, id):
149
 
        target = self.get(id)
150
 
        if not target:
151
 
            return
152
 
        del self.state['urls'][target.url]
153
 
        del self.state['names'][target.name]
154
 
        del self.nodes[id]
155
 
 
156
 
    def unpersist(self, id):
157
 
        node = self.get(id)
158
 
        if not node:
159
 
            return
160
 
        try:
161
 
            del self.startup['start'][node.name]
162
 
            self.startup.save()
163
 
        except KeyError:
164
 
            pass
165
 
            
166
 
    def ignore(self, id):
167
 
        self.state['ignore'].append(id)
168
 
 
169
 
    def unignore(self, id):
170
 
        self.state.data['ignore'].remove(id)
171
 
        self.state.save()
172
 
 
173
 
    def stop(self):
174
 
        pass
175
 
 
176
 
    def cmndall(self, what, wait=5):
177
 
        total = {}
178
 
        queue = Queue.Queue()
179
 
        for id, node in self.nodes.iteritems():
180
 
            start_new_thread(node.cmnd, (what, queue))
181
 
        result = waitforqueue(queue, wait, maxitems=size())
182
 
        for res in result:
183
 
            total[res[0]] = res[1]
184
 
        return total
185
 
 
186
 
    def getnodes(self):
187
 
        return self.nodes.values()
188
 
 
189
 
    def getname(self, url):
190
 
        id = getid(url)
191
 
        return self.get(id).name
192
 
 
193
 
    def join(self, url=None):
194
 
        try:
195
 
            if url:
196
 
                result = geturl("%s/join?%s" % (url, \
197
 
webcfg.get('webport')))
198
 
            else:
199
 
                result = geturl('http://gozerbot.org:8088/join?%s' % \
200
 
webcfg.get('webport'))
201
 
        except Exception, ex:
202
 
            result = str(ex)
203
 
        result = result.strip()
204
 
        if 'added' in result:
205
 
            rlog(1, 'collective', "%s joined =>  %s" % (url, result))
206
 
        else:
207
 
            rlog(1, 'collective', "can't join %s => %s" % (url, str(result)))
208
 
        return result
209
 
 
210
 
    def joinall(self):
211
 
        for node in self.nodes.values():
212
 
           start_new_thread(self.join, (node.url, ))
213
 
 
214
 
    def boot(self, node=None):
215
 
        if node:
216
 
            self.sync(node)
217
 
        else:
218
 
            self.sync('http://gozerbot.org:8088')
219
 
        rlog(10, 'collective', 'booted %s nodes' % self.size())
220
 
 
221
 
    def fullboot(self):
222
 
        teller = 0
223
 
        threads = []
224
 
        for node in self.nodes.values():
225
 
            t = start_new_thread(self.sync, (node.url, False))        
226
 
            threads.append(t)
227
 
            teller += 1
228
 
        for i in threads:
229
 
            i.join()
230
 
        return teller
231
 
 
232
 
    def sync(self, url, throw=True):
233
 
        """ sync cacne with node """
234
 
        try:
235
 
            result = fromenc(geturl('%s/nodes' % url))
236
 
        except Exception, ex:
237
 
            if throw:
238
 
                raise
239
 
            else:
240
 
                return 0
241
 
        if not result:
242
 
            return 0
243
 
        rss = rsslist(result)
244
 
        got = 0
245
 
        for i in rss:
246
 
            try:
247
 
                url = i['url']
248
 
                id = i['id']
249
 
                name = i['name']
250
 
            except KeyError:
251
 
                continue
252
 
            try:
253
 
                self.add(name, url)
254
 
                got += 1
255
 
            except:
256
 
                handle_exception()
257
 
        return got
258
 
 
259
 
    def size(self):
260
 
        return len(self.nodes)
261
 
 
262
 
    def list(self):
263
 
        res = []
264
 
        for node in self.nodes.values():
265
 
            res.append(str(node))
266
 
        return res
267
 
 
268
 
    def names(self): 
269
 
        return self.state['names'].keys()
270
 
 
271
 
    def nodesxml(self):
272
 
        """ return nodes in xml format """
273
 
        result = "<xml>\n"
274
 
        gotit = False
275
 
        for name, node in self.nodes.iteritems():
276
 
            gotit = True
277
 
            result += "    <coll>\n"
278
 
            result += "        <url>%s</url>\n" % node.url
279
 
            result += "        <id>%s</id>\n" % node.id
280
 
            result += "        <name>%s</name>\n" % node.name
281
 
            result += "    </coll>\n"
282
 
        if gotit:
283
 
            result += "</xml>"
284
 
            return result
285
 
        return ""
286
 
 
287
 
colls = Collective()
288
 
 
289
 
def init():
290
 
    """ init the collective plugin """
291
 
    if not coll['enable']:
292
 
        return 1
293
 
    time.sleep(5)
294
 
    try:
295
 
        colls.boot()
296
 
    except Exception, ex:
297
 
        rlog(10, 'collective', 'error booting: %s' % str(ex))
298
 
    colls.start()
299
 
    rlog(10, 'collective', 'total of %s nodes' % size())
300
 
    return 1
301
 
 
302
 
def shutdown():
303
 
    """ shutdown the collective plugin """
304
 
    return 1
305
 
 
306
 
def size():
307
 
    """ return number of active collective nodes """
308
 
    return colls.size()
309
 
 
310
 
def handle_collping(bot, ievent):
311
 
    """ do a ping on a collective node """
312
 
    if not coll['enable']:
313
 
        ievent.reply('collective is not enabled')
314
 
        return
315
 
    try:
316
 
        name = ievent.args[0]
317
 
    except IndexError:
318
 
        ievent.missing('<name>')
319
 
        return
320
 
    id = colls.state['names'][ievent.rest]
321
 
    node = colls.get(id)
322
 
    result = node.ping()
323
 
    if 'pong' in result:
324
 
        ievent.reply('%s is alive' % name)
325
 
    else:
326
 
        ievent.reply('%s is not alive' % name)
327
 
 
328
 
cmnds.add('coll-ping', handle_collping, 'OPER')
329
 
examples.add('coll-ping', 'ping a collective node', 'coll-ping gozerbot')
330
 
 
331
 
def handle_colllist(bot, ievent):
332
 
    """ coll-list .. list all nodes in cache """
333
 
    if not coll['enable']:
334
 
        ievent.reply('collective is not enabled')
335
 
        return
336
 
    ievent.reply("collective nodes: ", colls.list(), dot=' \002||\002 ')
337
 
 
338
 
cmnds.add('coll-list', handle_colllist, 'OPER')
339
 
examples.add('coll-list', 'list nodes cache', 'coll-list')
340
 
 
341
 
def handle_collenable(bot, ievent):
342
 
    """ coll-enable .. enable the collective """
343
 
    ievent.reply('enabling collective')
344
 
    coll['enable'] = 1
345
 
    coll.save()
346
 
    plugins.reload('gozerplugs.plugs', 'collective')
347
 
    time.sleep(4)
348
 
    ievent.reply('done')
349
 
 
350
 
cmnds.add('coll-enable', handle_collenable, 'OPER')
351
 
examples.add('coll-enable', 'enable the collective', 'coll-enable')
352
 
 
353
 
def handle_colldisable(bot, ievent):
354
 
    """ coll-disable .. disable the collective """
355
 
    coll['enable'] = 0
356
 
    coll.save()
357
 
    plugins.reload('gozerplugs.plugs', 'collective')
358
 
    ievent.reply('collective disabled')
359
 
 
360
 
cmnds.add('coll-disable', handle_colldisable, 'OPER')
361
 
examples.add('coll-disable', 'disable the collective', 'coll-disable')
362
 
 
363
 
def handle_collsync(bot, ievent):
364
 
    """ coll-sync <node> .. sync nodes cache with node """ 
365
 
    if not coll['enable']:
366
 
        ievent.reply('collective is not enabled')
367
 
        return
368
 
    try:
369
 
        url = ievent.args[0]
370
 
    except IndexError:
371
 
        ievent.missing('<url>')
372
 
        return
373
 
    try:
374
 
        result = colls.sync(url)
375
 
    except IOError, ex:
376
 
        ievent.reply('ERROR: %s' % str(ex))
377
 
        return
378
 
    ievent.reply('%s nodes added' % result)
379
 
 
380
 
cmnds.add('coll-sync', handle_collsync, 'OPER', allowqueue=False)
381
 
examples.add('coll-sync', 'coll-sync <url> .. sync with provided node', \
382
 
'coll-sync http://gozerbot.org:8088')
383
 
 
384
 
def handle_coll(bot, ievent):
385
 
    """ coll <cmnd> .. execute <cmnd> on nodes """
386
 
    if not coll['enable']:
387
 
        ievent.reply('collective is not enabled')
388
 
        return
389
 
    if not ievent.rest:
390
 
        ievent.missing('<command>')
391
 
        return
392
 
    starttime = time.time()
393
 
    command = ievent.rest
394
 
    waitres = re.search(waitre, command)
395
 
    if waitres:
396
 
        wait = waitres.group(1)
397
 
        try:
398
 
            wait = int(wait)
399
 
        except ValueError:
400
 
            ievent.reply('wait needs to be an integer')
401
 
            return
402
 
        command = re.sub(waitre, '', command)
403
 
    else:
404
 
        wait = 5
405
 
    result = colls.cmndall(command + ' chan %s' % ievent.channel, wait)
406
 
    if result:
407
 
        reply = '%s out of %s (%s) => ' % (len(result), size(), time.time() \
408
 
- starttime)
409
 
        ievent.reply(reply, result, dot='\002||\002')
410
 
    else:
411
 
        ievent.reply('no results found')
412
 
 
413
 
cmnds.add('coll', handle_coll, ['USER', 'WEB'], allowqueue=False)
414
 
examples.add('coll', 'coll <cmnd> .. execute command in the collective', \
415
 
'coll lq')
416
 
 
417
 
def handle_collexec(bot, ievent):
418
 
    """ coll <nodename> <cmnd> .. execute <cmnd> on node """
419
 
    if not coll['enable']:
420
 
        ievent.reply('collective is not enabled')
421
 
        return
422
 
    try:
423
 
        (name, command) = ievent.rest.split(' ', 1)
424
 
    except ValueError:
425
 
        ievent.missing('<nodename> <command>')
426
 
        return
427
 
    node = colls.byname(name)
428
 
    if not node:
429
 
        ievent.reply('no node %s found' % name)
430
 
        return
431
 
    waitres = re.search(waitre, command)
432
 
    if waitres:
433
 
        wait = waitres.group(1)
434
 
        try:
435
 
            wait = int(wait)
436
 
        except ValueError:
437
 
            ievent.reply('wait needs to be an integer')
438
 
            return
439
 
        command = re.sub(waitre, '', command)
440
 
    else:
441
 
        wait = 5
442
 
    starttime = time.time()
443
 
    queue = Queue.Queue()
444
 
    command = command + ' chan %s' % ievent.channel
445
 
    start_new_thread(node.cmnd, (command, queue))
446
 
    res = waitforqueue(queue, wait+2, maxitems=1) 
447
 
    result = []
448
 
    for i in res:
449
 
        result.append(i[1])
450
 
    if result:
451
 
        ievent.reply("(%s) => [%s] " % (time.time() - starttime, name), \
452
 
result, dot=True)
453
 
    else:
454
 
        ievent.reply('no results found')
455
 
 
456
 
cmnds.add('coll-exec', handle_collexec, ['USER', 'WEB'], allowqueue=False)
457
 
examples.add('coll-exec', 'coll <nodename> <cmnd> .. execute command in \
458
 
the collective', 'coll-exec gozerbot lq')
459
 
 
460
 
def handle_colladdnode(bot, ievent):
461
 
    """ coll-addnode <name> <host:port> .. add node to cache """
462
 
    if not coll['enable']:
463
 
        ievent.reply('collective is not enabled')
464
 
        return
465
 
    try:
466
 
        (name, url) = ievent.args
467
 
    except ValueError:
468
 
        ievent.missing('<name> <url>')
469
 
        return
470
 
    colls.add(name, url)
471
 
    colls.persist(name, url)
472
 
    ievent.reply('%s added' % name)
473
 
 
474
 
cmnds.add('coll-add', handle_colladdnode, 'OPER')
475
 
examples.add('coll-add', 'coll-add <name> <url> .. add a node to cache and \
476
 
persist it', 'coll-add gozerbot http://gozerbot.org:8088')
477
 
 
478
 
def handle_collgetnode(bot, ievent):
479
 
    """ coll-getnode .. show node of <name>  """
480
 
    if not coll['enable']:
481
 
        ievent.reply('collective is not enabled')
482
 
        return
483
 
    try:
484
 
        name = ievent.args[0]
485
 
    except IndexError:
486
 
        ievent.missing('<name>')
487
 
        return
488
 
    try:
489
 
        node = colls.byname(name)
490
 
    except KeyError:
491
 
        ievent.reply('no such node')
492
 
    ievent.reply(str(node))
493
 
 
494
 
cmnds.add('coll-getnode', handle_collgetnode, 'OPER')
495
 
examples.add('coll-getnode', 'coll-getnode <name> .. get node of <name>', \
496
 
'coll-getnode gozerbot')
497
 
 
498
 
def handle_collnames(bot, ievent):
499
 
    """ coll-names .. show names with nodes in cache """
500
 
    if not coll['enable']:
501
 
        ievent.reply('collective is not enabled')
502
 
        return
503
 
    ievent.reply("collective node names: ", colls.names(), dot=True)
504
 
 
505
 
cmnds.add('coll-names', handle_collnames, 'OPER')
506
 
examples.add('coll-names', 'show all node names', 'coll-names')
507
 
 
508
 
def handle_collboot(bot, ievent):
509
 
    """ boot the collective node cache """
510
 
    if not coll['enable']:
511
 
        ievent.reply('collective is not enabled')
512
 
        return
513
 
    try:
514
 
        url = ievent.args[0]
515
 
    except IndexError:
516
 
        url = 'http://gozerbot.org:8088'
517
 
    try:
518
 
        bootnr = colls.boot(url)
519
 
    except IOError, ex:
520
 
        ievent.reply('ERROR: %s' % str(ex))
521
 
        return
522
 
    if bootnr:
523
 
        ievent.reply('collective added %s nodes' % bootnr)
524
 
    else:
525
 
        ievent.reply("no new nodes added from %s" % url)
526
 
 
527
 
cmnds.add('coll-boot', handle_collboot, 'OPER')
528
 
examples.add('coll-boot', 'sync collective list with provided host', \
529
 
'1) coll-boot 2) coll-boot http://localhost:8888')
530
 
 
531
 
def handle_collfullboot(bot, ievent):
532
 
    """ coll-fullboot .. boot from all nodes in cache """
533
 
    if not coll['enable']:
534
 
        ievent.reply('collective is not enabled')
535
 
        return
536
 
    try:
537
 
        teller = colls.fullboot()
538
 
    except IOError, ex:
539
 
        ievent.reply('ERROR: %s' % str(ex))
540
 
        return
541
 
    ievent.reply('%s nodes checked .. current %s nodes in list' % (teller, \
542
 
size()))
543
 
 
544
 
cmnds.add('coll-fullboot', handle_collfullboot, 'OPER')
545
 
examples.add('coll-fullboot', 'do a boot on every node in the collective \
546
 
list', 'coll-fullboot')
547
 
 
548
 
def handle_collremove(bot, ievent):
549
 
    if not coll['enable']:
550
 
        ievent.reply('collective is not enabled')
551
 
        return
552
 
    if not ievent.rest:
553
 
        ievent.missing('<name>')
554
 
        return
555
 
    got = False
556
 
    try:
557
 
        id = colls.state['names'][ievent.rest]
558
 
        if id:
559
 
            colls.unpersist(id)
560
 
            colls.remove(id)
561
 
            got = True
562
 
    except Exception, ex:
563
 
        ievent.reply('error removing %s: %s' % (ievent.rest, str(ex)))
564
 
        return
565
 
    if got:
566
 
        ievent.reply('%s node removed' % ievent.rest)
567
 
    else:
568
 
        ievent.reply('error removing %s node' % ievent.rest)
569
 
 
570
 
cmnds.add('coll-remove', handle_collremove, 'OPER')
571
 
examples.add('coll-remove', 'remove node with <name> from collective' , \
572
 
'coll-remove gozerbot')
573
 
 
574
 
def handle_colljoin(bot, ievent):
575
 
    if not coll['enable']:
576
 
        ievent.reply('collective is not enabled')
577
 
        return
578
 
    if not ievent.rest:
579
 
        ievent.missing('<name>')
580
 
        return
581
 
    try:
582
 
        id = colls.state['names'][ievent.rest]
583
 
        node = colls.get(id)
584
 
        result = colls.join(node.url)
585
 
    except Exception, ex:
586
 
        ievent.reply('error joining %s: %s' % (ievent.rest, str(ex)))
587
 
        return
588
 
    ievent.reply(result)
589
 
 
590
 
cmnds.add('coll-join', handle_colljoin, 'OPER')
591
 
examples.add('coll-join', 'join node with <name>' , 'coll-join gozerbot')