~timevaulters/timevault/0.6

« back to all changes in this revision

Viewing changes to src/timevault.py

  • Committer: A. Bashi
  • Date: 2007-07-15 00:02:48 UTC
  • Revision ID: sourcecontact@gmail.com-20070715000248-34ry2obdrs3bizsm
Initial 0.6.x release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python
 
2
# -*- coding: UTF-8 -*-
 
3
 
 
4
#        +-----------------------------------------------------------------------------+
 
5
#        | GPL                                                                         |
 
6
#        +-----------------------------------------------------------------------------+
 
7
#        | Copyright (c) A. Bashi <sourcecontact@gmail.com>                            |
 
8
#        |                                                                             |
 
9
#        | This program is free software; you can redistribute it and/or               |
 
10
#        | modify it under the terms of the GNU General Public License                 |
 
11
#        | as published by the Free Software Foundation; either version 2              |
 
12
#        | of the License, or (at your option) any later version.                      |
 
13
#        |                                                                             |
 
14
#        | This program is distributed in the hope that it will be useful,             |
 
15
#        | but WITHOUT ANY WARRANTY; without even the implied warranty of              |
 
16
#        | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               |
 
17
#        | GNU General Public License for more details.                                |
 
18
#        |                                                                             |
 
19
#        | You should have received a copy of the GNU General Public License           |
 
20
#        | along with this program; if not, write to the Free Software                 |
 
21
#        | Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA. |
 
22
#        +-----------------------------------------------------------------------------+
 
23
 
 
24
import sys
 
25
import os
 
26
import os.path
 
27
import time
 
28
import fcntl
 
29
 
 
30
import gobject
 
31
import base64
 
32
 
 
33
import dbus
 
34
import dbus.service
 
35
import dbus.glib
 
36
 
 
37
from heapq import heappush, heappop
 
38
 
 
39
sys.path.append(os.path.dirname(__file__))
 
40
from timevaultConfig import *
 
41
import timevaultDBusServer
 
42
import timevaultWatcher
 
43
import timevaultSnap
 
44
 
 
45
class TimeVault(timevaultDBusServer.Server):
 
46
        def __init__(self):
 
47
                if os.access(DEFAULT_LOCK_FILE, os.R_OK):
 
48
                        fdLock = open(DEFAULT_LOCK_FILE, "r")
 
49
                        pid = fdLock.read()
 
50
                        for line in Exec('ps -p %d -o pid --no-heading' % int(pid)):
 
51
                                Debug(0, "Found pid %d" % int(line))
 
52
                                if line.strip()==pid.strip():
 
53
                                        Debug(0, "Another TimeVault server instance is already running\n")
 
54
                                        sys.exit(-1)
 
55
                        fdLock.close()
 
56
                
 
57
                fdLock = open(DEFAULT_LOCK_FILE, "w")
 
58
                fdLock.write("%d" % os.getpid())
 
59
                fdLock.close()
 
60
 
 
61
                timevaultDBusServer.Server.__init__(self)
 
62
                self.committingTimer = None
 
63
                
 
64
                self.commands = ParseCommandLineOptions()
 
65
                try:
 
66
                        filename = self.commands['configFile']
 
67
                except:
 
68
                        filename = None
 
69
                
 
70
                self.started = int(time.time())
 
71
                self.cfg = Configuration(filename)
 
72
                
 
73
                self.pendingFiles = []
 
74
                self.pendingDirs = []
 
75
                self.pathDict = {}              # [path]=(absTime)
 
76
                self.timeDict = {}              # [absTime] = {path1:absTime, ...]
 
77
                self.schedule = []
 
78
                
 
79
                self.notificationTime = 3000
 
80
                self.lastBalloonNotification = 0
 
81
                self.lastStateNotification = 0
 
82
                self.snapshotter = None
 
83
                self.db = None
 
84
                
 
85
                self.watcher = timevaultWatcher.Watcher(self.INotifyCallback, self.cfg)
 
86
                self.watcher.watchesRegisteredCallback = self.ReportWatchedDirs
 
