~ubuntu-branches/ubuntu/utopic/xen/utopic

« back to all changes in this revision

Viewing changes to tools/remus/remus

  • Committer: Bazaar Package Importer
  • Author(s): Bastian Blank
  • Date: 2010-05-06 15:47:38 UTC
  • mto: (1.3.1) (15.1.1 sid) (4.1.1 experimental)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20100506154738-agoz0rlafrh1fnq7
Tags: upstream-4.0.0
ImportĀ upstreamĀ versionĀ 4.0.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
#
 
3
# This is a save process which also buffers outgoing I/O between
 
4
# rounds, so that external viewers never see anything that hasn't
 
5
# been committed at the backup
 
6
#
 
7
# TODO: fencing.
 
8
 
 
9
import optparse, os, re, select, signal, sys, time
 
10
from xen.remus import save, vm
 
11
from xen.xend import XendOptions
 
12
from xen.remus import netlink, qdisc, util
 
13
 
 
14
class CfgException(Exception): pass
 
15
 
 
16
class Cfg(object):
 
17
    def __init__(self):
 
18
        # must be set
 
19
        self.domid = 0
 
20
 
 
21
        self.host = 'localhost'
 
22
        self.port = XendOptions.instance().get_xend_relocation_port()
 
23
        self.interval = 200
 
24
        self.netbuffer = True
 
25
        self.nobackup = False
 
26
        self.timer = False
 
27
 
 
28
        parser = optparse.OptionParser()
 
29
        parser.usage = '%prog [options] domain [destination]'
 
30
        parser.add_option('-i', '--interval', dest='interval', type='int',
 
31
                          metavar='MS',
 
32
                          help='checkpoint every MS milliseconds')
 
33
        parser.add_option('-p', '--port', dest='port', type='int',
 
34
                          help='send stream to port PORT', metavar='PORT')
 
35
        parser.add_option('', '--no-net', dest='nonet', action='store_true',
 
36
                          help='run without net buffering (benchmark option)')
 
37
        parser.add_option('', '--timer', dest='timer', action='store_true',
 
38
                          help='force pause at checkpoint interval (experimental)')
 
39
        parser.add_option('', '--no-backup', dest='nobackup',
 
40
                          action='store_true',
 
41
                          help='prevent backup from starting up (benchmark '
 
42
                          'option)')
 
43
        self.parser = parser
 
44
 
 
45
    def usage(self):
 
46
        self.parser.print_help()
 
47
 
 
48
    def getargs(self):
 
49
        opts, args = self.parser.parse_args()
 
50
 
 
51
        if opts.interval:
 
52
            self.interval = opts.interval
 
53
        if opts.port:
 
54
            self.port = opts.port
 
55
        if opts.nonet:
 
56
            self.netbuffer = False
 
57
        if opts.timer:
 
58
            self.timer = True
 
59
 
 
60
        if not args:
 
61
            raise CfgException('Missing domain')
 
62
        self.domid = args[0]
 
63
        if (len(args) > 1):
 
64
            self.host = args[1]
 
65
 
 
66
class ReplicatedDiskException(Exception): pass
 
67
 
 
68
class BufferedDevice(object):
 
69
    'Base class for buffered devices'
 
70
 
 
71
    def postsuspend(self):
 
72
        'called after guest has suspended'
 
73
        pass
 
74
 
 
75
    def preresume(self):
 
76
        'called before guest resumes'
 
77
        pass
 
78
 
 
79
    def commit(self):
 
80
        'called when backup has acknowledged checkpoint reception'
 
81
        pass
 
82
 
 
83
class ReplicatedDisk(BufferedDevice):
 
84
    """
 
85
    Send a checkpoint message to a replicated disk while the domain
 
86
    is paused between epochs.
 
87
    """
 
88
    FIFODIR = '/var/run/tap'
 
89
 
 
90
    def __init__(self, disk):
 
91
        # look up disk, make sure it is tap:buffer, and set up socket
 
92
        # to request commits.
 
93
        self.ctlfd = None
 
94
 
 
95
        if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'):
 
96
            raise ReplicatedDiskException('Disk is not replicated: %s' %
 
97
                                        str(disk))
 
98
        fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_')
 
99
        absfifo = os.path.join(self.FIFODIR, fifo)
 
100
        absmsgfifo = absfifo + '.msg'
 
101
 
 
102
        self.installed = False
 
103
        self.ctlfd = open(absfifo, 'w+b')
 
104
        self.msgfd = open(absmsgfifo, 'r+b')
 
105
 
 
106
    def __del__(self):
 
107
        self.uninstall()
 
108
 
 
109
    def setup(self):
 
110
        #self.ctlfd.write('buffer')
 
