145
143
def __init__(self, database, uri, record_factory=None, create=False,
146
144
server_class=Server, **server_class_extras):
147
self._database_name = database
148
self._create = create
149
self._server_class = server_class
150
self._server_class_extras = server_class_extras
145
self.server_uri = uri
146
self._server = server_class(self.server_uri, **server_class_extras)
147
if database not in self._server:
149
self._server.create(database)
151
raise NoSuchDatabase(database)
152
self.db = self._server[database]
151
153
self.record_factory = record_factory or Record
152
self.server_uri = uri
156
154
self._changes_since = self.db.info()["update_seq"]
157
155
self._changes_last_used = 0 # Immediate run works.
160
def _is_reconnection_fail(ex):
161
return isinstance(ex, AttributeError) and \
162
ex.args == ("'NoneType' object has no attribute 'makefile'",)
164
def with_reconnects(self, func, *args, **kwargs):
165
for retry in (3, 2, 1, None):
167
return func(*args, **kwargs)
169
if self._is_reconnection_fail(e) and retry:
170
logging.warn("DB connection failed. Reconnecting.")
173
elif isinstance(e, socket.error):
174
logging.warn("Other socket error %s. Reconnecting.", e)
180
raise ValueError("failed to (re-)connect to couchdb server")
182
def _reconnect(self, uri=None):
183
logging.info("Connecting to %s.", self.server_uri or "discovered local port")
185
self._server = self._server_class(uri or self.server_uri,
186
**self._server_class_extras)
187
if self._database_name not in self._server:
189
self._server.create(self._database_name)
191
raise NoSuchDatabase(self._database_name)
193
self.db = self._server[self._database_name]
195
# Monkey-patch the object the user already uses. Oook!
196
new_db = self._server[self._database_name]
197
self.db.resource = new_db.resource
199
158
def _temporary_query(self, map_fun, reduce_fun=None, language='javascript',
200
159
wrapper=None, **options):
201
160
"""Pass-through to CouchDB library. Deprecated."""
202
return self.with_reconnects(self.db.query, map_fun, reduce_fun, language,
161
return self.db.query(map_fun, reduce_fun, language,
203
162
wrapper, **options)
205
164
def get_record(self, record_id):
206
165
"""Get a record from back end storage."""
208
couch_record = self.with_reconnects(self.db.__getitem__, record_id)
167
couch_record = self.db[record_id]
209
168
except ResourceNotFound:
234
193
# Do not rely on couchdb to create an ID for us.
235
194
from uuid import uuid4
236
195
record.record_id = uuid4().hex
238
self.with_reconnects(self.db.__setitem__,
239
record.record_id, record._data)
240
except ResourceConflict:
241
if record.record_revision is None and not row_is_deleted(record):
242
old_record = self.with_reconnects(self.db.__getitem__,
244
if row_is_deleted(old_record):
245
# User asked to make new record that already exists
246
# but we have marked deleted internally. Instead of
247
# complaining, pull up the previous revision ID and
248
# add that to the user's record, and re-send it.
249
record._set_record_revision(old_record.rev)
251
self.with_reconnects(self.db.__setitem__,
252
record.record_id, record._data)
258
for attachment_name in getattr(record, "_detached", []):
259
self.with_reconnects(self.db.delete_attachment,
260
record._data, attachment_name)
196
self.db[record.record_id] = record._data
262
198
for attachment_name in record.list_attachments():
263
199
data, content_type = record.attachment_data(attachment_name)
264
self.with_reconnects(self.db.put_attachment,
200
self.db.put_attachment(
265
201
record._data, data, attachment_name, content_type)
267
203
return record.record_id
279
215
# although with a single record we need to test for the
280
216
# revisison, with a batch we do not, but we have to make sure
281
217
# that we did not get an error
282
batch_put_result = self.with_reconnects(
283
self.db.update, [record._data for record in batch])
218
batch_put_result = self.db.update([record._data for record in batch])
284
219
for current_tuple in batch_put_result:
285
220
success, docid, rev_or_exc = current_tuple
287
222
record = records_hash[docid]
288
223
# set the new rev
289
224
record._data["_rev"] = rev_or_exc
291
for attachment_name in getattr(record, "_detached", []):
292
self.with_reconnects(self.db.delete_attachment,
293
record._data, attachment_name)
295
225
for attachment_name in record.list_attachments():
296
226
data, content_type = record.attachment_data(attachment_name)
297
self.with_reconnects(self.db.put_attachment,
227
self.db.put_attachment(
298
228
{"_id":record.record_id, "_rev":record["_rev"]},
299
229
data, attachment_name, content_type)
300
230
# all success record have the blobs added we return result of
319
249
# as far as we know. (If they're not, we'll get a
320
250
# ResourceConflict later on, from which we can recover.)
321
251
if cached_record is None:
322
cached_record = self.with_reconnects(self.db.__getitem__,
252
cached_record = self.db[record_id]
324
253
if isinstance(cached_record, Record):
325
254
cached_record = cached_record._data
326
255
record = copy.deepcopy(cached_record)
368
self.with_reconnects(self.db.__setitem__, record_id,
297
self.db[record_id] = record
370
298
except ResourceConflict:
371
299
# We got a conflict, meaning the record has
372
300
# changed in the database since we last loaded it
373
301
# into memory. Let's get a fresh copy and try
375
record = self.with_reconnects(self.db.__getitem__,
303
record = self.db[record_id]
378
305
# If we get here, nothing remains to be done, and we can
379
306
# take a well deserved break.
382
309
def delete_record(self, record_id):
383
310
"""Delete record with given id"""
384
record = self.with_reconnects(self.db.__getitem__, record_id)
311
record = self.db[record_id]
385
312
record.setdefault('application_annotations', {}).setdefault(
386
313
'Ubuntu One', {}).setdefault('private_application_annotations', {})[
387
314
'deleted'] = True
388
self.with_reconnects(self.db.__setitem__, record_id, record)
315
self.db[record_id] = record
390
317
def record_exists(self, record_id):
391
318
"""Check if record with given id exists."""
393
record = self.with_reconnects(self.db.__getitem__, record_id)
394
except ResourceNotFound:
319
if record_id not in self.db:
321
record = self.db[record_id]
396
322
return not row_is_deleted(record)
398
324
def delete_view(self, view_name, design_doc=DEFAULT_DESIGN_DOCUMENT):
442
367
def execute_view(self, view_name, design_doc=DEFAULT_DESIGN_DOCUMENT,
444
369
"""Execute view and return results."""
446
class ReconnectingViewWrapper(object):
447
"""A view from python-couchdb is an object with attributes that
448
cause HTTP requests to be fired off when accessed. If we wish
449
to be able to reconnect on disappearance of the server, then
450
we must intercept calls from user to python-couchdb."""
452
def __init__(wrapper, obj, *args, **kwargs):
453
wrapper.obj = self.with_reconnects(obj, *args, **kwargs)
455
def ___call__(wrapper, **options):
456
return self.with_reconnects(wrapper.obj.__call__, **options)
458
def __iter__(wrapper):
459
return self.view_with_reconnects(wrapper.obj.__iter__,
462
def __len__(wrapper):
463
return self.view_with_reconnects(wrapper.obj.__len__,
466
def __getattr__(wrapper, key):
467
return self.with_reconnects(getattr, wrapper.obj, key)
469
def __getitem__(wrapper, key):
470
return ReconnectingViewWrapper(wrapper.obj.__getitem__, key)
472
370
if design_doc is None:
473
371
design_doc = view_name
475
373
view_id_fmt = "_design/%(design_doc)s/_view/%(view_name)s"
476
wrapper = ReconnectingViewWrapper(self.db.view,
477
view_id_fmt % locals(),
481
def view_with_reconnects(self, func, view, *args, **kwargs):
482
for retry in (3, 2, 1, None):
484
return func(*args, **kwargs)
486
if self._is_reconnection_fail(e) and retry:
487
logging.warn("DB connection failed. Reconnecting.")
488
self._reconnect_view(view_result=view)
490
elif isinstance(e, socket.error):
491
logging.warn("Other socket error %s. Reconnecting.", e)
497
raise ValueError("failed to (re-)connect to couchdb server")
499
def _reconnect_view(self, uri=None, view_result=None):
500
logging.info("Connecting to %s.",
501
self.server_uri or "discovered local port")
502
self._server = self._server_class(
503
uri or '/'.join(self.db.resource.uri.split('/')[:-1]),
504
**self._server_class_extras)
505
if self._database_name not in self._server:
507
self._server.create(self._database_name)
509
raise NoSuchDatabase(self._database_name)
511
self.db = self._server[self._database_name]
513
# Monkey-patch the object the user already uses. Oook!
514
view_url = urlparse.urlparse(view_result.view.resource.uri)
515
db_url = urlparse.urlparse(self.db.resource.uri)
516
view_result.view.resource.uri = urlparse.urlunparse(
517
[view_url[0], db_url[1]] + list(view_url[2:]))
374
return self.db.view(view_id_fmt % locals(), **params)
519
376
def add_view(self, view_name, map_js, reduce_js,
520
377
design_doc=DEFAULT_DESIGN_DOCUMENT):