87
                
 
88
                self.Reload(reloadConfiguration=False)
 
89
                self.SetState(TVS_IDLE, 'Initializing', TVI_UNCONFIGURED)
 
90
                
 
91
                if Verbosity()>1:
 
92
                        gobject.timeout_add(self.cfg.settings['bipTimer'], self.Bip)
 
93
        
 
94
        def __del__(self):
 
95
                if os.access(DEFAULT_LOCK_FILE, os.R_OK):
 
96
                        os.remove(DEFAULT_LOCK_FILE)
 
97
        
 
98
        def Reload(self, reloadConfiguration=True):
 
99
                if reloadConfiguration:
 
100
                        self.cfg.Reload()
 
101
                self.watcher.Clear()
 
102
                self.startedWatching = time.time()
 
103
 
 
104
                self.state = [('Launched', TVI_UNCONFIGURED), ('', ''), ('', ''), ('', '')]
 
105
                self.snapshotNotificationCount = self.cfg.settings['snapshotNotificationCount']
 
106
                self.reloadScheduled = None
 
107
                self.change = False
 
108
                
 
109
                self.watcher.Add(self.cfg.filename, force=True)
 
110
                if self.cfg.settings['configValid']:
 
111
                        if not self.db:
 
112
                                self.db = timevaultDB.Catalog(self.cfg.GetDirCatalog()+DB_FILENAME)
 
113
                        if not self.snapshotter:
 
114
                                self.snapshotter = timevaultSnap.Sync(self.cfg, self.db)
 
115
 
 
116
                        badDirs = []
 
117
                        self.ticInc = self.cfg.settings['ticInc']
 
118
                        for path in self.cfg.settings['backup']:
 
119
                                if not self.watcher.Add(path):
 
120
                                        badDirs.append(path)
 
121
                        
 
122
                        for path in badDirs:
 
123
                                Debug(D_NORM, "Removing bad dir: '%s'\n" % path)
 
124
                                del self.cfg.settings['backup'][path]
 
125
                                
 
126
                        self.SetState(TVS_WARN, '', '')
 
127
                else:
 
128
                        self.SetState(TVS_WARN, "Unconfigured", TVI_UNCONFIGURED)
 
129
                
 
130
                # Let everyone know
 
131
                self.OnNotifyConfigurationChange()
 
132
                return False
 
133
        
 
134
        def ReportWatchedDirs(self, finished=False):
 
135
                if finished:
 
136
                        Bench("FinishedWatchingDirs", self.startedWatching, self.watcher.watchedDirs)
 
137
                        
 
138
                if self.watcher:
 
139
                        self.SetState(TVS_IDLE, "Watching %d directories" % self.watcher.watchedDirs, TVI_NORM)
 
140
                        if self.watcher.watchedDirs>self.watcher.maxUserWatches:
 
141
                                self.SetState(TVS_WARN, "Too many directories (%d>%d):\n- Some changes to be lost" % (self.watcher.watchedDirs, self.watcher.maxUserWatches), TVI_ERROR)
 
142
        
 
143
        def RelativeT(self, t):
 
144
                return int(t) - self.started
 
145
        
 
146
        def Bip(self):
 
147
                Debug(D_NORM, ". %d\n" % self.RelativeT(time.time()))
 
148
                if self.change:
 
149
                        self.DumpDictionaries()
 
150
                        self.change = False
 
151
                
 
152
                return True
 
153
        
 
154
        def LoadPendingFiles(self):
 
155
                if not self.db:
 
156
                        return
 
157
                
 
158
                now = int(time.time())
 
159
                queued = 0
 
160
                
 
161
                rescheduleAggressiveness = self.cfg.settings['rescheduleAggressiveness']
 
162
                self.db.Exec("SELECT tm,path,event FROM pending")
 
163
                for tm,path,event in self.db.Result():
 
164
                        
 
165
                        try:
 
166
                                if self.cfg.Excluded(path):
 
167
                                        continue
 
168
                                        
 
169
                                pushout = int(queued/rescheduleAggressiveness)*self.ticInc
 
170
                                if tm<now:
 
