5
""" the collective is a network of gozerbots connected by use of the bots
6
builtin webserver. nodes are urls pointing to these webservers .. this plugin
7
provides the client side of the collective, to join the collective the
8
webserver must be enabled and open for other connections (see WEBSERVER)
11
__copyright__ = 'this file is in the public domain'
12
__depend__ = ['webserver', ]
13
__gendocfirst__ = ['coll-disable', 'coll-enable', 'coll-boot']
15
from gozerbot.generic import rlog, geturl, toenc, fromenc, waitforqueue, \
16
strippedtxt, handle_exception, now, geturl2
17
from gozerbot.rsslist import rsslist
18
from gozerbot.datadir import datadir
19
from gozerbot.pdod import Pdod
20
from gozerbot.persiststate import PersistState
21
from gozerbot.commands import cmnds
22
from gozerbot.plugins import plugins
23
from gozerbot.thr import start_new_thread
24
from gozerbot.plughelp import plughelp
25
from gozerbot.examples import examples
26
from gozerbot.dol import Dol
27
from gozerbot.config import config
28
from gozerbot.periodical import periodical
29
from gozerplugs.plugs.webserver import cfg as webcfg
30
from xml.sax.saxutils import unescape
31
from gozerbot.contrib.simplejson import loads
32
import Queue, time, socket, re, os
40
plughelp.add('collective', 'query other gozerbots')
42
coll = PersistState(datadir + os.sep + 'collective')
43
coll.define('enable', 0)
44
coll.define('nodes', [])
45
coll.define('names', {})
46
coll.define('active', [])
47
coll.define('wait', 5)
49
waitre = re.compile(' wait (\d+)', re.I)
53
return str(uuid.uuid3(uuid.NAMESPACE_URL, toenc(url)))
57
class CollNode(object):
59
def __init__(self, name, url):
63
self.seen = time.time()
64
self.regtime = time.time()
69
return "name=%s url=<%s> seen=%s" % (self.name, self.url, now())
71
def cmnd(self, what, queue=None):
72
if what.startswith('coll'):
75
what = re.sub('\s+', '+', what)
76
data = geturl('%s/json?%s' % (self.url, what))
77
result = loads(fromenc(data))
79
# ValueError: No JSON object could be decoded
80
result = ['decode error', ]
85
rlog(10, 'collective', "can't fetch %s data: %s" % (self.url, \
87
return ['fetch error', ]
89
queue.put((self.name, result))
95
pongtxt = fromenc(geturl("%s/ping" % self.url))
102
class Collective(object):
106
self.state = Pdod(datadir + os.sep + 'coll.state')
107
self.startup = Pdod(datadir + os.sep + 'coll.startup')
108
if not self.state.has_key('ignore'):
109
self.state['ignore'] = []
110
if not self.state.has_key('urls'):
111
self.state['urls'] = {}
112
if not self.state.has_key('names'):
113
self.state['names'] = {}
114
if not self.startup.has_key('start'):
115
self.startup['start'] = {}
117
def add(self, name, url):
119
if self.nodes.has_key(id):
120
return self.nodes[id]
121
self.nodes[id] = CollNode(name, url)
122
self.state.set('urls', url, id)
123
self.state.set('names', name, id)
124
self.persist(name, url)
125
rlog(-10, 'collective', 'added %s node (%s) <%s>' % (name, id, url))
126
return self.nodes[id]
129
for name, url in self.startup['start'].iteritems():
131
start_new_thread(self.joinall, ())
133
def persist(self, name, url):
134
self.startup.set('start', name, url)
139
return self.nodes[id]
143
def byname(self, name):
144
id = self.state.get('names', name)
148
def remove(self, id):
149
target = self.get(id)
152
del self.state['urls'][target.url]
153
del self.state['names'][target.name]
156
def unpersist(self, id):
161
del self.startup['start'][node.name]
166
def ignore(self, id):
167
self.state['ignore'].append(id)
169
def unignore(self, id):
170
self.state.data['ignore'].remove(id)
176
def cmndall(self, what, wait=5):
178
queue = Queue.Queue()
179
for id, node in self.nodes.iteritems():
180
start_new_thread(node.cmnd, (what, queue))
181
result = waitforqueue(queue, wait, maxitems=size())
183
total[res[0]] = res[1]
187
return self.nodes.values()
189
def getname(self, url):
191
return self.get(id).name
193
def join(self, url=None):
196
result = geturl("%s/join?%s" % (url, \
197
webcfg.get('webport')))
199
result = geturl('http://gozerbot.org:8088/join?%s' % \
200
webcfg.get('webport'))
201
except Exception, ex:
203
result = result.strip()
204
if 'added' in result:
205
rlog(1, 'collective', "%s joined => %s" % (url, result))
207
rlog(1, 'collective', "can't join %s => %s" % (url, str(result)))
211
for node in self.nodes.values():
212
start_new_thread(self.join, (node.url, ))
214
def boot(self, node=None):
218
self.sync('http://gozerbot.org:8088')
219
rlog(10, 'collective', 'booted %s nodes' % self.size())
224
for node in self.nodes.values():
225
t = start_new_thread(self.sync, (node.url, False))
232
def sync(self, url, throw=True):
233
""" sync cacne with node """
235
result = fromenc(geturl('%s/nodes' % url))
236
except Exception, ex:
243
rss = rsslist(result)
260
return len(self.nodes)
264
for node in self.nodes.values():
265
res.append(str(node))
269
return self.state['names'].keys()
272
""" return nodes in xml format """
275
for name, node in self.nodes.iteritems():
277
result += " <coll>\n"
278
result += " <url>%s</url>\n" % node.url
279
result += " <id>%s</id>\n" % node.id
280
result += " <name>%s</name>\n" % node.name
281
result += " </coll>\n"
290
""" init the collective plugin """
291
if not coll['enable']:
296
except Exception, ex:
297
rlog(10, 'collective', 'error booting: %s' % str(ex))
299
rlog(10, 'collective', 'total of %s nodes' % size())
303
""" shutdown the collective plugin """
307
""" return number of active collective nodes """
310
def handle_collping(bot, ievent):
311
""" do a ping on a collective node """
312
if not coll['enable']:
313
ievent.reply('collective is not enabled')
316
name = ievent.args[0]
318
ievent.missing('<name>')
320
id = colls.state['names'][ievent.rest]
324
ievent.reply('%s is alive' % name)
326
ievent.reply('%s is not alive' % name)
328
cmnds.add('coll-ping', handle_collping, 'OPER')
329
examples.add('coll-ping', 'ping a collective node', 'coll-ping gozerbot')
331
def handle_colllist(bot, ievent):
332
""" coll-list .. list all nodes in cache """
333
if not coll['enable']:
334
ievent.reply('collective is not enabled')
336
ievent.reply("collective nodes: ", colls.list(), dot=' \002||\002 ')
338
cmnds.add('coll-list', handle_colllist, 'OPER')
339
examples.add('coll-list', 'list nodes cache', 'coll-list')
341
def handle_collenable(bot, ievent):
342
""" coll-enable .. enable the collective """
343
ievent.reply('enabling collective')
346
plugins.reload('gozerplugs.plugs', 'collective')
350
cmnds.add('coll-enable', handle_collenable, 'OPER')
351
examples.add('coll-enable', 'enable the collective', 'coll-enable')
353
def handle_colldisable(bot, ievent):
354
""" coll-disable .. disable the collective """
357
plugins.reload('gozerplugs.plugs', 'collective')
358
ievent.reply('collective disabled')
360
cmnds.add('coll-disable', handle_colldisable, 'OPER')
361
examples.add('coll-disable', 'disable the collective', 'coll-disable')
363
def handle_collsync(bot, ievent):
364
""" coll-sync <node> .. sync nodes cache with node """
365
if not coll['enable']:
366
ievent.reply('collective is not enabled')
371
ievent.missing('<url>')
374
result = colls.sync(url)
376
ievent.reply('ERROR: %s' % str(ex))
378
ievent.reply('%s nodes added' % result)
380
cmnds.add('coll-sync', handle_collsync, 'OPER', allowqueue=False)
381
examples.add('coll-sync', 'coll-sync <url> .. sync with provided node', \
382
'coll-sync http://gozerbot.org:8088')
384
def handle_coll(bot, ievent):
385
""" coll <cmnd> .. execute <cmnd> on nodes """
386
if not coll['enable']:
387
ievent.reply('collective is not enabled')
390
ievent.missing('<command>')
392
starttime = time.time()
393
command = ievent.rest
394
waitres = re.search(waitre, command)
396
wait = waitres.group(1)
400
ievent.reply('wait needs to be an integer')
402
command = re.sub(waitre, '', command)
405
result = colls.cmndall(command + ' chan %s' % ievent.channel, wait)
407
reply = '%s out of %s (%s) => ' % (len(result), size(), time.time() \
409
ievent.reply(reply, result, dot='\002||\002')
411
ievent.reply('no results found')
413
cmnds.add('coll', handle_coll, ['USER', 'WEB'], allowqueue=False)
414
examples.add('coll', 'coll <cmnd> .. execute command in the collective', \
417
def handle_collexec(bot, ievent):
418
""" coll <nodename> <cmnd> .. execute <cmnd> on node """
419
if not coll['enable']:
420
ievent.reply('collective is not enabled')
423
(name, command) = ievent.rest.split(' ', 1)
425
ievent.missing('<nodename> <command>')
427
node = colls.byname(name)
429
ievent.reply('no node %s found' % name)
431
waitres = re.search(waitre, command)
433
wait = waitres.group(1)
437
ievent.reply('wait needs to be an integer')
439
command = re.sub(waitre, '', command)
442
starttime = time.time()
443
queue = Queue.Queue()
444
command = command + ' chan %s' % ievent.channel
445
start_new_thread(node.cmnd, (command, queue))
446
res = waitforqueue(queue, wait+2, maxitems=1)
451
ievent.reply("(%s) => [%s] " % (time.time() - starttime, name), \
454
ievent.reply('no results found')
456
cmnds.add('coll-exec', handle_collexec, ['USER', 'WEB'], allowqueue=False)
457
examples.add('coll-exec', 'coll <nodename> <cmnd> .. execute command in \
458
the collective', 'coll-exec gozerbot lq')
460
def handle_colladdnode(bot, ievent):
461
""" coll-addnode <name> <host:port> .. add node to cache """
462
if not coll['enable']:
463
ievent.reply('collective is not enabled')
466
(name, url) = ievent.args
468
ievent.missing('<name> <url>')
471
colls.persist(name, url)
472
ievent.reply('%s added' % name)
474
cmnds.add('coll-add', handle_colladdnode, 'OPER')
475
examples.add('coll-add', 'coll-add <name> <url> .. add a node to cache and \
476
persist it', 'coll-add gozerbot http://gozerbot.org:8088')
478
def handle_collgetnode(bot, ievent):
479
""" coll-getnode .. show node of <name> """
480
if not coll['enable']:
481
ievent.reply('collective is not enabled')
484
name = ievent.args[0]
486
ievent.missing('<name>')
489
node = colls.byname(name)
491
ievent.reply('no such node')
492
ievent.reply(str(node))
494
cmnds.add('coll-getnode', handle_collgetnode, 'OPER')
495
examples.add('coll-getnode', 'coll-getnode <name> .. get node of <name>', \
496
'coll-getnode gozerbot')
498
def handle_collnames(bot, ievent):
499
""" coll-names .. show names with nodes in cache """
500
if not coll['enable']:
501
ievent.reply('collective is not enabled')
503
ievent.reply("collective node names: ", colls.names(), dot=True)
505
cmnds.add('coll-names', handle_collnames, 'OPER')
506
examples.add('coll-names', 'show all node names', 'coll-names')
508
def handle_collboot(bot, ievent):
509
""" boot the collective node cache """
510
if not coll['enable']:
511
ievent.reply('collective is not enabled')
516
url = 'http://gozerbot.org:8088'
518
bootnr = colls.boot(url)
520
ievent.reply('ERROR: %s' % str(ex))
523
ievent.reply('collective added %s nodes' % bootnr)
525
ievent.reply("no new nodes added from %s" % url)
527
cmnds.add('coll-boot', handle_collboot, 'OPER')
528
examples.add('coll-boot', 'sync collective list with provided host', \
529
'1) coll-boot 2) coll-boot http://localhost:8888')
531
def handle_collfullboot(bot, ievent):
532
""" coll-fullboot .. boot from all nodes in cache """
533
if not coll['enable']:
534
ievent.reply('collective is not enabled')
537
teller = colls.fullboot()
539
ievent.reply('ERROR: %s' % str(ex))
541
ievent.reply('%s nodes checked .. current %s nodes in list' % (teller, \
544
cmnds.add('coll-fullboot', handle_collfullboot, 'OPER')
545
examples.add('coll-fullboot', 'do a boot on every node in the collective \
546
list', 'coll-fullboot')
548
def handle_collremove(bot, ievent):
549
if not coll['enable']:
550
ievent.reply('collective is not enabled')
553
ievent.missing('<name>')
557
id = colls.state['names'][ievent.rest]
562
except Exception, ex:
563
ievent.reply('error removing %s: %s' % (ievent.rest, str(ex)))
566
ievent.reply('%s node removed' % ievent.rest)
568
ievent.reply('error removing %s node' % ievent.rest)
570
cmnds.add('coll-remove', handle_collremove, 'OPER')
571
examples.add('coll-remove', 'remove node with <name> from collective' , \
572
'coll-remove gozerbot')
574
def handle_colljoin(bot, ievent):
575
if not coll['enable']:
576
ievent.reply('collective is not enabled')
579
ievent.missing('<name>')
582
id = colls.state['names'][ievent.rest]
584
result = colls.join(node.url)
585
except Exception, ex:
586
ievent.reply('error joining %s: %s' % (ievent.rest, str(ex)))
590
cmnds.add('coll-join', handle_colljoin, 'OPER')
591
examples.add('coll-join', 'join node with <name>' , 'coll-join gozerbot')