~ubuntuone-support/+junk/syncdaemon-perftest

« back to all changes in this revision

Viewing changes to syncdaemon-perftest.py

  • Committer: Roman Yepishev
  • Date: 2012-04-02 10:28:38 UTC
  • Revision ID: roman.yepishev@canonical.com-20120402102838-cftyw8c8754a1rrj
Latest version of the scripts

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
from random import randint
18
18
 
 
19
# Cleanup via rest api
 
20
from u1perftest.rest import OAuthHttpClient
 
21
 
19
22
TESTDIR="syncdaemon-perftest-tmp"
20
23
 
21
24
class SDPerfTest(PerfTest):
22
25
    def __init__(self):
 
26
        super(SDPerfTest, self).__init__()
 
27
 
23
28
        self.state = 'INIT'
24
 
        self.root = None
25
29
        self.error = False
26
30
 
27
31
        # "namespace" for benchmarking
28
32
        self.bm_temp_files = {}
29
 
        self.bm_files_created = 0
 
33
        self.bm_upload_started = 0
 
34
 
 
35
        # Register callbacks here
 
36
        # This starts being messy
 
37
        self._file_created_cb = self.add_temp_file_info
30
38
 
31
39
        DBusGMainLoop(set_as_default=True)
32
40
 
83
91
    def handle_sd_StatusChanged(self, status):
84
92
        self.logger.debug("State: %s, queues: %s", status['name'], status['queues'])
85
93
 
86
 
        if status['name'] == u"STANDOFF" and self.state == "UPLOADING":
87
 
            self.logger.warn("Syncdaemon disconnected while uploading. Aborting")
88
 
            self.cleanup_files()
89
 
            self.quit(error=True)
90
 
 
91
94
        if status['name'] == u"QUEUE_MANAGER" and \
92
95
                status['queues'] == u"IDLE":
93
 
            self.logger.info("Syncdaemon is IDLE")
94
 
            # we need to give some time for all the queue items to be
95
 
            # processed
96
 
            if self.state == 'CONNECTING':
97
 
                # "emitting" signal, switch to UPLOADING state
98
 
                return self.handle_CONNECTED()
99
 
 
100
 
            if self.state == 'PRECLEANUP':
101
 
                # files were removed, proceed
102
 
                return self.handle_PRECLEANUP_COMPLETED()
 
96
            self.logger.info("Syncdaemon is IDLE (internal state: %s)",
 
97
                    self.state)
103
98
 
104
99
            if self.state == "UPLOADING":
105
100
                return self.handle_UPLOAD_COMPLETED()
107
102
            if self.state == "CLEANUP":
108
103
                return self.handle_CLEANUP_COMPLETED()
109
104
 
 
105
    def add_temp_file_info(self, path, info):
 
106
        self.bm_temp_files[path] = info
 
107
 
110
108
    def handle_sd_UploadStarted(self, path):
 
109
        if not self.bm_upload_started:
 
110
            self.bm_upload_started = time.time()
 
111
 
111
112
        self.logger.debug("Got UploadStarted notification of %s", path)
112
113
 
113
114
        if path not in self.bm_temp_files:
144
145
            self.metrics.gauge('upload_speed', speed)
145
146
 
146
147
    def run(self, options):
147
 
        self.logger = setup_logging(options.log)
148
 
        self.file_count = int(options.file_count)
149
 
 
150
 
        if options.file_size.startswith('..'):
151
 
            self.file_size = int(options.file_size[2:])
152
 
            self.file_size_randomize = True
153
 
        else:
154
 
            self.file_size = int(options.file_size)
155
 
 
156
 
        self.track_speed = options.track_speed
157
 
        self.track_queues = options.track_queues
158
 
 
159
 
        statsd_info = options.statsd
160
 
        self._setup_metrics(statsd_info, options.graph_prefix)
161
 
 
162
 
        if options.timeout:
163
 
            self.timeout = int(options.timeout)
 
148
        self._parse_common_options(options)
164
149
 
165
150
        self.loop = gobject.MainLoop()
166
151
        gobject.timeout_add(0, self.setup)
 
152
        self.oauth = options.oauth
167
153
 
168
154
        if self.timeout:
169
155
            gobject.timeout_add(self.timeout * 1000,
172
158
        self.loop.run()
173
159
 
174
160
    def setup(self):
 
161
        # Performing cleanup unconditionally
 
162
        url = "https://one.ubuntu.com/api/file_storage/"\
 
163
                "v1/~/Ubuntu%%20One/%s" % (TESTDIR)
 
164
        client = OAuthHttpClient()
 
165
        c_key, c_secret, t_key, t_secret = self.oauth.split(':')
 
166
        client.set_consumer(c_key, c_secret)
 
167
        client.set_token(t_key, t_secret)
 
168
 
 
169
        resp, content = client.request(url, "DELETE")
 
170
        self.logger.info(
 
171
            "Removing %s via REST API: %s", TESTDIR, resp['status'])
 
172
 
175
173
        # we need to make sure we don't autoconnect on startup
176
174
        if self.config_if.autoconnect_enabled():
177
175
            self.config_if.disable_autoconnect()
178
 
            self.syncdaemon_if.disconnect()
 
176
 
 
177
        # we unconditionally disconnect syncdaemon to prepare the files for
 
178
        # upload.
 
179
        self.syncdaemon_if.disconnect()
179
180
 
180
181
        self.root = self.syncdaemon_if.get_rootdir()
181
182
        status = self.status_if.current_status()
182
 
 
183
 
        if status['name'] == u"QUEUE_MANAGER":
184
 
            # sd is already running but we need a fresh one
185
 
            self.syncdaemon_if.disconnect()
 
183
        self.logger.info("Current SD status: %s/%s", status['name'],
 
184
                                                     status['queues'])
186
185
 
187
186
        self.bm_test_started = time.time()
188
187
        self.logger.info("Starting end-to-end test")
 
188
        # We generate files prior to connection - this way we will be in
 
189
        # mainloop when all the reports are coming in preventing infinite
 
190
        # upload speeds reading
 
191
 
 
192
        self.logger.info("Creating files")
 
193
        self._generate_files(TESTDIR)
 
194
        self.logger.info("Files created")
 
195
 
189
196
        self.handle_CONNECT()
190
197
    
191
198
    def handle_CONNECT(self):
192
 
        self.state = 'CONNECTING'
 
199
        # We upload immediately
 
200
        self.state = 'UPLOADING'
193
201
        self.syncdaemon_if.connect()
194
202
 
195
 
    def handle_CONNECTED(self):
196
 
        self.state = 'PRECLEANUP'
197
 
        if os.path.exists(os.path.join(self.root, TESTDIR)):
198
 
            self.logger.info("Found old test directory. Cleaning up")
199
 
            self.cleanup_files()
200
 
        else:
201
 
            # short-circuit
202
 
            return self.handle_PRECLEANUP_COMPLETED()
203
 
 
204
203
    def handle_TIMEOUT(self):
205
204
        self.state = "ERROR"
206
205
        self.logger.error("Test timeout, shutting down.")
208
207
        # fake result
209
208
        self.quit(error=True)
210
209
 
211
 
    def handle_PRECLEANUP_COMPLETED(self):
212
 
        self.state = 'UPLOADING'
213
 
        self.logger.info("Creating files")
214
 
 
215
 
        path_to_file = TESTDIR
216
 
 
217
 
        current_path = self.root
218
 
 
219
 
        for node in path_to_file.split('/'):
220
 
            current_path = os.path.join(current_path, node)
221
 
            os.mkdir(current_path)
222
 
            self.bm_temp_files[current_path] = { 'isdir': True }
223
 
 
224
 
        rnd = open("/dev/urandom", "r")
225
 
 
226
 
        for file_id in range(self.file_count):
227
 
            path = os.path.join(current_path, u"file%d.bin" % (file_id,))
228
 
            if self.file_size_randomize:
229
 
                size = randint(0, self.file_size)
230
 
            else:
231
 
                size = self.file_size
232
 
 
233
 
            self.logger.debug("Creating %s", path)
234
 
 
235
 
            self.bm_temp_files[path] = {
236
 
                'uploaded': False,
237
 
                'size': size,
238
 
                'upload_started': None
239
 
            }
240
 
 
241
 
            fh = open(path, "w")
242
 
 
243
 
            while size > 0:
244
 
                result = rnd.read(1024)
245
 
                fh.write(result)
246
 
                size = size - len(result)
247
 
 
248
 
            fh.close()
249
 
 
250
 
        rnd.close()
251
 
 
252
 
        self.bm_files_created = time.time()
253
 
        self.logger.info("Files created")
254
 
 
255
210
    def cleanup_files(self):
256
211
        self.logger.info("Moving existing test folder to trash")
257
212
 
265
220
        rmtree(new)
266
221
 
267
222
    def handle_UPLOAD_COMPLETED(self):
268
 
 
269
 
        for entry in self.bm_temp_files.values():
270
 
            if 'uploaded' not in entry:
 
223
        missing = 0
 
224
        for path, info in self.bm_temp_files.items():
 
225
            if 'uploaded' not in info:
271
226
                # this is not a file, we ignore this item
272
227
                continue
273
228
 
274
 
            if not entry['uploaded']:
275
 
                self.logger.info("QUEUE_MANAGER/IDLE but files are still being uploaded")
276
 
                return
 
229
            if not info['uploaded']:
 
230
                self.logger.debug("NOT UPLOADED: %s", path)
 
231
                missing += 1
 
232
 
 
233
        if missing:
 
234
            self.logger.info("QUEUE_MANAGER/IDLE but %d files"\
 
235
                             " are still being uploaded", missing)
 
236
            return
277
237
 
278
238
        # how much time did it take?
279
 
        delta = time.time() - self.bm_files_created
 
239
        delta = time.time() - self.bm_upload_started
280
240
        rate = self.file_count / delta
281
241
        if self.track_queues:
282
242
            self.metrics.gauge('meta_queue', rate)
283
243
 
284
 
        self.logger.info("Queue completed in %d seconds (%d requests/s)",
 
244
        self.logger.info("Queue completed in %d seconds (%.4f requests/s)",
285
245
                delta, rate)
286
246
 
287
247
        self.state = 'CLEANUP'
335
295
    parser.add_option("--statsd", dest="statsd",
336
296
            default=None,
337
297
            help="host:port for statsd server")
 
298
    parser.add_option("--oauth", dest="oauth",
 
299
            default=None,
 
300
            help="OAuth Token in the form of a:b:c:d")
338
301
 
339
302
    options, args = parser.parse_args()
340
303