171
                                        tm = now+pushout
 
172
                                else:
 
173
                                        tm = tm+pushout
 
174
                                
 
175
                                if not self.pathDict.has_key(path):
 
176
                                        Debug(D_VERB, "Restoring: '%s' [pushed %d]\n" % (path, pushout))
 
177
                                        self.ScheduleSnap(path, event, tm, saveToPendingQueue=False)
 
178
                                        queued += 1
 
179
                        except:
 
180
                                Debug(D_NORM, "Unknown wierdness on (%d,%s,%s)\n" % (tm,path,event), printTraceback=True)
 
181
                self.db.Commit()
 
182
                
 
183
                Bench("LoadPendingFiles", now, queued)
 
184
 
 
185
        def OnTimedProcessPendingFileQueue(self):
 
186
                L = len(self.pendingFiles)
 
187
                if L==0:
 
188
                        self.SetState(TVS_INFO, '', '')
 
189
                        return False
 
190
                        
 
191
                now = int(time.time())
 
192
                file, event = self.pendingFiles.pop()
 
193
                
 
194
                paths = self.timeDict[absTime]
 
195
                snapshotsProcessed, snapshotsTaken, snapshotsFailed = self.snapshotter.TakeSnap({file: event}, self.RoundUp(now))
 
196
                for path,event in snapshotsProcessed:
 
197
                        self.DelPendingFile(path)
 
198
                self.DeferCommit()
 
199
                return True
 
200
                
 
201
        def OnTimedProcessPendingDirQueue(self):
 
202
                L = len(self.pendingDirs)
 
203
                if L==0:
 
204
                        self.SetState(TVS_INFO, '', '')
 
205
                        self.db.Commit()
 
206
                        gobject.idle_add(self.LoadPendingFiles)
 
207
                        return False
 
208
                        
 
209
                now = int(time.time())
 
210
                path, event = self.pendingDirs.pop()
 
211
                files = os.listdir(path)
 
212
                F = len(files)
 
213
                
 
214
                Debug(D_NORM, "Queuing %s:'%s' [%d files]\n" % (event, path, F))
 
215
                for file in files:
 
216
                        file = os.path.join(path, file)
 
217
                        if os.path.isdir(file):
 
218
                                continue
 
219
                        if self.cfg.Excluded(file):
 
220
                                continue
 
221
                        
 
222
                        try:
 
223
                                Debug(D_VERB, "Queuing '%s' for crawling\n" % (path))
 
224
                                if not self.pathDict.has_key(file):
 
225
                                        self.AddPendingFile(file, event, now)
 
226
                        except:
 
227
                                Debug("Unknown wierdness on (%d,%s,%s)\n" % (tm,path,event))
 
228
                
 
229
                Bench("Queue", now, F)
 
230
                return True
 
231
        
 
232
        def SpiderDirs(self, path, event):
 
233
                for root, dirs, files in os.walk(path):
 
234
                        if self.cfg.Excluded(root):
 
235
                                continue
 
236
                        if os.path.isdir(root):
 
237
                                self.pendingDirs.append((root,event))
 
238
                
 
239
        def TakeMeta(self, path):
 
240
                self.SpiderDirs(path, 'M')
 
241
                self.SetState(TVS_INFO, 'Spidering new directories to retrieve metadata', TVI_PENDING)
 
242
                gobject.idle_add(self.OnTimedProcessPendingDirQueue)
 
243
                
 
244
        def TakeBaseline(self, path):
 
245
                self.SpiderDirs(path, 'B')
 
246
                self.SetState(TVS_INFO, 'Spidering new directories to create baseline', TVI_PENDING)
 
247
                gobject.idle_add(self.OnTimedProcessPendingDirQueue)
 
248
 
 
249
        def Popup(self, summary, markup):
 
250
                self.OnPopup(summary, markup, self.notificationTime)
 
251
        
 
252
        def GetScheduleTimes(self):
 
253
                schedule = []
 
254
                for t in self.schedule:
 
255
                        if self.timeDict.has_key(t):
 
256
                                schedule.append([t, len(self.timeDict[t])])
 