111
        #self.ctlfd.flush()
 
112
        self.installed = True
 
113
 
 
114
    def uninstall(self):
 
115
        if self.ctlfd:
 
116
            self.ctlfd.close()
 
117
            self.ctlfd = None
 
118
 
 
119
    def postsuspend(self):
 
120
        if not self.installed:
 
121
            self.setup()
 
122
 
 
123
        os.write(self.ctlfd.fileno(), 'flush')
 
124
 
 
125
    def commit(self):
 
126
        msg = os.read(self.msgfd.fileno(), 4)
 
127
        if msg != 'done':
 
128
            print 'Unknown message: %s' % msg
 
129
 
 
130
class NetbufferException(Exception): pass
 
131
 
 
132
class Netbuffer(BufferedDevice):
 
133
    """
 
134
    Buffer a protected domain's network output between rounds so that
 
135
    nothing is issued that a failover might not know about.
 
136
    """
 
137
    # shared rtnetlink handle
 
138
    rth = None
 
139
 
 
140
    def __init__(self, domid):
 
141
        self.installed = False
 
142
 
 
143
        if not self.rth:
 
144
            self.rth = netlink.rtnl()
 
145
 
 
146
        self.devname = self._startimq(domid)
 
147
        dev = self.rth.getlink(self.devname)
 
148
        if not dev:
 
149
            raise NetbufferException('could not find device %s' % self.devname)
 
150
        self.dev = dev['index']
 
151
        self.handle = qdisc.TC_H_ROOT
 
152
        self.q = qdisc.QueueQdisc()
 
153
 
 
154
    def __del__(self):
 
155
        self.uninstall()
 
156
 
 
157
    def postsuspend(self):
 
158
        if not self.installed:
 
159
            self._setup()
 
160
 
 
161
        self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
 
162
 
 
163
    def commit(self):
 
164
        '''Called when checkpoint has been acknowledged by
 
165
        the backup'''
 
166
        self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
 
167
 
 
168
    def _sendqmsg(self, action):
 
169
        self.q.action = action
 
170
        req = qdisc.changerequest(self.dev, self.handle, self.q)
 
171
        self.rth.talk(req.pack())
 
172
 
 
173
    def _setup(self):
 
174
        q = self.rth.getqdisc(self.dev)
 
175
        if q:
 
176
            if q['kind'] == 'queue':
 
177
                self.installed = True
 
178
                return
 
179
            if q['kind'] != 'pfifo_fast':
 
180
                raise NetbufferException('there is already a queueing '
 
181
                                         'discipline on %s' % self.devname)
 
182
 
 
183
        print 'installing buffer on %s' % self.devname
 
184
        req = qdisc.addrequest(self.dev, self.handle, self.q)
 
185
        self.rth.talk(req.pack())
 
186
        self.installed = True
 
187
 
 
188
    def uninstall(self):
 
189
        if self.installed:
 
190
            req = qdisc.delrequest(self.dev, self.handle)
 
191
            self.rth.talk(req.pack())
 
192
            self.installed = False
 
193
 
 
194
    def _startimq(self, domid):
 
195
        # stopgap hack to set up IMQ for an interface. Wrong in many ways.
 
196
        imqebt = '/usr/lib/xen/bin/imqebt'
 
197
        imqdev = 'imq0'
 
198
        vid = 'vif%d.0' % domid
 
199
        for mod in ['sch_queue', 'imq', 'ebt_imq']:
 
200
            util.runcmd(['modprobe', mod])
 
201
        util.runcmd("ip link set %s up" % (imqdev))
 
202
        util.runcmd("%s -F FORWARD" % (imqebt))
 
203
        util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev))
 
204
 
 
205
        return imqdev
 
206
 
 
207
class SignalException(Exception): pass
 
208
 
 
209
def run(cfg):
 
210
    closure = lambda: None
 
211
    closure.cmd = None
 
212
 
 
213
    def sigexception(signo, frame):
 
214
        raise SignalException(signo)
 
215
 
 
216
    def die():
 
217
        # I am not sure what the best way to die is. xm destroy is another option,
 
218
        # or we could attempt to trigger some instant reboot.
 
219
        print "dying..."
 
220
        print util.runcmd(['sudo', 'ifdown', 'eth2'])
 
221
        # dangling imq0 handle on vif locks up the system
 
222
        for buf in bufs:
 
223
            buf.uninstall()
 
224
        print util.runcmd(['sudo', 'xm', 'destroy', cfg.domid])
 
225
        print util.runcmd(['sudo', 'ifup', 'eth2'])
 
226
 
 
227
    def getcommand():
 
