~ubuntu-branches/ubuntu/raring/nova/raring-proposed

« back to all changes in this revision

Viewing changes to plugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Chuck Short
  • Date: 2012-11-23 09:04:58 UTC
  • mfrom: (1.1.66)
  • Revision ID: package-import@ubuntu.com-20121123090458-91565o7aev1i1h71
Tags: 2013.1~g1-0ubuntu1
[ Adam Gandelman ]
* debian/control: Ensure novaclient is upgraded with nova,
  require python-keystoneclient >= 1:2.9.0. (LP: #1073289)
* debian/patches/{ubuntu/*, rbd-security.patch}: Dropped, applied
  upstream.
* debian/control: Add python-testtools to Build-Depends.

[ Chuck Short ]
* New upstream version.
* Refreshed debian/patches/avoid_setuptools_git_dependency.patch.
* debian/rules: FTBFS if missing binaries.
* debian/nova-scheudler.install: Add missing rabbit-queues and
  nova-rpc-zmq-receiver.
* Remove nova-volume since it doesnt exist anymore, transition to cinder-*.
* debian/rules: install apport hook in the right place.
* debian/patches/ubuntu-show-tests.patch: Display test failures.
* debian/control: Add depends on genisoimage
* debian/control: Suggest guestmount.
* debian/control: Suggest websockify. (LP: #1076442)
* debian/nova.conf: Disable nova-volume service.
* debian/control: Depend on xen-system-* rather than the hypervisor.
* debian/control, debian/mans/nova-conductor.8, debian/nova-conductor.init,
  debian/nova-conductor.install, debian/nova-conductor.logrotate
  debian/nova-conductor.manpages, debian/nova-conductor.postrm
  debian/nova-conductor.upstart.in: Add nova-conductor service.
* debian/control: Add python-fixtures as a build deps.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
 
 
3
# Copyright (c) 2012 Openstack, LLC
 
4
#
 
5
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
6
#    not use this file except in compliance with the License. You may obtain
 
7
#    a copy of the License at
 
8
#
 
9
#         http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
#    Unless required by applicable law or agreed to in writing, software
 
12
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
13
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
14
#    License for the specific language governing permissions and limitations
 
15
#    under the License.
 
16
 
 
17
"""Download images via BitTorrent."""
 
18
 
 
19
import errno
 
20
import inspect
 
21
import os
 
22
import random
 
23
import shutil
 
24
import tempfile
 
25
import time
 
26
 
 
27
import libtorrent
 
28
import urllib2
 
29
import XenAPIPlugin
 
30
 
 
31
import utils
 
32
 
 
33
#FIXME(sirp): should this use pluginlib from 5.6?
 
34
from pluginlib_nova import *
 
35
configure_logging('bittorrent')
 
36
 
 
37
DEFAULT_TORRENT_CACHE = '/images/torrents'
 
38
DEFAULT_SEED_CACHE = '/images/seeds'
 
39
SEEDER_PROCESS = '_bittorrent_seeder'
 
40
 
 
41
 
 
42
def _make_torrent_cache():
 
43
    torrent_cache_path = os.environ.get(
 
44
            'TORRENT_CACHE', DEFAULT_TORRENT_CACHE)
 
45
 
 
46
    if not os.path.exists(torrent_cache_path):
 
47
        os.mkdir(torrent_cache_path)
 
48
 
 
49
    return torrent_cache_path
 
50
 
 
51
 
 
52
def _fetch_torrent_file(torrent_cache_path, image_id, torrent_base_url):
 
53
    torrent_path = os.path.join(
 
54
            torrent_cache_path, image_id + '.torrent')
 
55
 
 
56
    if not os.path.exists(torrent_path):
 
57
        torrent_url = torrent_base_url + "/%s.torrent" % image_id
 
58
        logging.info("Downloading %s" % torrent_url)
 
59
 
 
60
        # Write contents to temporary path to ensure we don't have partially
 
61
        # completed files in the cache.
 
62
        temp_directory = tempfile.mkdtemp(dir=torrent_cache_path)
 
63
        try:
 
64
            temp_path = os.path.join(
 
65
                    temp_directory, os.path.basename(torrent_path))
 
66
            temp_file = open(temp_path, 'wb')
 
67
            try:
 
68
                remote_torrent_file = urllib2.urlopen(torrent_url)
 
69
                shutil.copyfileobj(remote_torrent_file, temp_file)
 
70
            finally:
 
71
                temp_file.close()
 
72
 
 
73
            os.rename(temp_path, torrent_path)
 
74
        finally:
 
75
            shutil.rmtree(temp_directory)
 
76
 
 
77
    return torrent_path
 
78
 
 
79
 
 
80
def _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed):
 
81
    """Delete any torrent files that haven't been accessed recently."""
 
82
    if not torrent_max_last_accessed:
 
83
        logging.debug("Reaping old torrent files disabled, skipping...")
 
84
        return
 
85
 
 
86
    logging.debug("Preparing to reap old torrent files,"
 
87
                  " torrent_max_last_accessed=%d" % torrent_max_last_accessed)
 
88
 
 
89
    for fname in os.listdir(torrent_cache_path):
 
90
        torrent_path = os.path.join(torrent_cache_path, fname)
 
91
        last_accessed = time.time() - os.path.getatime(torrent_path)
 
92
        if last_accessed > torrent_max_last_accessed:
 
93
            logging.debug("Reaping '%s', last_accessed=%d" % (
 
94
                          torrent_path, last_accessed))
 
95
            utils.delete_if_exists(torrent_path)
 
96
 
 
97
 
 
98
def _download(torrent_path, save_as_path, torrent_listen_port_start,
 
99
              torrent_listen_port_end, torrent_download_stall_cutoff):
 
100
    session = libtorrent.session()
 
101
    session.listen_on(torrent_listen_port_start, torrent_listen_port_end)
 
102
    info = libtorrent.torrent_info(
 
103
        libtorrent.bdecode(open(torrent_path, 'rb').read()))
 
104
 
 
105
    torrent = session.add_torrent(
 
106
            info, save_as_path,
 
107
            storage_mode=libtorrent.storage_mode_t.storage_mode_sparse)
 
108
 
 
109
    try:
 
110
        last_progress = 0
 
111
        last_progress_updated = time.time()
 
112
 
 
113
        while not torrent.is_seed():
 
114
            s = torrent.status()
 
115
 
 
116
            progress = s.progress * 100
 
117
 
 
118
            if progress != last_progress:
 
119
                last_progress = progress
 
120
                last_progress_updated = time.time()
 
121
 
 
122
            stall_duration = time.time() - last_progress_updated
 
123
            if stall_duration > torrent_download_stall_cutoff:
 
124
                logging.error(
 
125
                    "Download stalled: stall_duration=%d,"
 
126
                    " torrent_download_stall_cutoff=%d" % (
 
127
                    stall_duration, torrent_download_stall_cutoff))
 
128
                raise Exception("Bittorrent download stall detected, bailing!")
 
129
 
 
130
            logging.debug(
 
131
                '%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d)'
 
132
                ' %s %s' % (progress, s.download_rate / 1000,
 
133
                            s.upload_rate / 1000, s.num_peers, s.state,
 
134
                            torrent_path))
 
135
            time.sleep(1)
 
136
    finally:
 
137
        session.remove_torrent(torrent)
 
138
 
 
139
    logging.debug("Download of '%s' finished" % torrent_path)
 
140
 
 
141
 
 
142
def _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance,
 
143
                 torrent_max_seeder_processes_per_host):
 
144
    if not torrent_seed_duration:
 
145
        logging.debug("Seeding disabled, skipping...")
 
146
        return False
 
147
 
 
148
    if os.path.exists(seed_path):
 
149
        logging.debug("Seed is already present, skipping....")
 
150
        return False
 
151
 
 
152
    rand = random.random()
 
153
    if rand > torrent_seed_chance:
 
154
        logging.debug("%.2f > %.2f, seeding randomly skipping..." % (
 
155
                      rand, torrent_seed_chance))
 
156
        return False
 
157
 
 
158
    num_active_seeders = len(list(_active_seeder_processes()))
 
159
    if (torrent_max_seeder_processes_per_host >= 0 and
 
160
        num_active_seeders >= torrent_max_seeder_processes_per_host):
 
161
        logging.debug("max number of seeder processes for this host reached"
 
162
                      " (%d), skipping..." %
 
163
                      torrent_max_seeder_processes_per_host)
 
164
        return False
 
165
 
 
166
    return True
 
167
 
 
168
 
 
169
def _seed(torrent_path, seed_cache_path, torrent_seed_duration,
 
170
          torrent_listen_port_start, torrent_listen_port_end):
 
171
    plugin_path = os.path.dirname(inspect.getabsfile(inspect.currentframe()))
 
172
    seeder_path = os.path.join(plugin_path, SEEDER_PROCESS)
 
173
    seed_cmd = "%s %s %s %d %d %d" % (
 
174
        seeder_path, torrent_path, seed_cache_path, torrent_seed_duration,
 
175
        torrent_listen_port_start, torrent_listen_port_end)
 
176
 
 
177
    seed_proc = utils.make_subprocess(seed_cmd)
 
178
    utils.finish_subprocess(seed_proc, seed_cmd)
 
179
 
 
180
 
 
181
def _seed_if_needed(seed_cache_path, tarball_path, torrent_path,
 
182
                    torrent_seed_duration, torrent_seed_chance,
 
183
                    torrent_listen_port_start, torrent_listen_port_end,
 
184
                    torrent_max_seeder_processes_per_host):
 
185
    seed_filename = os.path.basename(tarball_path)
 
186
    seed_path = os.path.join(seed_cache_path, seed_filename)
 
187
 
 
188
    if _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance,
 
189
                    torrent_max_seeder_processes_per_host):
 
190
        logging.debug("Preparing to seed '%s' for %d secs" % (
 
191
                      seed_path, torrent_seed_duration))
 
192
        utils._rename(tarball_path, seed_path)
 
193
 
 
194
        # Daemonize and seed the image
 
195
        _seed(torrent_path, seed_cache_path, torrent_seed_duration,
 
196
              torrent_listen_port_start, torrent_listen_port_end)
 
197
    else:
 
198
        utils.delete_if_exists(tarball_path)
 
199
 
 
200
 
 
201
def _extract_tarball(tarball_path, staging_path):
 
202
    """Extract the tarball into the staging directory."""
 
203
    tarball_fileobj = open(tarball_path, 'rb')
 
204
    try:
 
205
        utils.extract_tarball(tarball_fileobj, staging_path)
 
206
    finally:
 
207
        tarball_fileobj.close()
 
208
 
 
209
 
 
210
def _active_seeder_processes():
 
211
    """Yields command-line of active seeder processes.
 
212
 
 
213
    Roughly equivalent to performing ps | grep _bittorrent_seeder
 
214
    """
 
215
    pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
 
216
    for pid in pids:
 
217
        try:
 
218
            cmdline = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
 
219
        except IOError, e:
 
220
            if e.errno != errno.ENOENT:
 
221
                raise
 
222
 
 
223
        if SEEDER_PROCESS in cmdline:
 
224
            yield cmdline
 
225
 
 
226
 
 
227
def _reap_finished_seeds(seed_cache_path):
 
228
    """Delete any cached seeds where the seeder process has died."""
 
229
    logging.debug("Preparing to reap finished seeds")
 
230
    missing = {}
 
231
    for fname in os.listdir(seed_cache_path):
 
232
        seed_path = os.path.join(seed_cache_path, fname)
 
233
        missing[seed_path] = None
 
234
 
 
235
    for cmdline in _active_seeder_processes():
 
236
        for seed_path in missing.keys():
 
237
            seed_filename = os.path.basename(seed_path)
 
238
            if seed_filename in cmdline:
 
239
                del missing[seed_path]
 
240
 
 
241
    for seed_path in missing:
 
242
        logging.debug("Reaping cached seed '%s'" % seed_path)
 
243
        utils.delete_if_exists(seed_path)
 
244
 
 
245
 
 
246
def _make_seed_cache():
 
247
    seed_cache_path = os.environ.get('SEED_CACHE', DEFAULT_SEED_CACHE)
 
248
    if not os.path.exists(seed_cache_path):
 
249
        os.mkdir(seed_cache_path)
 
250
    return seed_cache_path
 
251
 
 
252
 
 
253
def download_vhd(session, image_id, torrent_base_url, torrent_seed_duration,
 
254
                 torrent_seed_chance, torrent_max_last_accessed,
 
255
                 torrent_listen_port_start, torrent_listen_port_end,
 
256
                 torrent_download_stall_cutoff, uuid_stack, sr_path,
 
257
                 torrent_max_seeder_processes_per_host):
 
258
    """Download an image from BitTorrent, unbundle it, and then deposit the
 
259
    VHDs into the storage repository
 
260
    """
 
261
    seed_cache_path = _make_seed_cache()
 
262
    torrent_cache_path = _make_torrent_cache()
 
263
 
 
264
    # Housekeeping
 
265
    _reap_finished_seeds(seed_cache_path)
 
266
    _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed)
 
267
 
 
268
    torrent_path = _fetch_torrent_file(
 
269
            torrent_cache_path, image_id, torrent_base_url)
 
270
 
 
271
    staging_path = utils.make_staging_area(sr_path)
 
272
    try:
 
273
        tarball_filename = os.path.basename(torrent_path).replace(
 
274
                '.torrent', '')
 
275
        tarball_path = os.path.join(staging_path, tarball_filename)
 
276
 
 
277
        # Download tarball into staging area
 
278
        _download(torrent_path, staging_path, torrent_listen_port_start,
 
279
                  torrent_listen_port_end, torrent_download_stall_cutoff)
 
280
 
 
281
        # Extract the tarball into the staging area
 
282
        _extract_tarball(tarball_path, staging_path)
 
283
 
 
284
        # Move the VHDs from the staging area into the storage repository
 
285
        vdi_list = utils.import_vhds(sr_path, staging_path, uuid_stack)
 
286
 
 
287
        # Seed image for others in the swarm
 
288
        _seed_if_needed(seed_cache_path, tarball_path, torrent_path,
 
289
                        torrent_seed_duration, torrent_seed_chance,
 
290
                        torrent_listen_port_start, torrent_listen_port_end,
 
291
                        torrent_max_seeder_processes_per_host)
 
292
    finally:
 
293
        utils.cleanup_staging_area(staging_path)
 
294
 
 
295
    return vdi_list
 
296
 
 
297
 
 
298
if __name__ == '__main__':
 
299
    utils.register_plugin_calls(download_vhd)