3
# Copyright (c) 2012 Openstack, LLC
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
6
# not use this file except in compliance with the License. You may obtain
7
# a copy of the License at
9
# http://www.apache.org/licenses/LICENSE-2.0
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
# License for the specific language governing permissions and limitations
17
"""Download images via BitTorrent."""
33
#FIXME(sirp): should this use pluginlib from 5.6?
34
from pluginlib_nova import *
35
configure_logging('bittorrent')
37
DEFAULT_TORRENT_CACHE = '/images/torrents'
38
DEFAULT_SEED_CACHE = '/images/seeds'
39
SEEDER_PROCESS = '_bittorrent_seeder'
42
def _make_torrent_cache():
43
torrent_cache_path = os.environ.get(
44
'TORRENT_CACHE', DEFAULT_TORRENT_CACHE)
46
if not os.path.exists(torrent_cache_path):
47
os.mkdir(torrent_cache_path)
49
return torrent_cache_path
52
def _fetch_torrent_file(torrent_cache_path, image_id, torrent_base_url):
53
torrent_path = os.path.join(
54
torrent_cache_path, image_id + '.torrent')
56
if not os.path.exists(torrent_path):
57
torrent_url = torrent_base_url + "/%s.torrent" % image_id
58
logging.info("Downloading %s" % torrent_url)
60
# Write contents to temporary path to ensure we don't have partially
61
# completed files in the cache.
62
temp_directory = tempfile.mkdtemp(dir=torrent_cache_path)
64
temp_path = os.path.join(
65
temp_directory, os.path.basename(torrent_path))
66
temp_file = open(temp_path, 'wb')
68
remote_torrent_file = urllib2.urlopen(torrent_url)
69
shutil.copyfileobj(remote_torrent_file, temp_file)
73
os.rename(temp_path, torrent_path)
75
shutil.rmtree(temp_directory)
80
def _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed):
81
"""Delete any torrent files that haven't been accessed recently."""
82
if not torrent_max_last_accessed:
83
logging.debug("Reaping old torrent files disabled, skipping...")
86
logging.debug("Preparing to reap old torrent files,"
87
" torrent_max_last_accessed=%d" % torrent_max_last_accessed)
89
for fname in os.listdir(torrent_cache_path):
90
torrent_path = os.path.join(torrent_cache_path, fname)
91
last_accessed = time.time() - os.path.getatime(torrent_path)
92
if last_accessed > torrent_max_last_accessed:
93
logging.debug("Reaping '%s', last_accessed=%d" % (
94
torrent_path, last_accessed))
95
utils.delete_if_exists(torrent_path)
98
def _download(torrent_path, save_as_path, torrent_listen_port_start,
99
torrent_listen_port_end, torrent_download_stall_cutoff):
100
session = libtorrent.session()
101
session.listen_on(torrent_listen_port_start, torrent_listen_port_end)
102
info = libtorrent.torrent_info(
103
libtorrent.bdecode(open(torrent_path, 'rb').read()))
105
torrent = session.add_torrent(
107
storage_mode=libtorrent.storage_mode_t.storage_mode_sparse)
111
last_progress_updated = time.time()
113
while not torrent.is_seed():
116
progress = s.progress * 100
118
if progress != last_progress:
119
last_progress = progress
120
last_progress_updated = time.time()
122
stall_duration = time.time() - last_progress_updated
123
if stall_duration > torrent_download_stall_cutoff:
125
"Download stalled: stall_duration=%d,"
126
" torrent_download_stall_cutoff=%d" % (
127
stall_duration, torrent_download_stall_cutoff))
128
raise Exception("Bittorrent download stall detected, bailing!")
131
'%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d)'
132
' %s %s' % (progress, s.download_rate / 1000,
133
s.upload_rate / 1000, s.num_peers, s.state,
137
session.remove_torrent(torrent)
139
logging.debug("Download of '%s' finished" % torrent_path)
142
def _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance,
143
torrent_max_seeder_processes_per_host):
144
if not torrent_seed_duration:
145
logging.debug("Seeding disabled, skipping...")
148
if os.path.exists(seed_path):
149
logging.debug("Seed is already present, skipping....")
152
rand = random.random()
153
if rand > torrent_seed_chance:
154
logging.debug("%.2f > %.2f, seeding randomly skipping..." % (
155
rand, torrent_seed_chance))
158
num_active_seeders = len(list(_active_seeder_processes()))
159
if (torrent_max_seeder_processes_per_host >= 0 and
160
num_active_seeders >= torrent_max_seeder_processes_per_host):
161
logging.debug("max number of seeder processes for this host reached"
162
" (%d), skipping..." %
163
torrent_max_seeder_processes_per_host)
169
def _seed(torrent_path, seed_cache_path, torrent_seed_duration,
170
torrent_listen_port_start, torrent_listen_port_end):
171
plugin_path = os.path.dirname(inspect.getabsfile(inspect.currentframe()))
172
seeder_path = os.path.join(plugin_path, SEEDER_PROCESS)
173
seed_cmd = "%s %s %s %d %d %d" % (
174
seeder_path, torrent_path, seed_cache_path, torrent_seed_duration,
175
torrent_listen_port_start, torrent_listen_port_end)
177
seed_proc = utils.make_subprocess(seed_cmd)
178
utils.finish_subprocess(seed_proc, seed_cmd)
181
def _seed_if_needed(seed_cache_path, tarball_path, torrent_path,
182
torrent_seed_duration, torrent_seed_chance,
183
torrent_listen_port_start, torrent_listen_port_end,
184
torrent_max_seeder_processes_per_host):
185
seed_filename = os.path.basename(tarball_path)
186
seed_path = os.path.join(seed_cache_path, seed_filename)
188
if _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance,
189
torrent_max_seeder_processes_per_host):
190
logging.debug("Preparing to seed '%s' for %d secs" % (
191
seed_path, torrent_seed_duration))
192
utils._rename(tarball_path, seed_path)
194
# Daemonize and seed the image
195
_seed(torrent_path, seed_cache_path, torrent_seed_duration,
196
torrent_listen_port_start, torrent_listen_port_end)
198
utils.delete_if_exists(tarball_path)
201
def _extract_tarball(tarball_path, staging_path):
202
"""Extract the tarball into the staging directory."""
203
tarball_fileobj = open(tarball_path, 'rb')
205
utils.extract_tarball(tarball_fileobj, staging_path)
207
tarball_fileobj.close()
210
def _active_seeder_processes():
211
"""Yields command-line of active seeder processes.
213
Roughly equivalent to performing ps | grep _bittorrent_seeder
215
pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
218
cmdline = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
220
if e.errno != errno.ENOENT:
223
if SEEDER_PROCESS in cmdline:
227
def _reap_finished_seeds(seed_cache_path):
228
"""Delete any cached seeds where the seeder process has died."""
229
logging.debug("Preparing to reap finished seeds")
231
for fname in os.listdir(seed_cache_path):
232
seed_path = os.path.join(seed_cache_path, fname)
233
missing[seed_path] = None
235
for cmdline in _active_seeder_processes():
236
for seed_path in missing.keys():
237
seed_filename = os.path.basename(seed_path)
238
if seed_filename in cmdline:
239
del missing[seed_path]
241
for seed_path in missing:
242
logging.debug("Reaping cached seed '%s'" % seed_path)
243
utils.delete_if_exists(seed_path)
246
def _make_seed_cache():
247
seed_cache_path = os.environ.get('SEED_CACHE', DEFAULT_SEED_CACHE)
248
if not os.path.exists(seed_cache_path):
249
os.mkdir(seed_cache_path)
250
return seed_cache_path
253
def download_vhd(session, image_id, torrent_base_url, torrent_seed_duration,
254
torrent_seed_chance, torrent_max_last_accessed,
255
torrent_listen_port_start, torrent_listen_port_end,
256
torrent_download_stall_cutoff, uuid_stack, sr_path,
257
torrent_max_seeder_processes_per_host):
258
"""Download an image from BitTorrent, unbundle it, and then deposit the
259
VHDs into the storage repository
261
seed_cache_path = _make_seed_cache()
262
torrent_cache_path = _make_torrent_cache()
265
_reap_finished_seeds(seed_cache_path)
266
_reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed)
268
torrent_path = _fetch_torrent_file(
269
torrent_cache_path, image_id, torrent_base_url)
271
staging_path = utils.make_staging_area(sr_path)
273
tarball_filename = os.path.basename(torrent_path).replace(
275
tarball_path = os.path.join(staging_path, tarball_filename)
277
# Download tarball into staging area
278
_download(torrent_path, staging_path, torrent_listen_port_start,
279
torrent_listen_port_end, torrent_download_stall_cutoff)
281
# Extract the tarball into the staging area
282
_extract_tarball(tarball_path, staging_path)
284
# Move the VHDs from the staging area into the storage repository
285
vdi_list = utils.import_vhds(sr_path, staging_path, uuid_stack)
287
# Seed image for others in the swarm
288
_seed_if_needed(seed_cache_path, tarball_path, torrent_path,
289
torrent_seed_duration, torrent_seed_chance,
290
torrent_listen_port_start, torrent_listen_port_end,
291
torrent_max_seeder_processes_per_host)
293
utils.cleanup_staging_area(staging_path)
298
if __name__ == '__main__':
299
utils.register_plugin_calls(download_vhd)