~ubuntu-branches/ubuntu/precise/desktopcouch/precise

« back to all changes in this revision

Viewing changes to desktopcouch/records/server_base.py

  • Committer: Bazaar Package Importer
  • Author(s): Ken VanDine
  • Date: 2010-04-19 12:52:22 UTC
  • mfrom: (12.1.9 lucid)
  • Revision ID: james.westby@ubuntu.com-20100419125222-zsh9lrm15h84951j
* debian/patches/lp_522538.patch
  - Handle reconnects if the server isn't running (LP: #522538)

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
"""The Desktop Couch Records API."""
23
23
 
24
24
import httplib2, urlparse, cgi, copy
25
 
from time import time, sleep
26
 
import socket
 
25
from time import time
27
26
 
28
27
# please keep desktopcouch python 2.5 compatible for now
29
28
 
43
42
from couchdb.client import ResourceNotFound, ResourceConflict, uri as couchdburi
44
43
from couchdb.design import ViewDefinition
45
44
from record import Record
46
 
import logging
47
45
 
48
46
#DEFAULT_DESIGN_DOCUMENT = "design"
49
47
DEFAULT_DESIGN_DOCUMENT = None  # each view in its own eponymous design doc.
144
142
 
145
143
    def __init__(self, database, uri, record_factory=None, create=False,
146
144
                 server_class=Server, **server_class_extras):
147
 
        self._database_name = database
148
 
        self._create = create
149
 
        self._server_class = server_class
150
 
        self._server_class_extras = server_class_extras
 
145
        self.server_uri = uri
 
146
        self._server = server_class(self.server_uri, **server_class_extras)
 
147
        if database not in self._server:
 
148
            if create:
 
149
                self._server.create(database)
 
150
            else:
 
151
                raise NoSuchDatabase(database)
 
152
        self.db = self._server[database]
151
153
        self.record_factory = record_factory or Record
152
 
        self.server_uri = uri
153
 
        self._server = None
154
 
        self.db = None
155
 
        self._reconnect()
156
154
        self._changes_since = self.db.info()["update_seq"]
157
155
        self._changes_last_used = 0  # Immediate run works.
158
156
 
159
 
    @staticmethod
160
 
    def _is_reconnection_fail(ex):
161
 
        return isinstance(ex, AttributeError) and \
162
 
                ex.args == ("'NoneType' object has no attribute 'makefile'",)
163
 
 
164
 
    def with_reconnects(self, func, *args, **kwargs):
165
 
        for retry in (3, 2, 1, None):
166
 
            try:
167
 
                return func(*args, **kwargs)
168
 
            except Exception, e:
169
 
                if self._is_reconnection_fail(e) and retry:
170
 
                    logging.warn("DB connection failed.  Reconnecting.")
171
 
                    self._reconnect()
172
 
                    continue
173
 
                elif isinstance(e, socket.error):
174
 
                    logging.warn("Other socket error %s.  Reconnecting.", e)
175
 
                    sleep(0.3)  
176
 
                    self._reconnect()
177
 
                    continue
178
 
                else:
179
 
                    raise
180
 
        raise ValueError("failed to (re-)connect to couchdb server")
181
 
 
182
 
    def _reconnect(self, uri=None):
183
 
        logging.info("Connecting to %s.", self.server_uri or "discovered local port")
184
 
 
185
 
        self._server = self._server_class(uri or self.server_uri,
186
 
                **self._server_class_extras)
187
 
        if self._database_name not in self._server:
188
 
            if self._create:
189
 
                self._server.create(self._database_name)
190
 
            else:
191
 
                raise NoSuchDatabase(self._database_name)
192
 
        if self.db is None:
193
 
            self.db = self._server[self._database_name]
194
 
        else:
195
 
            # Monkey-patch the object the user already uses.  Oook!
196
 
            new_db = self._server[self._database_name]
197
 
            self.db.resource = new_db.resource
198
157
 
199
158
    def _temporary_query(self, map_fun, reduce_fun=None, language='javascript',
200
159
            wrapper=None, **options):
