~ubuntu-branches/ubuntu/quantal/ceph/quantal

« back to all changes in this revision

Viewing changes to src/pybind/rados.py

  • Committer: Package Import Robot
  • Author(s): Laszlo Boszormenyi (GCS)
  • Date: 2012-02-05 10:07:38 UTC
  • mfrom: (0.3.7)
  • mto: This revision was merged to the branch mainline in revision 10.
  • Revision ID: package-import@ubuntu.com-20120205100738-skhl2b81vg7yrsjz
Tags: upstream-0.41
ImportĀ upstreamĀ versionĀ 0.41

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
"""librados Python ctypes wrapper
2
2
Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
3
3
"""
4
 
from ctypes import CDLL, c_char_p, c_size_t, c_void_p, c_int, \
5
 
    create_string_buffer, byref, Structure, c_uint64, c_ubyte, pointer
 
4
from ctypes import CDLL, c_char_p, c_size_t, c_void_p, c_int, c_long, \
 
5
    create_string_buffer, byref, Structure, c_uint64, c_ubyte, pointer, \
 
6
    CFUNCTYPE
 
7
import threading
6
8
import ctypes
7
9
import errno
8
10
import time
 
11
from datetime import datetime
9
12
 
10
13
ANONYMOUS_AUID = 0xffffffffffffffff
11
14
ADMIN_AUID = 0
207
210
                            self.cluster, c_char_p(pool_name))
208
211
            else:
209
212
                ret = self.librados.rados_pool_create_with_all(
210
 
                            self.cluster, c_char_p(pool_name), c_ubyte(auid),
 
213
                            self.cluster, c_char_p(pool_name), c_uint64(auid),
211
214
                            c_ubyte(crush_rule))
212
215
        elif (crush_rule == None):
213
216
            ret = self.librados.rados_pool_create_with_auid(
214
 
                        self.cluster, c_char_p(pool_name), c_ubyte(auid))
 
217
                        self.cluster, c_char_p(pool_name), c_uint64(auid))
215
218
        else:
216
219
            ret = self.librados.rados_pool_create_with_crush_rule(
217
220
                        self.cluster, c_char_p(pool_name), c_ubyte(crush_rule))
226
229
        if ret < 0:
227
230
            raise make_ex(ret, "error deleting pool '%s'" % pool_name)
228
231
 
 
232
    def list_pools(self):
 
233
        self.require_state("connected")
 
234
        size = c_size_t(512)
 
235
        while True:
 
236
            c_names = create_string_buffer(size.value)
 
237
            ret = self.librados.rados_pool_list(self.cluster,
 
238
                                                byref(c_names), size)
 
239
            if ret > size.value:
 
240
                size = c_size_t(ret)
 
241
            else:
 
242
                break
 
243
        return filter(lambda name: name != '', c_names.raw.split('\0'))
 
244
 
229
245
    def open_ioctx(self, ioctx_name):
230
246
        self.require_state("connected")
231
247
        if not isinstance(ioctx_name, str):
252
268
 
253
269
    def next(self):
254
270
        key = c_char_p()
255
 
        ret = self.ioctx.librados.rados_objects_list_next(self.ctx, byref(key))
 
271
        locator = c_char_p()
 
272
        ret = self.ioctx.librados.rados_objects_list_next(self.ctx, byref(key),
 
273
                                                          byref(locator))
256
274
        if ret < 0:
257
275
            raise StopIteration()
258
 
        return Object(self.ioctx, key.value)
 
276
        return Object(self.ioctx, key.value, locator.value)
259
277
 
260
278
    def __del__(self):
261
279
        self.ioctx.librados.rados_objects_list_close(self.ctx)
340
358
 
341
359
    def get_timestamp(self):
342
360
        snap_time = c_long(0)
343
 
        ret = rados_ioctx_snap_get_stamp(self.ioctx.io, self.snap_id,
344
 
                                        byref(snap_time))
 
361
        ret = self.ioctx.librados.rados_ioctx_snap_get_stamp(
 
362
            self.ioctx.io, self.snap_id,
 
363
            byref(snap_time))
345
364
        if (ret != 0):
346
365
            raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
347
 
        return date.fromtimestamp(snap_time)
 
366
        return datetime.fromtimestamp(snap_time.value)
 
367
 
 
368
class Completion(object):
 
369
    """completion object"""
 
370
    def __init__(self, ioctx, rados_comp, oncomplete, onsafe):
 
371
        self.rados_comp = rados_comp
 
372
        self.oncomplete = oncomplete
 
373
        self.onsafe = onsafe
 
374
        self.ioctx = ioctx
 
375
 
 
376
    def wait_for_safe(self):
 
377
        return self.ioctx.librados.rados_aio_is_safe(
 
378
            self.rados_comp
 
379
            )
 
380
 
 
381
    def wait_for_complete(self):
 
382
        return self.ioctx.librados.rados_aio_is_complete(
 
383
            self.rados_comp
 
384
            )
 
385
 
 
386
    def get_return_value(self):
 
387
        return self.ioctx.librados.rados_aio_get_return_value(
 
388
            self.rados_comp)
 