228
        """Get a command to execute while running.
 
229
        Commands include:
 
230
          s: die prior to postsuspend hook
 
231
          s2: die after postsuspend hook
 
232
          r: die prior to preresume hook
 
233
          r2: die after preresume hook
 
234
          c: die prior to commit hook
 
235
          c2: die after commit hook
 
236
          """
 
237
        r, w, x = select.select([sys.stdin], [], [], 0)
 
238
        if sys.stdin not in r:
 
239
            return
 
240
 
 
241
        cmd = sys.stdin.readline().strip()
 
242
        if cmd not in ('s', 's2', 'r', 'r2', 'c', 'c2'):
 
243
            print "unknown command: %s" % cmd
 
244
        closure.cmd = cmd
 
245
 
 
246
    signal.signal(signal.SIGTERM, sigexception)
 
247
 
 
248
    dom = vm.VM(cfg.domid)
 
249
 
 
250
    # set up I/O buffers
 
251
    bufs = []
 
252
 
 
253
    # disks must commit before network can be released
 
254
    for disk in dom.disks:
 
255
        try:
 
256
            bufs.append(ReplicatedDisk(disk))
 
257
        except ReplicatedDiskException, e:
 
258
            print e
 
259
            continue
 
260
 
 
261
    if cfg.netbuffer:
 
262
        for vif in dom.vifs:
 
263
            bufs.append(Netbuffer(dom.domid))
 
264
 
 
265
    fd = save.MigrationSocket((cfg.host, cfg.port))
 
266
 
 
267
    def postsuspend():
 
268
        'Begin external checkpointing after domain has paused'
 
269
        if not cfg.timer:
 
270
            # when not using a timer thread, sleep until now + interval
 
271
            closure.starttime = time.time()
 
272
 
 
273
        if closure.cmd == 's':
 
274
            die()
 
275
 
 
276
        for buf in bufs:
 
277
            buf.postsuspend()
 
278
 
 
279
        if closure.cmd == 's2':
 
280
            die()
 
281
 
 
282
    def preresume():
 
283
        'Complete external checkpointing before domain resumes'
 
284
        if closure.cmd == 'r':
 
285
            die()
 
286
 
 
287
        for buf in bufs:
 
288
            buf.preresume()
 
289
 
 
290
        if closure.cmd == 'r2':
 
291
            die()
 
292
 
 
293
    def commit():
 
294
        'commit network buffer'
 
295
        if closure.cmd == 'c':
 
296
            die()
 
297
 
 
298
        print >> sys.stderr, "PROF: flushed memory at %0.6f" % (time.time())
 
299
 
 
300
        for buf in bufs:
 
301
            buf.commit()
 
302
 
 
303
        if closure.cmd == 'c2':
 
304
            die()
 
305
 
 
306
        # Since the domain is running at this point, it's a good time to
 
307
        # check for control channel commands
 
308
        getcommand()
 
309
 
 
310
        if not cfg.timer:
 
311
            endtime = time.time()
 
312
            elapsed = (endtime - closure.starttime) * 1000
 
313
 
 
314
            if elapsed < cfg.interval:
 
315
                time.sleep((cfg.interval - elapsed) / 1000.0)
 
316
 
 
317
        # False ends checkpointing
 
318
        return True
 
319
 
 
320
    if cfg.timer:
 
321
        interval = cfg.interval
 
322
    else:
 
323
        interval = 0
 
324
 
 
325
    rc = 0
 
326
 
 
327
    checkpointer = save.Saver(cfg.domid, fd, postsuspend, preresume, commit,
 
328
                              interval)
 
329
 
 
330
    try:
 
331
        checkpointer.start()
 
332
    except save.CheckpointError, e:
 
333
        print e
 
334
        rc = 1
 
335
    except KeyboardInterrupt:
 
336
        pass
 
337
    except SignalException:
 
338
        print '*** signalled ***'
 
339
 
 
340
    for buf in bufs:
 
341
        buf.uninstall()
 
342
 
 
343
    if cfg.nobackup:
 
344
        # lame attempt to kill backup if protection is stopped deliberately.
 
345
        # It would be much better to move this into the heartbeat "protocol".
 
346
        print util.runcmd(['sudo', '-u', os.getlogin(), 'ssh', cfg.host, 'sudo', 'xm', 'destroy', dom.name])
 
347
 
 
348
    sys.exit(rc)
 
349
 
 
350
cfg = Cfg()
 
351
try:
 
352
    cfg.getargs()
 
353
except CfgException, inst:
 
354
    print str(inst)
 
355
    cfg.usage()
 
356
    sys.exit(1)
 
357
 
 
358
try:
 
359
    run(cfg)
 
360
except vm.VMException, inst:
 
361
    print str(inst)
 
362
    sys.exit(1)