2
# -*- coding: UTF-8 -*-
4
# +-----------------------------------------------------------------------------+
6
# +-----------------------------------------------------------------------------+
7
# | Copyright (c) A. Bashi <sourcecontact@gmail.com> |
9
# | This program is free software; you can redistribute it and/or |
10
# | modify it under the terms of the GNU General Public License |
11
# | as published by the Free Software Foundation; either version 2 |
12
# | of the License, or (at your option) any later version. |
14
# | This program is distributed in the hope that it will be useful, |
15
# | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16
# | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17
# | GNU General Public License for more details. |
19
# | You should have received a copy of the GNU General Public License |
20
# | along with this program; if not, write to the Free Software |
21
# | Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. |
22
# +-----------------------------------------------------------------------------+
37
from heapq import heappush, heappop
39
sys.path.append(os.path.dirname(__file__))
40
from timevaultConfig import *
41
import timevaultDBusServer
42
import timevaultWatcher
45
class TimeVault(timevaultDBusServer.Server):
47
if os.access(DEFAULT_LOCK_FILE, os.R_OK):
48
fdLock = open(DEFAULT_LOCK_FILE, "r")
50
for line in Exec('ps -p %d -o pid --no-heading' % int(pid)):
51
Debug(0, "Found pid %d" % int(line))
52
if line.strip()==pid.strip():
53
Debug(0, "Another TimeVault server instance is already running\n")
57
fdLock = open(DEFAULT_LOCK_FILE, "w")
58
fdLock.write("%d" % os.getpid())
61
timevaultDBusServer.Server.__init__(self)
62
self.committingTimer = None
64
self.commands = ParseCommandLineOptions()
66
filename = self.commands['configFile']
70
self.started = int(time.time())
71
self.cfg = Configuration(filename)
73
self.pendingFiles = []
75
self.pathDict = {} # [path]=(absTime)
76
self.timeDict = {} # [absTime] = {path1:absTime, ...]
79
self.notificationTime = 3000
80
self.lastBalloonNotification = 0
81
self.lastStateNotification = 0
82
self.snapshotter = None
85
self.watcher = timevaultWatcher.Watcher(self.INotifyCallback, self.cfg)
86
self.watcher.watchesRegisteredCallback = self.ReportWatchedDirs
88
self.Reload(reloadConfiguration=False)
89
self.SetState(TVS_IDLE, 'Initializing', TVI_UNCONFIGURED)
92
gobject.timeout_add(self.cfg.settings['bipTimer'], self.Bip)
95
if os.access(DEFAULT_LOCK_FILE, os.R_OK):
96
os.remove(DEFAULT_LOCK_FILE)
98
def Reload(self, reloadConfiguration=True):
99
if reloadConfiguration:
102
self.startedWatching = time.time()
104
self.state = [('Launched', TVI_UNCONFIGURED), ('', ''), ('', ''), ('', '')]
105
self.snapshotNotificationCount = self.cfg.settings['snapshotNotificationCount']
106
self.reloadScheduled = None
109
self.watcher.Add(self.cfg.filename, force=True)
110
if self.cfg.settings['configValid']:
112
self.db = timevaultDB.Catalog(self.cfg.GetDirCatalog()+DB_FILENAME)
113
if not self.snapshotter:
114
self.snapshotter = timevaultSnap.Sync(self.cfg, self.db)
117
self.ticInc = self.cfg.settings['ticInc']
118
for path in self.cfg.settings['backup']:
119
if not self.watcher.Add(path):
123
Debug(D_NORM, "Removing bad dir: '%s'\n" % path)
124
del self.cfg.settings['backup'][path]
126
self.SetState(TVS_WARN, '', '')
128
self.SetState(TVS_WARN, "Unconfigured", TVI_UNCONFIGURED)
131
self.OnNotifyConfigurationChange()
134
def ReportWatchedDirs(self, finished=False):
136
Bench("FinishedWatchingDirs", self.startedWatching, self.watcher.watchedDirs)
139
self.SetState(TVS_IDLE, "Watching %d directories" % self.watcher.watchedDirs, TVI_NORM)
140
if self.watcher.watchedDirs>self.watcher.maxUserWatches:
141
self.SetState(TVS_WARN, "Too many directories (%d>%d):\n- Some changes to be lost" % (self.watcher.watchedDirs, self.watcher.maxUserWatches), TVI_ERROR)
143
def RelativeT(self, t):
144
return int(t) - self.started
147
Debug(D_NORM, ". %d\n" % self.RelativeT(time.time()))
149
self.DumpDictionaries()
154
def LoadPendingFiles(self):
158
now = int(time.time())
161
rescheduleAggressiveness = self.cfg.settings['rescheduleAggressiveness']
162
self.db.Exec("SELECT tm,path,event FROM pending")
163
for tm,path,event in self.db.Result():
166
if self.cfg.Excluded(path):
169
pushout = int(queued/rescheduleAggressiveness)*self.ticInc
175
if not self.pathDict.has_key(path):
176
Debug(D_VERB, "Restoring: '%s' [pushed %d]\n" % (path, pushout))
177
self.ScheduleSnap(path, event, tm, saveToPendingQueue=False)
180
Debug(D_NORM, "Unknown wierdness on (%d,%s,%s)\n" % (tm,path,event), printTraceback=True)
183
Bench("LoadPendingFiles", now, queued)
185
def OnTimedProcessPendingFileQueue(self):
186
L = len(self.pendingFiles)
188
self.SetState(TVS_INFO, '', '')
191
now = int(time.time())
192
file, event = self.pendingFiles.pop()
194
paths = self.timeDict[absTime]
195
snapshotsProcessed, snapshotsTaken, snapshotsFailed = self.snapshotter.TakeSnap({file: event}, self.RoundUp(now))
196
for path,event in snapshotsProcessed:
197
self.DelPendingFile(path)
201
def OnTimedProcessPendingDirQueue(self):
202
L = len(self.pendingDirs)
204
self.SetState(TVS_INFO, '', '')
206
gobject.idle_add(self.LoadPendingFiles)
209
now = int(time.time())
210
path, event = self.pendingDirs.pop()
211
files = os.listdir(path)
214
Debug(D_NORM, "Queuing %s:'%s' [%d files]\n" % (event, path, F))
216
file = os.path.join(path, file)
217
if os.path.isdir(file):
219
if self.cfg.Excluded(file):
223
Debug(D_VERB, "Queuing '%s' for crawling\n" % (path))
224
if not self.pathDict.has_key(file):
225
self.AddPendingFile(file, event, now)
227
Debug("Unknown wierdness on (%d,%s,%s)\n" % (tm,path,event))
229
Bench("Queue", now, F)
232
def SpiderDirs(self, path, event):
233
for root, dirs, files in os.walk(path):
234
if self.cfg.Excluded(root):
236
if os.path.isdir(root):
237
self.pendingDirs.append((root,event))
239
def TakeMeta(self, path):
240
self.SpiderDirs(path, 'M')
241
self.SetState(TVS_INFO, 'Spidering new directories to retrieve metadata', TVI_PENDING)
242
gobject.idle_add(self.OnTimedProcessPendingDirQueue)
244
def TakeBaseline(self, path):
245
self.SpiderDirs(path, 'B')
246
self.SetState(TVS_INFO, 'Spidering new directories to create baseline', TVI_PENDING)
247
gobject.idle_add(self.OnTimedProcessPendingDirQueue)
249
def Popup(self, summary, markup):
250
self.OnPopup(summary, markup, self.notificationTime)
252
def GetScheduleTimes(self):
254
for t in self.schedule:
255
if self.timeDict.has_key(t):
256
schedule.append([t, len(self.timeDict[t])])
259
def GetScheduleFiles(self, tm):
261
if self.timeDict.has_key(tm):
262
for path in self.timeDict[tm]:
263
schedule.append([str(path), str(self.timeDict[tm][path])])
268
L = len(self.pathDict)
274
self.state[TVS_INFO] = ('%d file%s scheduled' % (L, plurality), TVI_PENDING)
276
self.state[TVS_INFO] = ('', '')
278
self.OnState(self.state)
280
def DumpDictionaries(self):
283
msg = "======= DumpDictionaries at t=%d =======\n" % self.RelativeT(time.time())
284
msg += "self.timeDict:\n"
285
for t in self.timeDict:
286
msg += "\t%3d: %s\n" % (self.RelativeT(t), self.timeDict[t])
287
msg += "self.pathDict:\n"
288
for t in self.pathDict:
289
msg += "\t%3s: %s\n" % (t, self.RelativeT(self.pathDict[t]))
290
msg += "self.schedule: "
291
for t in self.schedule:
292
msg += "%3d " % (self.RelativeT(t))
293
msg += "\n=========================================\n"
296
def PossiblySetState(self, level, text, icon):
298
dT = now-self.lastStateNotification
302
self.lastStateNotification = now
303
self.SetState(level, text, icon)
305
def SetState(self, level, text, icon):
306
self.state[level] = (text, icon)
307
self.OnState(self.state)
309
def SetSnapTime(self, absTime):
310
if not self.timeDict.has_key(absTime):
311
delay = int(absTime - time.time())
312
if absTime not in self.schedule:
313
heappush(self.schedule, absTime) # record it in the schedule
315
gobject.timeout_add(delay*1000, self.TakeSnap, absTime)
316
self.timeDict[absTime] = {} # Create the timeslot
318
def RefreshPendingFile(self, path, event, absTime):
320
self.db.Exec("UPDATE pending SET tm=?, event=? WHERE path=?", (absTime, event, path))
322
#Debug(D_NORM, "Cannot update (%d,%s,%s)\n" % (absTime, path, event))
324
# ToDo: File not inserted; We could correct the error, but for now we let it stick
325
#self.AddPendingFile(path, event, absTime)
327
def AddPendingFile(self, path, event, absTime):
329
self.db.Exec("INSERT INTO pending (tm,path,event) VALUES (?,?,?)", (absTime, path, event))
331
#Debug(D_NORM, "Cannot insert (%d,%s,%s)\n" % (absTime, path, event))
333
def DelPendingFile(self, path):
335
self.db.Exec("DELETE FROM pending WHERE path=?", (path,))
337
#Debug(D_NORM, "Cannot delete (%s)\n" % (path))
344
self.committingTimer = None
347
def DeferCommit(self):
348
if not self.committingTimer:
349
self.committingTimer = gobject.timeout_add(self.cfg.settings['deferredCommitDelay'], self.CommitDB)
351
def RoundUp(self, tm):
352
return int((tm+self.ticInc-1)/self.ticInc) * self.ticInc
354
def ScheduleSnap(self, path, event, absTime, saveToPendingQueue=True):
355
# Move up to the next tic
356
Debug(D_VERB, "ScheduleSnap: %s: %d => " % (path, self.RelativeT(absTime)))
357
absTime = self.RoundUp(absTime)
358
Debug(D_VERB, "%d\n" % self.RelativeT(absTime), timeStamp=False)
361
if self.pathDict.has_key(path): # Are we rescheduling?
362
if saveToPendingQueue:
363
self.RefreshPendingFile(path,event,absTime) # Record the file change so that we can resume if interrupted
365
schedTime = self.pathDict[path]
366
if absTime == schedTime: # To the same timeslot?
367
Debug(D_VERB, "Refreshing %s\n" % path)
368
self.timeDict[absTime][path] = event # If so, update the event and
371
Debug(D_VERB, "Rescheduling %s\n" % path)
372
if self.timeDict.has_key(schedTime) and self.timeDict[schedTime].has_key(path):
373
del self.timeDict[schedTime][path] # else, delete it from the last slot
375
if saveToPendingQueue:
376
self.AddPendingFile(path,event,absTime) # Record the file change so that we can resume if interrupted
378
if not self.timeDict.has_key(absTime):
379
self.SetSnapTime(absTime) # Schedule it
381
self.pathDict[path] = absTime # Update/set the path's snap time
382
self.timeDict[absTime][path] = event # and insert the path in the correct timeslot
385
self.PossiblyNotify(absTime)
387
def PossiblyNotify(self, absTime):
389
dT = now-self.lastBalloonNotification
393
self.lastBalloonNotification = now
394
if self.cfg.settings['showNotifications']:
395
self.OnScheduleSnap(absTime, len(self.timeDict[absTime]))
397
def TakeSnap(self, absTime):
398
if not self.timeDict.has_key(absTime):
399
return False # Nothing to do
400
if absTime not in self.schedule:
401
Debug(D_NORM, "Error: TakeSnap scheduled for %d, not in heap\n" % (absTime))
404
Debug(D_NORM, "* TakeSnap: %d\n" % (int(time.time())-self.started))
405
if not self.timeDict.has_key(absTime):
406
Debug(D_NORM, "Nothing to do...\n" % (absTime))
410
paths = self.timeDict[absTime]
411
snapshotsProcessed, snapshotsTaken, snapshotsFailed = self.snapshotter.TakeSnap(paths, absTime)
412
for path,event in snapshotsProcessed:
413
self.DelPendingFile(path)
414
del self.pathDict[path]
417
del self.timeDict[absTime]
418
self.schedule.remove(absTime)
421
T = len(snapshotsProcessed)
422
L = len(snapshotsTaken)
423
F = len(snapshotsFailed)
424
Debug(D_NORM, "\t%d snapshots processed, %d taken (%d failed)\n" % (T,L,F))
425
Bench("TakeSnap", now, T)
426
if L and self.cfg.settings['showNotifications']:
428
for path,event in snapshotsTaken[:self.snapshotNotificationCount]:
429
firstN.append("%s\t%s" % (path, event))
430
self.OnSnap(L, "\n".join(firstN))
434
self.DumpDictionaries()
437
def INotifyCallback(self, path, event):
438
if path==self.cfg.filename:
439
Debug(D_NORM, "Config File Changed [%s:%s]: Reload Forced\n" % (event, path))
440
if self.reloadScheduled:
441
gobject.source_remove(self.reloadScheduled)
442
self.reloadScheduled = gobject.timeout_add(1000, self.Reload) # we delay a little in case the file was
443
# subscribed to from diff. locations
445
if not self.cfg.settings['configValid']:
447
if not self.snapshotter:
452
longestMatch = self.cfg.LongestPathMatch(path)
454
delay = self.cfg.settings['backup'][longestMatch]['delay']
455
absTime = int(time.time()+0.999) + delay
456
self.ScheduleSnap(path, event, absTime)
458
def UserMount(self, tm):
459
UNION_MOUNT = 'mount -t unionfs -o defaults,noatime,ro,dirs=%s null %s' # path, dest
460
UNION_ADD = 'unionctl %s --add --mode ro %s' # dest, path
461
UNION_UMOUNT = 'umount %s' # dest
463
dirStack = self.cfg.GetDirectoryStack(tm)
467
dest = self.cfg.GetDirUserSnapshot(tm)
468
self.cfg.CheckAndMake(dest, 0755)
470
base = dirStack.pop()
471
Debug(D_NORM, "%s=>%s\n" % (base,dest))
472
for line in Exec(UNION_MOUNT % (base, dest)):
473
Debug(D_NORM, line+"\n")
475
Debug(D_NORM, "%s=>%s\n" % (d,dest))
476
for line in Exec(UNION_ADD % (dest, d)):
477
Debug(D_NORM, line+"\n")
480
gobject.MainLoop().run()
482
def DeferredQuit(tv):
483
# ToDo: This only gets called when SIGTERM is used to shutdown, but start-stop-daemon
484
# can't kill with SIGTERM, hmmm
489
print('This program must be run as root')
497
tv.LoadPendingFiles()
499
Debug(D_NORM, "Terminating\n\n")
502
#tv.OnServerShutdown()
503
#gobject.timeout_add(100, DeferredQuit, tv)