2
rbd.c -- Export ceph rados objects as a Linux block device
5
based on drivers/block/osdblk.c:
7
Copyright 2009 Red Hat, Inc.
9
This program is free software; you can redistribute it and/or modify
10
it under the terms of the GNU General Public License as published by
11
the Free Software Foundation.
13
This program is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
GNU General Public License for more details.
18
You should have received a copy of the GNU General Public License
19
along with this program; see the file COPYING. If not, write to
20
the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24
For usage instructions, please refer to:
26
Documentation/ABI/testing/sysfs-bus-rbd
30
#include <linux/ceph/libceph.h>
31
#include <linux/ceph/osd_client.h>
32
#include <linux/ceph/mon_client.h>
33
#include <linux/ceph/decode.h>
34
#include <linux/parser.h>
36
#include <linux/kernel.h>
37
#include <linux/device.h>
38
#include <linux/module.h>
40
#include <linux/blkdev.h>
42
#include "rbd_types.h"
44
#define DRV_NAME "rbd"
45
#define DRV_NAME_LONG "rbd (rados block device)"
47
#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
49
#define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
50
#define RBD_MAX_POOL_NAME_LEN 64
51
#define RBD_MAX_SNAP_NAME_LEN 32
52
#define RBD_MAX_OPT_LEN 1024
54
#define RBD_SNAP_HEAD_NAME "-"
56
#define DEV_NAME_LEN 32
58
#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
61
* block device image metadata (in-memory version)
63
struct rbd_image_header {
69
struct rw_semaphore snap_rwsem;
70
struct ceph_snap_context *snapc;
71
size_t snap_names_len;
86
* an instance of the client. multiple devices may share a client.
89
struct ceph_client *client;
90
struct rbd_options *rbd_opts;
92
struct list_head node;
101
struct request *rq; /* blk layer request */
102
struct bio *bio; /* cloned bio */
103
struct page **pages; /* list of used pages */
106
struct rbd_req_coll *coll;
109
struct rbd_req_status {
116
* a collection of requests
118
struct rbd_req_coll {
122
struct rbd_req_status status[0];
129
struct list_head node;
137
int id; /* blkdev unique id */
139
int major; /* blkdev assigned major */
140
struct gendisk *disk; /* blkdev's gendisk and rq */
141
struct request_queue *q;
143
struct ceph_client *client;
144
struct rbd_client *rbd_client;
146
char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
148
spinlock_t lock; /* queue lock */
150
struct rbd_image_header header;
151
char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
153
char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154
char pool_name[RBD_MAX_POOL_NAME_LEN];
157
struct ceph_osd_event *watch_event;
158
struct ceph_osd_request *watch_request;
160
char snap_name[RBD_MAX_SNAP_NAME_LEN];
161
u32 cur_snap; /* index+1 of current snapshot within snap context
165
struct list_head node;
167
/* list of snapshots */
168
struct list_head snaps;
174
static struct bus_type rbd_bus_type = {
178
static spinlock_t node_lock; /* protects client get/put */
180
static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
181
static LIST_HEAD(rbd_dev_list); /* devices */
182
static LIST_HEAD(rbd_client_list); /* clients */
184
static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185
static void rbd_dev_release(struct device *dev);
186
static ssize_t rbd_snap_add(struct device *dev,
187
struct device_attribute *attr,
190
static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
191
struct rbd_snap *snap);
194
static struct rbd_device *dev_to_rbd(struct device *dev)
196
return container_of(dev, struct rbd_device, dev);
199
static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
201
return get_device(&rbd_dev->dev);
204
static void rbd_put_dev(struct rbd_device *rbd_dev)
206
put_device(&rbd_dev->dev);
209
static int __rbd_update_snaps(struct rbd_device *rbd_dev);
211
static int rbd_open(struct block_device *bdev, fmode_t mode)
213
struct gendisk *disk = bdev->bd_disk;
214
struct rbd_device *rbd_dev = disk->private_data;
216
rbd_get_dev(rbd_dev);
218
set_device_ro(bdev, rbd_dev->read_only);
220
if ((mode & FMODE_WRITE) && rbd_dev->read_only)
226
static int rbd_release(struct gendisk *disk, fmode_t mode)
228
struct rbd_device *rbd_dev = disk->private_data;
230
rbd_put_dev(rbd_dev);
235
static const struct block_device_operations rbd_bd_ops = {
236
.owner = THIS_MODULE,
238
.release = rbd_release,
242
* Initialize an rbd client instance.
245
static struct rbd_client *rbd_client_create(struct ceph_options *opt,
246
struct rbd_options *rbd_opts)
248
struct rbd_client *rbdc;
251
dout("rbd_client_create\n");
252
rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
256
kref_init(&rbdc->kref);
257
INIT_LIST_HEAD(&rbdc->node);
259
rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
260
if (IS_ERR(rbdc->client))
262
opt = NULL; /* Now rbdc->client is responsible for opt */
264
ret = ceph_open_session(rbdc->client);
268
rbdc->rbd_opts = rbd_opts;
270
spin_lock(&node_lock);
271
list_add_tail(&rbdc->node, &rbd_client_list);
272
spin_unlock(&node_lock);
274
dout("rbd_client_create created %p\n", rbdc);
278
ceph_destroy_client(rbdc->client);
283
ceph_destroy_options(opt);
288
* Find a ceph client with specific addr and configuration.
290
static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
292
struct rbd_client *client_node;
294
if (opt->flags & CEPH_OPT_NOSHARE)
297
list_for_each_entry(client_node, &rbd_client_list, node)
298
if (ceph_compare_options(opt, client_node->client) == 0)
311
/* string args above */
314
static match_table_t rbdopt_tokens = {
315
{Opt_notify_timeout, "notify_timeout=%d"},
317
/* string args above */
321
static int parse_rbd_opts_token(char *c, void *private)
323
struct rbd_options *rbdopt = private;
324
substring_t argstr[MAX_OPT_ARGS];
325
int token, intval, ret;
327
token = match_token((char *)c, rbdopt_tokens, argstr);
331
if (token < Opt_last_int) {
332
ret = match_int(&argstr[0], &intval);
334
pr_err("bad mount option arg (not int) "
338
dout("got int token %d val %d\n", token, intval);
339
} else if (token > Opt_last_int && token < Opt_last_string) {
340
dout("got string token %d val %s\n", token,
343
dout("got token %d\n", token);
347
case Opt_notify_timeout:
348
rbdopt->notify_timeout = intval;
357
* Get a ceph client with specific addr and configuration, if one does
358
* not exist create it.
360
static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
363
struct rbd_client *rbdc;
364
struct ceph_options *opt;
366
struct rbd_options *rbd_opts;
368
rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
372
rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
374
ret = ceph_parse_options(&opt, options, mon_addr,
375
mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
379
spin_lock(&node_lock);
380
rbdc = __rbd_client_find(opt);
382
ceph_destroy_options(opt);
384
/* using an existing client */
385
kref_get(&rbdc->kref);
386
rbd_dev->rbd_client = rbdc;
387
rbd_dev->client = rbdc->client;
388
spin_unlock(&node_lock);
391
spin_unlock(&node_lock);
393
rbdc = rbd_client_create(opt, rbd_opts);
399
rbd_dev->rbd_client = rbdc;
400
rbd_dev->client = rbdc->client;
408
* Destroy ceph client
410
static void rbd_client_release(struct kref *kref)
412
struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
414
dout("rbd_release_client %p\n", rbdc);
415
spin_lock(&node_lock);
416
list_del(&rbdc->node);
417
spin_unlock(&node_lock);
419
ceph_destroy_client(rbdc->client);
420
kfree(rbdc->rbd_opts);
425
* Drop reference to ceph client node. If it's not referenced anymore, release
428
static void rbd_put_client(struct rbd_device *rbd_dev)
430
kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
431
rbd_dev->rbd_client = NULL;
432
rbd_dev->client = NULL;
436
* Destroy requests collection
438
static void rbd_coll_release(struct kref *kref)
440
struct rbd_req_coll *coll =
441
container_of(kref, struct rbd_req_coll, kref);
443
dout("rbd_coll_release %p\n", coll);
448
* Create a new header structure, translate header format from the on-disk
451
static int rbd_header_from_disk(struct rbd_image_header *header,
452
struct rbd_image_header_ondisk *ondisk,
457
u32 snap_count = le32_to_cpu(ondisk->snap_count);
460
if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) {
464
init_rwsem(&header->snap_rwsem);
465
header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
466
header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
468
sizeof(struct rbd_image_snap_ondisk),
473
header->snap_names = kmalloc(header->snap_names_len,
475
if (!header->snap_names)
477
header->snap_sizes = kmalloc(snap_count * sizeof(u64),
479
if (!header->snap_sizes)
482
header->snap_names = NULL;
483
header->snap_sizes = NULL;
485
memcpy(header->block_name, ondisk->block_name,
486
sizeof(ondisk->block_name));
488
header->image_size = le64_to_cpu(ondisk->image_size);
489
header->obj_order = ondisk->options.order;
490
header->crypt_type = ondisk->options.crypt_type;
491
header->comp_type = ondisk->options.comp_type;
493
atomic_set(&header->snapc->nref, 1);
494
header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495
header->snapc->num_snaps = snap_count;
496
header->total_snaps = snap_count;
499
allocated_snaps == snap_count) {
500
for (i = 0; i < snap_count; i++) {
501
header->snapc->snaps[i] =
502
le64_to_cpu(ondisk->snaps[i].id);
503
header->snap_sizes[i] =
504
le64_to_cpu(ondisk->snaps[i].image_size);
507
/* copy snapshot names */
508
memcpy(header->snap_names, &ondisk->snaps[i],
509
header->snap_names_len);
515
kfree(header->snap_names);
517
kfree(header->snapc);
521
static int snap_index(struct rbd_image_header *header, int snap_num)
523
return header->total_snaps - snap_num;
526
static u64 cur_snap_id(struct rbd_device *rbd_dev)
528
struct rbd_image_header *header = &rbd_dev->header;
530
if (!rbd_dev->cur_snap)
533
return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
536
static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
540
char *p = header->snap_names;
542
for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
543
if (strcmp(snap_name, p) == 0)
546
if (i == header->total_snaps)
549
*seq = header->snapc->snaps[i];
552
*size = header->snap_sizes[i];
557
static int rbd_header_set_snap(struct rbd_device *dev,
558
const char *snap_name,
561
struct rbd_image_header *header = &dev->header;
562
struct ceph_snap_context *snapc = header->snapc;
565
down_write(&header->snap_rwsem);
569
strcmp(snap_name, "-") == 0 ||
570
strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
571
if (header->total_snaps)
572
snapc->seq = header->snap_seq;
578
*size = header->image_size;
580
ret = snap_by_name(header, snap_name, &snapc->seq, size);
584
dev->cur_snap = header->total_snaps - ret;
590
up_write(&header->snap_rwsem);
594
static void rbd_header_free(struct rbd_image_header *header)
596
kfree(header->snapc);
597
kfree(header->snap_names);
598
kfree(header->snap_sizes);
602
* get the actual striped segment name, offset and length
604
static u64 rbd_get_segment(struct rbd_image_header *header,
605
const char *block_name,
607
char *seg_name, u64 *segofs)
609
u64 seg = ofs >> header->obj_order;
612
snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
613
"%s.%012llx", block_name, seg);
615
ofs = ofs & ((1 << header->obj_order) - 1);
616
len = min_t(u64, len, (1 << header->obj_order) - ofs);
624
static int rbd_get_num_segments(struct rbd_image_header *header,
627
u64 start_seg = ofs >> header->obj_order;
628
u64 end_seg = (ofs + len - 1) >> header->obj_order;
629
return end_seg - start_seg + 1;
633
* returns the size of an object in the image
635
static u64 rbd_obj_bytes(struct rbd_image_header *header)
637
return 1 << header->obj_order;
644
static void bio_chain_put(struct bio *chain)
650
chain = chain->bi_next;
656
* zeros a bio chain, starting at specific offset
658
static void zero_bio_chain(struct bio *chain, int start_ofs)
667
bio_for_each_segment(bv, chain, i) {
668
if (pos + bv->bv_len > start_ofs) {
669
int remainder = max(start_ofs - pos, 0);
670
buf = bvec_kmap_irq(bv, &flags);
671
memset(buf + remainder, 0,
672
bv->bv_len - remainder);
673
bvec_kunmap_irq(buf, &flags);
678
chain = chain->bi_next;
683
* bio_chain_clone - clone a chain of bios up to a certain length.
684
* might return a bio_pair that will need to be released.
686
static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
687
struct bio_pair **bp,
688
int len, gfp_t gfpmask)
690
struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
694
bio_pair_release(*bp);
698
while (old_chain && (total < len)) {
699
tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
703
if (total + old_chain->bi_size > len) {
707
* this split can only happen with a single paged bio,
708
* split_bio will BUG_ON if this is not the case
710
dout("bio_chain_clone split! total=%d remaining=%d"
712
(int)total, (int)len-total,
713
(int)old_chain->bi_size);
715
/* split the bio. We'll release it either in the next
716
call, or it will have to be released outside */
717
bp = bio_split(old_chain, (len - total) / 512ULL);
721
__bio_clone(tmp, &bp->bio1);
725
__bio_clone(tmp, old_chain);
726
*next = old_chain->bi_next;
730
gfpmask &= ~__GFP_WAIT;
734
new_chain = tail = tmp;
739
old_chain = old_chain->bi_next;
741
total += tmp->bi_size;
747
tail->bi_next = NULL;
754
dout("bio_chain_clone with err\n");
755
bio_chain_put(new_chain);
760
* helpers for osd request op vectors.
762
static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
767
*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
771
(*ops)[0].op = opcode;
773
* op extent offset and length will be set later on
774
* in calc_raw_layout()
776
(*ops)[0].payload_len = payload_len;
780
static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
785
static void rbd_coll_end_req_index(struct request *rq,
786
struct rbd_req_coll *coll,
790
struct request_queue *q;
793
dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
794
coll, index, ret, len);
800
blk_end_request(rq, ret, len);
806
spin_lock_irq(q->queue_lock);
807
coll->status[index].done = 1;
808
coll->status[index].rc = ret;
809
coll->status[index].bytes = len;
810
max = min = coll->num_done;
811
while (max < coll->total && coll->status[max].done)
814
for (i = min; i<max; i++) {
815
__blk_end_request(rq, coll->status[i].rc,
816
coll->status[i].bytes);
818
kref_put(&coll->kref, rbd_coll_release);
820
spin_unlock_irq(q->queue_lock);
823
static void rbd_coll_end_req(struct rbd_request *req,
826
rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
830
* Send ceph osd request
832
static int rbd_do_request(struct request *rq,
833
struct rbd_device *dev,
834
struct ceph_snap_context *snapc,
836
const char *obj, u64 ofs, u64 len,
841
struct ceph_osd_req_op *ops,
843
struct rbd_req_coll *coll,
845
void (*rbd_cb)(struct ceph_osd_request *req,
846
struct ceph_msg *msg),
847
struct ceph_osd_request **linger_req,
850
struct ceph_osd_request *req;
851
struct ceph_file_layout *layout;
854
struct timespec mtime = CURRENT_TIME;
855
struct rbd_request *req_data;
856
struct ceph_osd_request_head *reqhead;
857
struct rbd_image_header *header = &dev->header;
859
req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
862
rbd_coll_end_req_index(rq, coll, coll_index,
868
req_data->coll = coll;
869
req_data->coll_index = coll_index;
872
dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
874
down_read(&header->snap_rwsem);
876
req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
880
GFP_NOIO, pages, bio);
882
up_read(&header->snap_rwsem);
887
req->r_callback = rbd_cb;
891
req_data->pages = pages;
894
req->r_priv = req_data;
896
reqhead = req->r_request->front.iov_base;
897
reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
899
strncpy(req->r_oid, obj, sizeof(req->r_oid));
900
req->r_oid_len = strlen(req->r_oid);
902
layout = &req->r_file_layout;
903
memset(layout, 0, sizeof(*layout));
904
layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
905
layout->fl_stripe_count = cpu_to_le32(1);
906
layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
907
layout->fl_pg_preferred = cpu_to_le32(-1);
908
layout->fl_pg_pool = cpu_to_le32(dev->poolid);
909
ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
910
ofs, &len, &bno, req, ops);
912
ceph_osdc_build_request(req, ofs, &len,
916
req->r_oid, req->r_oid_len);
917
up_read(&header->snap_rwsem);
920
ceph_osdc_set_request_linger(&dev->client->osdc, req);
924
ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
929
ret = ceph_osdc_wait_request(&dev->client->osdc, req);
931
*ver = le64_to_cpu(req->r_reassert_version.version);
932
dout("reassert_ver=%lld\n",
933
le64_to_cpu(req->r_reassert_version.version));
934
ceph_osdc_put_request(req);
939
bio_chain_put(req_data->bio);
940
ceph_osdc_put_request(req);
942
rbd_coll_end_req(req_data, ret, len);
948
* Ceph osd op callback
950
static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
952
struct rbd_request *req_data = req->r_priv;
953
struct ceph_osd_reply_head *replyhead;
954
struct ceph_osd_op *op;
960
replyhead = msg->front.iov_base;
961
WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
962
op = (void *)(replyhead + 1);
963
rc = le32_to_cpu(replyhead->result);
964
bytes = le64_to_cpu(op->extent.length);
965
read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
967
dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
969
if (rc == -ENOENT && read_op) {
970
zero_bio_chain(req_data->bio, 0);
972
} else if (rc == 0 && read_op && bytes < req_data->len) {
973
zero_bio_chain(req_data->bio, bytes);
974
bytes = req_data->len;
977
rbd_coll_end_req(req_data, rc, bytes);
980
bio_chain_put(req_data->bio);
982
ceph_osdc_put_request(req);
986
static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
988
ceph_osdc_put_request(req);
992
* Do a synchronous ceph osd operation
994
static int rbd_req_sync_op(struct rbd_device *dev,
995
struct ceph_snap_context *snapc,
999
struct ceph_osd_req_op *orig_ops,
1004
struct ceph_osd_request **linger_req,
1008
struct page **pages;
1010
struct ceph_osd_req_op *ops = orig_ops;
1013
num_pages = calc_pages_for(ofs , len);
1014
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1016
return PTR_ERR(pages);
1019
payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1020
ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1024
if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1025
ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1031
ret = rbd_do_request(NULL, dev, snapc, snapid,
1032
obj, ofs, len, NULL,
1043
if ((flags & CEPH_OSD_FLAG_READ) && buf)
1044
ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1048
rbd_destroy_ops(ops);
1050
ceph_release_page_vector(pages, num_pages);
1055
* Do an asynchronous ceph osd operation
1057
static int rbd_do_op(struct request *rq,
1058
struct rbd_device *rbd_dev ,
1059
struct ceph_snap_context *snapc,
1061
int opcode, int flags, int num_reply,
1064
struct rbd_req_coll *coll,
1071
struct ceph_osd_req_op *ops;
1074
seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1078
seg_len = rbd_get_segment(&rbd_dev->header,
1079
rbd_dev->header.block_name,
1081
seg_name, &seg_ofs);
1083
payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1085
ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1089
/* we've taken care of segment sizes earlier when we
1090
cloned the bios. We should never have a segment
1091
truncated at this point */
1092
BUG_ON(seg_len < len);
1094
ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1095
seg_name, seg_ofs, seg_len,
1102
rbd_req_cb, 0, NULL);
1104
rbd_destroy_ops(ops);
1111
* Request async osd write
1113
static int rbd_req_write(struct request *rq,
1114
struct rbd_device *rbd_dev,
1115
struct ceph_snap_context *snapc,
1118
struct rbd_req_coll *coll,
1121
return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1123
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1125
ofs, len, bio, coll, coll_index);
1129
* Request async osd read
1131
static int rbd_req_read(struct request *rq,
1132
struct rbd_device *rbd_dev,
1136
struct rbd_req_coll *coll,
1139
return rbd_do_op(rq, rbd_dev, NULL,
1140
(snapid ? snapid : CEPH_NOSNAP),
1144
ofs, len, bio, coll, coll_index);
1148
* Request sync osd read
1150
static int rbd_req_sync_read(struct rbd_device *dev,
1151
struct ceph_snap_context *snapc,
1158
return rbd_req_sync_op(dev, NULL,
1159
(snapid ? snapid : CEPH_NOSNAP),
1163
1, obj, ofs, len, buf, NULL, ver);
1167
* Request sync osd watch
1169
static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1174
struct ceph_osd_req_op *ops;
1175
struct page **pages = NULL;
1178
ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1182
ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1183
ops[0].watch.cookie = notify_id;
1184
ops[0].watch.flag = 0;
1186
ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1193
rbd_simple_req_cb, 0, NULL);
1195
rbd_destroy_ops(ops);
1199
static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1201
struct rbd_device *dev = (struct rbd_device *)data;
1207
dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1208
notify_id, (int)opcode);
1209
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1210
rc = __rbd_update_snaps(dev);
1211
mutex_unlock(&ctl_mutex);
1213
pr_warning(DRV_NAME "%d got notification but failed to update"
1214
" snaps: %d\n", dev->major, rc);
1216
rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1220
* Request sync osd watch
1222
static int rbd_req_sync_watch(struct rbd_device *dev,
1226
struct ceph_osd_req_op *ops;
1227
struct ceph_osd_client *osdc = &dev->client->osdc;
1229
int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1233
ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1234
(void *)dev, &dev->watch_event);
1238
ops[0].watch.ver = cpu_to_le64(ver);
1239
ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1240
ops[0].watch.flag = 1;
1242
ret = rbd_req_sync_op(dev, NULL,
1245
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1248
&dev->watch_request, NULL);
1253
rbd_destroy_ops(ops);
1257
ceph_osdc_cancel_event(dev->watch_event);
1258
dev->watch_event = NULL;
1260
rbd_destroy_ops(ops);
1265
* Request sync osd unwatch
1267
static int rbd_req_sync_unwatch(struct rbd_device *dev,
1270
struct ceph_osd_req_op *ops;
1272
int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1276
ops[0].watch.ver = 0;
1277
ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1278
ops[0].watch.flag = 0;
1280
ret = rbd_req_sync_op(dev, NULL,
1283
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1285
1, obj, 0, 0, NULL, NULL, NULL);
1287
rbd_destroy_ops(ops);
1288
ceph_osdc_cancel_event(dev->watch_event);
1289
dev->watch_event = NULL;
1293
struct rbd_notify_info {
1294
struct rbd_device *dev;
1297
static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1299
struct rbd_device *dev = (struct rbd_device *)data;
1303
dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1304
notify_id, (int)opcode);
1308
* Request sync osd notify
1310
static int rbd_req_sync_notify(struct rbd_device *dev,
1313
struct ceph_osd_req_op *ops;
1314
struct ceph_osd_client *osdc = &dev->client->osdc;
1315
struct ceph_osd_event *event;
1316
struct rbd_notify_info info;
1317
int payload_len = sizeof(u32) + sizeof(u32);
1320
ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1326
ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1327
(void *)&info, &event);
1331
ops[0].watch.ver = 1;
1332
ops[0].watch.flag = 1;
1333
ops[0].watch.cookie = event->cookie;
1334
ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1335
ops[0].watch.timeout = 12;
1337
ret = rbd_req_sync_op(dev, NULL,
1340
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1342
1, obj, 0, 0, NULL, NULL, NULL);
1346
ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1347
dout("ceph_osdc_wait_event returned %d\n", ret);
1348
rbd_destroy_ops(ops);
1352
ceph_osdc_cancel_event(event);
1354
rbd_destroy_ops(ops);
1359
* Request sync osd read
1361
static int rbd_req_sync_exec(struct rbd_device *dev,
1369
struct ceph_osd_req_op *ops;
1370
int cls_len = strlen(cls);
1371
int method_len = strlen(method);
1372
int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1373
cls_len + method_len + len);
1377
ops[0].cls.class_name = cls;
1378
ops[0].cls.class_len = (__u8)cls_len;
1379
ops[0].cls.method_name = method;
1380
ops[0].cls.method_len = (__u8)method_len;
1381
ops[0].cls.argc = 0;
1382
ops[0].cls.indata = data;
1383
ops[0].cls.indata_len = len;
1385
ret = rbd_req_sync_op(dev, NULL,
1388
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1390
1, obj, 0, 0, NULL, NULL, ver);
1392
rbd_destroy_ops(ops);
1394
dout("cls_exec returned %d\n", ret);
1398
static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1400
struct rbd_req_coll *coll =
1401
kzalloc(sizeof(struct rbd_req_coll) +
1402
sizeof(struct rbd_req_status) * num_reqs,
1407
coll->total = num_reqs;
1408
kref_init(&coll->kref);
1413
* block device queue callback
1415
static void rbd_rq_fn(struct request_queue *q)
1417
struct rbd_device *rbd_dev = q->queuedata;
1419
struct bio_pair *bp = NULL;
1421
rq = blk_fetch_request(q);
1425
struct bio *rq_bio, *next_bio = NULL;
1427
int size, op_size = 0;
1429
int num_segs, cur_seg = 0;
1430
struct rbd_req_coll *coll;
1432
/* peek at request from block layer */
1436
dout("fetched request\n");
1438
/* filter out block requests we don't understand */
1439
if ((rq->cmd_type != REQ_TYPE_FS)) {
1440
__blk_end_request_all(rq, 0);
1444
/* deduce our operation (read, write) */
1445
do_write = (rq_data_dir(rq) == WRITE);
1447
size = blk_rq_bytes(rq);
1448
ofs = blk_rq_pos(rq) * 512ULL;
1450
if (do_write && rbd_dev->read_only) {
1451
__blk_end_request_all(rq, -EROFS);
1455
spin_unlock_irq(q->queue_lock);
1457
dout("%s 0x%x bytes at 0x%llx\n",
1458
do_write ? "write" : "read",
1459
size, blk_rq_pos(rq) * 512ULL);
1461
num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1462
coll = rbd_alloc_coll(num_segs);
1464
spin_lock_irq(q->queue_lock);
1465
__blk_end_request_all(rq, -ENOMEM);
1470
/* a bio clone to be passed down to OSD req */
1471
dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1472
op_size = rbd_get_segment(&rbd_dev->header,
1473
rbd_dev->header.block_name,
1476
kref_get(&coll->kref);
1477
bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1478
op_size, GFP_ATOMIC);
1480
rbd_coll_end_req_index(rq, coll, cur_seg,
1486
/* init OSD command: write or read */
1488
rbd_req_write(rq, rbd_dev,
1489
rbd_dev->header.snapc,
1494
rbd_req_read(rq, rbd_dev,
1495
cur_snap_id(rbd_dev),
1507
kref_put(&coll->kref, rbd_coll_release);
1510
bio_pair_release(bp);
1511
spin_lock_irq(q->queue_lock);
1513
rq = blk_fetch_request(q);
1518
* a queue callback. Makes sure that we don't create a bio that spans across
1519
* multiple osd objects. One exception would be with a single page bios,
1520
* which we handle later at bio_chain_clone
1522
static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1523
struct bio_vec *bvec)
1525
struct rbd_device *rbd_dev = q->queuedata;
1526
unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1527
sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1528
unsigned int bio_sectors = bmd->bi_size >> 9;
1531
max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1532
+ bio_sectors)) << 9;
1534
max = 0; /* bio_add cannot handle a negative return */
1535
if (max <= bvec->bv_len && bio_sectors == 0)
1536
return bvec->bv_len;
1540
static void rbd_free_disk(struct rbd_device *rbd_dev)
1542
struct gendisk *disk = rbd_dev->disk;
1547
rbd_header_free(&rbd_dev->header);
1549
if (disk->flags & GENHD_FL_UP)
1552
blk_cleanup_queue(disk->queue);
1557
* reload the ondisk the header
1559
static int rbd_read_header(struct rbd_device *rbd_dev,
1560
struct rbd_image_header *header)
1563
struct rbd_image_header_ondisk *dh;
1565
u64 snap_names_len = 0;
1569
int len = sizeof(*dh) +
1570
snap_count * sizeof(struct rbd_image_snap_ondisk) +
1574
dh = kmalloc(len, GFP_KERNEL);
1578
rc = rbd_req_sync_read(rbd_dev,
1580
rbd_dev->obj_md_name,
1586
rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1589
pr_warning("unrecognized header format"
1590
" for image %s", rbd_dev->obj);
1595
if (snap_count != header->total_snaps) {
1596
snap_count = header->total_snaps;
1597
snap_names_len = header->snap_names_len;
1598
rbd_header_free(header);
1604
header->obj_version = ver;
1614
static int rbd_header_add_snap(struct rbd_device *dev,
1615
const char *snap_name,
1618
int name_len = strlen(snap_name);
1624
/* we should create a snapshot only if we're pointing at the head */
1628
ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1630
dout("created snapid=%lld\n", new_snapid);
1634
data = kmalloc(name_len + 16, gfp_flags);
1639
e = data + name_len + 16;
1641
ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1642
ceph_encode_64_safe(&p, e, new_snapid, bad);
1644
ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1645
data, p - data, &ver);
1652
dev->header.snapc->seq = new_snapid;
1659
static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1661
struct rbd_snap *snap;
1663
while (!list_empty(&rbd_dev->snaps)) {
1664
snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1665
__rbd_remove_snap_dev(rbd_dev, snap);
1670
* only read the first part of the ondisk header, without the snaps info
1672
static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1675
struct rbd_image_header h;
1679
ret = rbd_read_header(rbd_dev, &h);
1684
set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1686
down_write(&rbd_dev->header.snap_rwsem);
1688
snap_seq = rbd_dev->header.snapc->seq;
1689
if (rbd_dev->header.total_snaps &&
1690
rbd_dev->header.snapc->snaps[0] == snap_seq)
1691
/* pointing at the head, will need to follow that
1695
kfree(rbd_dev->header.snapc);
1696
kfree(rbd_dev->header.snap_names);
1697
kfree(rbd_dev->header.snap_sizes);
1699
rbd_dev->header.total_snaps = h.total_snaps;
1700
rbd_dev->header.snapc = h.snapc;
1701
rbd_dev->header.snap_names = h.snap_names;
1702
rbd_dev->header.snap_names_len = h.snap_names_len;
1703
rbd_dev->header.snap_sizes = h.snap_sizes;
1705
rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1707
rbd_dev->header.snapc->seq = snap_seq;
1709
ret = __rbd_init_snaps_header(rbd_dev);
1711
up_write(&rbd_dev->header.snap_rwsem);
1716
static int rbd_init_disk(struct rbd_device *rbd_dev)
1718
struct gendisk *disk;
1719
struct request_queue *q;
1723
/* contact OSD, request size info about the object being mapped */
1724
rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1728
/* no need to lock here, as rbd_dev is not registered yet */
1729
rc = __rbd_init_snaps_header(rbd_dev);
1733
rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1737
/* create gendisk info */
1739
disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1743
snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1745
disk->major = rbd_dev->major;
1746
disk->first_minor = 0;
1747
disk->fops = &rbd_bd_ops;
1748
disk->private_data = rbd_dev;
1752
q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1756
/* set io sizes to object size */
1757
blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1758
blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1759
blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1760
blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1762
blk_queue_merge_bvec(q, rbd_merge_bvec);
1765
q->queuedata = rbd_dev;
1767
rbd_dev->disk = disk;
1770
/* finally, announce the disk to the world */
1771
set_capacity(disk, total_size / 512ULL);
1774
pr_info("%s: added with size 0x%llx\n",
1775
disk->disk_name, (unsigned long long)total_size);
1788
static ssize_t rbd_size_show(struct device *dev,
1789
struct device_attribute *attr, char *buf)
1791
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1793
return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1796
static ssize_t rbd_major_show(struct device *dev,
1797
struct device_attribute *attr, char *buf)
1799
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1801
return sprintf(buf, "%d\n", rbd_dev->major);
1804
static ssize_t rbd_client_id_show(struct device *dev,
1805
struct device_attribute *attr, char *buf)
1807
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1809
return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1812
static ssize_t rbd_pool_show(struct device *dev,
1813
struct device_attribute *attr, char *buf)
1815
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1817
return sprintf(buf, "%s\n", rbd_dev->pool_name);
1820
static ssize_t rbd_name_show(struct device *dev,
1821
struct device_attribute *attr, char *buf)
1823
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1825
return sprintf(buf, "%s\n", rbd_dev->obj);
1828
static ssize_t rbd_snap_show(struct device *dev,
1829
struct device_attribute *attr,
1832
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1834
return sprintf(buf, "%s\n", rbd_dev->snap_name);
1837
static ssize_t rbd_image_refresh(struct device *dev,
1838
struct device_attribute *attr,
1842
struct rbd_device *rbd_dev = dev_to_rbd(dev);
1846
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1848
rc = __rbd_update_snaps(rbd_dev);
1852
mutex_unlock(&ctl_mutex);
1856
static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1857
static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1858
static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1859
static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1860
static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1861
static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1862
static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1863
static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1865
static struct attribute *rbd_attrs[] = {
1866
&dev_attr_size.attr,
1867
&dev_attr_major.attr,
1868
&dev_attr_client_id.attr,
1869
&dev_attr_pool.attr,
1870
&dev_attr_name.attr,
1871
&dev_attr_current_snap.attr,
1872
&dev_attr_refresh.attr,
1873
&dev_attr_create_snap.attr,
1877
static struct attribute_group rbd_attr_group = {
1881
static const struct attribute_group *rbd_attr_groups[] = {
1886
static void rbd_sysfs_dev_release(struct device *dev)
1890
static struct device_type rbd_device_type = {
1892
.groups = rbd_attr_groups,
1893
.release = rbd_sysfs_dev_release,
1901
static ssize_t rbd_snap_size_show(struct device *dev,
1902
struct device_attribute *attr,
1905
struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1907
return sprintf(buf, "%lld\n", (long long)snap->size);
1910
static ssize_t rbd_snap_id_show(struct device *dev,
1911
struct device_attribute *attr,
1914
struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1916
return sprintf(buf, "%lld\n", (long long)snap->id);
1919
static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1920
static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1922
static struct attribute *rbd_snap_attrs[] = {
1923
&dev_attr_snap_size.attr,
1924
&dev_attr_snap_id.attr,
1928
static struct attribute_group rbd_snap_attr_group = {
1929
.attrs = rbd_snap_attrs,
1932
static void rbd_snap_dev_release(struct device *dev)
1934
struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1939
static const struct attribute_group *rbd_snap_attr_groups[] = {
1940
&rbd_snap_attr_group,
1944
static struct device_type rbd_snap_device_type = {
1945
.groups = rbd_snap_attr_groups,
1946
.release = rbd_snap_dev_release,
1949
static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1950
struct rbd_snap *snap)
1952
list_del(&snap->node);
1953
device_unregister(&snap->dev);
1956
static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1957
struct rbd_snap *snap,
1958
struct device *parent)
1960
struct device *dev = &snap->dev;
1963
dev->type = &rbd_snap_device_type;
1964
dev->parent = parent;
1965
dev->release = rbd_snap_dev_release;
1966
dev_set_name(dev, "snap_%s", snap->name);
1967
ret = device_register(dev);
1972
static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1973
int i, const char *name,
1974
struct rbd_snap **snapp)
1977
struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1980
snap->name = kstrdup(name, GFP_KERNEL);
1981
snap->size = rbd_dev->header.snap_sizes[i];
1982
snap->id = rbd_dev->header.snapc->snaps[i];
1983
if (device_is_registered(&rbd_dev->dev)) {
1984
ret = rbd_register_snap_dev(rbd_dev, snap,
1998
* search for the previous snap in a null delimited string list
2000
const char *rbd_prev_snap_name(const char *name, const char *start)
2002
if (name < start + 2)
2015
* compare the old list of snapshots that we have to what's in the header
2016
* and update it accordingly. Note that the header holds the snapshots
2017
* in a reverse order (from newest to oldest) and we need to go from
2018
* older to new so that we don't get a duplicate snap name when
2019
* doing the process (e.g., removed snapshot and recreated a new
2020
* one with the same name.
2022
static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2024
const char *name, *first_name;
2025
int i = rbd_dev->header.total_snaps;
2026
struct rbd_snap *snap, *old_snap = NULL;
2028
struct list_head *p, *n;
2030
first_name = rbd_dev->header.snap_names;
2031
name = first_name + rbd_dev->header.snap_names_len;
2033
list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2036
old_snap = list_entry(p, struct rbd_snap, node);
2039
cur_id = rbd_dev->header.snapc->snaps[i - 1];
2041
if (!i || old_snap->id < cur_id) {
2042
/* old_snap->id was skipped, thus was removed */
2043
__rbd_remove_snap_dev(rbd_dev, old_snap);
2046
if (old_snap->id == cur_id) {
2047
/* we have this snapshot already */
2049
name = rbd_prev_snap_name(name, first_name);
2053
i--, name = rbd_prev_snap_name(name, first_name)) {
2058
cur_id = rbd_dev->header.snapc->snaps[i];
2059
/* snapshot removal? handle it above */
2060
if (cur_id >= old_snap->id)
2062
/* a new snapshot */
2063
ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2067
/* note that we add it backward so using n and not p */
2068
list_add(&snap->node, n);
2072
/* we're done going over the old snap list, just add what's left */
2073
for (; i > 0; i--) {
2074
name = rbd_prev_snap_name(name, first_name);
2079
ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2082
list_add(&snap->node, &rbd_dev->snaps);
2089
static void rbd_root_dev_release(struct device *dev)
2093
static struct device rbd_root_dev = {
2095
.release = rbd_root_dev_release,
2098
static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2102
struct rbd_snap *snap;
2104
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2105
dev = &rbd_dev->dev;
2107
dev->bus = &rbd_bus_type;
2108
dev->type = &rbd_device_type;
2109
dev->parent = &rbd_root_dev;
2110
dev->release = rbd_dev_release;
2111
dev_set_name(dev, "%d", rbd_dev->id);
2112
ret = device_register(dev);
2116
list_for_each_entry(snap, &rbd_dev->snaps, node) {
2117
ret = rbd_register_snap_dev(rbd_dev, snap,
2123
mutex_unlock(&ctl_mutex);
2126
mutex_unlock(&ctl_mutex);
2130
static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2132
device_unregister(&rbd_dev->dev);
2135
static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2140
ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2141
rbd_dev->header.obj_version);
2142
if (ret == -ERANGE) {
2143
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2144
rc = __rbd_update_snaps(rbd_dev);
2145
mutex_unlock(&ctl_mutex);
2149
} while (ret == -ERANGE);
2154
static ssize_t rbd_add(struct bus_type *bus,
2158
struct ceph_osd_client *osdc;
2159
struct rbd_device *rbd_dev;
2160
ssize_t rc = -ENOMEM;
2161
int irc, new_id = 0;
2162
struct list_head *tmp;
2166
if (!try_module_get(THIS_MODULE))
2169
mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2173
options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2177
/* new rbd_device object */
2178
rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2182
/* static rbd_device initialization */
2183
spin_lock_init(&rbd_dev->lock);
2184
INIT_LIST_HEAD(&rbd_dev->node);
2185
INIT_LIST_HEAD(&rbd_dev->snaps);
2187
/* generate unique id: find highest unique id, add one */
2188
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2190
list_for_each(tmp, &rbd_dev_list) {
2191
struct rbd_device *rbd_dev;
2193
rbd_dev = list_entry(tmp, struct rbd_device, node);
2194
if (rbd_dev->id >= new_id)
2195
new_id = rbd_dev->id + 1;
2198
rbd_dev->id = new_id;
2200
/* add to global list */
2201
list_add_tail(&rbd_dev->node, &rbd_dev_list);
2203
/* parse add command */
2204
if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2205
"%" __stringify(RBD_MAX_OPT_LEN) "s "
2206
"%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2207
"%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2208
"%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2209
mon_dev_name, options, rbd_dev->pool_name,
2210
rbd_dev->obj, rbd_dev->snap_name) < 4) {
2215
if (rbd_dev->snap_name[0] == 0)
2216
rbd_dev->snap_name[0] = '-';
2218
rbd_dev->obj_len = strlen(rbd_dev->obj);
2219
snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2220
rbd_dev->obj, RBD_SUFFIX);
2222
/* initialize rest of new object */
2223
snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2224
rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2228
mutex_unlock(&ctl_mutex);
2231
osdc = &rbd_dev->client->osdc;
2232
rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2234
goto err_out_client;
2235
rbd_dev->poolid = rc;
2237
/* register our block device */
2238
irc = register_blkdev(0, rbd_dev->name);
2241
goto err_out_client;
2243
rbd_dev->major = irc;
2245
rc = rbd_bus_add_dev(rbd_dev);
2247
goto err_out_blkdev;
2249
/* set up and announce blkdev mapping */
2250
rc = rbd_init_disk(rbd_dev);
2254
rc = rbd_init_watch_dev(rbd_dev);
2261
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2262
list_del_init(&rbd_dev->node);
2263
mutex_unlock(&ctl_mutex);
2265
/* this will also clean up rest of rbd_dev stuff */
2267
rbd_bus_del_dev(rbd_dev);
2269
kfree(mon_dev_name);
2273
unregister_blkdev(rbd_dev->major, rbd_dev->name);
2275
rbd_put_client(rbd_dev);
2276
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2278
list_del_init(&rbd_dev->node);
2279
mutex_unlock(&ctl_mutex);
2285
kfree(mon_dev_name);
2287
dout("Error adding device %s\n", buf);
2288
module_put(THIS_MODULE);
2292
static struct rbd_device *__rbd_get_dev(unsigned long id)
2294
struct list_head *tmp;
2295
struct rbd_device *rbd_dev;
2297
list_for_each(tmp, &rbd_dev_list) {
2298
rbd_dev = list_entry(tmp, struct rbd_device, node);
2299
if (rbd_dev->id == id)
2305
static void rbd_dev_release(struct device *dev)
2307
struct rbd_device *rbd_dev =
2308
container_of(dev, struct rbd_device, dev);
2310
if (rbd_dev->watch_request)
2311
ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2312
rbd_dev->watch_request);
2313
if (rbd_dev->watch_event)
2314
rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2316
rbd_put_client(rbd_dev);
2318
/* clean up and free blkdev */
2319
rbd_free_disk(rbd_dev);
2320
unregister_blkdev(rbd_dev->major, rbd_dev->name);
2323
/* release module ref */
2324
module_put(THIS_MODULE);
2327
static ssize_t rbd_remove(struct bus_type *bus,
2331
struct rbd_device *rbd_dev = NULL;
2336
rc = strict_strtoul(buf, 10, &ul);
2340
/* convert to int; abort if we lost anything in the conversion */
2341
target_id = (int) ul;
2342
if (target_id != ul)
2345
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2347
rbd_dev = __rbd_get_dev(target_id);
2353
list_del_init(&rbd_dev->node);
2355
__rbd_remove_all_snaps(rbd_dev);
2356
rbd_bus_del_dev(rbd_dev);
2359
mutex_unlock(&ctl_mutex);
2363
static ssize_t rbd_snap_add(struct device *dev,
2364
struct device_attribute *attr,
2368
struct rbd_device *rbd_dev = dev_to_rbd(dev);
2370
char *name = kmalloc(count + 1, GFP_KERNEL);
2374
snprintf(name, count, "%s", buf);
2376
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2378
ret = rbd_header_add_snap(rbd_dev,
2383
ret = __rbd_update_snaps(rbd_dev);
2387
/* shouldn't hold ctl_mutex when notifying.. notify might
2388
trigger a watch callback that would need to get that mutex */
2389
mutex_unlock(&ctl_mutex);
2391
/* make a best effort, don't error if failed */
2392
rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2399
mutex_unlock(&ctl_mutex);
2404
static struct bus_attribute rbd_bus_attrs[] = {
2405
__ATTR(add, S_IWUSR, NULL, rbd_add),
2406
__ATTR(remove, S_IWUSR, NULL, rbd_remove),
2411
* create control files in sysfs
2414
static int rbd_sysfs_init(void)
2418
rbd_bus_type.bus_attrs = rbd_bus_attrs;
2420
ret = bus_register(&rbd_bus_type);
2424
ret = device_register(&rbd_root_dev);
2429
static void rbd_sysfs_cleanup(void)
2431
device_unregister(&rbd_root_dev);
2432
bus_unregister(&rbd_bus_type);
2435
int __init rbd_init(void)
2439
rc = rbd_sysfs_init();
2442
spin_lock_init(&node_lock);
2443
pr_info("loaded " DRV_NAME_LONG "\n");
2447
void __exit rbd_exit(void)
2449
rbd_sysfs_cleanup();
2452
module_init(rbd_init);
2453
module_exit(rbd_exit);
2455
MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2456
MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2457
MODULE_DESCRIPTION("rados block device");
2459
/* following authorship retained from original osdblk.c */
2460
MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2462
MODULE_LICENSE("GPL");