~ubuntu-branches/ubuntu/natty/nova/natty

« back to all changes in this revision

Viewing changes to nova/virt/vmwareapi/io_util.py

  • Committer: Bazaar Package Importer
  • Author(s): Chuck Short, Chuck Short, Soren Hansen
  • Date: 2011-03-31 11:25:10 UTC
  • mfrom: (1.1.14 upstream)
  • Revision ID: james.westby@ubuntu.com-20110331112510-j8imyjc43jpxbe5g
Tags: 2011.2~bzr925-0ubuntu1
[Chuck Short]
* New upstream release.

[Soren Hansen]
* Make the build fail if the test suite does. The test that used to
  fail on the buildd's has been complete rewritten. (LP: #712481)
* Specify that we need Sphinx > 1.0 to build.
* Remove refresh_bzr_branches target from debian/rules. It is not used
  anymore.
* Clean up after doc builds on debian/rules clean.
* Add a nova-ajax-console-proxy package.
* Add Recommends: ajaxterm to nova-compute, so that nova-ajax-console-
  proxy will have something to connect to.
* Stop depending on aoetools. iscsi is the default nowadays (and has
  been for a while).
* Move dependency on open-iscsi from nova-volume to nova-compute.
  They're client tools, so that's where they belong.
* Add a build-depends on python-suds.
* Add logrote config for nova-ajax-console-proxy.
* Add upstart job for nova-ajax-console-proxy.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright (c) 2011 Citrix Systems, Inc.
 
4
# Copyright 2011 OpenStack LLC.
 
5
#
 
6
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
7
#    not use this file except in compliance with the License. You may obtain
 
8
#    a copy of the License at
 
9
#
 
10
#         http://www.apache.org/licenses/LICENSE-2.0
 
11
#
 
12
#    Unless required by applicable law or agreed to in writing, software
 
13
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
14
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
15
#    License for the specific language governing permissions and limitations
 
16
#    under the License.
 
17
 
 
18
"""
 
19
Utility classes for defining the time saving transfer of data from the reader
 
20
to the write using a LightQueue as a Pipe between the reader and the writer.
 
21
"""
 
22
 
 
23
from eventlet import event
 
24
from eventlet import greenthread
 
25
from eventlet.queue import LightQueue
 
26
 
 
27
from glance import client
 
28
 
 
29
from nova import exception
 
30
from nova import log as logging
 
31
 
 
32
LOG = logging.getLogger("nova.virt.vmwareapi.io_util")
 
33
 
 
34
IO_THREAD_SLEEP_TIME = .01
 
35
GLANCE_POLL_INTERVAL = 5
 
36
 
 
37
 
 
38
class ThreadSafePipe(LightQueue):
 
39
    """The pipe to hold the data which the reader writes to and the writer
 
40
    reads from."""
 
41
 
 
42
    def __init__(self, maxsize, transfer_size):
 
43
        LightQueue.__init__(self, maxsize)
 
44
        self.transfer_size = transfer_size
 
45
        self.transferred = 0
 
46
 
 
47
    def read(self, chunk_size):
 
48
        """Read data from the pipe. Chunksize if ignored for we have ensured
 
49
        that the data chunks written to the pipe by readers is the same as the
 
50
        chunks asked for by the Writer."""
 
51
        if self.transferred < self.transfer_size:
 
52
            data_item = self.get()
 
53
            self.transferred += len(data_item)
 
54
            return data_item
 
55
        else:
 
56
            return ""
 
57
 
 
58
    def write(self, data):
 
59
        """Put a data item in the pipe."""
 
60
        self.put(data)
 
61
 
 
62
    def close(self):
 
63
        """A place-holder to maintain consistency."""
 
64
        pass
 
65
 
 
66
 
 
67
class GlanceWriteThread(object):
 
68
    """Ensures that image data is written to in the glance client and that
 
69
    it is in correct ('active')state."""
 
70
 
 
71
    def __init__(self, input, glance_client, image_id, image_meta={}):
 
72
        self.input = input
 
73
        self.glance_client = glance_client
 
74
        self.image_id = image_id
 
75
        self.image_meta = image_meta
 
76
        self._running = False
 
77
 
 
78
    def start(self):
 
79
        self.done = event.Event()
 
80
 
 
81
        def _inner():
 
82
            """Function to do the image data transfer through an update
 
83
            and thereon checks if the state is 'active'."""
 
84
            self.glance_client.update_image(self.image_id,
 
85
                                            image_meta=self.image_meta,
 
86
                                            image_data=self.input)
 
87
            self._running = True
 
88
            while self._running:
 
89
                try:
 
90
                    image_status = \
 
91
                        self.glance_client.get_image_meta(self.image_id).get(
 
92
                                                                "status")
 
93
                    if image_status == "active":
 
94
                        self.stop()
 
95
                        self.done.send(True)
 
96
                    # If the state is killed, then raise an exception.
 
97
                    elif image_status == "killed":
 
98
                        self.stop()
 
99
                        exc_msg = _("Glance image %s is in killed state") %\
 
100
                                    self.image_id
 
101
                        LOG.exception(exc_msg)
 
102
                        self.done.send_exception(exception.Error(exc_msg))
 
103
                    elif image_status in ["saving", "queued"]:
 
104
                        greenthread.sleep(GLANCE_POLL_INTERVAL)
 
105
                    else:
 
106
                        self.stop()
 
107
                        exc_msg = _("Glance image "
 
108
                                    "%(image_id)s is in unknown state "
 
109
                                    "- %(state)s") % {
 
110
                                            "image_id": self.image_id,
 
111
                                            "state": image_status}
 
112
                        LOG.exception(exc_msg)
 
113
                        self.done.send_exception(exception.Error(exc_msg))
 
114
                except Exception, exc:
 
115
                    self.stop()
 
116
                    self.done.send_exception(exc)
 
117
 
 
118
        greenthread.spawn(_inner)
 
119
        return self.done
 
120
 
 
121
    def stop(self):
 
122
        self._running = False
 
123
 
 
124
    def wait(self):
 
125
        return self.done.wait()
 
126
 
 
127
    def close(self):
 
128
        pass
 
129
 
 
130
 
 
131
class IOThread(object):
 
132
    """Class that reads chunks from the input file and writes them to the
 
133
    output file till the transfer is completely done."""
 
134
 
 
135
    def __init__(self, input, output):
 
136
        self.input = input
 
137
        self.output = output
 
138
        self._running = False
 
139
        self.got_exception = False
 
140
 
 
141
    def start(self):
 
142
        self.done = event.Event()
 
143
 
 
144
        def _inner():
 
145
            """Read data from the input and write the same to the output
 
146
            until the transfer completes."""
 
147
            self._running = True
 
148
            while self._running:
 
149
                try:
 
150
                    data = self.input.read(None)
 
151
                    if not data:
 
152
                        self.stop()
 
153
                        self.done.send(True)
 
154
                    self.output.write(data)
 
155
                    greenthread.sleep(IO_THREAD_SLEEP_TIME)
 
156
                except Exception, exc:
 
157
                    self.stop()
 
158
                    LOG.exception(exc)
 
159
                    self.done.send_exception(exc)
 
160
 
 
161
        greenthread.spawn(_inner)
 
162
        return self.done
 
163
 
 
164
    def stop(self):
 
165
        self._running = False
 
166
 
 
167
    def wait(self):
 
168
        return self.done.wait()