201
160
        """Pass-through to CouchDB library.  Deprecated."""
202
 
        return self.with_reconnects(self.db.query, map_fun, reduce_fun, language,
 
161
        return self.db.query(map_fun, reduce_fun, language,
203
162
              wrapper, **options)
204
163
 
205
164
    def get_record(self, record_id):
206
165
        """Get a record from back end storage."""
207
166
        try:
208
 
            couch_record = self.with_reconnects(self.db.__getitem__, record_id)
 
167
            couch_record = self.db[record_id]
209
168
        except ResourceNotFound:
210
169
            return None
211
170
        data = {}
234
193
            # Do not rely on couchdb to create an ID for us.
235
194
            from uuid import uuid4
236
195
            record.record_id = uuid4().hex
237
 
        try:
238
 
            self.with_reconnects(self.db.__setitem__,
239
 
                    record.record_id, record._data)
240
 
        except ResourceConflict:
241
 
            if record.record_revision is None and not row_is_deleted(record):
242
 
                old_record = self.with_reconnects(self.db.__getitem__,
243
 
                    record.record_id)
244
 
                if row_is_deleted(old_record):
245
 
                    # User asked to make new record that already exists
246
 
                    # but we have marked deleted internally.  Instead of
247
 
                    # complaining, pull up the previous revision ID and
248
 
                    # add that to the user's record, and re-send it.
249
 
                    record._set_record_revision(old_record.rev)
250
 
 
251
 
                    self.with_reconnects(self.db.__setitem__,
252
 
                            record.record_id, record._data)
253
 
                else:
254
 
                    raise
255
 
            else:
256
 
                raise
257
 
 
258
 
        for attachment_name in getattr(record, "_detached", []):
259
 
            self.with_reconnects(self.db.delete_attachment,
260
 
                record._data, attachment_name)
 
196
        self.db[record.record_id] = record._data
261
197
 
262
198
        for attachment_name in record.list_attachments():
263
199
            data, content_type = record.attachment_data(attachment_name)
264
 
            self.with_reconnects(self.db.put_attachment,
 
200
            self.db.put_attachment(
265
201
                record._data, data, attachment_name, content_type)
266
202
 
267
203
        return record.record_id
279
215
        # although with a single record we need to test for the
280
216
        # revisison, with a batch we do not, but we have to make sure
281
217
        # that we did not get an error
282
 
        batch_put_result = self.with_reconnects(
283
 
                self.db.update, [record._data for record in batch])
 
218
        batch_put_result = self.db.update([record._data for record in batch])
284
219
        for current_tuple in batch_put_result:
285
220
            success, docid, rev_or_exc = current_tuple
286
221
            if success:
287
222
                record = records_hash[docid]
288
223
                # set the new rev
289
224
                record._data["_rev"] = rev_or_exc
290
 
 
291
 
                for attachment_name in getattr(record, "_detached", []):
292
 
                    self.with_reconnects(self.db.delete_attachment,
293
 
                        record._data, attachment_name)
294
 
 
295
225
                for attachment_name in record.list_attachments():
296
226
                    data, content_type = record.attachment_data(attachment_name)
297
 
                    self.with_reconnects(self.db.put_attachment,
 
227
                    self.db.put_attachment(
298
228
                        {"_id":record.record_id, "_rev":record["_rev"]},
299
229
                        data, attachment_name, content_type)
300
230
        # all success record have the blobs added we return result of
319
249
        # as far as we know. (If they're not, we'll get a
320
250
        # ResourceConflict later on, from which we can recover.)
321
251
        if cached_record is None:
322
 
            cached_record = self.with_reconnects(self.db.__getitem__,
323
 
                    record_id)
 
252
            cached_record = self.db[record_id]
324
253
        if isinstance(cached_record, Record):
325
254
            cached_record = cached_record._data
326
255
        record = copy.deepcopy(cached_record)
365
294
            # it.
366
295
            if modified:
367
296
                try:
368
 
                    self.with_reconnects(self.db.__setitem__, record_id,
369
 
                            record)
 
297
                    self.db[record_id] = record
370
298
                except ResourceConflict:
371
299
                    # We got a conflict, meaning the record has
372
300
                    # changed in the database since we last loaded it
373
301
                    # into memory. Let's get a fresh copy and try
374
302
                    # again.
375
 
                    record = self.with_reconnects(self.db.__getitem__,
376
 
                            record_id)
 
303
                    record = self.db[record_id]
377
304
                    continue
378
305
            # If we get here, nothing remains to be done, and we can
379
306
            # take a well deserved break.
381
308
 
382
309
    def delete_record(self, record_id):
383
310
        """Delete record with given id"""
384
 
        record = self.with_reconnects(self.db.__getitem__, record_id)
 
311
        record = self.db[record_id]
385
312
        record.setdefault('application_annotations', {}).setdefault(
386
313
            'Ubuntu One', {}).setdefault('private_application_annotations', {})[
387
314
            'deleted'] = True
388
 
        self.with_reconnects(self.db.__setitem__, record_id, record)
 
315
        self.db[record_id] = record
389
316
 
390
317
    def record_exists(self, record_id):
391
318
        """Check if record with given id exists."""
392
 
        try:
393
 
            record = self.with_reconnects(self.db.__getitem__, record_id)
394
 
        except ResourceNotFound:
 
319
        if record_id not in self.db:
395
320
            return False
 
321
        record = self.db[record_id]
396
322
        return not row_is_deleted(record)
397
323
 
398
324
    def delete_view(self, view_name, design_doc=DEFAULT_DESIGN_DOCUMENT):
406
332
        # No atomic updates.  Only read & mutate & write.  Le sigh.
407
333
        # First, get current contents.
408
334
        try:
409
 
            view_container = self.with_reconnects(
410
 
                    self.db.__getitem__, doc_id)["views"]
 
335
            view_container = self.db[doc_id]["views"]
411
336
        except (KeyError, ResourceNotFound):
412
337
            raise KeyError
413
338
 
433
358
 
434
359
            # Remove design document.  This assumes there are only views in
435
360
            # design documents.  :(
436
 
            self.with_reconnects(self.db.__delitem__, doc_id)
 
361
            del self.db[doc_id]
437
362
 
438
363
        assert not self.view_exists(view_name, design_doc)
439
364
 
442
367
    def execute_view(self, view_name, design_doc=DEFAULT_DESIGN_DOCUMENT,
443
368
            **params):
444
369
        """Execute view and return results."""
445
 
 
446
 
        class ReconnectingViewWrapper(object):
447
 
            """A view from python-couchdb is an object with attributes that
448
 
            cause HTTP requests to be fired off when accessed.  If we wish
449
 
            to be able to reconnect on disappearance of the server, then
450
 
            we must intercept calls from user to python-couchdb."""
451
 
 
452
 
            def __init__(wrapper, obj, *args, **kwargs):
453
 
                wrapper.obj = self.with_reconnects(obj, *args, **kwargs)
454
 
 
455
 
            def ___call__(wrapper, **options):
456
 
                return self.with_reconnects(wrapper.obj.__call__, **options)
457
 
 
458
 
            def __iter__(wrapper):
459
 
                return self.view_with_reconnects(wrapper.obj.__iter__,
460
 
                                                 wrapper.obj)
461
 
 
462
 
            def __len__(wrapper):
463
 
                return self.view_with_reconnects(wrapper.obj.__len__,
464
 
                                                 wrapper.obj)
465
 
 
466
 
            def __getattr__(wrapper, key):
467
 
                return self.with_reconnects(getattr, wrapper.obj, key)
468
 
 
469
 
            def __getitem__(wrapper, key):
470
 
                return ReconnectingViewWrapper(wrapper.obj.__getitem__, key)
471
 
 
472
370
        if design_doc is None:
473
371
            design_doc = view_name
474
372
 
475
373
        view_id_fmt = "_design/%(design_doc)s/_view/%(view_name)s"
476
 
        wrapper = ReconnectingViewWrapper(self.db.view, 
477
 
                                          view_id_fmt % locals(),
478
 
                                          **params)
479
 
        return wrapper
480
 
 
481
 
    def view_with_reconnects(self, func, view, *args, **kwargs):
482
 
        for retry in (3, 2, 1, None):
483
 
            try:
484
 
                return func(*args, **kwargs)
485
 
            except Exception, e:
486
 
                if self._is_reconnection_fail(e) and retry:
487
 
                    logging.warn("DB connection failed.  Reconnecting.")
488
 
                    self._reconnect_view(view_result=view)
489
 
                    continue
490
 
                elif isinstance(e, socket.error):
491
 
                    logging.warn("Other socket error %s.  Reconnecting.", e)
492
 
                    sleep(0.3)  
493
 
                    self._reconnect()
494
 
                    continue
495
 
                else:
496
 
                    raise
497
 
        raise ValueError("failed to (re-)connect to couchdb server")
498
 
 
499
 
    def _reconnect_view(self, uri=None, view_result=None):
500
 
        logging.info("Connecting to %s.",
501
 
                     self.server_uri or "discovered local port")
502
 
        self._server = self._server_class(
503
 
            uri or '/'.join(self.db.resource.uri.split('/')[:-1]),
504
 
                **self._server_class_extras)
505
 
        if self._database_name not in self._server:
506
 
            if self._create:
507
 
                self._server.create(self._database_name)
508
 
            else:
509
 
                raise NoSuchDatabase(self._database_name)
510
 
        if self.db is None:
511
 
            self.db = self._server[self._database_name]
512
 
        else:
513
 
            # Monkey-patch the object the user already uses.  Oook!
514
 
            view_url = urlparse.urlparse(view_result.view.resource.uri)
515
 
            db_url = urlparse.urlparse(self.db.resource.uri)
516
 
            view_result.view.resource.uri = urlparse.urlunparse(
517
 
                [view_url[0], db_url[1]] + list(view_url[2:]))
 
374
        return self.db.view(view_id_fmt % locals(), **params)
518
375
 
519
376
    def add_view(self, view_name, map_js, reduce_js,
520
377
            design_doc=DEFAULT_DESIGN_DOCUMENT):
524
381
            design_doc = view_name
525
382
 
526
383
        view = ViewDefinition(design_doc, view_name, map_js, reduce_js)
527
 
        self.with_reconnects(view.sync, self.db)
 
384
        view.sync(self.db)
528
385
        assert self.view_exists(view_name, design_doc)
529
386
 
530
387
    def view_exists(self, view_name, design_doc=DEFAULT_DESIGN_DOCUMENT):
536
393
        doc_id = "_design/%(design_doc)s" % locals()
537
394
 
538
395
        try:
539
 
            view_container = \
540
 
                    self.with_reconnects(self.db.__getitem__, doc_id)["views"]
 
396
            view_container = self.db[doc_id]["views"]
541
397
            return view_name in view_container
542
398
        except (KeyError, ResourceNotFound):
543
399
            return False
548
404
        in it."""
549
405
        doc_id = "_design/%(design_doc)s" % locals()
550
406
        try:
551
 
            return list(self.with_reconnects(self.db.__getitem__, doc_id)["views"])
 
407
            return list(self.db[doc_id]["views"])
552
408
        except (KeyError, ResourceNotFound):
553
409
            return []
554
410
 
650
506
            uri = couchdburi(
651
507
                self._server.resource.uri, self.db.name, "_changes",
652
508
                since=self._changes_since)
653
 
            ## Assume server has not crashed and URI is the same.  FIXME
654
509
            resp, data = self._server.resource.http.request(uri, "GET", "", {})
655
510
            if resp["status"] != '200':
656
511
                raise IOError(
667
522
            # If exception in cb, we never update governor.
668
523
            self._changes_last_used = now
669
524
        return call_count
670
 
 
671
 
    def ensure_full_commit(self):
672
 
        """
673
 
        Make sure that CouchDb flushes all writes to the database,
674
 
        flushing all delayed commits, before going on.
675
 
        """
676
 
        self.db.resource.post(
677
 
            path='_ensure_full_commit',
678
 
            headers={'Content-Type': 'application/json'})