1
"""librados Python ctypes wrapper
2
Copyright 2011, Hannu Valtonen <hannu.valtonen@ormod.com>
4
from ctypes import CDLL, c_char_p, c_size_t, c_void_p, c_int, c_long, \
5
create_string_buffer, byref, Structure, c_uint64, c_ubyte, pointer, \
11
from datetime import datetime
13
ANONYMOUS_AUID = 0xffffffffffffffff
16
class Error(Exception):
19
class PermissionError(Error):
22
class ObjectNotFound(Error):
28
class ObjectExists(Error):
37
class IncompleteWriteError(Error):
40
class RadosStateError(Error):
43
class IoctxStateError(Error):
46
class ObjectStateError(Error):
49
class LogicError(Error):
52
def make_ex(ret, msg):
54
if (ret == errno.EPERM):
55
return PermissionError(msg)
56
elif (ret == errno.ENOENT):
57
return ObjectNotFound(msg)
58
elif (ret == errno.EIO):
60
elif (ret == errno.ENOSPC):
62
elif (ret == errno.EEXIST):
63
return ObjectExists(msg)
64
elif (ret == errno.ENODATA):
67
return Error(msg + (": error code %d" % ret))
69
class rados_pool_stat_t(Structure):
70
_fields_ = [("num_bytes", c_uint64),
72
("num_objects", c_uint64),
73
("num_object_clones", c_uint64),
74
("num_object_copies", c_uint64),
75
("num_objects_missing_on_primary", c_uint64),
76
("num_objects_unfound", c_uint64),
77
("num_objects_degraded", c_uint64),
79
("num_rd_kb", c_uint64),
81
("num_wr_kb", c_uint64)]
83
class rados_cluster_stat_t(Structure):
84
_fields_ = [("kb", c_uint64),
85
("kb_used", c_uint64),
86
("kb_avail", c_uint64),
87
("num_objects", c_uint64)]
89
class Version(object):
90
def __init__(self, major, minor, extra):
96
return "%d.%d.%d" % (self.major, self.minor, self.extra)
99
"""librados python wrapper"""
100
def require_state(self, *args):
104
raise RadosStateError("You cannot perform that operation on a \
105
Rados object in state %s." % (self.state))
107
def __init__(self, rados_id=None, conf=None, conffile=None):
108
self.librados = CDLL('librados.so.2')
109
self.cluster = c_void_p()
110
self.rados_id = rados_id
111
if rados_id is not None and not isinstance(rados_id, str):
112
raise TypeError('rados_id must be a string or None')
113
if conffile is not None and not isinstance(conffile, str):
114
raise TypeError('conffile must be a string or None')
115
ret = self.librados.rados_create(byref(self.cluster), c_char_p(rados_id))
117
raise Error("rados_initialize failed with error code: %d" % ret)
118
self.state = "configuring"
119
if conffile is not None:
120
# read the default conf file when '' is given
123
self.conf_read_file(conffile)
125
for key, value in conf.iteritems():
126
self.conf_set(key, value)
129
if (self.__dict__.has_key("state") and self.state != "shutdown"):
130
self.librados.rados_shutdown(self.cluster)
131
self.state = "shutdown"
137
def __exit__(self, type_, value, traceback):
148
self.librados.rados_version(byref(major), byref(minor), byref(extra))
149
return Version(major.value, minor.value, extra.value)
151
def conf_read_file(self, path=None):
152
self.require_state("configuring", "connected")
153
if path is not None and not isinstance(path, str):
154
raise TypeError('path must be a string')
155
ret = self.librados.rados_conf_read_file(self.cluster, c_char_p(path))
157
raise make_ex(ret, "error calling conf_read_file")
159
def conf_get(self, option):
160
self.require_state("configuring", "connected")
161
if not isinstance(option, str):
162
raise TypeError('option must be a string')
165
ret_buf = create_string_buffer(length)
166
ret = self.librados.rados_conf_get(self.cluster, option,
167
ret_buf, c_size_t(length))
170
elif (ret == -errno.ENAMETOOLONG):
172
elif (ret == -errno.ENOENT):
175
raise make_ex(ret, "error calling conf_get")
177
def conf_set(self, option, val):
178
self.require_state("configuring", "connected")
179
if not isinstance(option, str):
180
raise TypeError('option must be a string')
181
if not isinstance(val, str):
182
raise TypeError('val must be a string')
183
ret = self.librados.rados_conf_set(self.cluster, c_char_p(option),
186
raise make_ex(ret, "error calling conf_set")
189
self.require_state("configuring")
190
ret = self.librados.rados_connect(self.cluster)
192
raise make_ex(ret, "error calling connect")
193
self.state = "connected"
195
def get_cluster_stats(self):
196
stats = rados_cluster_stat_t()
197
ret = self.librados.rados_cluster_stat(self.cluster, byref(stats))
200
ret, "Rados.get_cluster_stats(%s): get_stats failed" % self.rados_id)
201
return {'kb': stats.kb,
202
'kb_used': stats.kb_used,
203
'kb_avail': stats.kb_avail,
204
'num_objects': stats.num_objects}
206
# Returns true if the pool exists; false otherwise.
207
def pool_exists(self, pool_name):
208
self.require_state("connected")
209
if not isinstance(pool_name, str):
210
raise TypeError('pool_name must be a string')
211
ret = self.librados.rados_pool_lookup(self.cluster, c_char_p(pool_name))
214
elif (ret == -errno.ENOENT):
217
raise make_ex(ret, "error looking up pool '%s'" % pool_name)
219
def create_pool(self, pool_name, auid=None, crush_rule=None):
220
self.require_state("connected")
221
if not isinstance(pool_name, str):
222
raise TypeError('pool_name must be a string')
223
if crush_rule is not None and not isinstance(crush_rule, str):
224
raise TypeError('cruse_rule must be a string')
226
if (crush_rule == None):
227
ret = self.librados.rados_pool_create(
228
self.cluster, c_char_p(pool_name))
230
ret = self.librados.rados_pool_create_with_all(
231
self.cluster, c_char_p(pool_name), c_uint64(auid),
233
elif (crush_rule == None):
234
ret = self.librados.rados_pool_create_with_auid(
235
self.cluster, c_char_p(pool_name), c_uint64(auid))
237
ret = self.librados.rados_pool_create_with_crush_rule(
238
self.cluster, c_char_p(pool_name), c_ubyte(crush_rule))
240
raise make_ex(ret, "error creating pool '%s'" % pool_name)
242
def delete_pool(self, pool_name):
243
self.require_state("connected")
244
if not isinstance(pool_name, str):
245
raise TypeError('pool_name must be a string')
246
ret = self.librados.rados_pool_delete(self.cluster, c_char_p(pool_name))
248
raise make_ex(ret, "error deleting pool '%s'" % pool_name)
250
def list_pools(self):
251
self.require_state("connected")
254
c_names = create_string_buffer(size.value)
255
ret = self.librados.rados_pool_list(self.cluster,
256
byref(c_names), size)
261
return filter(lambda name: name != '', c_names.raw.split('\0'))
264
self.require_state("connected")
266
fsid = create_string_buffer(fsid_len + 1)
267
ret = self.librados.rados_cluster_fsid(self.cluster,
271
raise make_ex(ret, "error getting cluster fsid")
274
def open_ioctx(self, ioctx_name):
275
self.require_state("connected")
276
if not isinstance(ioctx_name, str):
277
raise TypeError('ioctx_name must be a string')
279
ret = self.librados.rados_ioctx_create(self.cluster, c_char_p(ioctx_name), byref(ioctx))
281
raise make_ex(ret, "error opening ioctx '%s'" % ioctx_name)
282
return Ioctx(ioctx_name, self.librados, ioctx)
284
class ObjectIterator(object):
285
"""rados.Ioctx Object iterator"""
286
def __init__(self, ioctx):
288
self.ctx = c_void_p()
289
ret = self.ioctx.librados.\
290
rados_objects_list_open(self.ioctx.io, byref(self.ctx))
292
raise make_ex(ret, "error iterating over the objects in ioctx '%s'" \
301
ret = self.ioctx.librados.rados_objects_list_next(self.ctx, byref(key),
304
raise StopIteration()
305
return Object(self.ioctx, key.value, locator.value)
308
self.ioctx.librados.rados_objects_list_close(self.ctx)
310
class XattrIterator(object):
311
"""Extended attribute iterator"""
312
def __init__(self, ioctx, it, oid):
322
ret = self.ioctx.librados.\
323
rados_getxattrs_next(self.it, byref(name_), byref(val_), byref(len_))
325
raise make_ex(ret, "error iterating over the extended attributes \
327
if name_.value == None:
328
raise StopIteration()
329
name = ctypes.string_at(name_)
330
val = ctypes.string_at(val_, len_)
333
self.ioctx.librados.rados_getxattrs_end(self.it)
335
class SnapIterator(object):
336
"""Snapshot iterator"""
337
def __init__(self, ioctx):
339
# We don't know how big a buffer we need until we've called the
340
# function. So use the exponential doubling strategy.
343
self.snaps = (ctypes.c_uint64 * num_snaps)()
344
ret = self.ioctx.librados.rados_ioctx_snap_list(self.ioctx.io,
345
self.snaps, num_snaps)
349
elif (ret != -errno.ERANGE):
350
raise make_ex(ret, "error calling rados_snap_list for \
351
ioctx '%s'" % self.ioctx.name)
352
num_snaps = num_snaps * 2
359
if (self.cur_snap >= self.max_snap):
361
snap_id = self.snaps[self.cur_snap]
364
name = create_string_buffer(name_len)
365
ret = self.ioctx.librados.rados_ioctx_snap_get_name(self.ioctx.io, \
366
snap_id, byref(name), name_len)
370
elif (ret != -errno.ERANGE):
371
raise make_ex(ret, "rados_snap_get_name error")
372
name_len = name_len * 2
373
snap = Snap(self.ioctx, name.value, snap_id)
374
self.cur_snap = self.cur_snap + 1
378
"""Snapshot object"""
379
def __init__(self, ioctx, name, snap_id):
382
self.snap_id = snap_id
385
return "rados.Snap(ioctx=%s,name=%s,snap_id=%d)" \
386
% (str(self.ioctx), self.name, self.snap_id)
388
def get_timestamp(self):
389
snap_time = c_long(0)
390
ret = self.ioctx.librados.rados_ioctx_snap_get_stamp(
391
self.ioctx.io, self.snap_id,
394
raise make_ex(ret, "rados_ioctx_snap_get_stamp error")
395
return datetime.fromtimestamp(snap_time.value)
397
class Completion(object):
398
"""completion object"""
399
def __init__(self, ioctx, rados_comp, oncomplete, onsafe):
400
self.rados_comp = rados_comp
401
self.oncomplete = oncomplete
405
def wait_for_safe(self):
406
return self.ioctx.librados.rados_aio_is_safe(
410
def wait_for_complete(self):
411
return self.ioctx.librados.rados_aio_is_complete(
415
def get_return_value(self):
416
return self.ioctx.librados.rados_aio_get_return_value(
420
self.ioctx.librados.rados_aio_release(
425
"""rados.Ioctx object"""
426
def __init__(self, name, librados, io):
428
self.librados = librados
431
self.locator_key = ""
433
self.complete_cbs = {}
434
RADOS_CB = CFUNCTYPE(c_int, c_void_p, c_void_p)
435
self.__aio_safe_cb_c = RADOS_CB(self.__aio_safe_cb)
436
self.__aio_complete_cb_c = RADOS_CB(self.__aio_complete_cb)
437
self.lock = threading.Lock()
442
def __exit__(self, type_, value, traceback):
449
def __aio_safe_cb(self, completion, _):
452
cb = self.safe_cbs[completion]
453
del self.safe_cbs[completion]
457
def __aio_complete_cb(self, completion, _):
460
cb = self.complete_cbs[completion]
461
del self.complete_cbs[completion]
465
def __get_completion(self, oncomplete, onsafe):
466
completion = c_void_p(0)
470
complete_cb = self.__aio_complete_cb_c
472
safe_cb = self.__aio_safe_cb_c
473
ret = self.librados.rados_aio_create_completion(
480
raise make_ex(ret, "error getting a completion")
482
completion_obj = Completion(self, completion, oncomplete, onsafe)
484
self.complete_cbs[completion.value] = completion_obj
486
self.safe_cbs[completion.value] = completion_obj
487
return completion_obj
489
def aio_write(self, object_name, to_write, offset=0,
490
oncomplete=None, onsafe=None):
491
completion = self.__get_completion(oncomplete, onsafe)
492
ret = self.librados.rados_aio_write(
494
c_char_p(object_name),
495
completion.rados_comp,
497
c_size_t(len(to_write)),
500
raise make_ex(ret, "error writing object %s" % object_name)
503
def aio_write_full(self, object_name, to_write,
504
oncomplete=None, onsafe=None):
505
completion = self.__get_completion(oncomplete, onsafe)
506
ret = self.librados.rados_aio_write_full(
508
c_char_p(object_name),
509
completion.rados_comp,
511
c_size_t(len(to_write)))
513
raise make_ex(ret, "error writing object %s" % object_name)
516
def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
517
completion = self.__get_completion(oncomplete, onsafe)
518
ret = self.librados.rados_aio_append(
520
c_char_p(object_name),
521
completion.rados_comp,
523
c_size_t(len(to_append)))
525
raise make_ex(ret, "error appending to object %s" % object_name)
529
ret = self.librados.rados_aio_flush(
532
raise make_ex(ret, "error flushing")
534
def aio_read(self, object_name, length, offset, oncomplete):
536
oncomplete will be called with the returned read value as
537
well as the completion:
539
oncomplete(completion, data_read)
541
buf = create_string_buffer(length)
542
def oncomplete_(completion):
543
return oncomplete(completion, buf.value)
544
completion = self.__get_completion(oncomplete_, None)
545
ret = self.librados.rados_aio_read(
547
c_char_p(object_name),
548
completion.rados_comp,
553
raise make_ex(ret, "error reading %s" % object_name)
556
def require_ioctx_open(self):
557
if self.state != "open":
558
raise IoctxStateError("The pool is %s" % self.state)
560
def change_auid(self, auid):
561
self.require_ioctx_open()
562
ret = self.librados.rados_ioctx_pool_set_auid(self.io, \
563
ctypes.c_uint64(auid))
565
raise make_ex(ret, "error changing auid of '%s' to %d" %\
568
def set_locator_key(self, loc_key):
569
self.require_ioctx_open()
570
if not isinstance(loc_key, str):
571
raise TypeError('loc_key must be a string')
572
self.librados.rados_ioctx_locator_set_key(self.io, \
574
self.locator_key = loc_key
576
def get_locator_key(self):
577
return self.locator_key
580
if self.state == "open":
581
self.require_ioctx_open()
582
self.librados.rados_ioctx_destroy(self.io)
583
self.state = "closed"
585
def write(self, key, data, offset=0):
586
self.require_ioctx_open()
587
if not isinstance(data, str):
588
raise TypeError('data must be a string')
590
ret = self.librados.rados_write(self.io, c_char_p(key),
591
c_char_p(data), c_size_t(length), c_uint64(offset))
595
raise make_ex(ret, "Ioctx.write(%s): failed to write %s" % \
598
raise IncompleteWriteError("Wrote only %d out of %d bytes" % \
601
raise LogicError("Ioctx.write(%s): rados_write \
602
returned %d, but %d was the maximum number of bytes it could have \
603
written." % (self.name, ret, length))
605
def write_full(self, key, data):
606
self.require_ioctx_open()
607
if not isinstance(key, str):
608
raise TypeError('key must be a string')
609
if not isinstance(data, str):
610
raise TypeError('data must be a string')
612
ret = self.librados.rados_write_full(self.io, c_char_p(key),
613
c_char_p(data), c_size_t(length))
617
raise make_ex(ret, "Ioctx.write(%s): failed to write_full %s" % \
620
def read(self, key, length=8192, offset=0):
621
self.require_ioctx_open()
622
if not isinstance(key, str):
623
raise TypeError('key must be a string')
624
ret_buf = create_string_buffer(length)
625
ret = self.librados.rados_read(self.io, c_char_p(key), ret_buf,
626
c_size_t(length), c_uint64(offset))
628
raise make_ex(ret, "Ioctx.read(%s): failed to read %s" % (self.name, key))
629
return ctypes.string_at(ret_buf, ret)
632
self.require_ioctx_open()
633
stats = rados_pool_stat_t()
634
ret = self.librados.rados_ioctx_pool_stat(self.io, byref(stats))
636
raise make_ex(ret, "Ioctx.get_stats(%s): get_stats failed" % self.name)
637
return {'num_bytes': stats.num_bytes,
638
'num_kb': stats.num_kb,
639
'num_objects': stats.num_objects,
640
'num_object_clones': stats.num_object_clones,
641
'num_object_copies': stats.num_object_copies,
642
"num_objects_missing_on_primary": stats.num_objects_missing_on_primary,
643
"num_objects_unfound": stats.num_objects_unfound,
644
"num_objects_degraded": stats.num_objects_degraded,
645
"num_rd": stats.num_rd,
646
"num_rd_kb": stats.num_rd_kb,
647
"num_wr": stats.num_wr,
648
"num_wr_kb": stats.num_wr_kb }
650
def remove_object(self, key):
651
self.require_ioctx_open()
652
if not isinstance(key, str):
653
raise TypeError('key must be a string')
654
ret = self.librados.rados_remove(self.io, c_char_p(key))
656
raise make_ex(ret, "Failed to remove '%s'" % key)
659
def trunc(self, key, size):
660
self.require_ioctx_open()
661
if not isinstance(key, str):
662
raise TypeError('key must be a string')
663
ret = self.librados.rados_trunc(self.io, c_char_p(key), c_size_t(size))
665
raise make_ex(ret, "Ioctx.trunc(%s): failed to truncate %s" % (self.name, key))
669
"""Stat object, returns, size/timestamp"""
670
self.require_ioctx_open()
671
if not isinstance(key, str):
672
raise TypeError('key must be a string')
676
ret = self.librados.rados_stat(self.io, c_char_p(key), pointer(psize),
679
raise make_ex(ret, "Failed to stat %r" % key)
680
return psize.value, time.localtime(pmtime.value)
682
def get_xattr(self, key, xattr_name):
683
self.require_ioctx_open()
684
if not isinstance(xattr_name, str):
685
raise TypeError('xattr_name must be a string')
687
ret_buf = create_string_buffer(ret_length)
688
ret = self.librados.rados_getxattr(self.io, c_char_p(key),
689
c_char_p(xattr_name), ret_buf, c_size_t(ret_length))
691
raise make_ex(ret, "Failed to get xattr %r" % xattr_name)
692
return ctypes.string_at(ret_buf, ret)
694
def get_xattrs(self, oid):
695
self.require_ioctx_open()
696
if not isinstance(oid, str):
697
raise TypeError('oid must be a string')
699
ret = self.librados.rados_getxattrs(self.io, oid, byref(it))
701
raise make_ex(ret, "Failed to get rados xattrs for object %r" % oid)
702
return XattrIterator(self, it, oid)
704
def set_xattr(self, key, xattr_name, xattr_value):
705
self.require_ioctx_open()
706
if not isinstance(key, str):
707
raise TypeError('key must be a string')
708
if not isinstance(xattr_name, str):
709
raise TypeError('xattr_name must be a string')
710
if not isinstance(xattr_value, str):
711
raise TypeError('xattr_value must be a string')
712
ret = self.librados.rados_setxattr(self.io, c_char_p(key),
713
c_char_p(xattr_name), c_char_p(xattr_value),
714
c_size_t(len(xattr_value)))
716
raise make_ex(ret, "Failed to set xattr %r" % xattr_name)
719
def rm_xattr(self, key, xattr_name):
720
self.require_ioctx_open()
721
if not isinstance(key, str):
722
raise TypeError('key must be a string')
723
if not isinstance(xattr_name, str):
724
raise TypeError('xattr_name must be a string')
725
ret = self.librados.rados_rmxattr(self.io, c_char_p(key), c_char_p(xattr_name))
727
raise make_ex(ret, "Failed to delete key %r xattr %r" %
731
def list_objects(self):
732
self.require_ioctx_open()
733
return ObjectIterator(self)
735
def list_snaps(self):
736
self.require_ioctx_open()
737
return SnapIterator(self)
739
def create_snap(self, snap_name):
740
self.require_ioctx_open()
741
if not isinstance(snap_name, str):
742
raise TypeError('snap_name must be a string')
743
ret = self.librados.rados_ioctx_snap_create(self.io,
746
raise make_ex(ret, "Failed to create snap %s" % snap_name)
748
def remove_snap(self, snap_name):
749
self.require_ioctx_open()
750
if not isinstance(snap_name, str):
751
raise TypeError('snap_name must be a string')
752
ret = self.librados.rados_ioctx_snap_remove(self.io,
755
raise make_ex(ret, "Failed to remove snap %s" % snap_name)
757
def lookup_snap(self, snap_name):
758
self.require_ioctx_open()
759
if not isinstance(snap_name, str):
760
raise TypeError('snap_name must be a string')
762
ret = self.librados.rados_ioctx_snap_lookup(self.io, \
763
c_char_p(snap_name), byref(snap_id))
765
raise make_ex(ret, "Failed to lookup snap %s" % snap_name)
766
return Snap(self, snap_name, snap_id)
768
def get_last_version(self):
769
self.require_ioctx_open()
770
return self.librados.rados_get_last_version(self.io)
772
def set_object_locator(func):
773
def retfunc(self, *args, **kwargs):
774
if self.locator_key is not None:
775
old_locator = self.ioctx.get_locator_key()
776
self.ioctx.set_locator_key(self.locator_key)
777
retval = func(self, *args, **kwargs)
778
self.ioctx.set_locator_key(old_locator)
781
return func(self, *args, **kwargs)
784
class Object(object):
785
"""Rados object wrapper, makes the object look like a file"""
786
def __init__(self, ioctx, key, locator_key=None):
790
self.state = "exists"
791
self.locator_key = locator_key
794
return "rados.Object(ioctx=%s,key=%s)" % (str(self.ioctx), self.key)
796
def require_object_exists(self):
797
if self.state != "exists":
798
raise ObjectStateError("The object is %s" % self.state)
801
def read(self, length = 1024*1024):
802
self.require_object_exists()
803
ret = self.ioctx.read(self.key, length, self.offset)
804
self.offset += len(ret)
808
def write(self, string_to_write):
809
self.require_object_exists()
810
ret = self.ioctx.write(self.key, string_to_write, self.offset)
816
self.require_object_exists()
817
self.ioctx.remove_object(self.key)
818
self.state = "removed"
822
self.require_object_exists()
823
return self.ioctx.stat(self.key)
825
def seek(self, position):
826
self.require_object_exists()
827
self.offset = position
830
def get_xattr(self, xattr_name):
831
self.require_object_exists()
832
return self.ioctx.get_xattr(self.key, xattr_name)
835
def get_xattrs(self, xattr_name):
836
self.require_object_exists()
837
return self.ioctx.get_xattrs(self.key, xattr_name)
840
def set_xattr(self, xattr_name, xattr_value):
841
self.require_object_exists()
842
return self.ioctx.set_xattr(self.key, xattr_name, xattr_value)
845
def rm_xattr(self, xattr_name):
846
self.require_object_exists()
847
return self.ioctx.rm_xattr(self.key, xattr_name)