3
# This is a save process which also buffers outgoing I/O between
4
# rounds, so that external viewers never see anything that hasn't
5
# been committed at the backup
9
import optparse, os, re, select, signal, sys, time
10
from xen.remus import save, vm
11
from xen.xend import XendOptions
12
from xen.remus import netlink, qdisc, util
14
class CfgException(Exception): pass
21
self.host = 'localhost'
22
self.port = XendOptions.instance().get_xend_relocation_port()
28
parser = optparse.OptionParser()
29
parser.usage = '%prog [options] domain [destination]'
30
parser.add_option('-i', '--interval', dest='interval', type='int',
32
help='checkpoint every MS milliseconds')
33
parser.add_option('-p', '--port', dest='port', type='int',
34
help='send stream to port PORT', metavar='PORT')
35
parser.add_option('', '--no-net', dest='nonet', action='store_true',
36
help='run without net buffering (benchmark option)')
37
parser.add_option('', '--timer', dest='timer', action='store_true',
38
help='force pause at checkpoint interval (experimental)')
39
parser.add_option('', '--no-backup', dest='nobackup',
41
help='prevent backup from starting up (benchmark '
46
self.parser.print_help()
49
opts, args = self.parser.parse_args()
52
self.interval = opts.interval
56
self.netbuffer = False
61
raise CfgException('Missing domain')
66
class ReplicatedDiskException(Exception): pass
68
class BufferedDevice(object):
69
'Base class for buffered devices'
71
def postsuspend(self):
72
'called after guest has suspended'
76
'called before guest resumes'
80
'called when backup has acknowledged checkpoint reception'
83
class ReplicatedDisk(BufferedDevice):
85
Send a checkpoint message to a replicated disk while the domain
86
is paused between epochs.
88
FIFODIR = '/var/run/tap'
90
def __init__(self, disk):
91
# look up disk, make sure it is tap:buffer, and set up socket
95
if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'):
96
raise ReplicatedDiskException('Disk is not replicated: %s' %
98
fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_')
99
absfifo = os.path.join(self.FIFODIR, fifo)
100
absmsgfifo = absfifo + '.msg'
102
self.installed = False
103
self.ctlfd = open(absfifo, 'w+b')
104
self.msgfd = open(absmsgfifo, 'r+b')
110
#self.ctlfd.write('buffer')
112
self.installed = True
119
def postsuspend(self):
120
if not self.installed:
123
os.write(self.ctlfd.fileno(), 'flush')
126
msg = os.read(self.msgfd.fileno(), 4)
128
print 'Unknown message: %s' % msg
130
class NetbufferException(Exception): pass
132
class Netbuffer(BufferedDevice):
134
Buffer a protected domain's network output between rounds so that
135
nothing is issued that a failover might not know about.
137
# shared rtnetlink handle
140
def __init__(self, domid):
141
self.installed = False
144
self.rth = netlink.rtnl()
146
self.devname = self._startimq(domid)
147
dev = self.rth.getlink(self.devname)
149
raise NetbufferException('could not find device %s' % self.devname)
150
self.dev = dev['index']
151
self.handle = qdisc.TC_H_ROOT
152
self.q = qdisc.QueueQdisc()
157
def postsuspend(self):
158
if not self.installed:
161
self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
164
'''Called when checkpoint has been acknowledged by
166
self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
168
def _sendqmsg(self, action):
169
self.q.action = action
170
req = qdisc.changerequest(self.dev, self.handle, self.q)
171
self.rth.talk(req.pack())
174
q = self.rth.getqdisc(self.dev)
176
if q['kind'] == 'queue':
177
self.installed = True
179
if q['kind'] != 'pfifo_fast':
180
raise NetbufferException('there is already a queueing '
181
'discipline on %s' % self.devname)
183
print 'installing buffer on %s' % self.devname
184
req = qdisc.addrequest(self.dev, self.handle, self.q)
185
self.rth.talk(req.pack())
186
self.installed = True
190
req = qdisc.delrequest(self.dev, self.handle)
191
self.rth.talk(req.pack())
192
self.installed = False
194
def _startimq(self, domid):
195
# stopgap hack to set up IMQ for an interface. Wrong in many ways.
196
imqebt = '/usr/lib/xen/bin/imqebt'
198
vid = 'vif%d.0' % domid
199
for mod in ['sch_queue', 'imq', 'ebt_imq']:
200
util.runcmd(['modprobe', mod])
201
util.runcmd("ip link set %s up" % (imqdev))
202
util.runcmd("%s -F FORWARD" % (imqebt))
203
util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev))
207
class SignalException(Exception): pass
210
closure = lambda: None
213
def sigexception(signo, frame):
214
raise SignalException(signo)
217
# I am not sure what the best way to die is. xm destroy is another option,
218
# or we could attempt to trigger some instant reboot.
220
print util.runcmd(['sudo', 'ifdown', 'eth2'])
221
# dangling imq0 handle on vif locks up the system
224
print util.runcmd(['sudo', 'xm', 'destroy', cfg.domid])
225
print util.runcmd(['sudo', 'ifup', 'eth2'])
228
"""Get a command to execute while running.
230
s: die prior to postsuspend hook
231
s2: die after postsuspend hook
232
r: die prior to preresume hook
233
r2: die after preresume hook
234
c: die prior to commit hook
235
c2: die after commit hook
237
r, w, x = select.select([sys.stdin], [], [], 0)
238
if sys.stdin not in r:
241
cmd = sys.stdin.readline().strip()
242
if cmd not in ('s', 's2', 'r', 'r2', 'c', 'c2'):
243
print "unknown command: %s" % cmd
246
signal.signal(signal.SIGTERM, sigexception)
248
dom = vm.VM(cfg.domid)
253
# disks must commit before network can be released
254
for disk in dom.disks:
256
bufs.append(ReplicatedDisk(disk))
257
except ReplicatedDiskException, e:
263
bufs.append(Netbuffer(dom.domid))
265
fd = save.MigrationSocket((cfg.host, cfg.port))
268
'Begin external checkpointing after domain has paused'
270
# when not using a timer thread, sleep until now + interval
271
closure.starttime = time.time()
273
if closure.cmd == 's':
279
if closure.cmd == 's2':
283
'Complete external checkpointing before domain resumes'
284
if closure.cmd == 'r':
290
if closure.cmd == 'r2':
294
'commit network buffer'
295
if closure.cmd == 'c':
298
print >> sys.stderr, "PROF: flushed memory at %0.6f" % (time.time())
303
if closure.cmd == 'c2':
306
# Since the domain is running at this point, it's a good time to
307
# check for control channel commands
311
endtime = time.time()
312
elapsed = (endtime - closure.starttime) * 1000
314
if elapsed < cfg.interval:
315
time.sleep((cfg.interval - elapsed) / 1000.0)
317
# False ends checkpointing
321
interval = cfg.interval
327
checkpointer = save.Saver(cfg.domid, fd, postsuspend, preresume, commit,
332
except save.CheckpointError, e:
335
except KeyboardInterrupt:
337
except SignalException:
338
print '*** signalled ***'
344
# lame attempt to kill backup if protection is stopped deliberately.
345
# It would be much better to move this into the heartbeat "protocol".
346
print util.runcmd(['sudo', '-u', os.getlogin(), 'ssh', cfg.host, 'sudo', 'xm', 'destroy', dom.name])
353
except CfgException, inst:
360
except vm.VMException, inst: