~dongpo-deng/sahana-eden/test

« back to all changes in this revision

Viewing changes to cron/daemonX.py

  • Committer: Deng Dongpo
  • Date: 2010-08-01 09:29:44 UTC
  • Revision ID: dongpo@dhcp-21193.iis.sinica.edu.tw-20100801092944-8t9obt4xtl7otesb
initial

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
 
 
3
# DaemonX
 
4
# author: Hasanat Kazmi <hasanatkazmi@gmail.com>
 
5
# This script requires jsonrpc, install it from json-rpc.org/wiki/python-json-rpc
 
6
# written using python 2.5, should work on 2.6
 
7
 
 
8
import os, os.path
 
9
import sys
 
10
import logging, logging.handlers
 
11
from ConfigParser import RawConfigParser
 
12
import pickle
 
13
import shutil
 
14
# Change this if application name changes!
 
15
#import applications.sahana.modules.Zeroconf
 
16
import Zeroconf
 
17
from struct import *
 
18
import socket
 
19
import time
 
20
try:
 
21
    from jsonrpc import ServiceProxy, JSONRPCException
 
22
except ImportError:
 
23
    print 'JSONRPC module not available within the running Python - this needs installing for AutoSync!'
 
24
 
 
25
class LogManager(object):
 
26
    """ Manages error logs """
 
27
    def __init__(self, logLevel = logging.DEBUG):
 
28
        self.logfolder = "synclogs"
 
29
        self.logfilename = "log.out"
 
30
        self.level = logLevel
 
31
        
 
32
        scriptfolder = sys.path[0]
 
33
        if not os.path.isdir(os.path.join(scriptfolder, self.logfolder)):
 
34
            os.mkdir(os.path.join(scriptfolder, self.logfolder)) # mkdir is only available for Windows & Unix
 
35
 
 
36
        handler = logging.handlers.RotatingFileHandler(os.path.join(scriptfolder, self.logfolder, self.logfilename), maxBytes=128*1024*8)#128kb
 
37
 
 
38
        formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 
39
        handler.setFormatter(formatter)
 
40
 
 
41
        self.handler = handler
 
42
 
 
43
    def log(self, caller, severity, message):
 
44
        """
 
45
        caller is name of the class which is logging
 
46
        severity is type of log entry like warning or error
 
47
        message is log message
 
48
        """
 
49
        if severity not in [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL]:
 
50
            raise Exception("severity must be one of these: logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL")
 
51
        my_logger = logging.getLogger(str(caller))
 
52
        my_logger.setLevel(self.level)
 
53
        my_logger.addHandler(self.handler)
 
54
        my_logger.log(severity, message)
 
55
        
 
56
class Config(object):
 
57
    """ (not to confuse with zeroConf) It saves configurations for the system  """
 
58
    def __init__(self):
 
59
        items = {}
 
60
        items['localhost'] = "127.0.0.1" # this should be dealt with different approach
 
61
        items['localhost-port'] = 8000 # this too
 
62
        items['servername'] = 'sahana'
 
63
        self.items = items
 
64
 
 
65
    def getConfFromServer(self, server):
 
66
        "gets configurations by calling server (localhost)"
 
67
        if not isinstance(server, RpcManager):
 
68
            raise
 
69
        # get config from server
 
70
        try:
 
71
            settings = server.execute('getconf')
 
72
            serveritems = {}
 
73
            serveritems['uuid'] = settings['uuid']
 
74
            serveritems['webservice_port'] = settings['webservice_port']
 
75
            
 
76
            zeroconfproperties = {}
 
77
            zeroconfproperties['description'] = settings['zeroconf_description']
 
78
            zeroconfproperties['uuid'] = settings['uuid']
 
79
            zeroconfproperties['serverport'] = self.items['localhost-port']
 
80
            zeroconfproperties['zeroconfig_port'] = settings['webservice_port']
 
81
            serveritems['zeroconfproperties'] = zeroconfproperties
 
82
            
 
83
            self.items.update(serveritems)
 
84
        except:
 
85
            print "DEBUG: seems, server is down or you are not authenticated? are you being recognized as 127.0.0.1"
 
86
            raise Exception("invalid settings called from server, or server may be down")
 
87
 
 
88
class DumpManager(object):
 
89
    """ All data dump related operations are performed by this """
 
90
    """
 
91
        note that date can not be processed by pickle, so web services should not export date as date object (which it isnt right at the time to writting this code)
 
92
    """
 
