35
49
waitre = re.compile(' wait (\d+)', re.I)
53
return str(uuid.uuid3(uuid.NAMESPACE_URL, toenc(url)))
57
class CollNode(object):
59
def __init__(self, name, url):
63
self.seen = time.time()
64
self.regtime = time.time()
69
return "name=%s url=<%s> seen=%s" % (self.name, self.url, now())
71
def cmnd(self, what, queue=None):
72
if what.startswith('coll'):
75
what = re.sub('\s+', '+', what)
76
data = geturl('%s/json?%s' % (self.url, what))
77
result = loads(fromenc(data))
79
# ValueError: No JSON object could be decoded
80
result = ['decode error', ]
85
rlog(10, 'collective', "can't fetch %s data: %s" % (self.url, \
87
return ['fetch error', ]
89
queue.put((self.name, result))
95
pongtxt = fromenc(geturl("%s/ping" % self.url))
102
class Collective(object):
106
self.state = Pdod(datadir + os.sep + 'coll.state')
107
self.startup = Pdod(datadir + os.sep + 'coll.startup')
108
if not self.state.has_key('ignore'):
109
self.state['ignore'] = []
110
if not self.state.has_key('urls'):
111
self.state['urls'] = {}
112
if not self.state.has_key('names'):
113
self.state['names'] = {}
114
if not self.startup.has_key('start'):
115
self.startup['start'] = {}
117
def add(self, name, url):
119
if self.nodes.has_key(id):
120
return self.nodes[id]
121
self.nodes[id] = CollNode(name, url)
122
self.state.set('urls', url, id)
123
self.state.set('names', name, id)
124
self.persist(name, url)
125
rlog(-10, 'collective', 'added %s node (%s) <%s>' % (name, id, url))
126
return self.nodes[id]
129
for name, url in self.startup['start'].iteritems():
131
start_new_thread(self.joinall, ())
133
def persist(self, name, url):
134
self.startup.set('start', name, url)
139
return self.nodes[id]
143
def byname(self, name):
144
id = self.state.get('names', name)
148
def remove(self, id):
149
target = self.get(id)
152
del self.state['urls'][target.url]
153
del self.state['names'][target.name]
156
def unpersist(self, id):
161
del self.startup['start'][node.name]
166
def ignore(self, id):
167
self.state['ignore'].append(id)
169
def unignore(self, id):
170
self.state.data['ignore'].remove(id)
176
def cmndall(self, what, wait=5):
178
queue = Queue.Queue()
179
for id, node in self.nodes.iteritems():
180
start_new_thread(node.cmnd, (what, queue))
181
result = waitforqueue(queue, wait, maxitems=size())
183
total[res[0]] = res[1]
187
return self.nodes.values()
189
def getname(self, url):
191
return self.get(id).name
193
def join(self, url=None):
196
result = geturl("%s/join?%s" % (url, \
197
webcfg.get('webport')))
199
result = geturl('http://gozerbot.org:8088/join?%s' % \
200
webcfg.get('webport'))
201
except Exception, ex:
203
result = result.strip()
204
if 'added' in result:
205
rlog(1, 'collective', "%s joined => %s" % (url, result))
207
rlog(1, 'collective', "can't join %s => %s" % (url, str(result)))
211
for node in self.nodes.values():
212
start_new_thread(self.join, (node.url, ))
214
def boot(self, node=None):
218
self.sync('http://gozerbot.org:8088')
219
rlog(10, 'collective', 'booted %s nodes' % self.size())
224
for node in self.nodes.values():
225
t = start_new_thread(self.sync, (node.url, False))
232
def sync(self, url, throw=True):
233
""" sync cacne with node """
235
result = fromenc(geturl('%s/nodes' % url))
236
except Exception, ex:
243
rss = rsslist(result)
260
return len(self.nodes)
264
for node in self.nodes.values():
265
res.append(str(node))
269
return self.state['names'].keys()
272
""" return nodes in xml format """
275
for name, node in self.nodes.iteritems():
277
result += " <coll>\n"
278
result += " <url>%s</url>\n" % node.url
279
result += " <id>%s</id>\n" % node.id
280
result += " <name>%s</name>\n" % node.name
281
result += " </coll>\n"
38
290
""" init the collective plugin """
39
291
if not coll['enable']:
41
periodical.addjob(900, 0, checkone)
42
start_new_thread(checkone, ())
296
except Exception, ex:
297
rlog(10, 'collective', 'error booting: %s' % str(ex))
299
rlog(10, 'collective', 'total of %s nodes' % size())
46
303
""" shutdown the collective plugin """
50
307
""" return number of active collective nodes """
51
return len(coll['active'])
54
""" do one active check """
58
for i in coll['nodes']:
59
start_new_thread(gotpong, (i, q))
60
result = waitforqueue(q, 4)
61
for i in coll['nodes']:
64
if not i in coll['active']:
66
coll['active'].remove(i)
68
rlog(0, 'collective', 'removed %s from active list' % i)
72
if not i in coll['active']:
73
coll['active'].append(i)
75
rlog(0, 'collective', '%s added to active list' % i)
76
return (removed, added)
79
""" do ping of a collective node """
81
start_new_thread(gotpong, (node, q))
82
return waitforqueue(q, 4)
84
def gotpong(node, queue):
85
""" check if node returns pong """
87
pongtxt = fromenc(geturl("http://%s/ping&how=direct" % node))
92
queue.put_nowait(node)
95
""" return cached nodes cache """
99
""" return active nodes """
100
return coll['active']
103
""" return name of node or None """
104
if coll['names'].has_key(node):
105
return coll['names'][node]
107
def getnodefromname(name):
108
for i, j in coll['names'].iteritems():
112
def checkactive(node):
114
coll['active'].append(node)
117
""" add a node to cache """
118
(host, port) = node.split(':')
120
ipnr = socket.gethostbyname(host)
122
node = "%s:%s" % (ipnr, port)
125
if node not in coll['nodes']:
126
rlog(0, 'collective', 'adding node %s (%s)' % (str(node), \
128
coll['nodes'].append(node)
134
""" sync cacne with node """
136
result = fromenc(geturl('http://%s/nodes&how=noescape' % node))
138
rlog(10, 'collective', "can't fetch %s data" % node)
141
rss = rsslist(result)
151
if not coll['names'].has_key(node):
152
coll['names'][node] = i['name']
155
start_new_thread(checkactive, (node, ))
162
def colldispatchone(node, what, queue):
163
""" dispatch command on remote node .. place result in queue """
164
colldispatch(node, what, queue)
167
def colldispatch(node, what, queue):
168
""" dispatch command on remote node .. place result in queue """
169
if what.startswith('coll'):
172
what = re.sub('\s+', '+', what)
173
result = fromenc(geturl('http://%s/dispatch?%s&how=direct' % \
176
rlog(10, 'collective', "can't fetch %s data" % node)
181
res = result.split('\n\n')
182
queue.put((name, res))
184
def colldispatchall(what, wait=5):
185
""" coll dispatch on all nodes """
186
queue = Queue.Queue()
187
for i in coll['active']:
188
start_new_thread(colldispatch, (i, what, queue))
189
for i in range(wait*10):
190
if queue.qsize() == len(coll['active']):
197
""" sync cache with main server or server provided """
200
if config['collboot']:
201
got = syncnode(config['collboot'])
203
got = syncnode('irc.gozerbot.org:8088')
207
nrnodes = len(coll['nodes'])
208
rlog(10, 'collective', 'booted %s nodes' % nrnodes)
214
""" return nodes in xml format """
217
for i in coll['nodes']:
219
result += " <coll>\n"
220
result += " <node>%s</node>\n" % i
223
result += " <name>%s</name>\n" % name
224
result += " </coll>\n"
230
310
def handle_collping(bot, ievent):
231
311
""" do a ping on a collective node """
403
463
ievent.reply('collective is not enabled')
406
(name, node) = ievent.args
466
(name, url) = ievent.args
407
467
except ValueError:
408
ievent.missing('<name> <host:port>')
411
ievent.missing('<name> <host:port>')
413
(host, port) = node.split(':')
415
ipnr = socket.gethostbyname(host)
417
node = "%s:%s" % (ipnr, port)
419
ievent.reply("can't find ipnr for %s" % host)
421
coll['names'][node] = name
468
ievent.missing('<name> <url>')
471
colls.persist(name, url)
424
472
ievent.reply('%s added' % name)
426
cmnds.add('coll-addnode', handle_colladdnode, 'OPER')
427
examples.add('coll-addnode', 'coll-addnode <name> <node> .. add a node to \
428
cache .. node is in host:port format', 'coll-addnode gozerbot \
431
def handle_colldelnode(bot, ievent):
432
""" coll-delnode <name> .. delete node from cache """
433
if not coll['enable']:
434
ievent.reply('collective is not enabled')
437
what = ievent.args[0]
439
ievent.missing('<name>')
442
for i, j in coll['names'].iteritems():
446
ievent.reply("can't find node %s" % what)
450
coll['nodes'].remove(i)
458
coll['active'].remove(i)
462
ievent.reply('%s deleted' % what)
464
cmnds.add('coll-delnode', handle_colldelnode, 'OPER')
465
examples.add('coll-delnode', 'coll-delnode <name> .. remove node from \
466
collective list', 'coll-delnode gozerbot')
474
cmnds.add('coll-add', handle_colladdnode, 'OPER')
475
examples.add('coll-add', 'coll-add <name> <url> .. add a node to cache and \
476
persist it', 'coll-add gozerbot http://gozerbot.org:8088')
468
478
def handle_collgetnode(bot, ievent):
469
479
""" coll-getnode .. show node of <name> """
475
485
except IndexError:
476
486
ievent.missing('<name>')
478
for i, j in coll['names'].iteritems():
482
ievent.reply('no node named %s found' % name)
489
node = colls.byname(name)
491
ievent.reply('no such node')
492
ievent.reply(str(node))
484
494
cmnds.add('coll-getnode', handle_collgetnode, 'OPER')
485
495
examples.add('coll-getnode', 'coll-getnode <name> .. get node of <name>', \
486
496
'coll-getnode gozerbot')
488
def handle_collsetname(bot, ievent):
489
""" coll-setname <node> <name> .. set name of node """
490
if not coll['enable']:
491
ievent.reply('collective is not enabled')
494
(what, name) = ievent.args
496
ievent.missing('<node> <name>')
498
coll['names'][what] = name
500
ievent.reply('%s added' % name)
502
cmnds.add('coll-setname', handle_collsetname, 'OPER')
503
examples.add('coll-setname', 'set name of collective node', 'coll-setname \
504
gozerbot gozerbot.org:8088')
506
498
def handle_collnames(bot, ievent):
507
499
""" coll-names .. show names with nodes in cache """
508
500
if not coll['enable']:
509
501
ievent.reply('collective is not enabled')
511
ievent.reply("collective node names: ", coll['names'].values(), \
503
ievent.reply("collective node names: ", colls.names(), dot=True)
514
505
cmnds.add('coll-names', handle_collnames, 'OPER')
515
506
examples.add('coll-names', 'show all node names', 'coll-names')
520
511
ievent.reply('collective is not enabled')
523
server = ievent.args[0]
524
515
except IndexError:
525
server = 'gozerbot.org:8088'
526
bootnr = boot(server)
516
url = 'http://gozerbot.org:8088'
518
bootnr = colls.boot(url)
520
ievent.reply('ERROR: %s' % str(ex))
528
523
ievent.reply('collective added %s nodes' % bootnr)
530
ievent.reply("no new nodes added from %s" % server)
525
ievent.reply("no new nodes added from %s" % url)
532
527
cmnds.add('coll-boot', handle_collboot, 'OPER')
533
528
examples.add('coll-boot', 'sync collective list with provided host', \
534
'1) coll-boot 2) coll-boot localhost:8888')
529
'1) coll-boot 2) coll-boot http://localhost:8888')
536
531
def handle_collfullboot(bot, ievent):
537
532
""" coll-fullboot .. boot from all nodes in cache """
538
533
if not coll['enable']:
539
534
ievent.reply('collective is not enabled')
542
snap = list(coll['nodes'])
545
t = start_new_thread(boot, (i, ))
550
ievent.reply('%s nodes checked .. current %s nodes in list' % (teller,
537
teller = colls.fullboot()
539
ievent.reply('ERROR: %s' % str(ex))
541
ievent.reply('%s nodes checked .. current %s nodes in list' % (teller, \
553
544
cmnds.add('coll-fullboot', handle_collfullboot, 'OPER')
554
545
examples.add('coll-fullboot', 'do a boot on every node in the collective \
555
546
list', 'coll-fullboot')
557
def handle_collsave(bot, ievent):
558
""" coll-save .. save collective data to disk """
559
if not coll['enable']:
560
ievent.reply('collective is not enabled')
563
ievent.reply('collective saved')
565
cmnds.add('coll-save', handle_collsave, 'OPER')
566
examples.add('coll-save', 'save collective data' , 'coll-save')
568
def handle_clean(bot , ievent):
569
""" coll-clean .. clear nodes cache """
570
if not coll['enable']:
571
ievent.reply('collective is not enabled')
578
cmnds.add('coll-clean', handle_clean, 'OPER')
579
examples.add('coll-clean', 'clean the collective list', 'coll-clean')
581
def handle_collactive(bot, ievent):
582
""" show active nodes """
583
if not coll['enable']:
584
ievent.reply('collective is not enabled')
587
for i in coll['active']:
589
res.append("%s (%s)" % (i, name))
591
ievent.reply("active nodes: ", res, dot=True)
593
ievent.reply("no nodes active")
595
cmnds.add('coll-active', handle_collactive, 'OPER')
596
examples.add('coll-active', 'show active nodes', 'coll-active')
598
def handle_collinactive(bot, ievent):
599
""" show inactive nodes """
600
if not coll['enable']:
601
ievent.reply('collective is not enabled')
604
for i in coll['nodes']:
605
if i not in coll['active']:
607
res.append("%s (%s)" % (i, name))
609
ievent.reply("inactive nodes: ", res, dot=True)
611
ievent.reply("no nodes inactive")
613
cmnds.add('coll-inactive', handle_collinactive, 'OPER')
614
examples.add('coll-inactive', 'show inactive nodes', 'coll-inactive')
616
def handle_collcheckactive(bot, ievent):
617
""" run active check """
618
if not coll['enable']:
619
ievent.reply('collective is not enabled')
621
ievent.reply('calling active checker')
623
ievent.reply('removed: %s .. added: %s' % result)
625
cmnds.add('coll-checkactive', handle_collcheckactive, 'OPER')
626
examples.add('coll-checkactive', 'run active nodes check', 'coll-checkactive')
628
def handle_collstatus(bot, ievent):
629
""" show active collective nodes """
631
if not coll['enable']:
632
ievent.reply('collective is not enabled')
635
for i in coll['active']:
637
name = coll['names'][i]
642
ievent.reply('%s active nodes: ' % len(coll['active']), there, \
645
ievent.reply('collective void')
647
cmnds.add('coll-status', handle_collstatus, 'OPER')
648
examples.add('coll-status', 'show active nodes', 'coll-status')
650
def handle_collrename(bot, ievent):
651
""" rename a collective node """
652
if not coll['enable']:
653
ievent.reply('collective is not enabled')
656
name, toname = ievent.args
658
ievent.missing('<from> <to>')
660
node = getnodefromname(name)
662
ievent.reply("can't get node for %s" % name)
665
coll['names'][node] = toname
667
ievent.reply('setting name failed')
669
ievent.reply('name %s changed to %s' % (name, toname))
671
cmnds.add('coll-rename', handle_collrename, 'OPER')
672
examples.add('coll-rename', 'rename a collective node name', \
673
'coll-rename dunk dunker')
675
548
def handle_collremove(bot, ievent):
676
""" remove a collective node """
678
name = ievent.args[0]
549
if not coll['enable']:
550
ievent.reply('collective is not enabled')
680
553
ievent.missing('<name>')
682
node = getnodefromname(name)
684
ievent.reply("can't get node for %s" % name)
687
coll['nodes'].remove(node)
688
coll['active'].remove(node)
689
del coll['names'][node]
557
id = colls.state['names'][ievent.rest]
690
562
except Exception, ex:
691
ievent.reply("failed to remove %s: %s" % (name, str(ex)))
694
ievent.reply('%s removed' % name)
563
ievent.reply('error removing %s: %s' % (ievent.rest, str(ex)))
566
ievent.reply('%s node removed' % ievent.rest)
568
ievent.reply('error removing %s node' % ievent.rest)
696
570
cmnds.add('coll-remove', handle_collremove, 'OPER')
697
examples.add('coll-remove', 'coll-remove <name> .. rmeove node <name> from \
698
the colletive lists', 'coll-remove gozerbot')
571
examples.add('coll-remove', 'remove node with <name> from collective' , \
572
'coll-remove gozerbot')
574
def handle_colljoin(bot, ievent):
575
if not coll['enable']:
576
ievent.reply('collective is not enabled')
579
ievent.missing('<name>')
582
id = colls.state['names'][ievent.rest]
584
result = colls.join(node.url)
585
except Exception, ex:
586
ievent.reply('error joining %s: %s' % (ievent.rest, str(ex)))
590
cmnds.add('coll-join', handle_colljoin, 'OPER')
591
examples.add('coll-join', 'join node with <name>' , 'coll-join gozerbot')