257
                return schedule
 
258
 
 
259
        def GetScheduleFiles(self, tm):
 
260
                schedule = []
 
261
                if self.timeDict.has_key(tm):
 
262
                        for path in self.timeDict[tm]:
 
263
                                schedule.append([str(path), str(self.timeDict[tm][path])])
 
264
                
 
265
                return schedule
 
266
 
 
267
        def GetState(self):
 
268
                L = len(self.pathDict)
 
269
                if L:
 
270
                        if L==1:
 
271
                                plurality = ''
 
272
                        else:
 
273
                                plurality = 's'
 
274
                        self.state[TVS_INFO] = ('%d file%s scheduled' % (L, plurality), TVI_PENDING)
 
275
                else:
 
276
                        self.state[TVS_INFO] = ('', '')
 
277
                
 
278
                self.OnState(self.state)
 
279
        
 
280
        def DumpDictionaries(self):
 
281
                return
 
282
        
 
283
                msg = "======= DumpDictionaries at t=%d =======\n" % self.RelativeT(time.time())
 
284
                msg += "self.timeDict:\n"
 
285
                for t in self.timeDict:
 
286
                        msg += "\t%3d: %s\n" % (self.RelativeT(t), self.timeDict[t])
 
287
                msg += "self.pathDict:\n"
 
288
                for t in self.pathDict:
 
289
                        msg += "\t%3s: %s\n" % (t, self.RelativeT(self.pathDict[t]))
 
290
                msg += "self.schedule: "
 
291
                for t in self.schedule:
 
292
                        msg += "%3d " % (self.RelativeT(t))
 
293
                msg += "\n=========================================\n"
 
294
                Debug(D_ALL, msg)
 
295
                
 
296
        def PossiblySetState(self, level, text, icon):
 
297
                now = time.time()
 
298
                dT = now-self.lastStateNotification
 
299
                if dT<0.25:
 
300
                        return                          # Too soon
 
301
                
 
302
                self.lastStateNotification = now
 
303
                self.SetState(level, text, icon)
 
304
                
 
305
        def SetState(self, level, text, icon):
 
306
                self.state[level] = (text, icon)
 
307
                self.OnState(self.state)
 
308
                
 
309
        def SetSnapTime(self, absTime):
 
310
                if not self.timeDict.has_key(absTime):
 
311
                        delay = int(absTime - time.time())
 
312
                        if absTime not in self.schedule:
 
313
                                heappush(self.schedule, absTime)                                # record it in the schedule
 
314
                                                                                                                                # and set a timer
 
315
                                gobject.timeout_add(delay*1000, self.TakeSnap, absTime)
 
316
                        self.timeDict[absTime] = {}                                                     # Create the timeslot
 
317
 
 
318
        def RefreshPendingFile(self, path, event, absTime):
 
319
                #try:
 
320
                self.db.Exec("UPDATE pending SET tm=?, event=? WHERE path=?", (absTime, event, path))
 
321
                #except:
 
322
                        #Debug(D_NORM, "Cannot update (%d,%s,%s)\n" % (absTime, path, event))
 
323
                        
 
324
                        # ToDo: File not inserted; We could correct the error, but for now we let it stick
 
325
                        #self.AddPendingFile(path, event, absTime)
 
326
                
 
327
        def AddPendingFile(self, path, event, absTime):
 
328
                #try:
 
329
                self.db.Exec("INSERT INTO pending (tm,path,event) VALUES (?,?,?)", (absTime, path, event))
 
330
                #except:
 
331
                        #Debug(D_NORM, "Cannot insert (%d,%s,%s)\n" % (absTime, path, event))
 
332
        
 
333
        def DelPendingFile(self, path):
 
334
                #try:
 
335
                self.db.Exec("DELETE FROM pending WHERE path=?", (path,))
 
336
                #except:
 
337
                        #Debug(D_NORM, "Cannot delete (%s)\n" % (path))
 
338
        
 
339
        def CommitDB(self):
 
340
                if not self.db:
 
341
                        return
 
342
                
 
343
                self.db.Commit()
 