93
 
 
94
    def __init__(self, logger=None):
 
95
        self.logger = logger
 
96
        default_dump_folder = "dumps"
 
97
        self.default_dump_folder_path = os.path.join(sys.path[0], default_dump_folder)
 
98
 
 
99
        if not os.path.isdir(self.default_dump_folder_path):
 
100
            os.mkdir(self.default_dump_folder_path)
 
101
       
 
102
    def __next(self): # this is enumerate function, but meeting our unique requirement
 
103
        def isintfolder(i):
 
104
            try:
 
105
                int(i)
 
106
                return True
 
107
            except:
 
108
                return False
 
109
 
 
110
        dirs = os.listdir(self.default_dump_folder_path)
 
111
        dirs = [int(i) for i in dirs if os.path.isdir(os.path.join(self.default_dump_folder_path, i)) if isintfolder(i) ]
 
112
        dirs.sort()
 
113
        for i in range(len(dirs)+1):
 
114
            if not i in dirs:
 
115
                #self.lastdir = i
 
116
                #return self.lastdir
 
117
                return i
 
118
    
 
119
    def createdump(self, *data):
 
120
        """
 
121
        This function creates dumps and returns reference id
 
122
        data are any python objects
 
123
        """
 
124
        folderref = self.__next()
 
125
        folder = os.path.join(self.default_dump_folder_path, str(folderref))
 
126
        os.mkdir(folder)
 
127
        for i in range(len(data)):
 
128
            fileobj = file(os.path.join(folder, str(i)), "w")
 
129
            pickle.dump(data[i], fileobj)
 
130
            fileobj.close()
 
131
            if not self.logger == None:
 
132
                self.logger.log('DumpManager', logging.INFO, "created Dump in " + os.path.join(folder, str(i)))
 
133
 
 
134
        return folderref
 
135
 
 
136
    def createobj(self, referenceid, deletedump = True):
 
137
        "when referenceid is passed, all object related to that reference id are returned as list"
 
138
        folder = os.path.join(self.default_dump_folder_path, str(referenceid))
 
139
        def isnumfile(i):
 
140
            try:
 
141
                int(i)
 
142
                return True
 
143
            except:
 
144
                return False
 
145
        
 
146
        try:
 
147
            files = [i for i in os.listdir(folder) if os.path.isfile(os.path.join(folder, i))]
 
148
        except:
 
149
            try:
 
150
                int(referenceid)
 
151
            except:
 
152
                raise Exception("referenceid must be an integer")
 
153
            raise Exception("Invalid reference id, no such file")
 
154
        files.sort()
 
155
        toreturn = [pickle.load(file(os.path.join(folder, i), "r")) for i in files]
 
156
        if not self.logger == None:
 
157
            self.logger.log('DumpManager', logging.INFO, "read Dumps from " + folder)
 
158
 
 
159
        if deletedump:
 
160
            shutil.rmtree(folder)
 
161
            if not self.logger == None:
 
162
                self.logger.log('DumpManager', logging.INFO, "deleted Dump folder: " + folder)
 
163
 
 
164
        print toreturn
 
165
 
 
166
    def cleardump():
 
167
        shutil.rmtree(self.default_dump_folder_path)
 
168
        os.mkdir(self.default_dump_folder_path)
 
169
        if not self.logger == None:
 
170
            self.logger.log('DumpManager', logging.INFO, "cleared all dump: " + self.default_dump_folder_path)
 
171
 
 
172
class ZeroConfExpose(object):
 
173
    """ exposes the service for zeroConf """
 
174
    def __init__(self, conf, logger = None):
 
175
        self.conf = conf
 
176
        self.logger = logger
 
177
 
 
178
        server = Zeroconf.Zeroconf()
 
179
        # Get local IP address
 
180
        local_ip = socket.gethostbyname(socket.gethostname())
 
181
        local_ip = socket.inet_aton(local_ip)
 
182
 
 
183
        svc1 = Zeroconf.ServiceInfo('_sahana._tcp.local.',
 
184
                                      self.conf.items['uuid'] + '._sahana._tcp.local.',
 
185
                                      address = local_ip,
 
186
                                      port = self.conf.items['webservice_port'],
 
187
                                      weight = 0,
 
188
                                      priority=0,
 
189
                                      properties = self.conf.items['zeroconfproperties']
 
190
                                     )
 