389
 
 
390
    def __del__(self):
 
391
        self.ioctx.librados.rados_aio_release(
 
392
            self.rados_comp
 
393
            )
348
394
 
349
395
class Ioctx(object):
350
396
    """rados.Ioctx object"""
353
399
        self.librados = librados
354
400
        self.io = io
355
401
        self.state = "open"
 
402
        self.locator_key = ""
 
403
        self.safe_cbs = {}
 
404
        self.complete_cbs = {}
 
405
        RADOS_CB = CFUNCTYPE(c_int, c_void_p, c_void_p)
 
406
        self.__aio_safe_cb_c = RADOS_CB(self.__aio_safe_cb)
 
407
        self.__aio_complete_cb_c = RADOS_CB(self.__aio_complete_cb)
 
408
        self.lock = threading.Lock()
356
409
 
357
410
    def __enter__(self):
358
411
        return self
364
417
    def __del__(self):
365
418
        self.close()
366
419
 
 
420
    def __aio_safe_cb(self, completion, _):
 
421
        cb = None
 
422
        with self.lock:
 
423
            cb = self.safe_cbs[completion]
 
424
            del self.safe_cbs[completion]
 
425
        cb.onsafe(cb)
 
426
        return 0
 
427
 
 
428
    def __aio_complete_cb(self, completion, _):
 
429
        cb = None
 
430
        with self.lock:
 
431
            cb = self.complete_cbs[completion]
 
432
            del self.complete_cbs[completion]
 
433
        cb.oncomplete(cb)
 
434
        return 0
 
435
 
 
436
    def __get_completion(self, oncomplete, onsafe):
 
437
        completion = c_void_p(0)
 
438
        complete_cb = None
 
439
        safe_cb = None
 
440
        if oncomplete:
 
441
            complete_cb = self.__aio_complete_cb_c
 
442
        if onsafe:
 
443
            safe_cb = self.__aio_safe_cb_c
 
444
        ret = self.librados.rados_aio_create_completion(
 
445
            c_void_p(0),
 
446
            complete_cb,
 
447
            safe_cb,
 
448
            byref(completion)
 
449
            )
 
450
        if ret < 0:
 
451
            raise make_ex(ret, "error getting a completion")
 
452
        with self.lock:
 
453
            completion_obj = Completion(self, completion, oncomplete, onsafe)
 
454
            if oncomplete:
 
455
                self.complete_cbs[completion.value] = completion_obj
 
456
            if onsafe:
 
457
                self.safe_cbs[completion.value] = completion_obj
 
458
        return completion_obj
 
459
 
 
460
    def aio_write(self, object_name, to_write, offset=0,
 
461
                  oncomplete=None, onsafe=None):
 
462
        completion = self.__get_completion(oncomplete, onsafe)
 
463
        ret = self.librados.rados_aio_write(
 
464
            self.io,
 
465
            c_char_p(object_name),
 
466
            completion.rados_comp,
 
467
            c_char_p(to_write),
 
468
            c_size_t(len(to_write)),
 
469
            c_uint64(offset))
 
470
        if ret < 0:
 
471
            raise make_ex(ret, "error writing object %s" % object_name)
 
472
        return completion
 
473
 
 
474
    def aio_write_full(self, object_name, to_write,
 
475
                       oncomplete=None, onsafe=None):
 
476
        completion = self.__get_completion(oncomplete, onsafe)
 
477
        ret = self.librados.rados_aio_write_full(
 
478
            self.io,
 
479
            c_char_p(object_name),
 
480
            completion.rados_comp,
 
481
            c_char_p(to_write),
 
482
            c_size_t(len(to_write)))
 
483
        if ret < 0:
 
484
            raise make_ex(ret, "error writing object %s" % object_name)
 
485
        return completion
 
486
 
 
487
    def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
 
488
        completion = self.__get_completion(oncomplete, onsafe)
 
489
        ret = self.librados.rados_aio_append(
 
490
            self.io,
 
491
            c_char_p(object_name),
 
492
            completion.rados_comp,
 
493
            c_char_p(to_append),
 
494
            c_size_t(len(to_append)))
 
495
        if ret < 0:
 
496
            raise make_ex(ret, "error appending to object %s" % object_name)
 
497
        return completion
 
498
 
 
499
    def aio_flush(self):
 
500
        ret = self.librados.rados_aio_flush(
 
501
            self.io)
 
502
        if ret < 0:
 
503
            raise make_ex(ret, "error flushing")
 
504
 
 
505
    def aio_read(self, object_name, length, offset, oncomplete):
 
506
        """
 
507
        oncomplete will be called with the returned read value as
 
508
        well as the completion:
 
509
 
 
510
        oncomplete(completion, data_read)
 
511
        """
 
512
        buf = create_string_buffer(length)
 
513
        def oncomplete_(completion):
 
514
            return oncomplete(completion, buf.value)
 
515
        completion = self.__get_completion(oncomplete_, None)
 