344
                self.committingTimer = None
 
345
                return False
 
346
        
 
347
        def DeferCommit(self):
 
348
                if not self.committingTimer:
 
349
                        self.committingTimer = gobject.timeout_add(self.cfg.settings['deferredCommitDelay'], self.CommitDB)
 
350
        
 
351
        def RoundUp(self, tm):
 
352
                return int((tm+self.ticInc-1)/self.ticInc) * self.ticInc
 
353
                
 
354
        def ScheduleSnap(self, path, event, absTime, saveToPendingQueue=True):
 
355
                # Move up to the next tic
 
356
                Debug(D_VERB, "ScheduleSnap: %s: %d => " % (path, self.RelativeT(absTime)))
 
357
                absTime = self.RoundUp(absTime)
 
358
                Debug(D_VERB, "%d\n" % self.RelativeT(absTime), timeStamp=False)
 
359
                
 
360
                self.DeferCommit()
 
361
                if self.pathDict.has_key(path):                                                 # Are we rescheduling?
 
362
                        if saveToPendingQueue:
 
363
                                self.RefreshPendingFile(path,event,absTime)             # Record the file change so that we can resume if interrupted
 
364
                        
 
365
                        schedTime = self.pathDict[path]
 
366
                        if absTime == schedTime:                                                        # To the same timeslot?
 
367
                                Debug(D_VERB, "Refreshing %s\n" % path)
 
368
                                self.timeDict[absTime][path] = event                    # If so, update the event and
 
369
                                return                                                                                  # we're done
 
370
                        
 
371
                        Debug(D_VERB, "Rescheduling %s\n" % path)
 
372
                        if self.timeDict.has_key(schedTime) and self.timeDict[schedTime].has_key(path):
 
373
                                del self.timeDict[schedTime][path]                              # else, delete it from the last slot
 
374
                else:
 
375
                        if saveToPendingQueue:
 
376
                                self.AddPendingFile(path,event,absTime)                 # Record the file change so that we can resume if interrupted
 
377
                        
 
378
                if not self.timeDict.has_key(absTime):
 
379
                        self.SetSnapTime(absTime)                                                       # Schedule it
 
380
                
 
381
                self.pathDict[path] = absTime                                                   # Update/set the path's snap time
 
382
                self.timeDict[absTime][path] = event                                    # and insert the path in the correct timeslot
 
383
                self.change = True
 
384
                
 
385
                self.PossiblyNotify(absTime)
 
386
                
 
387
        def PossiblyNotify(self, absTime):
 
388
                now = time.time()
 
389
                dT = now-self.lastBalloonNotification
 
390
                if dT<0.25:
 
391
                        return                          # Too soon
 
392
                
 
393
                self.lastBalloonNotification = now
 
394
                if self.cfg.settings['showNotifications']:
 
395
                        self.OnScheduleSnap(absTime, len(self.timeDict[absTime]))
 
396
        
 
397
        def TakeSnap(self, absTime):
 
398
                if not self.timeDict.has_key(absTime):
 
399
                        return False                                                                            # Nothing to do
 
400
                if absTime not in self.schedule:
 
401
                        Debug(D_NORM, "Error: TakeSnap scheduled for %d, not in heap\n" % (absTime))
 
402
                        return
 
403
                
 
404
                Debug(D_NORM, "* TakeSnap: %d\n" % (int(time.time())-self.started))
 
405
                if not self.timeDict.has_key(absTime):
 
406
                        Debug(D_NORM, "Nothing to do...\n" % (absTime))
 
407
                        return False
 
408
                
 
409
                now = time.time()
 
410
                paths = self.timeDict[absTime]
 
411
                snapshotsProcessed, snapshotsTaken, snapshotsFailed = self.snapshotter.TakeSnap(paths, absTime)
 
412
                for path,event in snapshotsProcessed:
 
413
                        self.DelPendingFile(path)
 
414
                        del self.pathDict[path]
 
415
                self.db.Commit()
 
416
        
 
417
                del self.timeDict[absTime]
 
418
                self.schedule.remove(absTime)
 