191
        if not self.logger == None:
 
192
            self.logger.log('ZeroConfExpose', logging.INFO, "exposed ZeroConf service")
 
193
        server.registerService(svc1)
 
194
 
 
195
 
 
196
class ZeroConfSearch(object):
 
197
    """ searches for other servers, if a server is found, gets its information, and puts in Server object. It passes server object to SyncManager """
 
198
 
 
199
    class MyListener(object):
 
200
        def __init__(self, conf, servermanager, localrpc, logger=None):
 
201
            self.conf = conf
 
202
            self.servermanager = servermanager
 
203
            self.localrpc = localrpc
 
204
            self.logger = logger
 
205
 
 
206
        def removeService(self, server, type, name):
 
207
            if not self.logger == None:
 
208
                self.logger.log('ZeroConfSearch', logging.INFO, "Service "+ repr(name)+ " removed")
 
209
        
 
210
        def addService(self, server, type, name):
 
211
            if not self.logger == None:
 
212
                self.logger.log('ZeroConfSearch', logging.INFO, "Service "+ repr(name)+ " added")
 
213
            # Request more information about the service
 
214
            info = server.getServiceInfo(type, name)
 
215
            print "addService called"
 
216
            if not info is None:
 
217
                "printing for debugging purposes"
 
218
                # IP address is as unsigned short, network byte order
 
219
                r = unpack('4c', info.getAddress())
 
220
                r = [ord(i) for i in r]
 
221
                r = reduce(lambda x, y: str(x)+"."+str(y) , r)
 
222
                print "Found a server:"
 
223
                print "address: ", r
 
224
                print "port:", info.getPort()
 
225
                print "properties: ", info.getProperties()
 
226
                self.servermanager.addServer(r, info.getPort(), info.getProperties())
 
227
                
 
228
    def __init__(self, conf, servermanager, localrpc, logger = None):
 
229
        self.conf = conf
 
230
        self.servermanager = servermanager
 
231
        self.localrpc = localrpc
 
232
        self.logger = logger
 
233
        
 
234
        server = Zeroconf.Zeroconf()
 
235
        listener = self.MyListener(self.conf, self.servermanager, self.localrpc, logger = self.logger)
 
236
        browser = Zeroconf.ServiceBrowser(server, "_sahana._tcp.local.", listener)
 
237
 
 
238
class ServerManager(object):
 
239
    """ gets server object as parameter, it calls local server json rpc service through RpcManager. When it gets data from rpcManager, it calls local server for data, if it gets error, it logs that in error log using errorLogManager or if it gets successful, it calls another dumpManager's instance and stores data and gets reference id, then foreign server's postdata through rpcManager  """
 
240
    def __init__(self, conf, dumpmanager, localrpc, logger=None):
 
241
        self.conf = conf
 
242
        self.localrpc = localrpc
 
243
        self.logger = logger
 
244
        self.dumpmanager = dumpmanager
 
245
        self.starttime = None
 
246
        self.cue = []
 
247
        
 
248
    def addServer(self, adress, port, properties):
 
249
        """server is URI of the server to be called"""
 
250
        if adress is "127.0.0.1": #local server shouldn't be added
 
251
            returnnode = []
 
252
        node.append( adress )
 
253
        node.append( port )
 
254
        node.append( properties )
 
255
        self.cue.append(node)
 
256
        print "DEBUG: server " + adress + " added"
 
257
        print "CUE" + str(self.cue)
 
258
        #log it
 
259
        
 
260
    def process(self):
 
261
        """ it does all json transactions between servers """
 
262
        while True:
 
263
            # For each element in self.cue, call that server and exchange data with local server
 
264
            self.starttime = int(time.time())
 
265
            for i in self.cue:
 
266
                try:
 
267
                    url = "http://"+i[0]+":" + str(i[1]) + i[2]['rpc_service_url']
 
268
                    foreignrpc = RpcManager(self.conf, self.logger, server = url)
 
269
                    foreignrpc.open_server()
 
270
                    
 
271
                    cred = self.localrpc.execute("getAuthCred", i[2]['uuid'])
 
272
                    dataforeign = foreignrpc.execute("getdata", self.conf.uuid, cred[0], cred[1])
 
273
                    ref1 = self.dumpmanager.createdump(dataforeign)
 
274
                    self.localrpc.execute("putdata", "","", dataforeign)
 
275
                    
 
276
                    datalocal = self.localrpc.execute("getdata", i[2]['uuid'], "", "")
 