516
        ret = self.librados.rados_aio_read(
 
517
            self.io,
 
518
            c_char_p(object_name),
 
519
            completion.rados_comp,
 
520
            buf,
 
521
            c_size_t(length),
 
522
            c_uint64(offset))
 
523
        if ret < 0:
 
524
            raise make_ex(ret, "error reading %s" % object_name)
 
525
        return completion
 
526
 
367
527
    def require_ioctx_open(self):
368
528
        if self.state != "open":
369
529
            raise IoctxStateError("The pool is %s" % self.state)
371
531
    def change_auid(self, auid):
372
532
        self.require_ioctx_open()
373
533
        ret = self.librados.rados_ioctx_pool_set_auid(self.io,\
374
 
                ctypes.c_int64(auid))
 
534
                ctypes.c_uint64(auid))
375
535
        if ret < 0:
376
536
            raise make_ex(ret, "error changing auid of '%s' to %lld" %\
377
537
                (self.name, auid))
380
540
        self.require_ioctx_open()
381
541
        if not isinstance(loc_key, str):
382
542
            raise TypeError('loc_key must be a string')
383
 
        ret = self.librados.rados_ioctx_locator_set_key(self.io,\
 
543
        self.librados.rados_ioctx_locator_set_key(self.io,\
384
544
                c_char_p(loc_key))
385
 
        if ret < 0:
386
 
            raise make_ex(ret, "error changing locator key of '%s' to '%s'" %\
387
 
                (self.name, loc_key))
 
545
        self.locator_key = loc_key
 
546
 
 
547
    def get_locator_key(self):
 
548
        return self.locator_key
388
549
 
389
550
    def close(self):
390
551
        if self.state == "open":
435
596
        ret = self.librados.rados_read(self.io, c_char_p(key), ret_buf,
436
597
                c_size_t(length), c_uint64(offset))
437
598
        if ret < 0:
438
 
            raise make_ex("Ioctx.read(%s): failed to read %s" % (self.name, key))
 
599
            raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
439
600
        return ctypes.string_at(ret_buf, ret)
440
601
 
441
602
    def get_stats(self):
570
731
        self.require_ioctx_open()
571
732
        return self.librados.rados_get_last_version(self.io)
572
733
 
 
734
def set_object_locator(func):
 
735
    def retfunc(self, *args, **kwargs):
 
736
        if self.locator_key is not None:
 
737
            old_locator = self.ioctx.get_locator_key()
 
738
            self.ioctx.set_locator_key(self.locator_key)
 
739
            retval = func(self, *args, **kwargs)
 
740
            self.ioctx.set_locator_key(old_locator)
 
741
            return retval
 
742
        else:
 
743
            return func(self, *args, **kwargs)
 
744
    return retfunc
 
745
 
573
746
class Object(object):
574
747
    """Rados object wrapper, makes the object look like a file"""
575
 
    def __init__(self, ioctx, key):
 
748
    def __init__(self, ioctx, key, locator_key=None):
576
749
        self.key = key
577
750
        self.ioctx = ioctx
578
751
        self.offset = 0
579
752
        self.state = "exists"
 
753
        self.locator_key = locator_key
580
754
 
581
755
    def __str__(self):
582
756
        return "rados.Object(ioctx=%s,key=%s)" % (str(self.ioctx), self.key)
585
759
        if self.state != "exists":
586
760
            raise ObjectStateError("The object is %s" % self.state)
587
761
 
 
762
    @set_object_locator
588
763
    def read(self, length = 1024*1024):
589
764
        self.require_object_exists()
590
765
        ret = self.ioctx.read(self.key, self.offset, length)
591
766
        self.offset += len(ret)
592
767
        return ret
593
768
 
 
769
    @set_object_locator
594
770
    def write(self, string_to_write):
595
771
        self.require_object_exists()
596
772
        ret = self.ioctx.write(self.key, string_to_write, self.offset)
597
773
        self.offset += ret
598
774
        return ret
599
775
 
 
776
    @set_object_locator
600
777
    def remove(self):
601
778
        self.require_object_exists()
602
779
        self.ioctx.remove_object(self.key)
603
780
        self.state = "removed"
604
781
 
 
782
    @set_object_locator
605
783
    def stat(self):
606
784
        self.require_object_exists()
607
785
        return self.ioctx.stat(self.key)
610
788
        self.require_object_exists()
611
789
        self.offset = position
612
790
 
 
791
    @set_object_locator
613
792
    def get_xattr(self, xattr_name):
614
793
        self.require_object_exists()
615
794
        return self.ioctx.get_xattr(self.key, xattr_name)
616
795
 
 
796
    @set_object_locator
617
797
    def get_xattrs(self, xattr_name):
618
798
        self.require_object_exists()
619
799
        return self.ioctx.get_xattrs(self.key, xattr_name)
620
800
 
 
801
    @set_object_locator
621
802
    def set_xattr(self, xattr_name, xattr_value):
622
803
        self.require_object_exists()
623
804
        return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
624
805
 
 
806
    @set_object_locator
625
807
    def rm_xattr(self, xattr_name):
626
808
        self.require_object_exists()
627
809
        return self.ioctx.rm_xattr(self.key, xattr_name)