419
                
 
420
                # DBus Notification
 
421
                T = len(snapshotsProcessed)
 
422
                L = len(snapshotsTaken)
 
423
                F = len(snapshotsFailed)
 
424
                Debug(D_NORM, "\t%d snapshots processed, %d taken (%d failed)\n" % (T,L,F))
 
425
                Bench("TakeSnap", now, T)
 
426
                if L and self.cfg.settings['showNotifications']:
 
427
                        firstN = []
 
428
                        for path,event in snapshotsTaken[:self.snapshotNotificationCount]:
 
429
                                firstN.append("%s\t%s" % (path, event))
 
430
                        self.OnSnap(L, "\n".join(firstN))
 
431
                else:
 
432
                        self.GetState()
 
433
                
 
434
                self.DumpDictionaries()
 
435
                return False
 
436
        
 
437
        def INotifyCallback(self, path, event):
 
438
                if path==self.cfg.filename:
 
439
                        Debug(D_NORM, "Config File Changed [%s:%s]: Reload Forced\n" % (event, path))
 
440
                        if self.reloadScheduled:
 
441
                                gobject.source_remove(self.reloadScheduled)
 
442
                        self.reloadScheduled = gobject.timeout_add(1000, self.Reload)                           # we delay a little in case the file was 
 
443
                                                                                                                                # subscribed to from diff. locations
 
444
                
 
445
                if not self.cfg.settings['configValid']:
 
446
                        return
 
447
                if not self.snapshotter:
 
448
                        return
 
449
                if not self.db:
 
450
                        return
 
451
                
 
452
                longestMatch = self.cfg.LongestPathMatch(path)
 
453
                if longestMatch:
 
454
                        delay = self.cfg.settings['backup'][longestMatch]['delay']
 
455
                        absTime = int(time.time()+0.999) + delay
 
456
                        self.ScheduleSnap(path, event, absTime)
 
457
        
 
458
        def UserMount(self, tm):
 
459
                UNION_MOUNT = 'mount -t unionfs -o defaults,noatime,ro,dirs=%s null %s' # path, dest
 
460
                UNION_ADD = 'unionctl %s --add --mode ro %s'                            # dest, path
 
461
                UNION_UMOUNT = 'umount %s'                                                                      # dest
 
462
                
 
463
                dirStack = self.cfg.GetDirectoryStack(tm)
 
464
                if dirStack<1:
 
465
                        return False
 
466
                
 
467
                dest = self.cfg.GetDirUserSnapshot(tm)
 
468
                self.cfg.CheckAndMake(dest, 0755)
 
469
                
 
470
                base = dirStack.pop()
 
471
                Debug(D_NORM, "%s=>%s\n" % (base,dest))
 
472
                for line in Exec(UNION_MOUNT % (base, dest)):
 
473
                        Debug(D_NORM, line+"\n")
 
474
                for d in dirStack:
 
475
                        Debug(D_NORM, "%s=>%s\n" % (d,dest))
 
476
                        for line in Exec(UNION_ADD % (dest, d)):
 
477
                                Debug(D_NORM, line+"\n")
 
478
                
 
479
        def Run(self):
 
480
                gobject.MainLoop().run()
 
481
 
 
482
def DeferredQuit(tv):
 
483
        # ToDo: This only gets called when SIGTERM is used to shutdown, but start-stop-daemon
 
484
        # can't kill with SIGTERM, hmmm
 
485
        sys.exit(0)
 
486
 
 
487
# Only root
 
488
if os.geteuid()!=0:
 
489
        print('This program must be run as root')
 
490
        sys.exit(-1)
 
491
 
 
492
#if not Daemonize():
 
493
        #sys.exit(0)
 
494
 
 
495
tv = TimeVault()
 
496
#try:
 
497
tv.LoadPendingFiles()
 
498
tv.Run()
 
499
Debug(D_NORM, "Terminating\n\n")
 
500
 
 
501
#except:
 
502
        #tv.OnServerShutdown()
 
503
        #gobject.timeout_add(100, DeferredQuit, tv)
 
504
        #tv.Run()