277
                    ref2 = self.dumpmanager.createdump(datalocal)
 
278
                    foreignrpc.execute("putdata", cred[0], cred[1], datalocal)
 
279
                    self.dumpmanager.cleardump()
 
280
                except Exception, e:
 
281
                    if not self.logger == None:
 
282
                        self.logger.log('ServerManager', logging.ERROR, "error occured while processing: " + str(e) )
 
283
            
 
284
            #pausetime = 30 * 60 # 30 mins
 
285
            pausetime = 1 * 60 # for debugging set to 1 mins
 
286
            if int(time.time()) - self.starttime < pausetime:
 
287
                time.sleep(pausetime - time.time() + self.starttime)
 
288
            
 
289
        
 
290
class RpcManager(object):
 
291
    """ all RPC related activities are performed through this """
 
292
    def __init__(self, conf, logger=None, server = None):
 
293
        if not isinstance(conf, Config):
 
294
            raise
 
295
        self.server = server
 
296
        self.conf = conf
 
297
        self.logger = logger
 
298
    
 
299
        self.open_server()
 
300
    
 
301
    def open_server(self):
 
302
        if self.server == None:
 
303
            try:
 
304
                url = "http://" + self.conf.items['localhost'] + ":" + str(self.conf.items['localhost-port']) + "/" + self.conf.items['servername'] + "/sync/call/jsonrpc"
 
305
            except:
 
306
                raise Exception("Configuration file seems to be corrupt. Delete it, system will regenerate new for you, you can edit it later on")
 
307
            try:
 
308
                self.server = ServiceProxy(url)
 
309
            except:
 
310
                raise Exception("Server proxy couldn't be resolved: " + url)
 
311
 
 
312
    def execute(self, function, *args):
 
313
        try:
 
314
            toreturn = eval('self.server.' + str(function))(*args)
 
315
            message = "function " + str(function) + " sucessful with arguments: " + str(*args)
 
316
            if not self.logger == None:
 
317
                self.logger.log('RpcManager', logging.INFO, message)
 
318
        except:
 
319
            message = "function " + str(function)+" failed with arguments: " + str(*args)
 
320
            if not self.logger == None:
 
321
                self.logger.log('RpcManager', logging.ERROR, message)
 
322
            raise Exception(message)
 
323
        return toreturn
 
324
 
 
325
 
 
326
class init(object):
 
327
    """ Manages execution """
 
328
    def __init__(self):
 
329
        "if dumps folder has files in it, it means there were were partial sync attempts, that need to be solved"
 
330
        logger = LogManager()
 
331
        c = Config()
 
332
        r = RpcManager(c, logger=logger) #this instance deals with local server
 
333
        while True:
 
334
            try:
 
335
                c.getConfFromServer(r)
 
336
                break
 
337
            except Exception, e:
 
338
                print "Couldn't call local server. " + str(e)
 
339
                # Try it after 10 sec
 
340
                time.sleep(10)
 
341
        #print "Printing config settings from server"
 
342
        #print c.items
 
343
        d = DumpManager(logger = logger)
 
344
        #ref = d.createdump(2,3,[1,2,3], "23")
 
345
        #d.createobj(ref)
 
346
        sm = ServerManager(c, d, r, logger=logger)
 
347
 
 
348
        SyncByIp(r, sm, logger, c)
 
349
        #ze = ZeroConfExpose(c, logger=logger)
 
350
        zs = ZeroConfSearch(c, sm, r, logger=logger)
 
351
        sm.process() # this is non returning
 
352
 
 
353
class SyncByIp(object):
 
354
    """ The class sync with IPs stored in database """
 
355
    def __init__(self, localserver, serverManager, logger = LogManager(), c = Config()):
 
356
        #r = RpcManager(c, logger=logger) #this instance deals with local server
 
357
        content = localserver.execute('getAllServers')
 
358
 
 
359
        for i in content:
 
360
            if not i['ip'] is None and not i['ip'] is '':
 
361
                print i
 
362
                ip = i['ip'][:i['ip'].find(':')]
 
363
                port = i['ip'][i['ip'].find(':')+1:]
 
364
                properties={'discription':'This IP was manually entered by user'}
 
365
                serverManager.addServer(ip, port, properties)
 
366
 
 
367
if __name__ == '__main__':
 
368
    init()
 
 
b'\\ No newline at end of file'