1
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2
// vim: ts=8 sw=2 smarttab
4
* Ceph - scalable distributed file system
6
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8
* This is free software; you can redistribute it and/or
9
* modify it under the terms of the GNU Lesser General Public
10
* License version 2.1, as published by the Free Software
11
* Foundation. See file COPYING.
20
#include <sys/utsname.h>
23
#include <boost/scoped_ptr.hpp>
25
#ifdef HAVE_SYS_PARAM_H
26
#include <sys/param.h>
29
#ifdef HAVE_SYS_MOUNT_H
30
#include <sys/mount.h>
35
#include "include/types.h"
36
#include "include/compat.h"
41
#include "osdc/Objecter.h"
43
#include "common/ceph_argparse.h"
44
#include "common/version.h"
46
#include "os/ObjectStore.h"
48
#include "ReplicatedPG.h"
53
#include "msg/Messenger.h"
54
#include "msg/Message.h"
56
#include "mon/MonClient.h"
58
#include "messages/MLog.h"
60
#include "messages/MGenericMessage.h"
61
#include "messages/MPing.h"
62
#include "messages/MOSDPing.h"
63
#include "messages/MOSDFailure.h"
64
#include "messages/MOSDMarkMeDown.h"
65
#include "messages/MOSDOp.h"
66
#include "messages/MOSDOpReply.h"
67
#include "messages/MOSDSubOp.h"
68
#include "messages/MOSDSubOpReply.h"
69
#include "messages/MOSDBoot.h"
70
#include "messages/MOSDPGTemp.h"
72
#include "messages/MOSDMap.h"
73
#include "messages/MOSDPGNotify.h"
74
#include "messages/MOSDPGQuery.h"
75
#include "messages/MOSDPGLog.h"
76
#include "messages/MOSDPGRemove.h"
77
#include "messages/MOSDPGInfo.h"
78
#include "messages/MOSDPGCreate.h"
79
#include "messages/MOSDPGTrim.h"
80
#include "messages/MOSDPGScan.h"
81
#include "messages/MOSDPGBackfill.h"
82
#include "messages/MOSDPGMissing.h"
83
#include "messages/MBackfillReserve.h"
84
#include "messages/MRecoveryReserve.h"
85
#include "messages/MOSDECSubOpWrite.h"
86
#include "messages/MOSDECSubOpWriteReply.h"
87
#include "messages/MOSDECSubOpRead.h"
88
#include "messages/MOSDECSubOpReadReply.h"
90
#include "messages/MOSDAlive.h"
92
#include "messages/MOSDScrub.h"
93
#include "messages/MOSDRepScrub.h"
95
#include "messages/MMonCommand.h"
96
#include "messages/MCommand.h"
97
#include "messages/MCommandReply.h"
99
#include "messages/MPGStats.h"
100
#include "messages/MPGStatsAck.h"
102
#include "messages/MWatchNotify.h"
103
#include "messages/MOSDPGPush.h"
104
#include "messages/MOSDPGPushReply.h"
105
#include "messages/MOSDPGPull.h"
107
#include "common/perf_counters.h"
108
#include "common/Timer.h"
109
#include "common/LogClient.h"
110
#include "common/HeartbeatMap.h"
111
#include "common/admin_socket.h"
113
#include "global/signal_handler.h"
114
#include "global/pidfile.h"
116
#include "include/color.h"
117
#include "perfglue/cpu_profiler.h"
118
#include "perfglue/heap_profiler.h"
120
#include "osd/ClassHandler.h"
121
#include "osd/OpRequest.h"
123
#include "auth/AuthAuthorizeHandler.h"
125
#include "common/errno.h"
127
#include "objclass/objclass.h"
129
#include "common/cmdparse.h"
130
#include "include/str_list.h"
132
#include "include/assert.h"
133
#include "common/config.h"
135
#define dout_subsys ceph_subsys_osd
137
#define dout_prefix _prefix(_dout, whoami, get_osdmap())
139
static ostream& _prefix(std::ostream* _dout, int whoami, OSDMapRef osdmap) {
140
return *_dout << "osd." << whoami << " "
141
<< (osdmap ? osdmap->get_epoch():0)
145
//Initial features in new superblock.
146
//Features here are also automatically upgraded
147
CompatSet OSD::get_osd_initial_compat_set() {
148
CompatSet::FeatureSet ceph_osd_feature_compat;
149
CompatSet::FeatureSet ceph_osd_feature_ro_compat;
150
CompatSet::FeatureSet ceph_osd_feature_incompat;
151
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
152
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_PGINFO);
153
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_OLOC);
154
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEC);
155
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_CATEGORIES);
156
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_HOBJECTPOOL);
157
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BIGINFO);
158
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBINFO);
159
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_LEVELDBLOG);
160
ceph_osd_feature_incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SNAPMAPPER);
161
return CompatSet(ceph_osd_feature_compat, ceph_osd_feature_ro_compat,
162
ceph_osd_feature_incompat);
165
//Features are added here that this OSD supports.
166
CompatSet OSD::get_osd_compat_set() {
167
CompatSet compat = get_osd_initial_compat_set();
168
//Any features here can be set in code, but not in initial superblock
169
compat.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SHARDS);
173
OSDService::OSDService(OSD *osd) :
176
whoami(osd->whoami), store(osd->store), clog(osd->clog),
177
pg_recovery_stats(osd->pg_recovery_stats),
178
infos_oid(OSD::make_infos_oid()),
179
cluster_messenger(osd->cluster_messenger),
180
client_messenger(osd->client_messenger),
182
recoverystate_perf(osd->recoverystate_perf),
185
peering_wq(osd->peering_wq),
186
recovery_wq(osd->recovery_wq),
187
snap_trim_wq(osd->snap_trim_wq),
188
scrub_wq(osd->scrub_wq),
189
scrub_finalize_wq(osd->scrub_finalize_wq),
190
rep_scrub_wq(osd->rep_scrub_wq),
191
push_wq("push_wq", cct->_conf->osd_recovery_thread_timeout, &osd->recovery_tp),
192
gen_wq("gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->recovery_tp),
193
class_handler(osd->class_handler),
194
publish_lock("OSDService::publish_lock"),
195
pre_publish_lock("OSDService::pre_publish_lock"),
196
sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
198
agent_lock("OSD::agent_lock"),
199
agent_valid_iterator(false),
203
agent_stop_flag(false),
204
agent_timer_lock("OSD::agent_timer_lock"),
205
agent_timer(osd->client_messenger->cct, agent_timer_lock),
206
objecter_lock("OSD::objecter_lock"),
207
objecter_timer(osd->client_messenger->cct, objecter_lock),
208
objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, &objecter_osdmap,
209
objecter_lock, objecter_timer, 0, 0)),
210
objecter_finisher(osd->client_messenger->cct),
211
objecter_dispatcher(this),
212
watch_lock("OSD::watch_lock"),
213
watch_timer(osd->client_messenger->cct, watch_lock),
215
backfill_request_lock("OSD::backfill_request_lock"),
216
backfill_request_timer(cct, backfill_request_lock, false),
218
tid_lock("OSDService::tid_lock"),
219
reserver_finisher(cct),
220
local_reserver(&reserver_finisher, cct->_conf->osd_max_backfills),
221
remote_reserver(&reserver_finisher, cct->_conf->osd_max_backfills),
222
pg_temp_lock("OSDService::pg_temp_lock"),
223
map_cache_lock("OSDService::map_lock"),
224
map_cache(cct->_conf->osd_map_cache_size),
225
map_bl_cache(cct->_conf->osd_map_cache_size),
226
map_bl_inc_cache(cct->_conf->osd_map_cache_size),
227
in_progress_split_lock("OSDService::in_progress_split_lock"),
228
full_status_lock("OSDService::full_status_lock"),
232
is_stopping_lock("OSDService::is_stopping_lock"),
235
, pgid_lock("OSDService::pgid_lock")
239
OSDService::~OSDService()
244
void OSDService::_start_split(spg_t parent, const set<spg_t> &children)
246
for (set<spg_t>::const_iterator i = children.begin();
249
dout(10) << __func__ << ": Starting split on pg " << *i
250
<< ", parent=" << parent << dendl;
251
assert(!pending_splits.count(*i));
252
assert(!in_progress_splits.count(*i));
253
pending_splits.insert(make_pair(*i, parent));
255
assert(!rev_pending_splits[parent].count(*i));
256
rev_pending_splits[parent].insert(*i);
260
void OSDService::mark_split_in_progress(spg_t parent, const set<spg_t> &children)
262
Mutex::Locker l(in_progress_split_lock);
263
map<spg_t, set<spg_t> >::iterator piter = rev_pending_splits.find(parent);
264
assert(piter != rev_pending_splits.end());
265
for (set<spg_t>::const_iterator i = children.begin();
268
assert(piter->second.count(*i));
269
assert(pending_splits.count(*i));
270
assert(!in_progress_splits.count(*i));
271
assert(pending_splits[*i] == parent);
273
pending_splits.erase(*i);
274
piter->second.erase(*i);
275
in_progress_splits.insert(*i);
277
if (piter->second.empty())
278
rev_pending_splits.erase(piter);
281
void OSDService::cancel_pending_splits_for_parent(spg_t parent)
283
Mutex::Locker l(in_progress_split_lock);
284
return _cancel_pending_splits_for_parent(parent);
287
void OSDService::_cancel_pending_splits_for_parent(spg_t parent)
289
map<spg_t, set<spg_t> >::iterator piter = rev_pending_splits.find(parent);
290
if (piter == rev_pending_splits.end())
293
for (set<spg_t>::iterator i = piter->second.begin();
294
i != piter->second.end();
296
assert(pending_splits.count(*i));
297
assert(!in_progress_splits.count(*i));
298
pending_splits.erase(*i);
299
dout(10) << __func__ << ": Completing split on pg " << *i
300
<< " for parent: " << parent << dendl;
301
_cancel_pending_splits_for_parent(*i);
303
rev_pending_splits.erase(piter);
306
void OSDService::_maybe_split_pgid(OSDMapRef old_map,
310
assert(old_map->have_pg_pool(pgid.pool()));
311
if (pgid.ps() < static_cast<unsigned>(old_map->get_pg_num(pgid.pool()))) {
313
pgid.is_split(old_map->get_pg_num(pgid.pool()),
314
new_map->get_pg_num(pgid.pool()), &children);
315
_start_split(pgid, children);
317
assert(pgid.ps() < static_cast<unsigned>(new_map->get_pg_num(pgid.pool())));
321
void OSDService::init_splits_between(spg_t pgid,
325
// First, check whether we can avoid this potentially expensive check
326
if (tomap->have_pg_pool(pgid.pool()) &&
328
frommap->get_pg_num(pgid.pool()),
329
tomap->get_pg_num(pgid.pool()),
331
// Ok, a split happened, so we need to walk the osdmaps
332
set<spg_t> new_pgs; // pgs to scan on each map
333
new_pgs.insert(pgid);
334
OSDMapRef curmap(get_map(frommap->get_epoch()));
335
for (epoch_t e = frommap->get_epoch() + 1;
336
e <= tomap->get_epoch();
338
OSDMapRef nextmap(try_get_map(e));
341
set<spg_t> even_newer_pgs; // pgs added in this loop
342
for (set<spg_t>::iterator i = new_pgs.begin(); i != new_pgs.end(); ++i) {
343
set<spg_t> split_pgs;
344
if (i->is_split(curmap->get_pg_num(i->pool()),
345
nextmap->get_pg_num(i->pool()),
347
start_split(*i, split_pgs);
348
even_newer_pgs.insert(split_pgs.begin(), split_pgs.end());
351
new_pgs.insert(even_newer_pgs.begin(), even_newer_pgs.end());
354
assert(curmap == tomap); // we must have had both frommap and tomap
358
void OSDService::expand_pg_num(OSDMapRef old_map,
361
Mutex::Locker l(in_progress_split_lock);
362
for (set<spg_t>::iterator i = in_progress_splits.begin();
363
i != in_progress_splits.end();
365
if (!new_map->have_pg_pool(i->pool())) {
366
in_progress_splits.erase(i++);
368
_maybe_split_pgid(old_map, new_map, *i);
372
for (map<spg_t, spg_t>::iterator i = pending_splits.begin();
373
i != pending_splits.end();
375
if (!new_map->have_pg_pool(i->first.pool())) {
376
rev_pending_splits.erase(i->second);
377
pending_splits.erase(i++);
379
_maybe_split_pgid(old_map, new_map, i->first);
385
bool OSDService::splitting(spg_t pgid)
387
Mutex::Locker l(in_progress_split_lock);
388
return in_progress_splits.count(pgid) ||
389
pending_splits.count(pgid);
392
void OSDService::complete_split(const set<spg_t> &pgs)
394
Mutex::Locker l(in_progress_split_lock);
395
for (set<spg_t>::const_iterator i = pgs.begin();
398
dout(10) << __func__ << ": Completing split on pg " << *i << dendl;
399
assert(!pending_splits.count(*i));
400
assert(in_progress_splits.count(*i));
401
in_progress_splits.erase(*i);
405
void OSDService::need_heartbeat_peer_update()
407
osd->need_heartbeat_peer_update();
410
void OSDService::pg_stat_queue_enqueue(PG *pg)
412
osd->pg_stat_queue_enqueue(pg);
415
void OSDService::pg_stat_queue_dequeue(PG *pg)
417
osd->pg_stat_queue_dequeue(pg);
420
void OSDService::shutdown()
422
reserver_finisher.stop();
424
Mutex::Locker l(watch_lock);
425
watch_timer.shutdown();
429
Mutex::Locker l(objecter_lock);
430
objecter_timer.shutdown();
431
objecter->shutdown_locked();
433
objecter->shutdown_unlocked();
434
objecter_finisher.stop();
437
Mutex::Locker l(backfill_request_lock);
438
backfill_request_timer.shutdown();
441
Mutex::Locker l(agent_timer_lock);
442
agent_timer.shutdown();
444
osdmap = OSDMapRef();
445
next_osdmap = OSDMapRef();
448
void OSDService::init()
450
reserver_finisher.start();
452
objecter_finisher.start();
453
objecter->init_unlocked();
454
Mutex::Locker l(objecter_lock);
455
objecter_timer.init();
456
objecter->set_client_incarnation(0);
457
objecter->init_locked();
462
agent_thread.create();
465
void OSDService::activate_map()
467
// wake/unwake the tiering agent
470
!osdmap->test_flag(CEPH_OSDMAP_NOTIERAGENT) &&
476
class AgentTimeoutCB : public Context {
479
AgentTimeoutCB(PGRef _pg) : pg(_pg) {}
481
pg->agent_choose_mode_restart();
485
void OSDService::agent_entry()
487
dout(10) << __func__ << " start" << dendl;
490
while (!agent_stop_flag) {
491
if (agent_queue.empty()) {
492
dout(20) << __func__ << " empty queue" << dendl;
493
agent_cond.Wait(agent_lock);
496
uint64_t level = agent_queue.rbegin()->first;
497
set<PGRef>& top = agent_queue.rbegin()->second;
499
<< " tiers " << agent_queue.size()
500
<< ", top is " << level
501
<< " with pgs " << top.size()
502
<< ", ops " << agent_ops << "/"
503
<< g_conf->osd_agent_max_ops
504
<< (agent_active ? " active" : " NOT ACTIVE")
506
dout(20) << __func__ << " oids " << agent_oids << dendl;
507
if (agent_ops >= g_conf->osd_agent_max_ops || top.empty() ||
509
agent_cond.Wait(agent_lock);
513
if (!agent_valid_iterator || agent_queue_pos == top.end()) {
514
agent_queue_pos = top.begin();
515
agent_valid_iterator = true;
517
PGRef pg = *agent_queue_pos;
518
int max = g_conf->osd_agent_max_ops - agent_ops;
520
if (!pg->agent_work(max)) {
521
dout(10) << __func__ << " " << *pg
522
<< " no agent_work, delay for " << g_conf->osd_agent_delay_time
523
<< " seconds" << dendl;
525
osd->logger->inc(l_osd_tier_delay);
526
// Queue a timer to call agent_choose_mode for this pg in 5 seconds
527
agent_timer_lock.Lock();
528
Context *cb = new AgentTimeoutCB(pg);
529
agent_timer.add_event_after(g_conf->osd_agent_delay_time, cb);
530
agent_timer_lock.Unlock();
535
dout(10) << __func__ << " finish" << dendl;
538
void OSDService::agent_stop()
541
Mutex::Locker l(agent_lock);
543
// By this time all ops should be cancelled
544
assert(agent_ops == 0);
545
// By this time all PGs are shutdown and dequeued
546
if (!agent_queue.empty()) {
547
set<PGRef>& top = agent_queue.rbegin()->second;
548
derr << "agent queue not empty, for example " << (*top.begin())->info.pgid << dendl;
549
assert(0 == "agent queue not empty");
552
agent_stop_flag = true;
560
#define dout_prefix *_dout
562
int OSD::convert_collection(ObjectStore *store, coll_t cid)
564
coll_t tmp0("convertfs_temp");
565
coll_t tmp1("convertfs_temp1");
566
vector<ghobject_t> objects;
568
map<string, bufferptr> aset;
569
int r = store->collection_getattrs(cid, aset);
574
ObjectStore::Transaction t;
575
t.create_collection(tmp0);
576
for (map<string, bufferptr>::iterator i = aset.begin();
580
val.push_back(i->second);
581
t.collection_setattr(tmp0, i->first, val);
583
store->apply_transaction(t);
587
while (!next.is_max()) {
589
ghobject_t start = next;
590
r = store->collection_list_partial(cid, start,
596
ObjectStore::Transaction t;
597
for (vector<ghobject_t>::iterator i = objects.begin();
600
t.collection_add(tmp0, cid, *i);
602
store->apply_transaction(t);
606
ObjectStore::Transaction t;
607
t.collection_rename(cid, tmp1);
608
t.collection_rename(tmp0, cid);
609
store->apply_transaction(t);
612
recursive_remove_collection(store, tmp1);
613
store->sync_and_flush();
618
int OSD::do_convertfs(ObjectStore *store)
620
int r = store->mount();
625
r = store->version_stamp_is_valid(&version);
629
return store->umount();
631
derr << "ObjectStore is old at version " << version << ". Updating..." << dendl;
633
derr << "Removing tmp pgs" << dendl;
634
vector<coll_t> collections;
635
r = store->list_collections(collections);
638
for (vector<coll_t>::iterator i = collections.begin();
639
i != collections.end();
642
if (i->is_temp(pgid))
643
recursive_remove_collection(store, *i);
644
else if (i->to_str() == "convertfs_temp" ||
645
i->to_str() == "convertfs_temp1")
646
recursive_remove_collection(store, *i);
651
derr << "Getting collections" << dendl;
653
derr << collections.size() << " to process." << dendl;
655
r = store->list_collections(collections);
659
for (vector<coll_t>::iterator i = collections.begin();
660
i != collections.end();
662
derr << processed << "/" << collections.size() << " processed" << dendl;
663
uint32_t collection_version;
664
r = store->collection_version_current(*i, &collection_version);
668
derr << "Collection " << *i << " is up to date" << dendl;
670
derr << "Updating collection " << *i << " current version is "
671
<< collection_version << dendl;
672
r = convert_collection(store, *i);
675
derr << "collection " << *i << " updated" << dendl;
678
derr << "All collections up to date, updating version stamp..." << dendl;
679
r = store->update_version_stamp();
682
store->sync_and_flush();
684
derr << "Version stamp updated, done with upgrade!" << dendl;
685
return store->umount();
688
int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev,
689
uuid_d fsid, int whoami)
694
// if we are fed a uuid for this osd, use it.
695
store->set_fsid(cct->_conf->osd_uuid);
699
derr << "OSD::mkfs: ObjectStore::mkfs failed with error " << ret << dendl;
703
ret = store->mount();
705
derr << "OSD::mkfs: couldn't mount ObjectStore: error " << ret << dendl;
710
if (cct->_conf->osd_age_time != 0) {
711
if (cct->_conf->osd_age_time >= 0) {
712
dout(0) << "aging..." << dendl;
713
Ager ager(cct, store);
714
ager.age(cct->_conf->osd_age_time,
716
cct->_conf->osd_age - .05,
718
cct->_conf->osd_age - .05);
724
ret = store->read(coll_t::META_COLL, OSD_SUPERBLOCK_POBJECT, 0, 0, sbbl);
726
dout(0) << " have superblock" << dendl;
727
if (whoami != sb.whoami) {
728
derr << "provided osd id " << whoami << " != superblock's " << sb.whoami << dendl;
732
if (fsid != sb.cluster_fsid) {
733
derr << "provided cluster fsid " << fsid << " != superblock's " << sb.cluster_fsid << dendl;
739
if (fsid.is_zero()) {
740
derr << "must specify cluster fsid" << dendl;
745
sb.cluster_fsid = fsid;
746
sb.osd_fsid = store->get_fsid();
748
sb.compat_features = get_osd_initial_compat_set();
751
if (cct->_conf->osd_auto_weight) {
753
bufferptr bp(1048576);
756
dout(0) << "testing disk bandwidth..." << dendl;
757
utime_t start = ceph_clock_now(cct);
758
object_t oid("disk_bw_test");
759
for (int i=0; i<1000; i++) {
760
ObjectStore::Transaction *t = new ObjectStore::Transaction;
761
t->write(coll_t::META_COLL, hobject_t(sobject_t(oid, 0)), i*bl.length(), bl.length(), bl);
762
store->queue_transaction_and_cleanup(NULL, t);
765
utime_t end = ceph_clock_now(cct);
767
dout(0) << "measured " << (1000.0 / (double)end) << " mb/sec" << dendl;
768
ObjectStore::Transaction tr;
769
tr.remove(coll_t::META_COLL, hobject_t(sobject_t(oid, 0)));
770
ret = store->apply_transaction(tr);
772
derr << "OSD::mkfs: error while benchmarking: apply_transaction returned "
778
sb.weight = (1000.0 / (double)end);
784
ObjectStore::Transaction t;
785
t.create_collection(coll_t::META_COLL);
786
t.write(coll_t::META_COLL, OSD_SUPERBLOCK_POBJECT, 0, bl.length(), bl);
787
ret = store->apply_transaction(t);
789
derr << "OSD::mkfs: error while writing OSD_SUPERBLOCK_POBJECT: "
790
<< "apply_transaction returned " << ret << dendl;
795
store->sync_and_flush();
797
ret = write_meta(store, sb.cluster_fsid, sb.osd_fsid, whoami);
799
derr << "OSD::mkfs: failed to write fsid file: error " << ret << dendl;
804
catch (const std::exception &se) {
805
derr << "OSD::mkfs: caught exception " << se.what() << dendl;
809
derr << "OSD::mkfs: caught unknown exception." << dendl;
820
int OSD::write_meta(ObjectStore *store, uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami)
825
snprintf(val, sizeof(val), "%s", CEPH_OSD_ONDISK_MAGIC);
826
r = store->write_meta("magic", val);
830
snprintf(val, sizeof(val), "%d", whoami);
831
r = store->write_meta("whoami", val);
835
cluster_fsid.print(val);
836
r = store->write_meta("ceph_fsid", val);
840
r = store->write_meta("ready", "ready");
847
int OSD::peek_meta(ObjectStore *store, std::string& magic,
848
uuid_d& cluster_fsid, uuid_d& osd_fsid, int& whoami)
852
int r = store->read_meta("magic", &val);
857
r = store->read_meta("whoami", &val);
860
whoami = atoi(val.c_str());
862
r = store->read_meta("ceph_fsid", &val);
865
r = cluster_fsid.parse(val.c_str());
869
r = store->read_meta("fsid", &val);
873
r = osd_fsid.parse(val.c_str());
883
#define dout_prefix _prefix(_dout, whoami, osdmap)
887
OSD::OSD(CephContext *cct_, ObjectStore *store_,
888
int id, Messenger *internal_messenger, Messenger *external_messenger,
889
Messenger *hb_clientm,
890
Messenger *hb_front_serverm,
891
Messenger *hb_back_serverm,
892
Messenger *osdc_messenger,
894
const std::string &dev, const std::string &jdev) :
896
osd_lock("OSD::osd_lock"),
897
tick_timer(cct, osd_lock),
898
authorize_handler_cluster_registry(new AuthAuthorizeHandlerRegistry(cct,
899
cct->_conf->auth_supported.length() ?
900
cct->_conf->auth_supported :
901
cct->_conf->auth_cluster_required)),
902
authorize_handler_service_registry(new AuthAuthorizeHandlerRegistry(cct,
903
cct->_conf->auth_supported.length() ?
904
cct->_conf->auth_supported :
905
cct->_conf->auth_service_required)),
906
cluster_messenger(internal_messenger),
907
client_messenger(external_messenger),
908
objecter_messenger(osdc_messenger),
911
recoverystate_perf(NULL),
913
clog(cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS),
915
dev_path(dev), journal_path(jdev),
916
dispatch_running(false),
918
osd_compat(get_osd_compat_set()),
919
state(STATE_INITIALIZING), boot_epoch(0), up_epoch(0), bind_epoch(0),
920
op_tp(cct, "OSD::op_tp", cct->_conf->osd_op_threads, "osd_op_threads"),
921
recovery_tp(cct, "OSD::recovery_tp", cct->_conf->osd_recovery_threads, "osd_recovery_threads"),
922
disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"),
923
command_tp(cct, "OSD::command_tp", 1),
924
paused_recovery(false),
925
heartbeat_lock("OSD::heartbeat_lock"),
926
heartbeat_stop(false), heartbeat_need_update(true), heartbeat_epoch(0),
927
hbclient_messenger(hb_clientm),
928
hb_front_server_messenger(hb_front_serverm),
929
hb_back_server_messenger(hb_back_serverm),
930
heartbeat_thread(this),
931
heartbeat_dispatcher(this),
932
stat_lock("OSD::stat_lock"),
933
finished_lock("OSD::finished_lock"),
934
op_tracker(cct, cct->_conf->osd_enable_op_tracker),
936
op_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
937
peering_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
938
map_lock("OSD::map_lock"),
939
peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
940
debug_drop_pg_create_probability(cct->_conf->osd_debug_drop_pg_create_probability),
941
debug_drop_pg_create_duration(cct->_conf->osd_debug_drop_pg_create_duration),
942
debug_drop_pg_create_left(-1),
943
outstanding_pg_stats(false),
944
timeout_mon_on_pg_stats(true),
945
up_thru_wanted(0), up_thru_pending(0),
946
pg_stat_queue_lock("OSD::pg_stat_queue_lock"),
947
osd_stat_updated(false),
948
pg_stat_tid(0), pg_stat_tid_flushed(0),
949
command_wq(this, cct->_conf->osd_command_thread_timeout, &command_tp),
950
recovery_ops_active(0),
951
recovery_wq(this, cct->_conf->osd_recovery_thread_timeout, &recovery_tp),
952
replay_queue_lock("OSD::replay_queue_lock"),
953
snap_trim_wq(this, cct->_conf->osd_snap_trim_thread_timeout, &disk_tp),
954
scrub_wq(this, cct->_conf->osd_scrub_thread_timeout, &disk_tp),
955
scrub_finalize_wq(cct->_conf->osd_scrub_finalize_thread_timeout, &op_tp),
956
rep_scrub_wq(this, cct->_conf->osd_scrub_thread_timeout, &disk_tp),
957
remove_wq(store, cct->_conf->osd_remove_thread_timeout, &disk_tp),
961
monc->set_messenger(client_messenger);
962
op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time,
963
cct->_conf->osd_op_log_threshold);
964
op_tracker.set_history_size_and_duration(cct->_conf->osd_op_history_size,
965
cct->_conf->osd_op_history_duration);
970
delete authorize_handler_cluster_registry;
971
delete authorize_handler_service_registry;
972
delete class_handler;
973
cct->get_perfcounters_collection()->remove(recoverystate_perf);
974
cct->get_perfcounters_collection()->remove(logger);
975
delete recoverystate_perf;
980
void cls_initialize(ClassHandler *ch);
982
void OSD::handle_signal(int signum)
984
assert(signum == SIGINT || signum == SIGTERM);
985
derr << "*** Got signal " << sys_siglist[signum] << " ***" << dendl;
986
//suicide(128 + signum);
992
Mutex::Locker lock(osd_lock);
996
if (store->test_mount_in_use()) {
997
derr << "OSD::pre_init: object store '" << dev_path << "' is "
998
<< "currently in use. (Is ceph-osd already running?)" << dendl;
1002
cct->_conf->add_observer(this);
1008
class OSDSocketHook : public AdminSocketHook {
1011
OSDSocketHook(OSD *o) : osd(o) {}
1012
bool call(std::string command, cmdmap_t& cmdmap, std::string format,
1015
bool r = osd->asok_command(command, cmdmap, format, ss);
1021
bool OSD::asok_command(string command, cmdmap_t& cmdmap, string format,
1024
Formatter *f = new_formatter(format);
1026
f = new_formatter("json-pretty");
1027
if (command == "status") {
1028
f->open_object_section("status");
1029
f->dump_stream("cluster_fsid") << superblock.cluster_fsid;
1030
f->dump_stream("osd_fsid") << superblock.osd_fsid;
1031
f->dump_unsigned("whoami", superblock.whoami);
1032
f->dump_string("state", get_state_name(state));
1033
f->dump_unsigned("oldest_map", superblock.oldest_map);
1034
f->dump_unsigned("newest_map", superblock.newest_map);
1036
f->dump_unsigned("num_pgs", pg_map.size());
1039
} else if (command == "flush_journal") {
1040
store->sync_and_flush();
1041
} else if (command == "dump_ops_in_flight") {
1042
op_tracker.dump_ops_in_flight(f);
1043
} else if (command == "dump_historic_ops") {
1044
op_tracker.dump_historic_ops(f);
1045
} else if (command == "dump_op_pq_state") {
1046
f->open_object_section("pq");
1049
} else if (command == "dump_blacklist") {
1050
list<pair<entity_addr_t,utime_t> > bl;
1051
OSDMapRef curmap = service.get_osdmap();
1053
f->open_array_section("blacklist");
1054
curmap->get_blacklist(&bl);
1055
for (list<pair<entity_addr_t,utime_t> >::iterator it = bl.begin();
1056
it != bl.end(); ++it) {
1057
f->open_array_section("entry");
1058
f->open_object_section("entity_addr_t");
1060
f->close_section(); //entity_addr_t
1061
it->second.localtime(f->dump_stream("expire_time"));
1062
f->close_section(); //entry
1064
f->close_section(); //blacklist
1065
} else if (command == "dump_watchers") {
1066
list<obj_watch_item_t> watchers;
1069
for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
1073
list<obj_watch_item_t> pg_watchers;
1074
PG *pg = it->second;
1076
pg->get_watchers(pg_watchers);
1078
watchers.splice(watchers.end(), pg_watchers);
1082
f->open_array_section("watchers");
1083
for (list<obj_watch_item_t>::iterator it = watchers.begin();
1084
it != watchers.end(); ++it) {
1086
f->open_array_section("watch");
1088
f->dump_string("namespace", it->obj.nspace);
1089
f->dump_string("object", it->obj.oid.name);
1091
f->open_object_section("entity_name");
1092
it->wi.name.dump(f);
1093
f->close_section(); //entity_name_t
1095
f->dump_int("cookie", it->wi.cookie);
1096
f->dump_int("timeout", it->wi.timeout_seconds);
1098
f->open_object_section("entity_addr_t");
1099
it->wi.addr.dump(f);
1100
f->close_section(); //entity_addr_t
1102
f->close_section(); //watch
1105
f->close_section(); //watches
1107
assert(0 == "broken asok registration");
1114
class TestOpsSocketHook : public AdminSocketHook {
1115
OSDService *service;
1118
TestOpsSocketHook(OSDService *s, ObjectStore *st) : service(s), store(st) {}
1119
bool call(std::string command, cmdmap_t& cmdmap, std::string format,
1122
test_ops(service, store, command, cmdmap, ss);
1126
void test_ops(OSDService *service, ObjectStore *store, std::string command,
1127
cmdmap_t& cmdmap, ostream &ss);
1133
CompatSet initial, diff;
1134
Mutex::Locker lock(osd_lock);
1139
service.backfill_request_timer.init();
1142
dout(2) << "mounting " << dev_path << " "
1143
<< (journal_path.empty() ? "(no journal)" : journal_path) << dendl;
1144
assert(store); // call pre_init() first!
1146
int r = store->mount();
1148
derr << "OSD:init: unable to mount object store" << dendl;
1152
dout(2) << "boot" << dendl;
1155
r = read_superblock();
1157
derr << "OSD::init() : unable to read osd superblock" << dendl;
1162
if (osd_compat.compare(superblock.compat_features) < 0) {
1163
derr << "The disk uses features unsupported by the executable." << dendl;
1164
derr << " ondisk features " << superblock.compat_features << dendl;
1165
derr << " daemon features " << osd_compat << dendl;
1167
if (osd_compat.writeable(superblock.compat_features)) {
1168
CompatSet diff = osd_compat.unsupported(superblock.compat_features);
1169
derr << "it is still writeable, though. Missing features: " << diff << dendl;
1174
CompatSet diff = osd_compat.unsupported(superblock.compat_features);
1175
derr << "Cannot write to disk! Missing features: " << diff << dendl;
1181
assert_warn(whoami == superblock.whoami);
1182
if (whoami != superblock.whoami) {
1183
derr << "OSD::init: superblock says osd"
1184
<< superblock.whoami << " but i am osd." << whoami << dendl;
1189
initial = get_osd_initial_compat_set();
1190
diff = superblock.compat_features.unsupported(initial);
1191
if (superblock.compat_features.merge(initial)) {
1192
// We need to persist the new compat_set before we
1194
dout(5) << "Upgrading superblock adding: " << diff << dendl;
1195
ObjectStore::Transaction t;
1196
write_superblock(t);
1197
r = store->apply_transaction(t);
1202
// make sure info object exists
1203
if (!store->exists(coll_t::META_COLL, service.infos_oid)) {
1204
dout(10) << "init creating/touching snapmapper object" << dendl;
1205
ObjectStore::Transaction t;
1206
t.touch(coll_t::META_COLL, service.infos_oid);
1207
r = store->apply_transaction(t);
1212
// make sure snap mapper object exists
1213
if (!store->exists(coll_t::META_COLL, OSD::make_snapmapper_oid())) {
1214
dout(10) << "init creating/touching infos object" << dendl;
1215
ObjectStore::Transaction t;
1216
t.touch(coll_t::META_COLL, OSD::make_snapmapper_oid());
1217
r = store->apply_transaction(t);
1222
class_handler = new ClassHandler(cct);
1223
cls_initialize(class_handler);
1225
if (cct->_conf->osd_open_classes_on_start) {
1226
int r = class_handler->open_all_classes();
1228
dout(1) << "warning: got an error loading one or more classes: " << cpp_strerror(r) << dendl;
1231
// load up "current" osdmap
1232
assert_warn(!osdmap);
1234
derr << "OSD::init: unable to read current osdmap" << dendl;
1238
osdmap = get_map(superblock.current_epoch);
1239
check_osdmap_features(store);
1241
create_recoverystate_perf();
1243
bind_epoch = osdmap->get_epoch();
1245
// load up pgs (as they previously existed)
1248
dout(2) << "superblock: i am osd." << superblock.whoami << dendl;
1253
client_messenger->add_dispatcher_head(this);
1254
cluster_messenger->add_dispatcher_head(this);
1256
hbclient_messenger->add_dispatcher_head(&heartbeat_dispatcher);
1257
hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
1258
hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
1260
objecter_messenger->add_dispatcher_head(&service.objecter_dispatcher);
1262
monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
1267
// tell monc about log_client so it will know about mon session resets
1268
monc->set_log_client(&clog);
1271
recovery_tp.start();
1275
// start the heartbeat
1276
heartbeat_thread.create();
1279
tick_timer.add_event_after(cct->_conf->osd_heartbeat_interval, new C_Tick(this));
1282
service.publish_map(osdmap);
1283
service.publish_superblock(superblock);
1287
r = monc->authenticate();
1289
osd_lock.Lock(); // locker is going to unlock this on function exit
1295
while (monc->wait_auth_rotating(30.0) < 0) {
1296
derr << "unable to obtain rotating service keys; retrying" << dendl;
1303
dout(10) << "ensuring pgs have consumed prior maps" << dendl;
1307
dout(0) << "done with init, starting boot process" << dendl;
1308
state = STATE_BOOTING;
1321
void OSD::final_init()
1324
AdminSocket *admin_socket = cct->get_admin_socket();
1325
asok_hook = new OSDSocketHook(this);
1326
r = admin_socket->register_command("status", "status", asok_hook,
1327
"high-level status of OSD");
1329
r = admin_socket->register_command("flush_journal", "flush_journal",
1331
"flush the journal to permanent store");
1333
r = admin_socket->register_command("dump_ops_in_flight",
1334
"dump_ops_in_flight", asok_hook,
1335
"show the ops currently in flight");
1337
r = admin_socket->register_command("dump_historic_ops", "dump_historic_ops",
1339
"show slowest recent ops");
1341
r = admin_socket->register_command("dump_op_pq_state", "dump_op_pq_state",
1343
"dump op priority queue state");
1345
r = admin_socket->register_command("dump_blacklist", "dump_blacklist",
1347
"dump blacklisted clients and times");
1349
r = admin_socket->register_command("dump_watchers", "dump_watchers",
1351
"show clients which have active watches,"
1352
" and on which objects");
1355
test_ops_hook = new TestOpsSocketHook(&(this->service), this->store);
1356
// Note: pools are CephString instead of CephPoolname because
1357
// these commands traditionally support both pool names and numbers
1358
r = admin_socket->register_command(
1361
"name=pool,type=CephString " \
1362
"name=objname,type=CephObjectname " \
1363
"name=key,type=CephString "\
1364
"name=val,type=CephString",
1368
r = admin_socket->register_command(
1371
"name=pool,type=CephString " \
1372
"name=objname,type=CephObjectname " \
1373
"name=key,type=CephString",
1377
r = admin_socket->register_command(
1380
"name=pool,type=CephString " \
1381
"name=objname,type=CephObjectname " \
1382
"name=header,type=CephString",
1387
r = admin_socket->register_command(
1390
"name=pool,type=CephString " \
1391
"name=objname,type=CephObjectname",
1393
"output entire object map");
1396
r = admin_socket->register_command(
1399
"name=pool,type=CephString " \
1400
"name=objname,type=CephObjectname " \
1401
"name=len,type=CephInt",
1403
"truncate object to length");
1406
r = admin_socket->register_command(
1409
"name=pool,type=CephString " \
1410
"name=objname,type=CephObjectname",
1412
"inject data error into omap");
1415
r = admin_socket->register_command(
1418
"name=pool,type=CephString " \
1419
"name=objname,type=CephObjectname",
1421
"inject metadata error");
1425
void OSD::create_logger()
1427
dout(10) << "create_logger" << dendl;
1429
PerfCountersBuilder osd_plb(cct, "osd", l_osd_first, l_osd_last);
1431
osd_plb.add_u64(l_osd_opq, "opq"); // op queue length (waiting to be processed yet)
1432
osd_plb.add_u64(l_osd_op_wip, "op_wip"); // rep ops currently being processed (primary)
1434
osd_plb.add_u64_counter(l_osd_op, "op"); // client ops
1435
osd_plb.add_u64_counter(l_osd_op_inb, "op_in_bytes"); // client op in bytes (writes)
1436
osd_plb.add_u64_counter(l_osd_op_outb, "op_out_bytes"); // client op out bytes (reads)
1437
osd_plb.add_time_avg(l_osd_op_lat, "op_latency"); // client op latency
1438
osd_plb.add_time_avg(l_osd_op_process_lat, "op_process_latency"); // client op process latency
1440
osd_plb.add_u64_counter(l_osd_op_r, "op_r"); // client reads
1441
osd_plb.add_u64_counter(l_osd_op_r_outb, "op_r_out_bytes"); // client read out bytes
1442
osd_plb.add_time_avg(l_osd_op_r_lat, "op_r_latency"); // client read latency
1443
osd_plb.add_time_avg(l_osd_op_r_process_lat, "op_r_process_latency"); // client read process latency
1444
osd_plb.add_u64_counter(l_osd_op_w, "op_w"); // client writes
1445
osd_plb.add_u64_counter(l_osd_op_w_inb, "op_w_in_bytes"); // client write in bytes
1446
osd_plb.add_time_avg(l_osd_op_w_rlat, "op_w_rlat"); // client write readable/applied latency
1447
osd_plb.add_time_avg(l_osd_op_w_lat, "op_w_latency"); // client write latency
1448
osd_plb.add_time_avg(l_osd_op_w_process_lat, "op_w_process_latency"); // client write process latency
1449
osd_plb.add_u64_counter(l_osd_op_rw, "op_rw"); // client rmw
1450
osd_plb.add_u64_counter(l_osd_op_rw_inb, "op_rw_in_bytes"); // client rmw in bytes
1451
osd_plb.add_u64_counter(l_osd_op_rw_outb,"op_rw_out_bytes"); // client rmw out bytes
1452
osd_plb.add_time_avg(l_osd_op_rw_rlat,"op_rw_rlat"); // client rmw readable/applied latency
1453
osd_plb.add_time_avg(l_osd_op_rw_lat, "op_rw_latency"); // client rmw latency
1454
osd_plb.add_time_avg(l_osd_op_rw_process_lat, "op_rw_process_latency"); // client rmw process latency
1456
osd_plb.add_u64_counter(l_osd_sop, "subop"); // subops
1457
osd_plb.add_u64_counter(l_osd_sop_inb, "subop_in_bytes"); // subop in bytes
1458
osd_plb.add_time_avg(l_osd_sop_lat, "subop_latency"); // subop latency
1460
osd_plb.add_u64_counter(l_osd_sop_w, "subop_w"); // replicated (client) writes
1461
osd_plb.add_u64_counter(l_osd_sop_w_inb, "subop_w_in_bytes"); // replicated write in bytes
1462
osd_plb.add_time_avg(l_osd_sop_w_lat, "subop_w_latency"); // replicated write latency
1463
osd_plb.add_u64_counter(l_osd_sop_pull, "subop_pull"); // pull request
1464
osd_plb.add_time_avg(l_osd_sop_pull_lat, "subop_pull_latency");
1465
osd_plb.add_u64_counter(l_osd_sop_push, "subop_push"); // push (write)
1466
osd_plb.add_u64_counter(l_osd_sop_push_inb, "subop_push_in_bytes");
1467
osd_plb.add_time_avg(l_osd_sop_push_lat, "subop_push_latency");
1469
osd_plb.add_u64_counter(l_osd_pull, "pull"); // pull requests sent
1470
osd_plb.add_u64_counter(l_osd_push, "push"); // push messages
1471
osd_plb.add_u64_counter(l_osd_push_outb, "push_out_bytes"); // pushed bytes
1473
osd_plb.add_u64_counter(l_osd_push_in, "push_in"); // inbound push messages
1474
osd_plb.add_u64_counter(l_osd_push_inb, "push_in_bytes"); // inbound pushed bytes
1476
osd_plb.add_u64_counter(l_osd_rop, "recovery_ops"); // recovery ops (started)
1478
osd_plb.add_u64(l_osd_loadavg, "loadavg");
1479
osd_plb.add_u64(l_osd_buf, "buffer_bytes"); // total ceph::buffer bytes
1481
osd_plb.add_u64(l_osd_pg, "numpg"); // num pgs
1482
osd_plb.add_u64(l_osd_pg_primary, "numpg_primary"); // num primary pgs
1483
osd_plb.add_u64(l_osd_pg_replica, "numpg_replica"); // num replica pgs
1484
osd_plb.add_u64(l_osd_pg_stray, "numpg_stray"); // num stray pgs
1485
osd_plb.add_u64(l_osd_hb_to, "heartbeat_to_peers"); // heartbeat peers we send to
1486
osd_plb.add_u64(l_osd_hb_from, "heartbeat_from_peers"); // heartbeat peers we recv from
1487
osd_plb.add_u64_counter(l_osd_map, "map_messages"); // osdmap messages
1488
osd_plb.add_u64_counter(l_osd_mape, "map_message_epochs"); // osdmap epochs
1489
osd_plb.add_u64_counter(l_osd_mape_dup, "map_message_epoch_dups"); // dup osdmap epochs
1490
osd_plb.add_u64_counter(l_osd_waiting_for_map,
1491
"messages_delayed_for_map"); // dup osdmap epochs
1493
osd_plb.add_u64(l_osd_stat_bytes, "stat_bytes");
1494
osd_plb.add_u64(l_osd_stat_bytes_used, "stat_bytes_used");
1495
osd_plb.add_u64(l_osd_stat_bytes_avail, "stat_bytes_avail");
1497
osd_plb.add_u64_counter(l_osd_copyfrom, "copyfrom");
1499
osd_plb.add_u64_counter(l_osd_tier_promote, "tier_promote");
1500
osd_plb.add_u64_counter(l_osd_tier_flush, "tier_flush");
1501
osd_plb.add_u64_counter(l_osd_tier_flush_fail, "tier_flush_fail");
1502
osd_plb.add_u64_counter(l_osd_tier_try_flush, "tier_try_flush");
1503
osd_plb.add_u64_counter(l_osd_tier_try_flush_fail, "tier_try_flush_fail");
1504
osd_plb.add_u64_counter(l_osd_tier_evict, "tier_evict");
1505
osd_plb.add_u64_counter(l_osd_tier_whiteout, "tier_whiteout");
1506
osd_plb.add_u64_counter(l_osd_tier_dirty, "tier_dirty");
1507
osd_plb.add_u64_counter(l_osd_tier_clean, "tier_clean");
1508
osd_plb.add_u64_counter(l_osd_tier_delay, "tier_delay");
1510
osd_plb.add_u64_counter(l_osd_agent_wake, "agent_wake");
1511
osd_plb.add_u64_counter(l_osd_agent_skip, "agent_skip");
1512
osd_plb.add_u64_counter(l_osd_agent_flush, "agent_flush");
1513
osd_plb.add_u64_counter(l_osd_agent_evict, "agent_evict");
1515
logger = osd_plb.create_perf_counters();
1516
cct->get_perfcounters_collection()->add(logger);
1519
void OSD::create_recoverystate_perf()
1521
dout(10) << "create_recoverystate_perf" << dendl;
1523
PerfCountersBuilder rs_perf(cct, "recoverystate_perf", rs_first, rs_last);
1525
rs_perf.add_time_avg(rs_initial_latency, "initial_latency");
1526
rs_perf.add_time_avg(rs_started_latency, "started_latency");
1527
rs_perf.add_time_avg(rs_reset_latency, "reset_latency");
1528
rs_perf.add_time_avg(rs_start_latency, "start_latency");
1529
rs_perf.add_time_avg(rs_primary_latency, "primary_latency");
1530
rs_perf.add_time_avg(rs_peering_latency, "peering_latency");
1531
rs_perf.add_time_avg(rs_backfilling_latency, "backfilling_latency");
1532
rs_perf.add_time_avg(rs_waitremotebackfillreserved_latency, "waitremotebackfillreserved_latency");
1533
rs_perf.add_time_avg(rs_waitlocalbackfillreserved_latency, "waitlocalbackfillreserved_latency");
1534
rs_perf.add_time_avg(rs_notbackfilling_latency, "notbackfilling_latency");
1535
rs_perf.add_time_avg(rs_repnotrecovering_latency, "repnotrecovering_latency");
1536
rs_perf.add_time_avg(rs_repwaitrecoveryreserved_latency, "repwaitrecoveryreserved_latency");
1537
rs_perf.add_time_avg(rs_repwaitbackfillreserved_latency, "repwaitbackfillreserved_latency");
1538
rs_perf.add_time_avg(rs_RepRecovering_latency, "RepRecovering_latency");
1539
rs_perf.add_time_avg(rs_activating_latency, "activating_latency");
1540
rs_perf.add_time_avg(rs_waitlocalrecoveryreserved_latency, "waitlocalrecoveryreserved_latency");
1541
rs_perf.add_time_avg(rs_waitremoterecoveryreserved_latency, "waitremoterecoveryreserved_latency");
1542
rs_perf.add_time_avg(rs_recovering_latency, "recovering_latency");
1543
rs_perf.add_time_avg(rs_recovered_latency, "recovered_latency");
1544
rs_perf.add_time_avg(rs_clean_latency, "clean_latency");
1545
rs_perf.add_time_avg(rs_active_latency, "active_latency");
1546
rs_perf.add_time_avg(rs_replicaactive_latency, "replicaactive_latency");
1547
rs_perf.add_time_avg(rs_stray_latency, "stray_latency");
1548
rs_perf.add_time_avg(rs_getinfo_latency, "getinfo_latency");
1549
rs_perf.add_time_avg(rs_getlog_latency, "getlog_latency");
1550
rs_perf.add_time_avg(rs_waitactingchange_latency, "waitactingchange_latency");
1551
rs_perf.add_time_avg(rs_incomplete_latency, "incomplete_latency");
1552
rs_perf.add_time_avg(rs_getmissing_latency, "getmissing_latency");
1553
rs_perf.add_time_avg(rs_waitupthru_latency, "waitupthru_latency");
1555
recoverystate_perf = rs_perf.create_perf_counters();
1556
cct->get_perfcounters_collection()->add(recoverystate_perf);
1559
void OSD::suicide(int exitcode)
1561
if (cct->_conf->filestore_blackhole) {
1562
derr << " filestore_blackhole=true, doing abbreviated shutdown" << dendl;
1566
// turn off lockdep; the surviving threads tend to fight with exit() below
1569
derr << " pausing thread pools" << dendl;
1572
recovery_tp.pause();
1575
derr << " flushing io" << dendl;
1576
store->sync_and_flush();
1578
derr << " removing pid file" << dendl;
1581
derr << " exit" << dendl;
1587
if (!service.prepare_to_stop())
1588
return 0; // already shutting down
1590
if (is_stopping()) {
1594
derr << "shutdown" << dendl;
1596
heartbeat_lock.Lock();
1597
state = STATE_STOPPING;
1598
heartbeat_lock.Unlock();
1601
cct->_conf->set_val("debug_osd", "100");
1602
cct->_conf->set_val("debug_journal", "100");
1603
cct->_conf->set_val("debug_filestore", "100");
1604
cct->_conf->set_val("debug_ms", "100");
1605
cct->_conf->apply_changes(NULL);
1608
for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
1611
dout(20) << " kicking pg " << p->first << dendl;
1613
p->second->on_shutdown();
1614
p->second->unlock();
1615
p->second->osr->flush();
1619
op_wq.drain(); // should already be empty except for lagard PGs
1621
Mutex::Locker l(finished_lock);
1622
finished.clear(); // zap waiters (bleh, this is messy)
1625
// unregister commands
1626
cct->get_admin_socket()->unregister_command("status");
1627
cct->get_admin_socket()->unregister_command("flush_journal");
1628
cct->get_admin_socket()->unregister_command("dump_ops_in_flight");
1629
cct->get_admin_socket()->unregister_command("dump_historic_ops");
1630
cct->get_admin_socket()->unregister_command("dump_op_pq_state");
1631
cct->get_admin_socket()->unregister_command("dump_blacklist");
1632
cct->get_admin_socket()->unregister_command("dump_watchers");
1636
cct->get_admin_socket()->unregister_command("setomapval");
1637
cct->get_admin_socket()->unregister_command("rmomapkey");
1638
cct->get_admin_socket()->unregister_command("setomapheader");
1639
cct->get_admin_socket()->unregister_command("getomap");
1640
cct->get_admin_socket()->unregister_command("truncobj");
1641
cct->get_admin_socket()->unregister_command("injectdataerr");
1642
cct->get_admin_socket()->unregister_command("injectmdataerr");
1643
delete test_ops_hook;
1644
test_ops_hook = NULL;
1648
heartbeat_lock.Lock();
1649
heartbeat_stop = true;
1650
heartbeat_cond.Signal();
1651
heartbeat_lock.Unlock();
1652
heartbeat_thread.join();
1654
recovery_tp.drain();
1656
dout(10) << "recovery tp stopped" << dendl;
1660
dout(10) << "op tp stopped" << dendl;
1664
dout(10) << "command tp stopped" << dendl;
1668
dout(10) << "disk tp paused (new)" << dendl;
1670
dout(10) << "stopping agent" << dendl;
1671
service.agent_stop();
1675
reset_heartbeat_peers();
1677
tick_timer.shutdown();
1679
// note unmount epoch
1680
dout(10) << "noting clean unmount in epoch " << osdmap->get_epoch() << dendl;
1681
superblock.mounted = boot_epoch;
1682
superblock.clean_thru = osdmap->get_epoch();
1683
ObjectStore::Transaction t;
1684
write_superblock(t);
1685
int r = store->apply_transaction(t);
1687
derr << "OSD::shutdown: error writing superblock: "
1688
<< cpp_strerror(r) << dendl;
1691
dout(10) << "syncing store" << dendl;
1697
dout(10) << "Store synced" << dendl;
1700
Mutex::Locker l(pg_stat_queue_lock);
1701
assert(pg_stat_queue.empty());
1706
#ifdef PG_DEBUG_REFS
1707
service.dump_live_pgids();
1709
for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
1712
dout(20) << " kicking pg " << p->first << dendl;
1714
if (p->second->ref.read() != 1) {
1715
derr << "pgid " << p->first << " has ref count of "
1716
<< p->second->ref.read() << dendl;
1719
p->second->unlock();
1720
p->second->put("PGMap");
1723
#ifdef PG_DEBUG_REFS
1724
service.dump_live_pgids();
1726
cct->_conf->remove_observer(this);
1731
osdmap = OSDMapRef();
1733
op_tracker.on_shutdown();
1735
class_handler->shutdown();
1736
client_messenger->shutdown();
1737
cluster_messenger->shutdown();
1738
hbclient_messenger->shutdown();
1739
objecter_messenger->shutdown();
1740
hb_front_server_messenger->shutdown();
1741
hb_back_server_messenger->shutdown();
1746
void OSD::write_superblock(ObjectStore::Transaction& t)
1748
dout(10) << "write_superblock " << superblock << dendl;
1750
//hack: at minimum it's using the baseline feature set
1751
if (!superblock.compat_features.incompat.mask |
1752
CEPH_OSD_FEATURE_INCOMPAT_BASE.id)
1753
superblock.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_BASE);
1756
::encode(superblock, bl);
1757
t.write(coll_t::META_COLL, OSD_SUPERBLOCK_POBJECT, 0, bl.length(), bl);
1760
int OSD::read_superblock()
1763
int r = store->read(coll_t::META_COLL, OSD_SUPERBLOCK_POBJECT, 0, 0, bl);
1767
bufferlist::iterator p = bl.begin();
1768
::decode(superblock, p);
1770
dout(10) << "read_superblock " << superblock << dendl;
1777
void OSD::recursive_remove_collection(ObjectStore *store, coll_t tmp)
1782
make_snapmapper_oid());
1785
tmp.is_pg_prefix(pg);
1787
ObjectStore::Transaction t;
1788
SnapMapper mapper(&driver, 0, 0, 0, pg.shard);
1790
vector<ghobject_t> objects;
1791
store->collection_list(tmp, objects);
1794
unsigned removed = 0;
1795
for (vector<ghobject_t>::iterator p = objects.begin();
1798
OSDriver::OSTransaction _t(driver.get_transaction(&t));
1799
int r = mapper.remove_oid(p->hobj, &_t);
1800
if (r != 0 && r != -ENOENT)
1802
t.collection_remove(tmp, *p);
1803
if (removed > 300) {
1804
int r = store->apply_transaction(t);
1806
t = ObjectStore::Transaction();
1810
t.remove_collection(tmp);
1811
int r = store->apply_transaction(t);
1813
store->sync_and_flush();
1817
// ======================================================
1820
PGPool OSD::_get_pool(int id, OSDMapRef createmap)
1822
if (!createmap->have_pg_pool(id)) {
1823
dout(5) << __func__ << ": the OSDmap does not contain a PG pool with id = "
1828
PGPool p = PGPool(id, createmap->get_pool_name(id),
1829
createmap->get_pg_pool(id)->auid);
1831
const pg_pool_t *pi = createmap->get_pg_pool(id);
1833
p.snapc = pi->get_snap_context();
1835
pi->build_removed_snaps(p.cached_removed_snaps);
1836
dout(10) << "_get_pool " << p.id << dendl;
1840
PG *OSD::_open_lock_pg(
1841
OSDMapRef createmap,
1842
spg_t pgid, bool no_lockdep_check, bool hold_map_lock)
1844
assert(osd_lock.is_locked());
1846
PG* pg = _make_pg(createmap, pgid);
1850
pg->lock(no_lockdep_check);
1851
pg->get("PGMap"); // because it's in pg_map
1856
OSDMapRef createmap,
1859
dout(10) << "_open_lock_pg " << pgid << dendl;
1860
PGPool pool = _get_pool(pgid.pool(), createmap);
1864
hobject_t logoid = make_pg_log_oid(pgid);
1865
hobject_t infooid = make_pg_biginfo_oid(pgid);
1866
if (createmap->get_pg_type(pgid.pgid) == pg_pool_t::TYPE_REPLICATED ||
1867
createmap->get_pg_type(pgid.pgid) == pg_pool_t::TYPE_ERASURE)
1868
pg = new ReplicatedPG(&service, createmap, pool, pgid, logoid, infooid);
1876
void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
1878
epoch_t e(service.get_osdmap()->get_epoch());
1879
pg->get("PGMap"); // For pg_map
1880
pg_map[pg->info.pgid] = pg;
1881
dout(10) << "Adding newly split pg " << *pg << dendl;
1882
vector<int> up, acting;
1883
pg->get_osdmap()->pg_to_up_acting_osds(pg->info.pgid.pgid, up, acting);
1884
int role = OSDMap::calc_pg_role(service.whoami, acting);
1886
pg->reg_next_scrub();
1887
pg->handle_loaded(rctx);
1888
pg->write_if_dirty(*(rctx->transaction));
1889
pg->queue_null(e, e);
1890
map<spg_t, list<PG::CephPeeringEvtRef> >::iterator to_wake =
1891
peering_wait_for_split.find(pg->info.pgid);
1892
if (to_wake != peering_wait_for_split.end()) {
1893
for (list<PG::CephPeeringEvtRef>::iterator i =
1894
to_wake->second.begin();
1895
i != to_wake->second.end();
1897
pg->queue_peering_event(*i);
1899
peering_wait_for_split.erase(to_wake);
1901
wake_pg_waiters(pg->info.pgid);
1902
if (!service.get_osdmap()->have_pg_pool(pg->info.pgid.pool()))
1906
OSD::res_result OSD::_try_resurrect_pg(
1907
OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state)
1909
assert(resurrected);
1910
assert(old_pg_state);
1911
// find nearest ancestor
1912
DeletingStateRef df;
1915
df = service.deleting_pgs.lookup(cur);
1920
cur = cur.get_parent();
1923
return RES_NONE; // good to go
1925
df->old_pg_state->lock();
1926
OSDMapRef create_map = df->old_pg_state->get_osdmap();
1927
df->old_pg_state->unlock();
1929
set<spg_t> children;
1931
if (df->try_stop_deletion()) {
1932
dout(10) << __func__ << ": halted deletion on pg " << pgid << dendl;
1934
*old_pg_state = df->old_pg_state;
1935
service.deleting_pgs.remove(pgid); // PG is no longer being removed!
1938
// raced, ensure we don't see DeletingStateRef when we try to
1940
service.deleting_pgs.remove(pgid);
1943
} else if (cur.is_split(create_map->get_pg_num(cur.pool()),
1944
curmap->get_pg_num(cur.pool()),
1946
children.count(pgid)) {
1947
if (df->try_stop_deletion()) {
1948
dout(10) << __func__ << ": halted deletion on ancestor pg " << pgid
1951
*old_pg_state = df->old_pg_state;
1952
service.deleting_pgs.remove(cur); // PG is no longer being removed!
1955
/* this is not a problem, failing to cancel proves that all objects
1956
* have been removed, so no hobject_t overlap is possible
1964
PG *OSD::_create_lock_pg(
1965
OSDMapRef createmap,
1971
vector<int>& up, int up_primary,
1972
vector<int>& acting, int acting_primary,
1973
pg_history_t history,
1974
pg_interval_map_t& pi,
1975
ObjectStore::Transaction& t)
1977
assert(osd_lock.is_locked());
1978
dout(20) << "_create_lock_pg pgid " << pgid << dendl;
1980
PG *pg = _open_lock_pg(createmap, pgid, true, hold_map_lock);
1982
service.init_splits_between(pgid, pg->get_osdmap(), service.get_osdmap());
1995
dout(7) << "_create_lock_pg " << *pg << dendl;
2000
bool OSD::_have_pg(spg_t pgid)
2002
assert(osd_lock.is_locked());
2003
return pg_map.count(pgid);
2006
PG *OSD::_lookup_lock_pg(spg_t pgid)
2008
assert(osd_lock.is_locked());
2009
if (!pg_map.count(pgid))
2011
PG *pg = pg_map[pgid];
2017
PG *OSD::_lookup_pg(spg_t pgid)
2019
assert(osd_lock.is_locked());
2020
if (!pg_map.count(pgid))
2022
PG *pg = pg_map[pgid];
2026
PG *OSD::_lookup_lock_pg_with_map_lock_held(spg_t pgid)
2028
assert(osd_lock.is_locked());
2029
assert(pg_map.count(pgid));
2030
PG *pg = pg_map[pgid];
2035
void OSD::load_pgs()
2037
assert(osd_lock.is_locked());
2038
dout(0) << "load_pgs" << dendl;
2039
assert(pg_map.empty());
2042
int r = store->list_collections(ls);
2044
derr << "failed to list pgs: " << cpp_strerror(-r) << dendl;
2047
set<spg_t> head_pgs;
2048
map<spg_t, interval_set<snapid_t> > pgs;
2049
for (vector<coll_t>::iterator it = ls.begin();
2056
if (it->is_temp(pgid) ||
2057
it->is_removal(&seq, &pgid)) {
2058
dout(10) << "load_pgs " << *it << " clearing temp" << dendl;
2059
recursive_remove_collection(store, *it);
2063
if (it->is_pg(pgid, snap)) {
2064
if (snap != CEPH_NOSNAP) {
2065
dout(10) << "load_pgs skipping snapped dir " << *it
2066
<< " (pg " << pgid << " snap " << snap << ")" << dendl;
2067
pgs[pgid].insert(snap);
2070
head_pgs.insert(pgid);
2075
dout(10) << "load_pgs ignoring unrecognized " << *it << dendl;
2078
bool has_upgraded = false;
2079
for (map<spg_t, interval_set<snapid_t> >::iterator i = pgs.begin();
2082
spg_t pgid(i->first);
2084
if (!head_pgs.count(pgid)) {
2085
dout(10) << __func__ << ": " << pgid << " has orphan snap collections " << i->second
2086
<< " with no head" << dendl;
2090
if (!osdmap->have_pg_pool(pgid.pool())) {
2091
dout(10) << __func__ << ": skipping PG " << pgid << " because we don't have pool "
2092
<< pgid.pool() << dendl;
2096
if (pgid.preferred() >= 0) {
2097
dout(10) << __func__ << ": skipping localized PG " << pgid << dendl;
2098
// FIXME: delete it too, eventually
2102
dout(10) << "pgid " << pgid << " coll " << coll_t(pgid) << dendl;
2104
epoch_t map_epoch = PG::peek_map_epoch(store, coll_t(pgid), service.infos_oid, &bl);
2106
PG *pg = _open_lock_pg(map_epoch == 0 ? osdmap : service.get_map(map_epoch), pgid);
2108
// read pg state, log
2109
pg->read_state(store, bl);
2111
if (pg->must_upgrade()) {
2112
if (!has_upgraded) {
2113
derr << "PGs are upgrading" << dendl;
2114
has_upgraded = true;
2116
dout(10) << "PG " << pg->info.pgid
2117
<< " must upgrade..." << dendl;
2118
pg->upgrade(store, i->second);
2119
} else if (!i->second.empty()) {
2120
// handle upgrade bug
2121
for (interval_set<snapid_t>::iterator j = i->second.begin();
2122
j != i->second.end();
2124
for (snapid_t k = j.get_start();
2125
k != j.get_start() + j.get_len();
2127
assert(store->collection_empty(coll_t(pgid, k)));
2128
ObjectStore::Transaction t;
2129
t.remove_collection(coll_t(pgid, k));
2130
store->apply_transaction(t);
2135
if (!pg->snap_collections.empty()) {
2136
pg->snap_collections.clear();
2137
pg->dirty_big_info = true;
2138
pg->dirty_info = true;
2139
ObjectStore::Transaction t;
2140
pg->write_if_dirty(t);
2141
store->apply_transaction(t);
2144
service.init_splits_between(pg->info.pgid, pg->get_osdmap(), osdmap);
2146
// generate state for PG's current mapping
2147
int primary, up_primary;
2148
vector<int> acting, up;
2149
pg->get_osdmap()->pg_to_up_acting_osds(
2150
pgid.pgid, &up, &up_primary, &acting, &primary);
2151
pg->init_primary_up_acting(
2156
int role = OSDMap::calc_pg_role(whoami, pg->acting);
2159
pg->reg_next_scrub();
2161
PG::RecoveryCtx rctx(0, 0, 0, 0, 0, 0);
2162
pg->handle_loaded(&rctx);
2164
dout(10) << "load_pgs loaded " << *pg << " " << pg->pg_log.get_log() << dendl;
2167
dout(0) << "load_pgs opened " << pg_map.size() << " pgs" << dendl;
2169
build_past_intervals_parallel();
2174
* build past_intervals efficiently on old, degraded, and buried
2175
* clusters. this is important for efficiently catching up osds that
2176
* are way behind on maps to the current cluster state.
2178
* this is a parallel version of PG::generate_past_intervals().
2179
* follow the same logic, but do all pgs at the same time so that we
2180
* can make a single pass across the osdmap history.
2184
vector<int> old_acting, old_up;
2185
epoch_t same_interval_since;
2190
void OSD::build_past_intervals_parallel()
2192
map<PG*,pistate> pis;
2194
// calculate untion of map range
2195
epoch_t end_epoch = superblock.oldest_map;
2196
epoch_t cur_epoch = superblock.newest_map;
2197
for (ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.begin();
2203
if (!pg->_calc_past_interval_range(&start, &end))
2206
dout(10) << pg->info.pgid << " needs " << start << "-" << end << dendl;
2207
pistate& p = pis[pg];
2210
p.same_interval_since = 0;
2212
if (start < cur_epoch)
2214
if (end > end_epoch)
2218
dout(10) << __func__ << " nothing to build" << dendl;
2222
dout(1) << __func__ << " over " << cur_epoch << "-" << end_epoch << dendl;
2223
assert(cur_epoch <= end_epoch);
2225
OSDMapRef cur_map, last_map;
2226
for ( ; cur_epoch <= end_epoch; cur_epoch++) {
2227
dout(10) << __func__ << " epoch " << cur_epoch << dendl;
2229
cur_map = get_map(cur_epoch);
2231
for (map<PG*,pistate>::iterator i = pis.begin(); i != pis.end(); ++i) {
2233
pistate& p = i->second;
2235
if (cur_epoch < p.start || cur_epoch > p.end)
2238
vector<int> acting, up;
2241
cur_map->pg_to_up_acting_osds(
2242
pg->info.pgid.pgid, &up, &up_primary, &acting, &primary);
2244
if (p.same_interval_since == 0) {
2245
dout(10) << __func__ << " epoch " << cur_epoch << " pg " << pg->info.pgid
2246
<< " first map, acting " << acting
2247
<< " up " << up << ", same_interval_since = " << cur_epoch << dendl;
2248
p.same_interval_since = cur_epoch;
2250
p.old_acting = acting;
2251
p.primary = primary;
2252
p.up_primary = up_primary;
2257
std::stringstream debug;
2258
bool new_interval = pg_interval_t::check_new_interval(
2261
p.old_acting, acting,
2265
p.same_interval_since,
2266
pg->info.history.last_epoch_clean,
2268
pg->info.pgid.pool(),
2270
&pg->past_intervals,
2273
dout(10) << __func__ << " epoch " << cur_epoch << " pg " << pg->info.pgid
2274
<< " " << debug.str() << dendl;
2276
p.old_acting = acting;
2277
p.same_interval_since = cur_epoch;
2282
// write info only at the end. this is necessary because we check
2283
// whether the past_intervals go far enough back or forward in time,
2284
// but we don't check for holes. we could avoid it by discarding
2285
// the previous past_intervals and rebuilding from scratch, or we
2286
// can just do this and commit all our work at the end.
2287
ObjectStore::Transaction t;
2289
for (map<PG*,pistate>::iterator i = pis.begin(); i != pis.end(); ++i) {
2292
pg->dirty_big_info = true;
2293
pg->dirty_info = true;
2294
pg->write_if_dirty(t);
2297
// don't let the transaction get too big
2298
if (++num >= cct->_conf->osd_target_transaction_size) {
2299
store->apply_transaction(t);
2300
t = ObjectStore::Transaction();
2305
store->apply_transaction(t);
2309
* look up a pg. if we have it, great. if not, consider creating it IF the pg mapping
2310
* hasn't changed since the given epoch and we are the primary.
2312
void OSD::handle_pg_peering_evt(
2314
const pg_info_t& info,
2315
pg_interval_map_t& pi,
2319
PG::CephPeeringEvtRef evt)
2321
if (service.splitting(pgid)) {
2322
peering_wait_for_split[pgid].push_back(evt);
2326
if (!_have_pg(pgid)) {
2328
if (!osdmap->have_pg_pool(pgid.pool()))
2330
int up_primary, acting_primary;
2331
vector<int> up, acting;
2332
osdmap->pg_to_up_acting_osds(
2333
pgid.pgid, &up, &up_primary, &acting, &acting_primary);
2334
int role = osdmap->calc_pg_role(whoami, acting, acting.size());
2336
pg_history_t history = info.history;
2337
bool valid_history = project_pg_history(
2338
pgid, history, epoch, up, up_primary, acting, acting_primary);
2340
if (!valid_history || epoch < history.same_interval_since) {
2341
dout(10) << "get_or_create_pg " << pgid << " acting changed in "
2342
<< history.same_interval_since << " (msg from " << epoch << ")" << dendl;
2346
if (service.splitting(pgid)) {
2350
bool create = false;
2354
// is there a creation pending on this pg?
2355
if (creating_pgs.count(pgid)) {
2356
creating_pgs[pgid].prior.erase(from);
2357
if (!can_create_pg(pgid))
2359
history = creating_pgs[pgid].history;
2362
dout(10) << "get_or_create_pg " << pgid
2363
<< " DNE on source, but creation probe, ignoring" << dendl;
2367
creating_pgs.erase(pgid);
2369
assert(!info.dne()); // pg exists if we are hearing about it
2372
// do we need to resurrect a deleting pg?
2375
res_result result = _try_resurrect_pg(
2376
service.get_osdmap(),
2381
PG::RecoveryCtx rctx = create_context();
2384
// ok, create the pg locally using provided Info and History
2385
rctx.transaction->create_collection(coll_t(pgid));
2386
PG *pg = _create_lock_pg(
2388
pgid, create, false, result == RES_SELF,
2391
acting, acting_primary,
2394
pg->handle_create(&rctx);
2395
pg->write_if_dirty(*rctx.transaction);
2396
dispatch_context(rctx, pg, osdmap);
2398
dout(10) << *pg << " is new" << dendl;
2401
wake_pg_waiters(pg->info.pgid);
2403
pg->queue_peering_event(evt);
2408
old_pg_state->lock();
2409
PG *pg = _create_lock_pg(
2410
old_pg_state->get_osdmap(),
2417
old_pg_state->up_primary.osd,
2418
old_pg_state->acting,
2419
old_pg_state->primary.osd,
2420
old_pg_state->info.history,
2421
old_pg_state->past_intervals,
2423
old_pg_state->unlock();
2424
pg->handle_create(&rctx);
2425
pg->write_if_dirty(*rctx.transaction);
2426
dispatch_context(rctx, pg, osdmap);
2428
dout(10) << *pg << " is new (resurrected)" << dendl;
2431
wake_pg_waiters(pg->info.pgid);
2433
pg->queue_peering_event(evt);
2438
assert(old_pg_state);
2439
old_pg_state->lock();
2440
PG *parent = _create_lock_pg(
2441
old_pg_state->get_osdmap(),
2448
old_pg_state->up_primary.osd,
2449
old_pg_state->acting,
2450
old_pg_state->primary.osd,
2451
old_pg_state->info.history,
2452
old_pg_state->past_intervals,
2455
old_pg_state->unlock();
2456
parent->handle_create(&rctx);
2457
parent->write_if_dirty(*rctx.transaction);
2458
dispatch_context(rctx, parent, osdmap);
2460
dout(10) << *parent << " is new" << dendl;
2463
wake_pg_waiters(parent->info.pgid);
2465
assert(service.splitting(pgid));
2466
peering_wait_for_split[pgid].push_back(evt);
2468
//parent->queue_peering_event(evt);
2469
parent->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
2475
// already had it. did the mapping change?
2476
PG *pg = _lookup_lock_pg(pgid);
2477
if (epoch < pg->info.history.same_interval_since) {
2478
dout(10) << *pg << " get_or_create_pg acting changed in "
2479
<< pg->info.history.same_interval_since
2480
<< " (msg from " << epoch << ")" << dendl;
2484
pg->queue_peering_event(evt);
2492
* calculate prior pg members during an epoch interval [start,end)
2493
* - from each epoch, include all osds up then AND now
2494
* - if no osds from then are up now, include them all, even tho they're not reachable now
2496
void OSD::calc_priors_during(
2497
spg_t pgid, epoch_t start, epoch_t end, set<pg_shard_t>& pset)
2499
dout(15) << "calc_priors_during " << pgid << " [" << start
2500
<< "," << end << ")" << dendl;
2502
for (epoch_t e = start; e < end; e++) {
2503
OSDMapRef oldmap = get_map(e);
2505
oldmap->pg_to_acting_osds(pgid.pgid, acting);
2506
dout(20) << " " << pgid << " in epoch " << e << " was " << acting << dendl;
2508
for (unsigned i=0; i<acting.size(); i++)
2509
if (osdmap->is_up(acting[i])) {
2510
if (acting[i] != whoami) {
2514
osdmap->pg_is_ec(pgid.pgid) ? i : ghobject_t::NO_SHARD));
2518
if (!up && !acting.empty()) {
2519
// sucky. add down osds, even tho we can't reach them right now.
2520
for (unsigned i=0; i<acting.size(); i++)
2521
if (acting[i] != whoami)
2525
osdmap->pg_is_ec(pgid.pgid) ? i : ghobject_t::NO_SHARD));
2528
dout(10) << "calc_priors_during " << pgid
2529
<< " [" << start << "," << end
2530
<< ") = " << pset << dendl;
2535
* Fill in the passed history so you know same_interval_since, same_up_since,
2536
* and same_primary_since.
2538
bool OSD::project_pg_history(spg_t pgid, pg_history_t& h, epoch_t from,
2539
const vector<int>& currentup,
2540
int currentupprimary,
2541
const vector<int>& currentacting,
2542
int currentactingprimary)
2544
dout(15) << "project_pg_history " << pgid
2545
<< " from " << from << " to " << osdmap->get_epoch()
2550
for (e = osdmap->get_epoch();
2553
// verify during intermediate epoch (e-1)
2554
OSDMapRef oldmap = service.try_get_map(e-1);
2556
dout(15) << __func__ << ": found map gap, returning false" << dendl;
2559
assert(oldmap->have_pg_pool(pgid.pool()));
2561
int upprimary, actingprimary;
2562
vector<int> up, acting;
2563
oldmap->pg_to_up_acting_osds(
2570
// acting set change?
2571
if ((actingprimary != currentactingprimary ||
2572
upprimary != currentupprimary ||
2573
acting != currentacting ||
2574
up != currentup) && e > h.same_interval_since) {
2575
dout(15) << "project_pg_history " << pgid << " acting|up changed in " << e
2576
<< " from " << acting << "/" << up
2577
<< " " << actingprimary << "/" << upprimary
2578
<< " -> " << currentacting << "/" << currentup
2579
<< " " << currentactingprimary << "/" << currentupprimary
2581
h.same_interval_since = e;
2584
if (pgid.is_split(oldmap->get_pg_num(pgid.pool()),
2585
osdmap->get_pg_num(pgid.pool()),
2587
h.same_interval_since = e;
2590
if ((up != currentup || upprimary != currentupprimary)
2591
&& e > h.same_up_since) {
2592
dout(15) << "project_pg_history " << pgid << " up changed in " << e
2593
<< " from " << up << " " << upprimary
2594
<< " -> " << currentup << " " << currentupprimary << dendl;
2595
h.same_up_since = e;
2599
if (OSDMap::primary_changed(
2602
currentactingprimary,
2604
e > h.same_primary_since) {
2605
dout(15) << "project_pg_history " << pgid << " primary changed in " << e << dendl;
2606
h.same_primary_since = e;
2609
if (h.same_interval_since >= e && h.same_up_since >= e && h.same_primary_since >= e)
2613
// base case: these floors should be the creation epoch if we didn't
2614
// find any changes.
2615
if (e == h.epoch_created) {
2616
if (!h.same_interval_since)
2617
h.same_interval_since = e;
2618
if (!h.same_up_since)
2619
h.same_up_since = e;
2620
if (!h.same_primary_since)
2621
h.same_primary_since = e;
2624
dout(15) << "project_pg_history end " << h << dendl;
2628
// -------------------------------------
2630
float OSDService::get_full_ratio()
2632
float full_ratio = cct->_conf->osd_failsafe_full_ratio;
2633
if (full_ratio > 1.0) full_ratio /= 100.0;
2637
float OSDService::get_nearfull_ratio()
2639
float nearfull_ratio = cct->_conf->osd_failsafe_nearfull_ratio;
2640
if (nearfull_ratio > 1.0) nearfull_ratio /= 100.0;
2641
return nearfull_ratio;
2644
void OSDService::check_nearfull_warning(const osd_stat_t &osd_stat)
2646
Mutex::Locker l(full_status_lock);
2647
enum s_names new_state;
2649
time_t now = ceph_clock_gettime(NULL);
2651
// We base ratio on kb_avail rather than kb_used because they can
2652
// differ significantly e.g. on btrfs volumes with a large number of
2653
// chunks reserved for metadata, and for our purposes (avoiding
2654
// completely filling the disk) it's far more important to know how
2655
// much space is available to use than how much we've already used.
2656
float ratio = ((float)(osd_stat.kb - osd_stat.kb_avail)) / ((float)osd_stat.kb);
2657
float nearfull_ratio = get_nearfull_ratio();
2658
float full_ratio = get_full_ratio();
2661
if (full_ratio > 0 && ratio > full_ratio) {
2663
} else if (nearfull_ratio > 0 && ratio > nearfull_ratio) {
2670
if (cur_state != new_state) {
2671
cur_state = new_state;
2672
} else if (now - last_msg < cct->_conf->osd_op_complaint_time) {
2676
if (cur_state == FULL)
2677
clog.error() << "OSD full dropping all updates " << (int)(ratio * 100) << "% full";
2679
clog.warn() << "OSD near full (" << (int)(ratio * 100) << "%)";
2682
bool OSDService::check_failsafe_full()
2684
Mutex::Locker l(full_status_lock);
2685
if (cur_state == FULL)
2690
bool OSDService::too_full_for_backfill(double *_ratio, double *_max_ratio)
2692
Mutex::Locker l(full_status_lock);
2694
max_ratio = cct->_conf->osd_backfill_full_ratio;
2696
*_ratio = cur_ratio;
2698
*_max_ratio = max_ratio;
2699
return cur_ratio >= max_ratio;
2703
void OSD::update_osd_stat()
2705
// fill in osd stats too
2706
struct statfs stbuf;
2707
store->statfs(&stbuf);
2709
uint64_t bytes = stbuf.f_blocks * stbuf.f_bsize;
2710
uint64_t used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize;
2711
uint64_t avail = stbuf.f_bavail * stbuf.f_bsize;
2713
osd_stat.kb = bytes >> 10;
2714
osd_stat.kb_used = used >> 10;
2715
osd_stat.kb_avail = avail >> 10;
2717
logger->set(l_osd_stat_bytes, bytes);
2718
logger->set(l_osd_stat_bytes_used, used);
2719
logger->set(l_osd_stat_bytes_avail, avail);
2721
osd_stat.hb_in.clear();
2722
for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin(); p != heartbeat_peers.end(); ++p)
2723
osd_stat.hb_in.push_back(p->first);
2724
osd_stat.hb_out.clear();
2726
service.check_nearfull_warning(osd_stat);
2728
op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist);
2730
dout(20) << "update_osd_stat " << osd_stat << dendl;
2733
void OSD::_add_heartbeat_peer(int p)
2739
map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p);
2740
if (i == heartbeat_peers.end()) {
2741
pair<ConnectionRef,ConnectionRef> cons = service.get_con_osd_hb(p, osdmap->get_epoch());
2744
hi = &heartbeat_peers[p];
2746
HeartbeatSession *s = new HeartbeatSession(p);
2747
hi->con_back = cons.first.get();
2748
hi->con_back->set_priv(s);
2750
hi->con_front = cons.second.get();
2751
hi->con_front->set_priv(s->get());
2752
dout(10) << "_add_heartbeat_peer: new peer osd." << p
2753
<< " " << hi->con_back->get_peer_addr()
2754
<< " " << hi->con_front->get_peer_addr()
2757
hi->con_front.reset(NULL);
2758
dout(10) << "_add_heartbeat_peer: new peer osd." << p
2759
<< " " << hi->con_back->get_peer_addr()
2765
hi->epoch = osdmap->get_epoch();
2768
void OSD::_remove_heartbeat_peer(int n)
2770
map<int,HeartbeatInfo>::iterator q = heartbeat_peers.find(n);
2771
assert(q != heartbeat_peers.end());
2772
dout(20) << " removing heartbeat peer osd." << n
2773
<< " " << q->second.con_back->get_peer_addr()
2774
<< " " << (q->second.con_front ? q->second.con_front->get_peer_addr() : entity_addr_t())
2776
hbclient_messenger->mark_down(q->second.con_back);
2777
if (q->second.con_front) {
2778
hbclient_messenger->mark_down(q->second.con_front);
2780
heartbeat_peers.erase(q);
2783
void OSD::need_heartbeat_peer_update()
2785
Mutex::Locker l(heartbeat_lock);
2788
dout(20) << "need_heartbeat_peer_update" << dendl;
2789
heartbeat_need_update = true;
2792
void OSD::maybe_update_heartbeat_peers()
2794
assert(osd_lock.is_locked());
2796
if (is_waiting_for_healthy()) {
2797
utime_t now = ceph_clock_now(cct);
2798
if (last_heartbeat_resample == utime_t()) {
2799
last_heartbeat_resample = now;
2800
heartbeat_need_update = true;
2801
} else if (!heartbeat_need_update) {
2802
utime_t dur = now - last_heartbeat_resample;
2803
if (dur > cct->_conf->osd_heartbeat_grace) {
2804
dout(10) << "maybe_update_heartbeat_peers forcing update after " << dur << " seconds" << dendl;
2805
heartbeat_need_update = true;
2806
last_heartbeat_resample = now;
2807
reset_heartbeat_peers(); // we want *new* peers!
2812
Mutex::Locker l(heartbeat_lock);
2813
if (!heartbeat_need_update)
2815
heartbeat_need_update = false;
2817
dout(10) << "maybe_update_heartbeat_peers updating" << dendl;
2819
heartbeat_epoch = osdmap->get_epoch();
2821
// build heartbeat from set
2823
for (ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.begin();
2827
pg->heartbeat_peer_lock.Lock();
2828
dout(20) << i->first << " heartbeat_peers " << pg->heartbeat_peers << dendl;
2829
for (set<int>::iterator p = pg->heartbeat_peers.begin();
2830
p != pg->heartbeat_peers.end();
2832
if (osdmap->is_up(*p))
2833
_add_heartbeat_peer(*p);
2834
for (set<int>::iterator p = pg->probe_targets.begin();
2835
p != pg->probe_targets.end();
2837
if (osdmap->is_up(*p))
2838
_add_heartbeat_peer(*p);
2839
pg->heartbeat_peer_lock.Unlock();
2843
// include next and previous up osds to ensure we have a fully-connected set
2844
set<int> want, extras;
2845
int next = osdmap->get_next_up_osd_after(whoami);
2848
int prev = osdmap->get_previous_up_osd_before(whoami);
2852
for (set<int>::iterator p = want.begin(); p != want.end(); ++p) {
2853
dout(10) << " adding neighbor peer osd." << *p << dendl;
2855
_add_heartbeat_peer(*p);
2858
// remove down peers; enumerate extras
2859
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
2860
while (p != heartbeat_peers.end()) {
2861
if (!osdmap->is_up(p->first)) {
2864
_remove_heartbeat_peer(o);
2867
if (p->second.epoch < osdmap->get_epoch()) {
2868
extras.insert(p->first);
2874
int start = osdmap->get_next_up_osd_after(whoami);
2875
for (int n = start; n >= 0; ) {
2876
if ((int)heartbeat_peers.size() >= cct->_conf->osd_heartbeat_min_peers)
2878
if (!extras.count(n) && !want.count(n) && n != whoami) {
2879
dout(10) << " adding random peer osd." << n << dendl;
2881
_add_heartbeat_peer(n);
2883
n = osdmap->get_next_up_osd_after(n);
2885
break; // came full circle; stop
2889
for (set<int>::iterator p = extras.begin();
2890
(int)heartbeat_peers.size() > cct->_conf->osd_heartbeat_min_peers && p != extras.end();
2894
_remove_heartbeat_peer(*p);
2897
dout(10) << "maybe_update_heartbeat_peers " << heartbeat_peers.size() << " peers, extras " << extras << dendl;
2900
void OSD::reset_heartbeat_peers()
2902
assert(osd_lock.is_locked());
2903
dout(10) << "reset_heartbeat_peers" << dendl;
2904
Mutex::Locker l(heartbeat_lock);
2905
while (!heartbeat_peers.empty()) {
2906
HeartbeatInfo& hi = heartbeat_peers.begin()->second;
2907
hbclient_messenger->mark_down(hi.con_back);
2909
hbclient_messenger->mark_down(hi.con_front);
2911
heartbeat_peers.erase(heartbeat_peers.begin());
2913
failure_queue.clear();
2916
void OSD::handle_osd_ping(MOSDPing *m)
2918
if (superblock.cluster_fsid != m->fsid) {
2919
dout(20) << "handle_osd_ping from " << m->get_source_inst()
2920
<< " bad fsid " << m->fsid << " != " << superblock.cluster_fsid << dendl;
2925
int from = m->get_source().num();
2927
heartbeat_lock.Lock();
2928
if (is_stopping()) {
2929
heartbeat_lock.Unlock();
2934
OSDMapRef curmap = service.get_osdmap();
2938
case MOSDPing::PING:
2940
if (cct->_conf->osd_debug_drop_ping_probability > 0) {
2941
if (debug_heartbeat_drops_remaining.count(from)) {
2942
if (debug_heartbeat_drops_remaining[from] == 0) {
2943
debug_heartbeat_drops_remaining.erase(from);
2945
debug_heartbeat_drops_remaining[from]--;
2946
dout(5) << "Dropping heartbeat from " << from
2947
<< ", " << debug_heartbeat_drops_remaining[from]
2948
<< " remaining to drop" << dendl;
2951
} else if (cct->_conf->osd_debug_drop_ping_probability >
2952
((((double)(rand()%100))/100.0))) {
2953
debug_heartbeat_drops_remaining[from] =
2954
cct->_conf->osd_debug_drop_ping_duration;
2955
dout(5) << "Dropping heartbeat from " << from
2956
<< ", " << debug_heartbeat_drops_remaining[from]
2957
<< " remaining to drop" << dendl;
2962
if (!cct->get_heartbeat_map()->is_healthy()) {
2963
dout(10) << "internal heartbeat not healthy, dropping ping request" << dendl;
2967
Message *r = new MOSDPing(monc->get_fsid(),
2968
curmap->get_epoch(),
2969
MOSDPing::PING_REPLY,
2971
m->get_connection()->get_messenger()->send_message(r, m->get_connection());
2973
if (curmap->is_up(from)) {
2974
note_peer_epoch(from, m->map_epoch);
2976
ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
2978
_share_map_outgoing(from, con.get());
2981
} else if (!curmap->exists(from) ||
2982
curmap->get_down_at(from) > m->map_epoch) {
2983
// tell them they have died
2984
Message *r = new MOSDPing(monc->get_fsid(),
2985
curmap->get_epoch(),
2988
m->get_connection()->get_messenger()->send_message(r, m->get_connection());
2993
case MOSDPing::PING_REPLY:
2995
map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(from);
2996
if (i != heartbeat_peers.end()) {
2997
if (m->get_connection() == i->second.con_back) {
2998
dout(25) << "handle_osd_ping got reply from osd." << from
2999
<< " first_rx " << i->second.first_tx
3000
<< " last_tx " << i->second.last_tx
3001
<< " last_rx_back " << i->second.last_rx_back << " -> " << m->stamp
3002
<< " last_rx_front " << i->second.last_rx_front
3004
i->second.last_rx_back = m->stamp;
3005
// if there is no front con, set both stamps.
3006
if (i->second.con_front == NULL)
3007
i->second.last_rx_front = m->stamp;
3008
} else if (m->get_connection() == i->second.con_front) {
3009
dout(25) << "handle_osd_ping got reply from osd." << from
3010
<< " first_rx " << i->second.first_tx
3011
<< " last_tx " << i->second.last_tx
3012
<< " last_rx_back " << i->second.last_rx_back
3013
<< " last_rx_front " << i->second.last_rx_front << " -> " << m->stamp
3015
i->second.last_rx_front = m->stamp;
3020
curmap->is_up(from)) {
3021
note_peer_epoch(from, m->map_epoch);
3023
ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
3025
_share_map_outgoing(from, con.get());
3030
utime_t cutoff = ceph_clock_now(cct);
3031
cutoff -= cct->_conf->osd_heartbeat_grace;
3032
if (i->second.is_healthy(cutoff)) {
3033
// Cancel false reports
3034
if (failure_queue.count(from)) {
3035
dout(10) << "handle_osd_ping canceling queued failure report for osd." << from<< dendl;
3036
failure_queue.erase(from);
3038
if (failure_pending.count(from)) {
3039
dout(10) << "handle_osd_ping canceling in-flight failure report for osd." << from<< dendl;
3040
send_still_alive(curmap->get_epoch(), failure_pending[from]);
3041
failure_pending.erase(from);
3047
case MOSDPing::YOU_DIED:
3048
dout(10) << "handle_osd_ping " << m->get_source_inst()
3049
<< " says i am down in " << m->map_epoch << dendl;
3050
osdmap_subscribe(curmap->get_epoch()+1, false);
3054
heartbeat_lock.Unlock();
3058
void OSD::heartbeat_entry()
3060
Mutex::Locker l(heartbeat_lock);
3063
while (!heartbeat_stop) {
3066
double wait = .5 + ((float)(rand() % 10)/10.0) * (float)cct->_conf->osd_heartbeat_interval;
3068
w.set_from_double(wait);
3069
dout(30) << "heartbeat_entry sleeping for " << wait << dendl;
3070
heartbeat_cond.WaitInterval(cct, heartbeat_lock, w);
3073
dout(30) << "heartbeat_entry woke up" << dendl;
3077
void OSD::heartbeat_check()
3079
assert(heartbeat_lock.is_locked());
3080
utime_t now = ceph_clock_now(cct);
3081
double age = hbclient_messenger->get_dispatch_queue_max_age(now);
3082
if (age > (cct->_conf->osd_heartbeat_grace / 2)) {
3083
derr << "skipping heartbeat_check, hbqueue max age: " << age << dendl;
3084
return; // hb dispatch is too backed up for our hb status to be meaningful
3087
// check for incoming heartbeats (move me elsewhere?)
3088
utime_t cutoff = now;
3089
cutoff -= cct->_conf->osd_heartbeat_grace;
3090
for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
3091
p != heartbeat_peers.end();
3093
dout(25) << "heartbeat_check osd." << p->first
3094
<< " first_tx " << p->second.first_tx
3095
<< " last_tx " << p->second.last_tx
3096
<< " last_rx_back " << p->second.last_rx_back
3097
<< " last_rx_front " << p->second.last_rx_front
3099
if (p->second.is_unhealthy(cutoff)) {
3100
if (p->second.last_rx_back == utime_t() ||
3101
p->second.last_rx_front == utime_t()) {
3102
derr << "heartbeat_check: no reply from osd." << p->first
3103
<< " ever on either front or back, first ping sent " << p->second.first_tx
3104
<< " (cutoff " << cutoff << ")" << dendl;
3106
failure_queue[p->first] = p->second.last_tx;
3108
derr << "heartbeat_check: no reply from osd." << p->first
3109
<< " since back " << p->second.last_rx_back
3110
<< " front " << p->second.last_rx_front
3111
<< " (cutoff " << cutoff << ")" << dendl;
3113
failure_queue[p->first] = MIN(p->second.last_rx_back, p->second.last_rx_front);
3119
void OSD::heartbeat()
3121
dout(30) << "heartbeat" << dendl;
3125
if (getloadavg(loadavgs, 1) == 1)
3126
logger->set(l_osd_loadavg, 100 * loadavgs[0]);
3128
dout(30) << "heartbeat checking stats" << dendl;
3132
Mutex::Locker lock(stat_lock);
3136
dout(5) << "heartbeat: " << osd_stat << dendl;
3138
utime_t now = ceph_clock_now(cct);
3141
for (map<int,HeartbeatInfo>::iterator i = heartbeat_peers.begin();
3142
i != heartbeat_peers.end();
3144
int peer = i->first;
3145
i->second.last_tx = now;
3146
if (i->second.first_tx == utime_t())
3147
i->second.first_tx = now;
3148
dout(30) << "heartbeat sending ping to osd." << peer << dendl;
3149
hbclient_messenger->send_message(new MOSDPing(monc->get_fsid(),
3150
service.get_osdmap()->get_epoch(),
3153
i->second.con_back);
3154
if (i->second.con_front)
3155
hbclient_messenger->send_message(new MOSDPing(monc->get_fsid(),
3156
service.get_osdmap()->get_epoch(),
3159
i->second.con_front);
3162
dout(30) << "heartbeat check" << dendl;
3165
logger->set(l_osd_hb_to, heartbeat_peers.size());
3166
logger->set(l_osd_hb_from, 0);
3168
// hmm.. am i all alone?
3169
dout(30) << "heartbeat lonely?" << dendl;
3170
if (heartbeat_peers.empty()) {
3171
if (now - last_mon_heartbeat > cct->_conf->osd_mon_heartbeat_interval && is_active()) {
3172
last_mon_heartbeat = now;
3173
dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl;
3174
osdmap_subscribe(osdmap->get_epoch() + 1, true);
3178
dout(30) << "heartbeat done" << dendl;
3181
bool OSD::heartbeat_reset(Connection *con)
3183
HeartbeatSession *s = static_cast<HeartbeatSession*>(con->get_priv());
3185
heartbeat_lock.Lock();
3186
if (is_stopping()) {
3187
heartbeat_lock.Unlock();
3191
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
3192
if (p != heartbeat_peers.end() &&
3193
(p->second.con_back == con ||
3194
p->second.con_front == con)) {
3195
dout(10) << "heartbeat_reset failed hb con " << con << " for osd." << p->second.peer
3196
<< ", reopening" << dendl;
3197
if (con != p->second.con_back) {
3198
hbclient_messenger->mark_down(p->second.con_back);
3200
p->second.con_back.reset(NULL);
3201
if (p->second.con_front && con != p->second.con_front) {
3202
hbclient_messenger->mark_down(p->second.con_front);
3204
p->second.con_front.reset(NULL);
3205
pair<ConnectionRef,ConnectionRef> newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
3207
p->second.con_back = newcon.first.get();
3208
p->second.con_back->set_priv(s->get());
3209
if (newcon.second) {
3210
p->second.con_front = newcon.second.get();
3211
p->second.con_front->set_priv(s->get());
3214
dout(10) << "heartbeat_reset failed hb con " << con << " for osd." << p->second.peer
3215
<< ", raced with osdmap update, closing out peer" << dendl;
3216
heartbeat_peers.erase(p);
3219
dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
3221
heartbeat_lock.Unlock();
3229
// =========================================
3233
assert(osd_lock.is_locked());
3234
dout(5) << "tick" << dendl;
3236
logger->set(l_osd_buf, buffer::get_total_alloc());
3238
if (is_active() || is_waiting_for_healthy()) {
3239
map_lock.get_read();
3241
maybe_update_heartbeat_peers();
3243
heartbeat_lock.Lock();
3245
heartbeat_lock.Unlock();
3248
utime_t now = ceph_clock_now(cct);
3249
if (outstanding_pg_stats && timeout_mon_on_pg_stats &&
3250
(now - cct->_conf->osd_mon_ack_timeout) > last_pg_stats_ack) {
3251
dout(1) << "mon hasn't acked PGStats in " << now - last_pg_stats_ack
3252
<< " seconds, reconnecting elsewhere" << dendl;
3253
monc->reopen_session(new C_MonStatsAckTimer(this));
3254
timeout_mon_on_pg_stats = false;
3255
last_pg_stats_ack = ceph_clock_now(cct); // reset clock
3256
last_pg_stats_sent = utime_t();
3258
if (now - last_pg_stats_sent > cct->_conf->osd_mon_report_interval_max) {
3259
osd_stat_updated = true;
3261
} else if (now - last_mon_report > cct->_conf->osd_mon_report_interval_min) {
3265
map_lock.put_read();
3268
if (is_waiting_for_healthy()) {
3269
if (_is_healthy()) {
3270
dout(1) << "healthy again, booting" << dendl;
3271
state = STATE_BOOTING;
3277
// periodically kick recovery work queue
3280
if (!scrub_random_backoff()) {
3284
check_replay_queue();
3287
// only do waiters if dispatch() isn't currently running. (if it is,
3288
// it'll do the waiters, and doing them here may screw up ordering
3289
// of op_queue vs handle_osd_map.)
3290
if (!dispatch_running) {
3291
dispatch_running = true;
3293
dispatch_running = false;
3294
dispatch_cond.Signal();
3297
check_ops_in_flight();
3299
tick_timer.add_event_after(1.0, new C_Tick(this));
3302
void OSD::check_ops_in_flight()
3304
vector<string> warnings;
3305
if (op_tracker.check_ops_in_flight(warnings)) {
3306
for (vector<string>::iterator i = warnings.begin();
3307
i != warnings.end();
3316
// setomapval <pool-id> [namespace/]<obj-name> <key> <val>
3317
// rmomapkey <pool-id> [namespace/]<obj-name> <key>
3318
// setomapheader <pool-id> [namespace/]<obj-name> <header>
3319
// getomap <pool> [namespace/]<obj-name>
3320
// truncobj <pool-id> [namespace/]<obj-name> <newlen>
3321
// injectmdataerr [namespace/]<obj-name>
3322
// injectdataerr [namespace/]<obj-name>
3323
void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
3324
std::string command, cmdmap_t& cmdmap, ostream &ss)
3327
//Support changing the omap on a single osd by using the Admin Socket to
3328
//directly request the osd make a change.
3329
if (command == "setomapval" || command == "rmomapkey" ||
3330
command == "setomapheader" || command == "getomap" ||
3331
command == "truncobj" || command == "injectmdataerr" ||
3332
command == "injectdataerr"
3336
OSDMapRef curmap = service->get_osdmap();
3341
cmd_getval(service->cct, cmdmap, "pool", poolstr);
3342
pool = curmap->const_lookup_pg_pool_name(poolstr.c_str());
3343
//If we can't find it by name then maybe id specified
3344
if (pool < 0 && isdigit(poolstr[0]))
3345
pool = atoll(poolstr.c_str());
3347
ss << "Invalid pool" << poolstr;
3351
string objname, nspace;
3352
cmd_getval(service->cct, cmdmap, "objname", objname);
3353
std::size_t found = objname.find_first_of('/');
3354
if (found != string::npos) {
3355
nspace = objname.substr(0, found);
3356
objname = objname.substr(found+1);
3358
object_locator_t oloc(pool, nspace);
3359
r = curmap->object_locator_to_pg(object_t(objname), oloc, rawpg);
3362
ss << "Invalid namespace/objname";
3365
if (curmap->pg_is_ec(rawpg)) {
3366
ss << "Must not call on ec pool";
3369
spg_t pgid = spg_t(curmap->raw_pg_to_pg(rawpg), ghobject_t::no_shard());
3371
hobject_t obj(object_t(objname), string(""), CEPH_NOSNAP, rawpg.ps(), pool, nspace);
3372
ObjectStore::Transaction t;
3374
if (command == "setomapval") {
3375
map<string, bufferlist> newattrs;
3378
cmd_getval(service->cct, cmdmap, "key", key);
3379
cmd_getval(service->cct, cmdmap, "val", valstr);
3382
newattrs[key] = val;
3383
t.omap_setkeys(coll_t(pgid), obj, newattrs);
3384
r = store->apply_transaction(t);
3386
ss << "error=" << r;
3389
} else if (command == "rmomapkey") {
3392
cmd_getval(service->cct, cmdmap, "key", key);
3395
t.omap_rmkeys(coll_t(pgid), obj, keys);
3396
r = store->apply_transaction(t);
3398
ss << "error=" << r;
3401
} else if (command == "setomapheader") {
3402
bufferlist newheader;
3405
cmd_getval(service->cct, cmdmap, "header", headerstr);
3406
newheader.append(headerstr);
3407
t.omap_setheader(coll_t(pgid), obj, newheader);
3408
r = store->apply_transaction(t);
3410
ss << "error=" << r;
3413
} else if (command == "getomap") {
3414
//Debug: Output entire omap
3416
map<string, bufferlist> keyvals;
3417
r = store->omap_get(coll_t(pgid), obj, &hdrbl, &keyvals);
3419
ss << "header=" << string(hdrbl.c_str(), hdrbl.length());
3420
for (map<string, bufferlist>::iterator it = keyvals.begin();
3421
it != keyvals.end(); ++it)
3422
ss << " key=" << (*it).first << " val="
3423
<< string((*it).second.c_str(), (*it).second.length());
3425
ss << "error=" << r;
3427
} else if (command == "truncobj") {
3429
cmd_getval(service->cct, cmdmap, "len", trunclen);
3430
t.truncate(coll_t(pgid), obj, trunclen);
3431
r = store->apply_transaction(t);
3433
ss << "error=" << r;
3436
} else if (command == "injectdataerr") {
3437
store->inject_data_error(obj);
3439
} else if (command == "injectmdataerr") {
3440
store->inject_mdata_error(obj);
3445
ss << "Internal error - command=" << command;
3449
// =========================================
3452
ObjectStore *store, SnapMapper *mapper,
3454
ObjectStore::Sequencer *osr,
3455
coll_t coll, DeletingStateRef dstate,
3456
ThreadPool::TPHandle &handle)
3458
vector<ghobject_t> olist;
3460
ObjectStore::Transaction *t = new ObjectStore::Transaction;
3462
while (!next.is_max()) {
3463
handle.reset_tp_timeout();
3464
store->collection_list_partial(
3467
store->get_ideal_list_min(),
3468
store->get_ideal_list_max(),
3472
for (vector<ghobject_t>::iterator i = olist.begin();
3475
OSDriver::OSTransaction _t(osdriver->get_transaction(t));
3476
int r = mapper->remove_oid(i->hobj, &_t);
3477
if (r != 0 && r != -ENOENT) {
3480
t->remove(coll, *i);
3481
if (num >= cct->_conf->osd_target_transaction_size) {
3483
store->queue_transaction(osr, t, &waiter);
3484
bool cont = dstate->pause_clearing();
3485
handle.suspend_tp_timeout();
3487
handle.reset_tp_timeout();
3489
cont = dstate->resume_clearing();
3493
t = new ObjectStore::Transaction;
3501
store->queue_transaction(osr, t, &waiter);
3502
bool cont = dstate->pause_clearing();
3503
handle.suspend_tp_timeout();
3505
handle.reset_tp_timeout();
3507
cont = dstate->resume_clearing();
3512
void OSD::RemoveWQ::_process(
3513
pair<PGRef, DeletingStateRef> item,
3514
ThreadPool::TPHandle &handle)
3516
PGRef pg(item.first);
3517
SnapMapper &mapper = pg->snap_mapper;
3518
OSDriver &driver = pg->osdriver;
3519
coll_t coll = coll_t(pg->info.pgid);
3522
if (!item.second->start_clearing())
3525
list<coll_t> colls_to_remove;
3526
pg->get_colls(&colls_to_remove);
3527
for (list<coll_t>::iterator i = colls_to_remove.begin();
3528
i != colls_to_remove.end();
3530
bool cont = remove_dir(
3531
pg->cct, store, &mapper, &driver, pg->osr.get(), *i, item.second,
3537
if (!item.second->start_deleting())
3540
ObjectStore::Transaction *t = new ObjectStore::Transaction;
3541
PGLog::clear_info_log(
3543
OSD::make_infos_oid(),
3547
for (list<coll_t>::iterator i = colls_to_remove.begin();
3548
i != colls_to_remove.end();
3550
t->remove_collection(*i);
3553
// We need the sequencer to stick around until the op is complete
3554
store->queue_transaction(
3559
0, // onreadable sync
3560
new ObjectStore::C_DeleteTransactionHolder<PGRef>(
3561
t, pg), // oncomplete
3564
item.second->finish_deleting();
3566
// =========================================
3568
void OSD::do_mon_report()
3570
dout(7) << "do_mon_report" << dendl;
3572
utime_t now(ceph_clock_now(cct));
3573
last_mon_report = now;
3575
// do any pending reports
3577
service.send_pg_temp();
3582
void OSD::ms_handle_connect(Connection *con)
3584
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
3585
Mutex::Locker l(osd_lock);
3588
dout(10) << "ms_handle_connect on mon" << dendl;
3593
service.send_pg_temp();
3595
send_pg_stats(ceph_clock_now(cct));
3597
monc->sub_want("osd_pg_creates", 0, CEPH_SUBSCRIBE_ONETIME);
3603
bool OSD::ms_handle_reset(Connection *con)
3605
OSD::Session *session = (OSD::Session *)con->get_priv();
3606
dout(1) << "ms_handle_reset con " << con << " session " << session << dendl;
3609
session->wstate.reset();
3610
session->con.reset(NULL); // break con <-> session ref cycle
3615
struct C_OSD_GetVersion : public Context {
3617
uint64_t oldest, newest;
3618
C_OSD_GetVersion(OSD *o) : osd(o), oldest(0), newest(0) {}
3619
void finish(int r) {
3621
osd->_maybe_boot(oldest, newest);
3625
void OSD::start_boot()
3627
dout(10) << "start_boot - have maps " << superblock.oldest_map
3628
<< ".." << superblock.newest_map << dendl;
3629
C_OSD_GetVersion *c = new C_OSD_GetVersion(this);
3630
monc->get_version("osdmap", &c->newest, &c->oldest, c);
3633
void OSD::_maybe_boot(epoch_t oldest, epoch_t newest)
3635
Mutex::Locker l(osd_lock);
3638
dout(10) << "_maybe_boot mon has osdmaps " << oldest << ".." << newest << dendl;
3640
if (is_initializing()) {
3641
dout(10) << "still initializing" << dendl;
3645
// if our map within recent history, try to add ourselves to the osdmap.
3646
if (osdmap->test_flag(CEPH_OSDMAP_NOUP)) {
3647
dout(5) << "osdmap NOUP flag is set, waiting for it to clear" << dendl;
3648
} else if (is_waiting_for_healthy() || !_is_healthy()) {
3649
// if we are not healthy, do not mark ourselves up (yet)
3650
dout(1) << "not healthy; waiting to boot" << dendl;
3651
if (!is_waiting_for_healthy())
3652
start_waiting_for_healthy();
3653
// send pings sooner rather than later
3655
} else if (osdmap->get_epoch() >= oldest - 1 &&
3656
osdmap->get_epoch() + cct->_conf->osd_map_message_max > newest) {
3661
// get all the latest maps
3662
if (osdmap->get_epoch() > oldest)
3663
osdmap_subscribe(osdmap->get_epoch() + 1, true);
3665
osdmap_subscribe(oldest - 1, true);
3668
void OSD::start_waiting_for_healthy()
3670
dout(1) << "start_waiting_for_healthy" << dendl;
3671
state = STATE_WAITING_FOR_HEALTHY;
3672
last_heartbeat_resample = utime_t();
3675
bool OSD::_is_healthy()
3677
if (!cct->get_heartbeat_map()->is_healthy()) {
3678
dout(1) << "is_healthy false -- internal heartbeat failed" << dendl;
3682
if (is_waiting_for_healthy()) {
3683
Mutex::Locker l(heartbeat_lock);
3684
utime_t cutoff = ceph_clock_now(cct);
3685
cutoff -= cct->_conf->osd_heartbeat_grace;
3686
int num = 0, up = 0;
3687
for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
3688
p != heartbeat_peers.end();
3690
if (p->second.is_healthy(cutoff))
3694
if ((float)up < (float)num * cct->_conf->osd_heartbeat_min_healthy_ratio) {
3695
dout(1) << "is_healthy false -- only " << up << "/" << num << " up peers (less than 1/3)" << dendl;
3703
void OSD::_send_boot()
3705
dout(10) << "_send_boot" << dendl;
3706
entity_addr_t cluster_addr = cluster_messenger->get_myaddr();
3707
if (cluster_addr.is_blank_ip()) {
3708
int port = cluster_addr.get_port();
3709
cluster_addr = client_messenger->get_myaddr();
3710
cluster_addr.set_port(port);
3711
cluster_messenger->set_addr_unknowns(cluster_addr);
3712
dout(10) << " assuming cluster_addr ip matches client_addr" << dendl;
3714
entity_addr_t hb_back_addr = hb_back_server_messenger->get_myaddr();
3715
if (hb_back_addr.is_blank_ip()) {
3716
int port = hb_back_addr.get_port();
3717
hb_back_addr = cluster_addr;
3718
hb_back_addr.set_port(port);
3719
hb_back_server_messenger->set_addr_unknowns(hb_back_addr);
3720
dout(10) << " assuming hb_back_addr ip matches cluster_addr" << dendl;
3722
entity_addr_t hb_front_addr = hb_front_server_messenger->get_myaddr();
3723
if (hb_front_addr.is_blank_ip()) {
3724
int port = hb_front_addr.get_port();
3725
hb_front_addr = client_messenger->get_myaddr();
3726
hb_front_addr.set_port(port);
3727
hb_front_server_messenger->set_addr_unknowns(hb_front_addr);
3728
dout(10) << " assuming hb_front_addr ip matches client_addr" << dendl;
3731
MOSDBoot *mboot = new MOSDBoot(superblock, boot_epoch, hb_back_addr, hb_front_addr, cluster_addr);
3732
dout(10) << " client_addr " << client_messenger->get_myaddr()
3733
<< ", cluster_addr " << cluster_addr
3734
<< ", hb_back_addr " << hb_back_addr
3735
<< ", hb_front_addr " << hb_front_addr
3737
_collect_metadata(&mboot->metadata);
3738
monc->send_mon_message(mboot);
3741
void OSD::_collect_metadata(map<string,string> *pm)
3743
(*pm)["ceph_version"] = pretty_version_to_str();
3746
(*pm)["osd_data"] = dev_path;
3747
(*pm)["osd_journal"] = journal_path;
3748
(*pm)["front_addr"] = stringify(client_messenger->get_myaddr());
3749
(*pm)["back_addr"] = stringify(cluster_messenger->get_myaddr());
3750
(*pm)["hb_front_addr"] = stringify(hb_front_server_messenger->get_myaddr());
3751
(*pm)["hb_back_addr"] = stringify(hb_back_server_messenger->get_myaddr());
3757
(*pm)["os"] = u.sysname;
3758
(*pm)["kernel_version"] = u.release;
3759
(*pm)["kernel_description"] = u.version;
3760
(*pm)["hostname"] = u.nodename;
3761
(*pm)["arch"] = u.machine;
3765
FILE *f = fopen("/proc/meminfo", "r");
3769
char *line = fgets(buf, sizeof(buf), f);
3774
int r = sscanf(line, "%s %lld", key, &value);
3776
if (strcmp(key, "MemTotal:") == 0)
3777
(*pm)["mem_total_kb"] = stringify(value);
3778
else if (strcmp(key, "SwapTotal:") == 0)
3779
(*pm)["mem_swap_kb"] = stringify(value);
3786
f = fopen("/proc/cpuinfo", "r");
3790
char *line = fgets(buf, sizeof(buf), f);
3793
if (strncmp(line, "model name", 10) == 0) {
3794
char *c = strchr(buf, ':');
3810
f = fopen("/etc/lsb-release", "r");
3814
char *line = fgets(buf, sizeof(buf), f);
3817
char *eq = strchr(buf, '=');
3824
while (*eq && (eq[strlen(eq)-1] == '\n' ||
3825
eq[strlen(eq)-1] == '\"'))
3826
eq[strlen(eq)-1] = '\0';
3827
if (strcmp(buf, "DISTRIB_ID") == 0)
3828
(*pm)["distro"] = eq;
3829
else if (strcmp(buf, "DISTRIB_RELEASE") == 0)
3830
(*pm)["distro_version"] = eq;
3831
else if (strcmp(buf, "DISTRIB_CODENAME") == 0)
3832
(*pm)["distro_codename"] = eq;
3833
else if (strcmp(buf, "DISTRIB_DESCRIPTION") == 0)
3834
(*pm)["distro_description"] = eq;
3839
dout(10) << __func__ << " " << *pm << dendl;
3842
void OSD::queue_want_up_thru(epoch_t want)
3844
map_lock.get_read();
3845
epoch_t cur = osdmap->get_up_thru(whoami);
3846
if (want > up_thru_wanted) {
3847
dout(10) << "queue_want_up_thru now " << want << " (was " << up_thru_wanted << ")"
3848
<< ", currently " << cur
3850
up_thru_wanted = want;
3852
// expedite, a bit. WARNING this will somewhat delay other mon queries.
3853
last_mon_report = ceph_clock_now(cct);
3856
dout(10) << "queue_want_up_thru want " << want << " <= queued " << up_thru_wanted
3857
<< ", currently " << cur
3860
map_lock.put_read();
3863
void OSD::send_alive()
3865
if (!osdmap->exists(whoami))
3867
epoch_t up_thru = osdmap->get_up_thru(whoami);
3868
dout(10) << "send_alive up_thru currently " << up_thru << " want " << up_thru_wanted << dendl;
3869
if (up_thru_wanted > up_thru) {
3870
up_thru_pending = up_thru_wanted;
3871
dout(10) << "send_alive want " << up_thru_wanted << dendl;
3872
monc->send_mon_message(new MOSDAlive(osdmap->get_epoch(), up_thru_wanted));
3876
void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
3878
Mutex::Locker l(pre_publish_lock);
3880
// service map is always newer/newest
3881
assert(from_epoch <= next_osdmap->get_epoch());
3883
if (next_osdmap->is_down(peer) ||
3884
next_osdmap->get_info(peer).up_from > from_epoch) {
3888
const entity_inst_t& peer_inst = next_osdmap->get_cluster_inst(peer);
3889
Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get();
3890
osd->_share_map_outgoing(peer, peer_con, next_osdmap);
3891
osd->cluster_messenger->send_message(m, peer_inst);
3894
ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
3896
Mutex::Locker l(pre_publish_lock);
3898
// service map is always newer/newest
3899
assert(from_epoch <= next_osdmap->get_epoch());
3901
if (next_osdmap->is_down(peer) ||
3902
next_osdmap->get_info(peer).up_from > from_epoch) {
3905
return osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer));
3908
pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
3910
Mutex::Locker l(pre_publish_lock);
3912
// service map is always newer/newest
3913
assert(from_epoch <= next_osdmap->get_epoch());
3915
pair<ConnectionRef,ConnectionRef> ret;
3916
if (next_osdmap->is_down(peer) ||
3917
next_osdmap->get_info(peer).up_from > from_epoch) {
3920
ret.first = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_back_inst(peer));
3921
if (next_osdmap->get_hb_front_addr(peer) != entity_addr_t())
3922
ret.second = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_front_inst(peer));
3926
void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
3928
Mutex::Locker l(pg_temp_lock);
3929
pg_temp_wanted[pgid] = want;
3932
void OSDService::send_pg_temp()
3934
Mutex::Locker l(pg_temp_lock);
3935
if (pg_temp_wanted.empty())
3937
dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
3938
MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
3939
m->pg_temp = pg_temp_wanted;
3940
monc->send_mon_message(m);
3943
void OSD::send_failures()
3945
assert(osd_lock.is_locked());
3946
bool locked = false;
3947
if (!failure_queue.empty()) {
3948
heartbeat_lock.Lock();
3951
utime_t now = ceph_clock_now(cct);
3952
while (!failure_queue.empty()) {
3953
int osd = failure_queue.begin()->first;
3954
int failed_for = (int)(double)(now - failure_queue.begin()->second);
3955
entity_inst_t i = osdmap->get_inst(osd);
3956
monc->send_mon_message(new MOSDFailure(monc->get_fsid(), i, failed_for, osdmap->get_epoch()));
3957
failure_pending[osd] = i;
3958
failure_queue.erase(osd);
3960
if (locked) heartbeat_lock.Unlock();
3963
void OSD::send_still_alive(epoch_t epoch, const entity_inst_t &i)
3965
MOSDFailure *m = new MOSDFailure(monc->get_fsid(), i, 0, epoch);
3966
m->is_failed = false;
3967
monc->send_mon_message(m);
3970
void OSD::send_pg_stats(const utime_t &now)
3972
assert(osd_lock.is_locked());
3974
dout(20) << "send_pg_stats" << dendl;
3977
osd_stat_t cur_stat = osd_stat;
3980
osd_stat.fs_perf_stat = store->get_cur_stats();
3982
pg_stat_queue_lock.Lock();
3984
if (osd_stat_updated || !pg_stat_queue.empty()) {
3985
last_pg_stats_sent = now;
3986
osd_stat_updated = false;
3988
dout(10) << "send_pg_stats - " << pg_stat_queue.size() << " pgs updated" << dendl;
3990
utime_t had_for(now);
3991
had_for -= had_map_since;
3993
MPGStats *m = new MPGStats(monc->get_fsid(), osdmap->get_epoch(), had_for);
3994
m->set_tid(++pg_stat_tid);
3995
m->osd_stat = cur_stat;
3997
xlist<PG*>::iterator p = pg_stat_queue.begin();
4001
if (!pg->is_primary()) { // we hold map_lock; role is stable.
4002
pg->stat_queue_item.remove_myself();
4003
pg->put("pg_stat_queue");
4006
pg->pg_stats_publish_lock.Lock();
4007
if (pg->pg_stats_publish_valid) {
4008
m->pg_stat[pg->info.pgid.pgid] = pg->pg_stats_publish;
4009
dout(25) << " sending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch << ":"
4010
<< pg->pg_stats_publish.reported_seq << dendl;
4012
dout(25) << " NOT sending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch << ":"
4013
<< pg->pg_stats_publish.reported_seq << ", not valid" << dendl;
4015
pg->pg_stats_publish_lock.Unlock();
4018
if (!outstanding_pg_stats) {
4019
outstanding_pg_stats = true;
4020
last_pg_stats_ack = ceph_clock_now(cct);
4022
monc->send_mon_message(m);
4025
pg_stat_queue_lock.Unlock();
4028
void OSD::handle_pg_stats_ack(MPGStatsAck *ack)
4030
dout(10) << "handle_pg_stats_ack " << dendl;
4032
if (!require_mon_peer(ack)) {
4037
last_pg_stats_ack = ceph_clock_now(cct);
4039
pg_stat_queue_lock.Lock();
4041
if (ack->get_tid() > pg_stat_tid_flushed) {
4042
pg_stat_tid_flushed = ack->get_tid();
4043
pg_stat_queue_cond.Signal();
4046
xlist<PG*>::iterator p = pg_stat_queue.begin();
4052
if (ack->pg_stat.count(pg->info.pgid.pgid)) {
4053
pair<version_t,epoch_t> acked = ack->pg_stat[pg->info.pgid.pgid];
4054
pg->pg_stats_publish_lock.Lock();
4055
if (acked.first == pg->pg_stats_publish.reported_seq &&
4056
acked.second == pg->pg_stats_publish.reported_epoch) {
4057
dout(25) << " ack on " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch
4058
<< ":" << pg->pg_stats_publish.reported_seq << dendl;
4059
pg->stat_queue_item.remove_myself();
4060
pg->put("pg_stat_queue");
4062
dout(25) << " still pending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch
4063
<< ":" << pg->pg_stats_publish.reported_seq << " > acked " << acked << dendl;
4065
pg->pg_stats_publish_lock.Unlock();
4067
dout(30) << " still pending " << pg->info.pgid << " " << pg->pg_stats_publish.reported_epoch
4068
<< ":" << pg->pg_stats_publish.reported_seq << dendl;
4072
if (!pg_stat_queue.size()) {
4073
outstanding_pg_stats = false;
4076
pg_stat_queue_lock.Unlock();
4081
void OSD::flush_pg_stats()
4083
dout(10) << "flush_pg_stats" << dendl;
4084
utime_t now = ceph_clock_now(cct);
4089
pg_stat_queue_lock.Lock();
4090
uint64_t tid = pg_stat_tid;
4091
dout(10) << "flush_pg_stats waiting for stats tid " << tid << " to flush" << dendl;
4092
while (tid > pg_stat_tid_flushed)
4093
pg_stat_queue_cond.Wait(pg_stat_queue_lock);
4094
dout(10) << "flush_pg_stats finished waiting for stats tid " << tid << " to flush" << dendl;
4095
pg_stat_queue_lock.Unlock();
4101
void OSD::handle_command(MMonCommand *m)
4103
if (!require_mon_peer(m))
4106
Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), NULL);
4107
command_wq.queue(c);
4111
void OSD::handle_command(MCommand *m)
4113
ConnectionRef con = m->get_connection();
4114
Session *session = static_cast<Session *>(con->get_priv());
4116
client_messenger->send_message(new MCommandReply(m, -EPERM), con);
4121
OSDCap& caps = session->caps;
4124
if (!caps.allow_all() || m->get_source().is_mon()) {
4125
client_messenger->send_message(new MCommandReply(m, -EPERM), con);
4130
Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), con.get());
4131
command_wq.queue(c);
4141
string availability;
4142
} osd_commands[] = {
4144
#define COMMAND(parsesig, helptext, module, perm, availability) \
4145
{parsesig, helptext, module, perm, availability},
4147
// yes, these are really pg commands, but there's a limit to how
4148
// much work it's worth. The OSD returns all of them. Make this
4149
// form (pg <pgid> <cmd>) valid only for the cli.
4150
// Rest uses "tell <pgid> <cmd>"
4153
"name=pgid,type=CephPgid " \
4154
"name=cmd,type=CephChoices,strings=query", \
4155
"show details of a specific pg", "osd", "r", "cli")
4157
"name=pgid,type=CephPgid " \
4158
"name=cmd,type=CephChoices,strings=mark_unfound_lost " \
4159
"name=mulcmd,type=CephChoices,strings=revert|delete", \
4160
"mark all unfound objects in this pg as lost, either removing or reverting to a prior version if one is available",
4163
"name=pgid,type=CephPgid " \
4164
"name=cmd,type=CephChoices,strings=list_missing " \
4165
"name=offset,type=CephString,req=false",
4166
"list missing objects on this pg, perhaps starting at an offset given in JSON",
4169
// new form: tell <pgid> <cmd> for both cli and rest
4172
"show details of a specific pg", "osd", "r", "cli,rest")
4173
COMMAND("mark_unfound_lost " \
4174
"name=mulcmd,type=CephChoices,strings=revert|delete", \
4175
"mark all unfound objects in this pg as lost, either removing or reverting to a prior version if one is available",
4176
"osd", "rw", "cli,rest")
4177
COMMAND("list_missing " \
4178
"name=offset,type=CephString,req=false",
4179
"list missing objects on this pg, perhaps starting at an offset given in JSON",
4180
"osd", "r", "cli,rest")
4182
// tell <osd.n> commands. Validation of osd.n must be special-cased in client
4183
COMMAND("version", "report version of OSD", "osd", "r", "cli,rest")
4184
COMMAND("injectargs " \
4185
"name=injected_args,type=CephString,n=N",
4186
"inject configuration arguments into running OSD",
4187
"osd", "rw", "cli,rest")
4189
"name=count,type=CephInt,req=false " \
4190
"name=size,type=CephInt,req=false ", \
4191
"OSD benchmark: write <count> <size>-byte objects, " \
4192
"(default 1G size 4MB). Results in log.",
4193
"osd", "rw", "cli,rest")
4194
COMMAND("flush_pg_stats", "flush pg stats", "osd", "rw", "cli,rest")
4196
"name=heapcmd,type=CephChoices,strings=dump|start_profiler|stop_profiler|release|stats", \
4197
"show heap usage info (available only if compiled with tcmalloc)", \
4198
"osd", "rw", "cli,rest")
4199
COMMAND("debug_dump_missing " \
4200
"name=filename,type=CephFilepath",
4201
"dump missing objects to a named file", "osd", "r", "cli,rest")
4202
COMMAND("debug kick_recovery_wq " \
4203
"name=delay,type=CephInt,range=0",
4204
"set osd_recovery_delay_start to <val>", "osd", "rw", "cli,rest")
4205
COMMAND("cpu_profiler " \
4206
"name=arg,type=CephChoices,strings=status|flush",
4207
"run cpu profiling on daemon", "osd", "rw", "cli,rest")
4208
COMMAND("dump_pg_recovery_stats", "dump pg recovery statistics",
4209
"osd", "r", "cli,rest")
4210
COMMAND("reset_pg_recovery_stats", "reset pg recovery statistics",
4211
"osd", "rw", "cli,rest")
4214
void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data)
4217
stringstream ss, ds;
4221
dout(20) << "do_command tid " << tid << " " << cmd << dendl;
4223
map<string, cmd_vartype> cmdmap;
4227
boost::scoped_ptr<Formatter> f;
4230
ss << "no command given";
4234
if (!cmdmap_from_json(cmd, &cmdmap, ss)) {
4239
cmd_getval(cct, cmdmap, "prefix", prefix);
4241
if (prefix == "get_command_descriptions") {
4243
JSONFormatter *f = new JSONFormatter();
4244
f->open_object_section("command_descriptions");
4245
for (OSDCommand *cp = osd_commands;
4246
cp < &osd_commands[ARRAY_SIZE(osd_commands)]; cp++) {
4248
ostringstream secname;
4249
secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
4250
dump_cmddesc_to_json(f, secname.str(), cp->cmdstring, cp->helpstring,
4251
cp->module, cp->perm, cp->availability);
4254
f->close_section(); // command_descriptions
4261
cmd_getval(cct, cmdmap, "format", format);
4262
f.reset(new_formatter(format));
4264
if (prefix == "version") {
4266
f->open_object_section("version");
4267
f->dump_string("version", pretty_version_to_str());
4271
ds << pretty_version_to_str();
4275
else if (prefix == "injectargs") {
4276
vector<string> argsvec;
4277
cmd_getval(cct, cmdmap, "injected_args", argsvec);
4279
if (argsvec.empty()) {
4281
ss << "ignoring empty injectargs";
4284
string args = argsvec.front();
4285
for (vector<string>::iterator a = ++argsvec.begin(); a != argsvec.end(); ++a)
4288
cct->_conf->injectargs(args, &ss);
4292
// either 'pg <pgid> <command>' or
4293
// 'tell <pgid>' (which comes in without any of that prefix)?
4295
else if (prefix == "pg" ||
4296
(cmd_getval(cct, cmdmap, "pgid", pgidstr) &&
4297
(prefix == "query" ||
4298
prefix == "mark_unfound_lost" ||
4299
prefix == "list_missing")
4303
if (!cmd_getval(cct, cmdmap, "pgid", pgidstr)) {
4304
ss << "no pgid specified";
4306
} else if (!pgid.parse(pgidstr.c_str())) {
4307
ss << "couldn't parse pgid '" << pgidstr << "'";
4311
if (osdmap->get_primary_shard(pgid, &pcand) &&
4313
PG *pg = _lookup_lock_pg(pcand);
4315
if (pg->is_primary()) {
4316
// simulate pg <pgid> cmd= for pg->do-command
4318
cmd_putval(cct, cmdmap, "cmd", prefix);
4319
r = pg->do_command(cmdmap, ss, data, odata);
4321
ss << "not primary for pgid " << pgid;
4323
// send them the latest diff to ensure they realize the mapping
4325
send_incremental_map(osdmap->get_epoch() - 1, con);
4327
// do not reply; they will get newer maps and realize they
4334
ss << "i don't have pgid " << pgid;
4340
else if (prefix == "bench") {
4343
// default count 1G, size 4MB
4344
cmd_getval(cct, cmdmap, "count", count, (int64_t)1 << 30);
4345
cmd_getval(cct, cmdmap, "size", bsize, (int64_t)4 << 20);
4347
uint32_t duration = g_conf->osd_bench_duration;
4349
if (bsize > (int64_t) g_conf->osd_bench_max_block_size) {
4350
// let us limit the block size because the next checks rely on it
4351
// having a sane value. If we allow any block size to be set things
4352
// can still go sideways.
4353
ss << "block 'size' values are capped at "
4354
<< prettybyte_t(g_conf->osd_bench_max_block_size) << ". If you wish to use"
4355
<< " a higher value, please adjust 'osd_bench_max_block_size'";
4358
} else if (bsize < (int64_t) (1 << 20)) {
4359
// entering the realm of small block sizes.
4360
// limit the count to a sane value, assuming a configurable amount of
4361
// IOPS and duration, so that the OSD doesn't get hung up on this,
4362
// preventing timeouts from going off
4364
bsize * duration * g_conf->osd_bench_small_size_max_iops;
4365
if (count > max_count) {
4366
ss << "'count' values greater than " << max_count
4367
<< " for a block size of " << prettybyte_t(bsize) << ", assuming "
4368
<< g_conf->osd_bench_small_size_max_iops << " IOPS,"
4369
<< " for " << duration << " seconds,"
4370
<< " can cause ill effects on osd. "
4371
<< " Please adjust 'osd_bench_small_size_max_iops' with a higher"
4372
<< " value if you wish to use a higher 'count'.";
4377
// 1MB block sizes are big enough so that we get more stuff done.
4378
// However, to avoid the osd from getting hung on this and having
4379
// timers being triggered, we are going to limit the count assuming
4380
// a configurable throughput and duration.
4381
int64_t total_throughput =
4382
g_conf->osd_bench_large_size_max_throughput * duration;
4383
int64_t max_count = (int64_t) (total_throughput / bsize);
4384
if (count > max_count) {
4385
ss << "'count' values greater than " << max_count
4386
<< " for a block size of " << prettybyte_t(bsize) << ", assuming "
4387
<< prettybyte_t(g_conf->osd_bench_large_size_max_throughput) << "/s,"
4388
<< " for " << duration << " seconds,"
4389
<< " can cause ill effects on osd. "
4390
<< " Please adjust 'osd_bench_large_size_max_throughput'"
4391
<< " with a higher value if you wish to use a higher 'count'.";
4397
dout(1) << " bench count " << count
4398
<< " bsize " << prettybyte_t(bsize) << dendl;
4401
bufferptr bp(bsize);
4405
ObjectStore::Transaction *cleanupt = new ObjectStore::Transaction;
4407
store->sync_and_flush();
4408
utime_t start = ceph_clock_now(cct);
4409
for (int64_t pos = 0; pos < count; pos += bsize) {
4411
snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
4413
hobject_t soid(sobject_t(oid, 0));
4414
ObjectStore::Transaction *t = new ObjectStore::Transaction;
4415
t->write(coll_t::META_COLL, soid, 0, bsize, bl);
4416
store->queue_transaction_and_cleanup(NULL, t);
4417
cleanupt->remove(coll_t::META_COLL, soid);
4419
store->sync_and_flush();
4420
utime_t end = ceph_clock_now(cct);
4423
store->queue_transaction_and_cleanup(NULL, cleanupt);
4425
uint64_t rate = (double)count / (end - start);
4427
f->open_object_section("osd_bench_results");
4428
f->dump_int("bytes_written", count);
4429
f->dump_int("blocksize", bsize);
4430
f->dump_float("bytes_per_sec", rate);
4434
ss << "bench: wrote " << prettybyte_t(count)
4435
<< " in blocks of " << prettybyte_t(bsize) << " in "
4436
<< (end-start) << " sec at " << prettybyte_t(rate) << "/sec";
4440
else if (prefix == "flush_pg_stats") {
4444
else if (prefix == "heap") {
4445
if (!ceph_using_tcmalloc()) {
4447
ss << "could not issue heap profiler command -- not using tcmalloc!";
4450
cmd_getval(cct, cmdmap, "heapcmd", heapcmd);
4451
// XXX 1-element vector, change at callee or make vector here?
4452
vector<string> heapcmd_vec;
4453
get_str_vec(heapcmd, heapcmd_vec);
4454
ceph_heap_profiler_handle_command(heapcmd_vec, ds);
4458
else if (prefix == "debug dump_missing") {
4460
cmd_getval(cct, cmdmap, "filename", file_name);
4461
std::ofstream fout(file_name.c_str());
4462
if (!fout.is_open()) {
4463
ss << "failed to open file '" << file_name << "'";
4468
std::set <spg_t> keys;
4469
for (ceph::unordered_map<spg_t, PG*>::const_iterator pg_map_e = pg_map.begin();
4470
pg_map_e != pg_map.end(); ++pg_map_e) {
4471
keys.insert(pg_map_e->first);
4474
fout << "*** osd " << whoami << ": dump_missing ***" << std::endl;
4475
for (std::set <spg_t>::iterator p = keys.begin();
4476
p != keys.end(); ++p) {
4477
ceph::unordered_map<spg_t, PG*>::iterator q = pg_map.find(*p);
4478
assert(q != pg_map.end());
4482
fout << *pg << std::endl;
4483
std::map<hobject_t, pg_missing_t::item>::const_iterator mend =
4484
pg->pg_log.get_missing().missing.end();
4485
std::map<hobject_t, pg_missing_t::item>::const_iterator mi =
4486
pg->pg_log.get_missing().missing.begin();
4487
for (; mi != mend; ++mi) {
4488
fout << mi->first << " -> " << mi->second << std::endl;
4489
if (!pg->missing_loc.needs_recovery(mi->first))
4491
if (pg->missing_loc.is_unfound(mi->first))
4492
fout << " unfound ";
4493
const set<pg_shard_t> &mls(pg->missing_loc.get_locations(mi->first));
4496
fout << "missing_loc: " << mls << std::endl;
4504
else if (prefix == "debug kick_recovery_wq") {
4506
cmd_getval(cct, cmdmap, "delay", delay);
4509
r = cct->_conf->set_val("osd_recovery_delay_start", oss.str().c_str());
4511
ss << "kick_recovery_wq: error setting "
4512
<< "osd_recovery_delay_start to '" << delay << "': error "
4516
cct->_conf->apply_changes(NULL);
4517
ss << "kicking recovery queue. set osd_recovery_delay_start "
4518
<< "to " << cct->_conf->osd_recovery_delay_start;
4519
defer_recovery_until = ceph_clock_now(cct);
4520
defer_recovery_until += cct->_conf->osd_recovery_delay_start;
4524
else if (prefix == "cpu_profiler") {
4526
cmd_getval(cct, cmdmap, "arg", arg);
4527
vector<string> argvec;
4528
get_str_vec(arg, argvec);
4529
cpu_profiler_handle_command(argvec, ds);
4532
else if (prefix == "dump_pg_recovery_stats") {
4535
pg_recovery_stats.dump_formatted(f.get());
4538
pg_recovery_stats.dump(s);
4539
ds << "dump pg recovery stats: " << s.str();
4543
else if (prefix == "reset_pg_recovery_stats") {
4544
ss << "reset pg recovery stats";
4545
pg_recovery_stats.reset();
4549
ss << "unrecognized command! " << cmd;
4556
dout(0) << "do_command r=" << r << " " << rs << dendl;
4557
clog.info() << rs << "\n";
4559
MCommandReply *reply = new MCommandReply(r, rs);
4560
reply->set_tid(tid);
4561
reply->set_data(odata);
4562
client_messenger->send_message(reply, con);
4569
// --------------------------------------
4572
epoch_t OSD::get_peer_epoch(int peer)
4574
Mutex::Locker l(peer_map_epoch_lock);
4575
map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
4576
if (p == peer_map_epoch.end())
4581
epoch_t OSD::note_peer_epoch(int peer, epoch_t e)
4583
Mutex::Locker l(peer_map_epoch_lock);
4584
map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
4585
if (p != peer_map_epoch.end()) {
4586
if (p->second < e) {
4587
dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl;
4590
dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl;
4594
dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl;
4595
peer_map_epoch[peer] = e;
4600
void OSD::forget_peer_epoch(int peer, epoch_t as_of)
4602
Mutex::Locker l(peer_map_epoch_lock);
4603
map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
4604
if (p != peer_map_epoch.end()) {
4605
if (p->second <= as_of) {
4606
dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
4607
<< " had " << p->second << dendl;
4608
peer_map_epoch.erase(p);
4610
dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
4611
<< " has " << p->second << " - not forgetting" << dendl;
4617
bool OSD::_share_map_incoming(entity_name_t name, Connection *con, epoch_t epoch, Session* session)
4619
bool shared = false;
4620
dout(20) << "_share_map_incoming " << name << " " << con->get_peer_addr() << " " << epoch << dendl;
4621
//assert(osd_lock.is_locked());
4623
assert(is_active());
4625
// does client have old map?
4626
if (name.is_client()) {
4627
bool sendmap = epoch < osdmap->get_epoch();
4628
if (sendmap && session) {
4629
if (session->last_sent_epoch < osdmap->get_epoch()) {
4630
session->last_sent_epoch = osdmap->get_epoch();
4632
sendmap = false; //we don't need to send it out again
4633
dout(15) << name << " already sent incremental to update from epoch "<< epoch << dendl;
4637
dout(10) << name << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl;
4638
send_incremental_map(epoch, con);
4643
// does peer have old map?
4644
if (con->get_messenger() == cluster_messenger &&
4645
osdmap->is_up(name.num()) &&
4646
(osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
4647
osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
4649
epoch_t has = note_peer_epoch(name.num(), epoch);
4652
if (has < osdmap->get_epoch()) {
4653
dout(10) << name << " " << con->get_peer_addr() << " has old map " << epoch << " < " << osdmap->get_epoch() << dendl;
4654
note_peer_epoch(name.num(), osdmap->get_epoch());
4655
send_incremental_map(epoch, con);
4666
void OSD::_share_map_outgoing(int peer, Connection *con, OSDMapRef map)
4669
map = service.get_osdmap();
4672
epoch_t pe = get_peer_epoch(peer);
4674
if (pe < map->get_epoch()) {
4675
send_incremental_map(pe, con);
4676
note_peer_epoch(peer, map->get_epoch());
4678
dout(20) << "_share_map_outgoing " << con << " already has epoch " << pe << dendl;
4680
dout(20) << "_share_map_outgoing " << con << " don't know epoch, doing nothing" << dendl;
4681
// no idea about peer's epoch.
4682
// ??? send recent ???
4688
bool OSD::heartbeat_dispatch(Message *m)
4690
dout(30) << "heartbeat_dispatch " << m << dendl;
4691
switch (m->get_type()) {
4694
dout(10) << "ping from " << m->get_source_inst() << dendl;
4699
handle_osd_ping(static_cast<MOSDPing*>(m));
4702
case CEPH_MSG_OSD_MAP:
4704
ConnectionRef self = cluster_messenger->get_loopback_connection();
4705
cluster_messenger->send_message(m, self);
4710
dout(0) << "dropping unexpected message " << *m << " from " << m->get_source_inst() << dendl;
4717
bool OSDService::ObjecterDispatcher::ms_dispatch(Message *m)
4719
Mutex::Locker l(osd->objecter_lock);
4720
osd->objecter->dispatch(m);
4724
bool OSDService::ObjecterDispatcher::ms_handle_reset(Connection *con)
4726
Mutex::Locker l(osd->objecter_lock);
4727
osd->objecter->ms_handle_reset(con);
4731
void OSDService::ObjecterDispatcher::ms_handle_connect(Connection *con)
4733
Mutex::Locker l(osd->objecter_lock);
4734
return osd->objecter->ms_handle_connect(con);
4737
bool OSDService::ObjecterDispatcher::ms_get_authorizer(int dest_type,
4738
AuthAuthorizer **authorizer,
4741
if (dest_type == CEPH_ENTITY_TYPE_MON)
4743
*authorizer = osd->monc->auth->build_authorizer(dest_type);
4744
return *authorizer != NULL;
4748
bool OSD::ms_dispatch(Message *m)
4750
if (m->get_type() == MSG_OSD_MARK_ME_DOWN) {
4751
service.got_stop_ack();
4759
if (is_stopping()) {
4765
while (dispatch_running) {
4766
dout(10) << "ms_dispatch waiting for other dispatch thread to complete" << dendl;
4767
dispatch_cond.Wait(osd_lock);
4769
dispatch_running = true;
4775
dispatch_running = false;
4776
dispatch_cond.Signal();
4783
bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
4785
dout(10) << "OSD::ms_get_authorizer type=" << ceph_entity_type_name(dest_type) << dendl;
4787
if (dest_type == CEPH_ENTITY_TYPE_MON)
4791
/* the MonClient checks keys every tick(), so we should just wait for that cycle
4793
if (monc->wait_auth_rotating(10) < 0)
4797
*authorizer = monc->auth->build_authorizer(dest_type);
4798
return *authorizer != NULL;
4802
bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
4803
int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
4804
bool& isvalid, CryptoKey& session_key)
4806
AuthAuthorizeHandler *authorize_handler = 0;
4807
switch (peer_type) {
4808
case CEPH_ENTITY_TYPE_MDS:
4810
* note: mds is technically a client from our perspective, but
4811
* this makes the 'cluster' consistent w/ monitor's usage.
4813
case CEPH_ENTITY_TYPE_OSD:
4814
authorize_handler = authorize_handler_cluster_registry->get_handler(protocol);
4817
authorize_handler = authorize_handler_service_registry->get_handler(protocol);
4819
if (!authorize_handler) {
4820
dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
4825
AuthCapsInfo caps_info;
4828
uint64_t auid = CEPH_AUTH_UID_DEFAULT;
4830
isvalid = authorize_handler->verify_authorizer(cct, monc->rotating_secrets,
4831
authorizer_data, authorizer_reply, name, global_id, caps_info, session_key, &auid);
4834
Session *s = static_cast<Session *>(con->get_priv());
4837
con->set_priv(s->get());
4839
dout(10) << " new session " << s << " con=" << s->con << " addr=" << s->con->get_peer_addr() << dendl;
4842
s->entity_name = name;
4843
if (caps_info.allow_all)
4844
s->caps.set_allow_all();
4847
if (caps_info.caps.length() > 0) {
4848
bufferlist::iterator p = caps_info.caps.begin();
4853
catch (buffer::error& e) {
4855
bool success = s->caps.parse(str);
4857
dout(10) << " session " << s << " " << s->entity_name << " has caps " << s->caps << " '" << str << "'" << dendl;
4859
dout(10) << " session " << s << " " << s->entity_name << " failed to parse caps '" << str << "'" << dendl;
4868
void OSD::do_waiters()
4870
assert(osd_lock.is_locked());
4872
dout(10) << "do_waiters -- start" << dendl;
4873
finished_lock.Lock();
4874
while (!finished.empty()) {
4875
OpRequestRef next = finished.front();
4876
finished.pop_front();
4877
finished_lock.Unlock();
4879
finished_lock.Lock();
4881
finished_lock.Unlock();
4882
dout(10) << "do_waiters -- finish" << dendl;
4885
void OSD::dispatch_op(OpRequestRef op)
4887
switch (op->get_req()->get_type()) {
4889
case MSG_OSD_PG_CREATE:
4890
handle_pg_create(op);
4893
case MSG_OSD_PG_NOTIFY:
4894
handle_pg_notify(op);
4896
case MSG_OSD_PG_QUERY:
4897
handle_pg_query(op);
4899
case MSG_OSD_PG_LOG:
4902
case MSG_OSD_PG_REMOVE:
4903
handle_pg_remove(op);
4905
case MSG_OSD_PG_INFO:
4908
case MSG_OSD_PG_TRIM:
4911
case MSG_OSD_PG_MISSING:
4913
"received MOSDPGMissing; this message is supposed to be unused!?!");
4915
case MSG_OSD_PG_SCAN:
4918
case MSG_OSD_PG_BACKFILL:
4919
handle_pg_backfill(op);
4922
case MSG_OSD_BACKFILL_RESERVE:
4923
handle_pg_backfill_reserve(op);
4925
case MSG_OSD_RECOVERY_RESERVE:
4926
handle_pg_recovery_reserve(op);
4930
case CEPH_MSG_OSD_OP:
4934
// for replication etc.
4936
handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op);
4938
case MSG_OSD_SUBOPREPLY:
4939
handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op);
4941
case MSG_OSD_PG_PUSH:
4942
handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
4944
case MSG_OSD_PG_PULL:
4945
handle_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op);
4947
case MSG_OSD_PG_PUSH_REPLY:
4948
handle_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
4950
case MSG_OSD_EC_WRITE:
4951
handle_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
4953
case MSG_OSD_EC_WRITE_REPLY:
4954
handle_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
4956
case MSG_OSD_EC_READ:
4957
handle_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
4959
case MSG_OSD_EC_READ_REPLY:
4960
handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
4965
void OSD::_dispatch(Message *m)
4967
assert(osd_lock.is_locked());
4968
dout(20) << "_dispatch " << m << " " << *m << dendl;
4969
Session *session = NULL;
4971
logger->set(l_osd_buf, buffer::get_total_alloc());
4973
switch (m->get_type()) {
4975
// -- don't need lock --
4977
dout(10) << "ping from " << m->get_source() << dendl;
4981
// -- don't need OSDMap --
4983
// map and replication
4984
case CEPH_MSG_OSD_MAP:
4985
handle_osd_map(static_cast<MOSDMap*>(m));
4989
case CEPH_MSG_SHUTDOWN:
4990
session = static_cast<Session *>(m->get_connection()->get_priv());
4992
session->entity_name.is_mon() ||
4993
session->entity_name.is_osd())
4995
else dout(0) << "shutdown message from connection with insufficient privs!"
4996
<< m->get_connection() << dendl;
5002
case MSG_PGSTATSACK:
5003
handle_pg_stats_ack(static_cast<MPGStatsAck*>(m));
5006
case MSG_MON_COMMAND:
5007
handle_command(static_cast<MMonCommand*>(m));
5010
handle_command(static_cast<MCommand*>(m));
5014
handle_scrub(static_cast<MOSDScrub*>(m));
5017
case MSG_OSD_REP_SCRUB:
5018
handle_rep_scrub(static_cast<MOSDRepScrub*>(m));
5021
// -- need OSDMap --
5025
OpRequestRef op = op_tracker.create_request<OpRequest>(m);
5026
op->mark_event("waiting_for_osdmap");
5027
// no map? starting up?
5029
dout(7) << "no OSDMap, not booted" << dendl;
5030
waiting_for_osdmap.push_back(op);
5039
logger->set(l_osd_buf, buffer::get_total_alloc());
5043
void OSD::handle_rep_scrub(MOSDRepScrub *m)
5045
dout(10) << "queueing MOSDRepScrub " << *m << dendl;
5046
rep_scrub_wq.queue(m);
5049
void OSD::handle_scrub(MOSDScrub *m)
5051
dout(10) << "handle_scrub " << *m << dendl;
5052
if (!require_mon_peer(m))
5054
if (m->fsid != monc->get_fsid()) {
5055
dout(0) << "handle_scrub fsid " << m->fsid << " != " << monc->get_fsid() << dendl;
5060
if (m->scrub_pgs.empty()) {
5061
for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
5066
if (pg->is_primary()) {
5067
pg->unreg_next_scrub();
5068
pg->scrubber.must_scrub = true;
5069
pg->scrubber.must_deep_scrub = m->deep || m->repair;
5070
pg->scrubber.must_repair = m->repair;
5071
pg->reg_next_scrub();
5072
dout(10) << "marking " << *pg << " for scrub" << dendl;
5077
for (vector<pg_t>::iterator p = m->scrub_pgs.begin();
5078
p != m->scrub_pgs.end();
5081
if (osdmap->get_primary_shard(*p, &pcand) &&
5082
pg_map.count(pcand)) {
5083
PG *pg = pg_map[pcand];
5085
if (pg->is_primary()) {
5086
pg->unreg_next_scrub();
5087
pg->scrubber.must_scrub = true;
5088
pg->scrubber.must_deep_scrub = m->deep || m->repair;
5089
pg->scrubber.must_repair = m->repair;
5090
pg->reg_next_scrub();
5091
dout(10) << "marking " << *pg << " for scrub" << dendl;
5101
bool OSD::scrub_random_backoff()
5103
bool coin_flip = (rand() % 3) == whoami % 3;
5105
dout(20) << "scrub_random_backoff lost coin flip, randomly backing off" << dendl;
5111
bool OSD::scrub_should_schedule()
5114
if (getloadavg(loadavgs, 1) != 1) {
5115
dout(10) << "scrub_should_schedule couldn't read loadavgs\n" << dendl;
5119
if (loadavgs[0] >= cct->_conf->osd_scrub_load_threshold) {
5120
dout(20) << "scrub_should_schedule loadavg " << loadavgs[0]
5121
<< " >= max " << cct->_conf->osd_scrub_load_threshold
5122
<< " = no, load too high" << dendl;
5126
dout(20) << "scrub_should_schedule loadavg " << loadavgs[0]
5127
<< " < max " << cct->_conf->osd_scrub_load_threshold
5128
<< " = yes" << dendl;
5129
return loadavgs[0] < cct->_conf->osd_scrub_load_threshold;
5132
void OSD::sched_scrub()
5134
assert(osd_lock.is_locked());
5136
bool load_is_low = scrub_should_schedule();
5138
dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl;
5140
utime_t now = ceph_clock_now(cct);
5142
//dout(20) << " " << last_scrub_pg << dendl;
5144
pair<utime_t, spg_t> pos;
5145
if (service.first_scrub_stamp(&pos)) {
5147
utime_t t = pos.first;
5148
spg_t pgid = pos.second;
5149
dout(30) << "sched_scrub examine " << pgid << " at " << t << dendl;
5151
utime_t diff = now - t;
5152
if ((double)diff < cct->_conf->osd_scrub_min_interval) {
5153
dout(10) << "sched_scrub " << pgid << " at " << t
5154
<< ": " << (double)diff << " < min (" << cct->_conf->osd_scrub_min_interval << " seconds)" << dendl;
5157
if ((double)diff < cct->_conf->osd_scrub_max_interval && !load_is_low) {
5158
// save ourselves some effort
5159
dout(10) << "sched_scrub " << pgid << " high load at " << t
5160
<< ": " << (double)diff << " < max (" << cct->_conf->osd_scrub_max_interval << " seconds)" << dendl;
5164
PG *pg = _lookup_lock_pg(pgid);
5166
if (pg->get_pgbackend()->scrub_supported() && pg->is_active() &&
5168
(double)diff >= cct->_conf->osd_scrub_max_interval ||
5169
pg->scrubber.must_scrub)) {
5170
dout(10) << "sched_scrub scrubbing " << pgid << " at " << t
5171
<< (pg->scrubber.must_scrub ? ", explicitly requested" :
5172
( (double)diff >= cct->_conf->osd_scrub_max_interval ? ", diff >= max" : ""))
5174
if (pg->sched_scrub()) {
5181
} while (service.next_scrub_stamp(pos, &pos));
5183
dout(20) << "sched_scrub done" << dendl;
5186
bool OSDService::inc_scrubs_pending()
5188
bool result = false;
5190
sched_scrub_lock.Lock();
5191
if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
5192
dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1)
5193
<< " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
5197
dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
5199
sched_scrub_lock.Unlock();
5204
void OSDService::dec_scrubs_pending()
5206
sched_scrub_lock.Lock();
5207
dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
5208
<< " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
5210
assert(scrubs_pending >= 0);
5211
sched_scrub_lock.Unlock();
5214
void OSDService::inc_scrubs_active(bool reserved)
5216
sched_scrub_lock.Lock();
5220
dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
5221
<< " (max " << cct->_conf->osd_max_scrubs
5222
<< ", pending " << (scrubs_pending+1) << " -> " << scrubs_pending << ")" << dendl;
5223
assert(scrubs_pending >= 0);
5225
dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
5226
<< " (max " << cct->_conf->osd_max_scrubs
5227
<< ", pending " << scrubs_pending << ")" << dendl;
5229
sched_scrub_lock.Unlock();
5232
void OSDService::dec_scrubs_active()
5234
sched_scrub_lock.Lock();
5235
dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
5236
<< " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
5238
sched_scrub_lock.Unlock();
5241
bool OSDService::prepare_to_stop()
5243
Mutex::Locker l(is_stopping_lock);
5244
if (state != NOT_STOPPING)
5247
OSDMapRef osdmap = get_osdmap();
5248
if (osdmap && osdmap->is_up(whoami)) {
5249
dout(0) << __func__ << " telling mon we are shutting down" << dendl;
5250
state = PREPARING_TO_STOP;
5251
monc->send_mon_message(new MOSDMarkMeDown(monc->get_fsid(),
5252
osdmap->get_inst(whoami),
5253
osdmap->get_epoch(),
5256
utime_t now = ceph_clock_now(cct);
5258
timeout.set_from_double(now + cct->_conf->osd_mon_shutdown_timeout);
5259
while ((ceph_clock_now(cct) < timeout) &&
5260
(state != STOPPING)) {
5261
is_stopping_cond.WaitUntil(is_stopping_lock, timeout);
5264
dout(0) << __func__ << " starting shutdown" << dendl;
5269
void OSDService::got_stop_ack()
5271
Mutex::Locker l(is_stopping_lock);
5272
dout(0) << __func__ << " starting shutdown" << dendl;
5274
is_stopping_cond.Signal();
5278
// =====================================================
5281
void OSD::wait_for_new_map(OpRequestRef op)
5284
if (waiting_for_osdmap.empty()) {
5285
osdmap_subscribe(osdmap->get_epoch() + 1, true);
5288
logger->inc(l_osd_waiting_for_map);
5289
waiting_for_osdmap.push_back(op);
5290
op->mark_delayed("wait for new map");
5295
* assimilate new OSDMap(s). scan pgs, etc.
5298
void OSD::note_down_osd(int peer)
5300
assert(osd_lock.is_locked());
5301
cluster_messenger->mark_down(osdmap->get_cluster_addr(peer));
5303
heartbeat_lock.Lock();
5304
failure_queue.erase(peer);
5305
failure_pending.erase(peer);
5306
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(peer);
5307
if (p != heartbeat_peers.end()) {
5308
hbclient_messenger->mark_down(p->second.con_back);
5309
if (p->second.con_front) {
5310
hbclient_messenger->mark_down(p->second.con_front);
5312
heartbeat_peers.erase(p);
5314
heartbeat_lock.Unlock();
5317
void OSD::note_up_osd(int peer)
5319
forget_peer_epoch(peer, osdmap->get_epoch() - 1);
5322
struct C_OnMapApply : public Context {
5323
OSDService *service;
5324
boost::scoped_ptr<ObjectStore::Transaction> t;
5325
list<OSDMapRef> pinned_maps;
5327
C_OnMapApply(OSDService *service,
5328
ObjectStore::Transaction *t,
5329
const list<OSDMapRef> &pinned_maps,
5331
: service(service), t(t), pinned_maps(pinned_maps), e(e) {}
5332
void finish(int r) {
5333
service->clear_map_bl_cache_pins(e);
5337
void OSD::osdmap_subscribe(version_t epoch, bool force_request)
5339
OSDMapRef osdmap = service.get_osdmap();
5340
if (osdmap->get_epoch() >= epoch)
5343
if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
5349
void OSD::handle_osd_map(MOSDMap *m)
5351
assert(osd_lock.is_locked());
5352
list<OSDMapRef> pinned_maps;
5353
if (m->fsid != monc->get_fsid()) {
5354
dout(0) << "handle_osd_map fsid " << m->fsid << " != " << monc->get_fsid() << dendl;
5358
if (is_initializing()) {
5359
dout(0) << "ignoring osdmap until we have initialized" << dendl;
5364
Session *session = static_cast<Session *>(m->get_connection()->get_priv());
5365
if (session && !(session->entity_name.is_mon() || session->entity_name.is_osd())) {
5374
// share with the objecter
5376
Mutex::Locker l(service.objecter_lock);
5378
service.objecter->handle_osd_map(m);
5381
epoch_t first = m->get_first();
5382
epoch_t last = m->get_last();
5383
dout(3) << "handle_osd_map epochs [" << first << "," << last << "], i have "
5384
<< osdmap->get_epoch()
5385
<< ", src has [" << m->oldest_map << "," << m->newest_map << "]"
5388
logger->inc(l_osd_map);
5389
logger->inc(l_osd_mape, last - first + 1);
5390
if (first <= osdmap->get_epoch())
5391
logger->inc(l_osd_mape_dup, osdmap->get_epoch() - first + 1);
5393
// make sure there is something new, here, before we bother flushing the queues and such
5394
if (last <= osdmap->get_epoch()) {
5395
dout(10) << " no new maps here, dropping" << dendl;
5400
// even if this map isn't from a mon, we may have satisfied our subscription
5401
monc->sub_got("osdmap", last);
5404
bool skip_maps = false;
5405
if (first > osdmap->get_epoch() + 1) {
5406
dout(10) << "handle_osd_map message skips epochs " << osdmap->get_epoch() + 1
5407
<< ".." << (first-1) << dendl;
5408
if (m->oldest_map <= osdmap->get_epoch() + 1) {
5409
osdmap_subscribe(osdmap->get_epoch()+1, true);
5413
// always try to get the full range of maps--as many as we can. this
5414
// 1- is good to have
5415
// 2- is at present the only way to ensure that we get a *full* map as
5417
if (m->oldest_map < first) {
5418
osdmap_subscribe(m->oldest_map - 1, true);
5425
ObjectStore::Transaction *_t = new ObjectStore::Transaction;
5426
ObjectStore::Transaction &t = *_t;
5428
// store new maps: queue for disk and put in the osdmap cache
5429
epoch_t last_marked_full = 0;
5430
epoch_t start = MAX(osdmap->get_epoch() + 1, first);
5431
for (epoch_t e = start; e <= last; e++) {
5432
map<epoch_t,bufferlist>::iterator p;
5433
p = m->maps.find(e);
5434
if (p != m->maps.end()) {
5435
dout(10) << "handle_osd_map got full map for epoch " << e << dendl;
5436
OSDMap *o = new OSDMap;
5437
bufferlist& bl = p->second;
5440
if (o->test_flag(CEPH_OSDMAP_FULL))
5441
last_marked_full = e;
5442
pinned_maps.push_back(add_map(o));
5444
hobject_t fulloid = get_osdmap_pobject_name(e);
5445
t.write(coll_t::META_COLL, fulloid, 0, bl.length(), bl);
5450
p = m->incremental_maps.find(e);
5451
if (p != m->incremental_maps.end()) {
5452
dout(10) << "handle_osd_map got inc map for epoch " << e << dendl;
5453
bufferlist& bl = p->second;
5454
hobject_t oid = get_inc_osdmap_pobject_name(e);
5455
t.write(coll_t::META_COLL, oid, 0, bl.length(), bl);
5456
pin_map_inc_bl(e, bl);
5458
OSDMap *o = new OSDMap;
5461
OSDMapRef prev = get_map(e - 1);
5466
OSDMap::Incremental inc;
5467
bufferlist::iterator p = bl.begin();
5469
if (o->apply_incremental(inc) < 0) {
5470
derr << "ERROR: bad fsid? i have " << osdmap->get_fsid() << " and inc has " << inc.fsid << dendl;
5471
assert(0 == "bad fsid");
5474
if (o->test_flag(CEPH_OSDMAP_FULL))
5475
last_marked_full = e;
5476
pinned_maps.push_back(add_map(o));
5481
hobject_t fulloid = get_osdmap_pobject_name(e);
5482
t.write(coll_t::META_COLL, fulloid, 0, fbl.length(), fbl);
5487
assert(0 == "MOSDMap lied about what maps it had?");
5490
if (superblock.oldest_map) {
5494
service.map_cache.cached_key_lower_bound()));
5495
for (epoch_t e = superblock.oldest_map; e < min; ++e) {
5496
dout(20) << " removing old osdmap epoch " << e << dendl;
5497
t.remove(coll_t::META_COLL, get_osdmap_pobject_name(e));
5498
t.remove(coll_t::META_COLL, get_inc_osdmap_pobject_name(e));
5499
superblock.oldest_map = e+1;
5501
if (num >= cct->_conf->osd_target_transaction_size &&
5502
(uint64_t)num > (last - first)) // make sure we at least keep pace with incoming maps
5507
if (!superblock.oldest_map || skip_maps)
5508
superblock.oldest_map = first;
5509
superblock.newest_map = last;
5511
if (last_marked_full > superblock.last_map_marked_full)
5512
superblock.last_map_marked_full = last_marked_full;
5514
map_lock.get_write();
5516
C_Contexts *fin = new C_Contexts(cct);
5518
// advance through the new maps
5519
for (epoch_t cur = start; cur <= superblock.newest_map; cur++) {
5520
dout(10) << " advance to epoch " << cur << " (<= newest " << superblock.newest_map << ")" << dendl;
5522
OSDMapRef newmap = get_map(cur);
5523
assert(newmap); // we just cached it above!
5525
// start blacklisting messages sent to peers that go down.
5526
service.pre_publish_map(newmap);
5528
// kill connections to newly down osds
5530
osdmap->get_all_osds(old);
5531
for (set<int>::iterator p = old.begin(); p != old.end(); ++p) {
5533
osdmap->have_inst(*p) && // in old map
5534
(!newmap->exists(*p) || !newmap->is_up(*p))) { // but not the new one
5541
superblock.current_epoch = cur;
5542
advance_map(t, fin);
5543
had_map_since = ceph_clock_now(cct);
5546
if (osdmap->is_up(whoami) &&
5547
osdmap->get_addr(whoami) == client_messenger->get_myaddr() &&
5548
bind_epoch < osdmap->get_up_from(whoami)) {
5551
dout(1) << "state: booting -> active" << dendl;
5552
state = STATE_ACTIVE;
5554
// set incarnation so that osd_reqid_t's we generate for our
5555
// objecter requests are unique across restarts.
5556
service.objecter->set_client_incarnation(osdmap->get_epoch());
5560
bool do_shutdown = false;
5561
bool do_restart = false;
5562
if (osdmap->get_epoch() > 0 &&
5563
state == STATE_ACTIVE) {
5564
if (!osdmap->exists(whoami)) {
5565
dout(0) << "map says i do not exist. shutting down." << dendl;
5566
do_shutdown = true; // don't call shutdown() while we have everything paused
5567
} else if (!osdmap->is_up(whoami) ||
5568
!osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()) ||
5569
!osdmap->get_cluster_addr(whoami).probably_equals(cluster_messenger->get_myaddr()) ||
5570
!osdmap->get_hb_back_addr(whoami).probably_equals(hb_back_server_messenger->get_myaddr()) ||
5571
(osdmap->get_hb_front_addr(whoami) != entity_addr_t() &&
5572
!osdmap->get_hb_front_addr(whoami).probably_equals(hb_front_server_messenger->get_myaddr()))) {
5573
if (!osdmap->is_up(whoami)) {
5574
if (service.is_preparing_to_stop()) {
5575
service.got_stop_ack();
5577
clog.warn() << "map e" << osdmap->get_epoch()
5578
<< " wrongly marked me down";
5581
else if (!osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()))
5582
clog.error() << "map e" << osdmap->get_epoch()
5583
<< " had wrong client addr (" << osdmap->get_addr(whoami)
5584
<< " != my " << client_messenger->get_myaddr() << ")";
5585
else if (!osdmap->get_cluster_addr(whoami).probably_equals(cluster_messenger->get_myaddr()))
5586
clog.error() << "map e" << osdmap->get_epoch()
5587
<< " had wrong cluster addr (" << osdmap->get_cluster_addr(whoami)
5588
<< " != my " << cluster_messenger->get_myaddr() << ")";
5589
else if (!osdmap->get_hb_back_addr(whoami).probably_equals(hb_back_server_messenger->get_myaddr()))
5590
clog.error() << "map e" << osdmap->get_epoch()
5591
<< " had wrong hb back addr (" << osdmap->get_hb_back_addr(whoami)
5592
<< " != my " << hb_back_server_messenger->get_myaddr() << ")";
5593
else if (osdmap->get_hb_front_addr(whoami) != entity_addr_t() &&
5594
!osdmap->get_hb_front_addr(whoami).probably_equals(hb_front_server_messenger->get_myaddr()))
5595
clog.error() << "map e" << osdmap->get_epoch()
5596
<< " had wrong hb front addr (" << osdmap->get_hb_front_addr(whoami)
5597
<< " != my " << hb_front_server_messenger->get_myaddr() << ")";
5599
if (!service.is_stopping()) {
5602
bind_epoch = osdmap->get_epoch();
5604
start_waiting_for_healthy();
5606
set<int> avoid_ports;
5607
avoid_ports.insert(cluster_messenger->get_myaddr().get_port());
5608
avoid_ports.insert(hb_back_server_messenger->get_myaddr().get_port());
5609
avoid_ports.insert(hb_front_server_messenger->get_myaddr().get_port());
5611
int r = cluster_messenger->rebind(avoid_ports);
5613
do_shutdown = true; // FIXME: do_restart?
5615
r = hb_back_server_messenger->rebind(avoid_ports);
5617
do_shutdown = true; // FIXME: do_restart?
5619
r = hb_front_server_messenger->rebind(avoid_ports);
5621
do_shutdown = true; // FIXME: do_restart?
5623
hbclient_messenger->mark_down_all();
5625
reset_heartbeat_peers();
5631
// note in the superblock that we were clean thru the prior epoch
5632
if (boot_epoch && boot_epoch >= superblock.mounted) {
5633
superblock.mounted = boot_epoch;
5634
superblock.clean_thru = osdmap->get_epoch();
5637
// superblock and commit
5638
write_superblock(t);
5639
store->queue_transaction(
5642
new C_OnMapApply(&service, _t, pinned_maps, osdmap->get_epoch()),
5644
service.publish_superblock(superblock);
5646
map_lock.put_write();
5648
check_osdmap_features(store);
5653
if (is_active() || is_waiting_for_healthy())
5654
maybe_update_heartbeat_peers();
5657
dout(10) << " not yet active; waiting for peering wq to drain" << dendl;
5663
if (m->newest_map && m->newest_map > last) {
5664
dout(10) << " msg say newest map is " << m->newest_map << ", requesting more" << dendl;
5665
osdmap_subscribe(osdmap->get_epoch()+1, true);
5667
else if (is_booting()) {
5668
start_boot(); // retry
5670
else if (do_restart)
5679
void OSD::check_osdmap_features(ObjectStore *fs)
5681
// adjust required feature bits?
5683
// we have to be a bit careful here, because we are accessing the
5684
// Policy structures without taking any lock. in particular, only
5685
// modify integer values that can safely be read by a racing CPU.
5686
// since we are only accessing existing Policy structures a their
5687
// current memory location, and setting or clearing bits in integer
5688
// fields, and we are the only writer, this is not a problem.
5691
uint64_t features = osdmap->get_features(&mask);
5694
Messenger::Policy p = client_messenger->get_default_policy();
5695
if ((p.features_required & mask) != features) {
5696
dout(0) << "crush map has features " << features
5697
<< ", adjusting msgr requires for clients" << dendl;
5698
p.features_required = (p.features_required & ~mask) | features;
5699
client_messenger->set_default_policy(p);
5703
Messenger::Policy p = cluster_messenger->get_policy(entity_name_t::TYPE_OSD);
5704
if ((p.features_required & mask) != features) {
5705
dout(0) << "crush map has features " << features
5706
<< ", adjusting msgr requires for osds" << dendl;
5707
p.features_required = (p.features_required & ~mask) | features;
5708
cluster_messenger->set_policy(entity_name_t::TYPE_OSD, p);
5712
if ((features & CEPH_FEATURE_OSD_ERASURE_CODES) &&
5713
!fs->get_allow_sharded_objects()) {
5714
dout(0) << __func__ << " enabling on-disk ERASURE CODES compat feature" << dendl;
5715
superblock.compat_features.incompat.insert(CEPH_OSD_FEATURE_INCOMPAT_SHARDS);
5716
ObjectStore::Transaction *t = new ObjectStore::Transaction;
5717
write_superblock(*t);
5718
int err = store->queue_transaction_and_cleanup(NULL, t);
5720
fs->set_allow_sharded_objects();
5724
void OSD::advance_pg(
5725
epoch_t osd_epoch, PG *pg,
5726
ThreadPool::TPHandle &handle,
5727
PG::RecoveryCtx *rctx,
5728
set<boost::intrusive_ptr<PG> > *new_pgs)
5730
assert(pg->is_locked());
5731
epoch_t next_epoch = pg->get_osdmap()->get_epoch() + 1;
5732
OSDMapRef lastmap = pg->get_osdmap();
5734
if (lastmap->get_epoch() == osd_epoch)
5736
assert(lastmap->get_epoch() < osd_epoch);
5739
next_epoch <= osd_epoch;
5741
OSDMapRef nextmap = service.try_get_map(next_epoch);
5745
vector<int> newup, newacting;
5746
int up_primary, acting_primary;
5747
nextmap->pg_to_up_acting_osds(
5749
&newup, &up_primary,
5750
&newacting, &acting_primary);
5751
pg->handle_advance_map(
5752
nextmap, lastmap, newup, up_primary,
5753
newacting, acting_primary, rctx);
5756
set<spg_t> children;
5757
spg_t parent(pg->info.pgid);
5758
if (parent.is_split(
5759
lastmap->get_pg_num(pg->pool.id),
5760
nextmap->get_pg_num(pg->pool.id),
5762
service.mark_split_in_progress(pg->info.pgid, children);
5764
pg, children, new_pgs, lastmap, nextmap,
5769
handle.reset_tp_timeout();
5771
pg->handle_activate_map(rctx);
5775
* scan placement groups, initiate any replication
5778
void OSD::advance_map(ObjectStore::Transaction& t, C_Contexts *tfin)
5780
assert(osd_lock.is_locked());
5782
dout(7) << "advance_map epoch " << osdmap->get_epoch()
5783
<< " " << pg_map.size() << " pgs"
5787
osdmap->is_up(whoami) &&
5788
osdmap->get_inst(whoami) == client_messenger->get_myinst()) {
5789
up_epoch = osdmap->get_epoch();
5790
dout(10) << "up_epoch is " << up_epoch << dendl;
5792
boot_epoch = osdmap->get_epoch();
5793
dout(10) << "boot_epoch is " << boot_epoch << dendl;
5797
// scan pg creations
5798
ceph::unordered_map<spg_t, create_pg_info>::iterator n = creating_pgs.begin();
5799
while (n != creating_pgs.end()) {
5800
ceph::unordered_map<spg_t, create_pg_info>::iterator p = n++;
5801
spg_t pgid = p->first;
5803
// am i still primary?
5806
osdmap->pg_to_acting_osds(pgid.pgid, &acting, &primary);
5807
if (primary != whoami) {
5808
dout(10) << " no longer primary for " << pgid << ", stopping creation" << dendl;
5809
creating_pgs.erase(p);
5812
* adding new ppl to our pg has no effect, since we're still primary,
5813
* and obviously haven't given the new nodes any data.
5815
p->second.acting.swap(acting); // keep the latest
5819
// scan pgs with waiters
5820
map<spg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
5821
while (p != waiting_for_pg.end()) {
5822
spg_t pgid = p->first;
5825
int nrep = osdmap->pg_to_acting_osds(pgid.pgid, acting);
5826
int role = osdmap->calc_pg_role(whoami, acting, nrep);
5830
dout(10) << " discarding waiting ops for " << pgid << dendl;
5831
while (!p->second.empty()) {
5832
p->second.pop_front();
5834
waiting_for_pg.erase(p++);
5839
void OSD::consume_map()
5841
assert(osd_lock.is_locked());
5842
dout(7) << "consume_map version " << osdmap->get_epoch() << dendl;
5844
int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
5845
list<PGRef> to_remove;
5848
for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
5851
PG *pg = it->second;
5853
if (pg->is_primary())
5855
else if (pg->is_replica())
5860
if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
5862
to_remove.push_back(PGRef(pg));
5864
service.init_splits_between(it->first, service.get_osdmap(), osdmap);
5870
for (list<PGRef>::iterator i = to_remove.begin();
5871
i != to_remove.end();
5872
to_remove.erase(i++)) {
5879
service.expand_pg_num(service.get_osdmap(), osdmap);
5881
service.pre_publish_map(osdmap);
5882
service.publish_map(osdmap);
5885
for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
5888
PG *pg = it->second;
5890
pg->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
5894
logger->set(l_osd_pg, pg_map.size());
5895
logger->set(l_osd_pg_primary, num_pg_primary);
5896
logger->set(l_osd_pg_replica, num_pg_replica);
5897
logger->set(l_osd_pg_stray, num_pg_stray);
5900
void OSD::activate_map()
5902
assert(osd_lock.is_locked());
5904
dout(7) << "activate_map version " << osdmap->get_epoch() << dendl;
5906
wake_all_pg_waiters(); // the pg mapping may have shifted
5908
if (osdmap->test_flag(CEPH_OSDMAP_FULL)) {
5909
dout(10) << " osdmap flagged full, doing onetime osdmap subscribe" << dendl;
5910
osdmap_subscribe(osdmap->get_epoch() + 1, true);
5914
if (osdmap->test_flag(CEPH_OSDMAP_NORECOVER)) {
5915
if (!paused_recovery) {
5916
dout(1) << "pausing recovery (NORECOVER flag set)" << dendl;
5917
paused_recovery = true;
5918
recovery_tp.pause_new();
5921
if (paused_recovery) {
5922
dout(1) << "resuming recovery (NORECOVER flag cleared)" << dendl;
5923
paused_recovery = false;
5924
recovery_tp.unpause();
5928
service.activate_map();
5931
take_waiters(waiting_for_osdmap);
5935
MOSDMap *OSD::build_incremental_map_msg(epoch_t since, epoch_t to)
5937
MOSDMap *m = new MOSDMap(monc->get_fsid());
5938
m->oldest_map = superblock.oldest_map;
5939
m->newest_map = superblock.newest_map;
5941
for (epoch_t e = to; e > since; e--) {
5943
if (e > m->oldest_map && get_inc_map_bl(e, bl)) {
5944
m->incremental_maps[e].claim(bl);
5945
} else if (get_map_bl(e, bl)) {
5946
m->maps[e].claim(bl);
5949
derr << "since " << since << " to " << to
5950
<< " oldest " << m->oldest_map << " newest " << m->newest_map
5952
assert(0 == "missing an osdmap on disk"); // we should have all maps.
5958
void OSD::send_map(MOSDMap *m, Connection *con)
5960
Messenger *msgr = client_messenger;
5961
if (entity_name_t::TYPE_OSD == con->get_peer_type())
5962
msgr = cluster_messenger;
5963
msgr->send_message(m, con);
5966
void OSD::send_incremental_map(epoch_t since, Connection *con)
5968
epoch_t to = osdmap->get_epoch();
5969
dout(10) << "send_incremental_map " << since << " -> " << to
5970
<< " to " << con << " " << con->get_peer_addr() << dendl;
5972
if (since < superblock.oldest_map) {
5973
// just send latest full map
5974
MOSDMap *m = new MOSDMap(monc->get_fsid());
5975
m->oldest_map = superblock.oldest_map;
5976
m->newest_map = superblock.newest_map;
5977
get_map_bl(to, m->maps[to]);
5982
if (to > since && (int64_t)(to - since) > cct->_conf->osd_map_share_max_epochs) {
5983
dout(10) << " " << (to - since) << " > max " << cct->_conf->osd_map_share_max_epochs
5984
<< ", only sending most recent" << dendl;
5985
since = to - cct->_conf->osd_map_share_max_epochs;
5988
while (since < to) {
5989
if (to - since > (epoch_t)cct->_conf->osd_map_message_max)
5990
to = since + cct->_conf->osd_map_message_max;
5991
MOSDMap *m = build_incremental_map_msg(since, to);
5997
bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
5999
bool found = map_bl_cache.lookup(e, &bl);
6002
found = store->read(
6003
coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
6009
bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl)
6011
Mutex::Locker l(map_cache_lock);
6012
bool found = map_bl_inc_cache.lookup(e, &bl);
6015
found = store->read(
6016
coll_t::META_COLL, OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
6018
_add_map_inc_bl(e, bl);
6022
void OSDService::_add_map_bl(epoch_t e, bufferlist& bl)
6024
dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl;
6025
map_bl_cache.add(e, bl);
6028
void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl)
6030
dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl;
6031
map_bl_inc_cache.add(e, bl);
6034
void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl)
6036
Mutex::Locker l(map_cache_lock);
6037
map_bl_inc_cache.pin(e, bl);
6040
void OSDService::pin_map_bl(epoch_t e, bufferlist &bl)
6042
Mutex::Locker l(map_cache_lock);
6043
map_bl_cache.pin(e, bl);
6046
void OSDService::clear_map_bl_cache_pins(epoch_t e)
6048
Mutex::Locker l(map_cache_lock);
6049
map_bl_inc_cache.clear_pinned(e);
6050
map_bl_cache.clear_pinned(e);
6053
OSDMapRef OSDService::_add_map(OSDMap *o)
6055
epoch_t e = o->get_epoch();
6057
if (cct->_conf->osd_map_dedup) {
6058
// Dedup against an existing map at a nearby epoch
6059
OSDMapRef for_dedup = map_cache.lower_bound(e);
6061
OSDMap::dedup(for_dedup.get(), o);
6064
OSDMapRef l = map_cache.add(e, o);
6068
OSDMapRef OSDService::try_get_map(epoch_t epoch)
6070
Mutex::Locker l(map_cache_lock);
6071
OSDMapRef retval = map_cache.lookup(epoch);
6073
dout(30) << "get_map " << epoch << " -cached" << dendl;
6077
OSDMap *map = new OSDMap;
6079
dout(20) << "get_map " << epoch << " - loading and decoding " << map << dendl;
6081
if (!_get_map_bl(epoch, bl)) {
6087
dout(20) << "get_map " << epoch << " - return initial " << map << dendl;
6089
return _add_map(map);
6092
bool OSD::require_mon_peer(Message *m)
6094
if (!m->get_connection()->peer_is_mon()) {
6095
dout(0) << "require_mon_peer received from non-mon " << m->get_connection()->get_peer_addr()
6096
<< " " << *m << dendl;
6103
bool OSD::require_osd_peer(OpRequestRef op)
6105
if (!op->get_req()->get_connection()->peer_is_osd()) {
6106
dout(0) << "require_osd_peer received from non-osd " << op->get_req()->get_connection()->get_peer_addr()
6107
<< " " << *op->get_req() << dendl;
6114
* require that we have same (or newer) map, and that
6115
* the source is the pg primary.
6117
bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch)
6119
Message *m = op->get_req();
6120
dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ") " << m << dendl;
6122
assert(osd_lock.is_locked());
6124
// do they have a newer map?
6125
if (epoch > osdmap->get_epoch()) {
6126
dout(7) << "waiting for newer map epoch " << epoch << " > my " << osdmap->get_epoch() << " with " << m << dendl;
6127
wait_for_new_map(op);
6131
if (epoch < up_epoch) {
6132
dout(7) << "from pre-up epoch " << epoch << " < " << up_epoch << dendl;
6136
// ok, our map is same or newer.. do they still exist?
6137
if (m->get_connection()->get_messenger() == cluster_messenger) {
6138
int from = m->get_source().num();
6139
if (!osdmap->have_inst(from) ||
6140
osdmap->get_cluster_addr(from) != m->get_source_inst().addr) {
6141
dout(5) << "from dead osd." << from << ", marking down, "
6142
<< " msg was " << m->get_source_inst().addr
6143
<< " expected " << (osdmap->have_inst(from) ? osdmap->get_cluster_addr(from) : entity_addr_t())
6145
ConnectionRef con = m->get_connection();
6146
con->set_priv(NULL); // break ref <-> session cycle, if any
6147
cluster_messenger->mark_down(con.get());
6152
// ok, we have at least as new a map as they do. are we (re)booting?
6154
dout(7) << "still in boot state, dropping message " << *m << dendl;
6165
// ----------------------------------------
6169
bool OSD::can_create_pg(spg_t pgid)
6171
assert(creating_pgs.count(pgid));
6174
if (!creating_pgs[pgid].prior.empty()) {
6175
dout(10) << "can_create_pg " << pgid
6176
<< " - waiting for priors " << creating_pgs[pgid].prior << dendl;
6180
dout(10) << "can_create_pg " << pgid << " - can create now" << dendl;
6184
void OSD::split_pgs(
6186
const set<spg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs,
6189
PG::RecoveryCtx *rctx)
6191
unsigned pg_num = nextmap->get_pg_num(
6193
parent->update_snap_mapper_bits(
6194
parent->info.pgid.get_split_bits(pg_num)
6197
vector<object_stat_sum_t> updated_stats(childpgids.size() + 1);
6198
parent->info.stats.stats.sum.split(updated_stats);
6200
vector<object_stat_sum_t>::iterator stat_iter = updated_stats.begin();
6201
for (set<spg_t>::const_iterator i = childpgids.begin();
6202
i != childpgids.end();
6204
assert(stat_iter != updated_stats.end());
6205
dout(10) << "Splitting " << *parent << " into " << *i << dendl;
6206
assert(service.splitting(*i));
6207
PG* child = _make_pg(nextmap, *i);
6209
out_pgs->insert(child);
6211
unsigned split_bits = i->get_split_bits(pg_num);
6212
dout(10) << "pg_num is " << pg_num << dendl;
6213
dout(10) << "m_seed " << i->ps() << dendl;
6214
dout(10) << "split_bits is " << split_bits << dendl;
6216
parent->split_colls(
6225
child->info.stats.stats.sum = *stat_iter;
6227
child->write_if_dirty(*(rctx->transaction));
6230
assert(stat_iter != updated_stats.end());
6231
parent->info.stats.stats.sum = *stat_iter;
6232
parent->write_if_dirty(*(rctx->transaction));
6238
void OSD::handle_pg_create(OpRequestRef op)
6240
MOSDPGCreate *m = (MOSDPGCreate*)op->get_req();
6241
assert(m->get_header().type == MSG_OSD_PG_CREATE);
6243
dout(10) << "handle_pg_create " << *m << dendl;
6245
// drop the next N pg_creates in a row?
6246
if (debug_drop_pg_create_left < 0 &&
6247
cct->_conf->osd_debug_drop_pg_create_probability >
6248
((((double)(rand()%100))/100.0))) {
6249
debug_drop_pg_create_left = debug_drop_pg_create_duration;
6251
if (debug_drop_pg_create_left >= 0) {
6252
--debug_drop_pg_create_left;
6253
if (debug_drop_pg_create_left >= 0) {
6254
dout(0) << "DEBUG dropping/ignoring pg_create, will drop the next "
6255
<< debug_drop_pg_create_left << " too" << dendl;
6260
/* we have to hack around require_mon_peer's interface limits, so
6261
* grab an extra reference before going in. If the peer isn't
6262
* a Monitor, the reference is put for us (and then cleared
6263
* up automatically by our OpTracker infrastructure). Otherwise,
6264
* we put the extra ref ourself.
6266
if (!require_mon_peer(op->get_req()->get())) {
6269
op->get_req()->put();
6271
if (!require_same_or_newer_map(op, m->epoch)) return;
6275
int num_created = 0;
6277
for (map<pg_t,pg_create_t>::iterator p = m->mkpg.begin();
6280
epoch_t created = p->second.created;
6281
pg_t parent = p->second.parent;
6282
if (p->second.split_bits) // Skip split pgs
6286
if (on.preferred() >= 0) {
6287
dout(20) << "ignoring localized pg " << on << dendl;
6291
if (!osdmap->have_pg_pool(on.pool())) {
6292
dout(20) << "ignoring pg on deleted pool " << on << dendl;
6296
dout(20) << "mkpg " << on << " e" << created << dendl;
6298
// is it still ours?
6299
vector<int> up, acting;
6300
int up_primary = -1;
6301
int acting_primary = -1;
6302
osdmap->pg_to_up_acting_osds(on, &up, &up_primary, &acting, &acting_primary);
6303
int role = osdmap->calc_pg_role(whoami, acting, acting.size());
6305
if (up_primary != whoami) {
6306
dout(10) << "mkpg " << on << " not primary (role="
6307
<< role << "), skipping" << dendl;
6311
dout(10) << "mkpg " << on << " up " << up
6312
<< " != acting " << acting << ", ignoring" << dendl;
6313
// we'll get a query soon anyway, since we know the pg
6314
// must exist. we can ignore this.
6319
bool mapped = osdmap->get_primary_shard(on, &pgid);
6322
// does it already exist?
6323
if (_have_pg(pgid)) {
6324
dout(10) << "mkpg " << pgid << " already exists, skipping" << dendl;
6329
pg_history_t history;
6330
history.epoch_created = created;
6331
history.last_epoch_clean = created;
6332
// Newly created PGs don't need to scrub immediately, so mark them
6333
// as scrubbed at creation time.
6334
utime_t now = ceph_clock_now(NULL);
6335
history.last_scrub_stamp = now;
6336
history.last_deep_scrub_stamp = now;
6337
bool valid_history = project_pg_history(
6338
pgid, history, created, up, up_primary, acting, acting_primary);
6339
/* the pg creation message must have come from a mon and therefore
6340
* cannot be on the other side of a map gap
6342
assert(valid_history);
6345
creating_pgs[pgid].history = history;
6346
creating_pgs[pgid].parent = parent;
6347
creating_pgs[pgid].acting.swap(acting);
6348
calc_priors_during(pgid, created, history.same_interval_since,
6349
creating_pgs[pgid].prior);
6351
PG::RecoveryCtx rctx = create_context();
6353
set<pg_shard_t>& pset = creating_pgs[pgid].prior;
6354
dout(10) << "mkpg " << pgid << " e" << created
6356
<< " : querying priors " << pset << dendl;
6357
for (set<pg_shard_t>::iterator p = pset.begin(); p != pset.end(); ++p)
6358
if (osdmap->is_up(p->osd))
6359
(*rctx.query_map)[p->osd][spg_t(pgid.pgid, p->shard)] =
6362
p->shard, pgid.shard,
6364
osdmap->get_epoch());
6367
if (can_create_pg(pgid)) {
6368
pg_interval_map_t pi;
6369
rctx.transaction->create_collection(coll_t(pgid));
6370
pg = _create_lock_pg(
6371
osdmap, pgid, true, false, false,
6372
0, creating_pgs[pgid].acting, whoami,
6373
creating_pgs[pgid].acting, whoami,
6376
pg->info.last_epoch_started = pg->info.history.last_epoch_started;
6377
creating_pgs.erase(pgid);
6378
wake_pg_waiters(pg->info.pgid);
6379
pg->handle_create(&rctx);
6380
pg->write_if_dirty(*rctx.transaction);
6381
pg->publish_stats_to_osd();
6385
dispatch_context(rctx, pg, osdmap);
6388
maybe_update_heartbeat_peers();
6392
// ----------------------------------------
6393
// peering and recovery
6395
PG::RecoveryCtx OSD::create_context()
6397
ObjectStore::Transaction *t = new ObjectStore::Transaction;
6398
C_Contexts *on_applied = new C_Contexts(cct);
6399
C_Contexts *on_safe = new C_Contexts(cct);
6400
map<int, map<spg_t,pg_query_t> > *query_map =
6401
new map<int, map<spg_t, pg_query_t> >;
6402
map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *notify_list =
6403
new map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >;
6404
map<int,vector<pair<pg_notify_t, pg_interval_map_t> > > *info_map =
6405
new map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >;
6406
PG::RecoveryCtx rctx(query_map, info_map, notify_list,
6407
on_applied, on_safe, t);
6411
void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
6412
ThreadPool::TPHandle *handle)
6414
if (!ctx.transaction->empty()) {
6415
ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
6416
int tr = store->queue_transaction(
6418
ctx.transaction, ctx.on_applied, ctx.on_safe, NULL,
6419
TrackedOpRef(), handle);
6421
ctx.transaction = new ObjectStore::Transaction;
6422
ctx.on_applied = new C_Contexts(cct);
6423
ctx.on_safe = new C_Contexts(cct);
6427
bool OSD::compat_must_dispatch_immediately(PG *pg)
6429
assert(pg->is_locked());
6430
set<pg_shard_t> tmpacting;
6431
if (!pg->actingbackfill.empty()) {
6432
tmpacting = pg->actingbackfill;
6434
for (unsigned i = 0; i < pg->acting.size(); ++i) {
6438
pg->pool.info.ec_pool() ? i : ghobject_t::NO_SHARD));
6442
for (set<pg_shard_t>::iterator i = tmpacting.begin();
6443
i != tmpacting.end();
6445
if (i->osd == whoami)
6447
ConnectionRef conn =
6448
service.get_con_osd_cluster(i->osd, pg->get_osdmap()->get_epoch());
6449
if (conn && !conn->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) {
6456
void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
6457
ThreadPool::TPHandle *handle)
6459
if (service.get_osdmap()->is_up(whoami) &&
6461
do_notifies(*ctx.notify_list, curmap);
6462
do_queries(*ctx.query_map, curmap);
6463
do_infos(*ctx.info_map, curmap);
6465
delete ctx.notify_list;
6466
delete ctx.query_map;
6467
delete ctx.info_map;
6468
if ((ctx.on_applied->empty() &&
6469
ctx.on_safe->empty() &&
6470
ctx.transaction->empty()) || !pg) {
6471
delete ctx.transaction;
6472
delete ctx.on_applied;
6475
ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
6476
int tr = store->queue_transaction(
6478
ctx.transaction, ctx.on_applied, ctx.on_safe, NULL, TrackedOpRef(),
6485
* Send an MOSDPGNotify to a primary, with a list of PGs that I have
6486
* content for, and they are primary for.
6489
void OSD::do_notifies(
6490
map<int,vector<pair<pg_notify_t,pg_interval_map_t> > >& notify_list,
6494
vector<pair<pg_notify_t,pg_interval_map_t> > >::iterator it =
6495
notify_list.begin();
6496
it != notify_list.end();
6498
if (!curmap->is_up(it->first))
6500
ConnectionRef con = service.get_con_osd_cluster(
6501
it->first, curmap->get_epoch());
6504
_share_map_outgoing(it->first, con.get(), curmap);
6505
if (con->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) {
6506
dout(7) << "do_notify osd " << it->first
6507
<< " on " << it->second.size() << " PGs" << dendl;
6508
MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
6510
cluster_messenger->send_message(m, con.get());
6512
dout(7) << "do_notify osd " << it->first
6513
<< " sending separate messages" << dendl;
6514
for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i =
6516
i != it->second.end();
6518
vector<pair<pg_notify_t, pg_interval_map_t> > list(1);
6520
MOSDPGNotify *m = new MOSDPGNotify(i->first.epoch_sent,
6522
cluster_messenger->send_message(m, con.get());
6530
* send out pending queries for info | summaries
6532
void OSD::do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
6535
for (map<int, map<spg_t,pg_query_t> >::iterator pit = query_map.begin();
6536
pit != query_map.end();
6538
if (!curmap->is_up(pit->first))
6540
int who = pit->first;
6541
ConnectionRef con = service.get_con_osd_cluster(who, curmap->get_epoch());
6544
_share_map_outgoing(who, con.get(), curmap);
6545
if (con->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) {
6546
dout(7) << "do_queries querying osd." << who
6547
<< " on " << pit->second.size() << " PGs" << dendl;
6548
MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(), pit->second);
6549
cluster_messenger->send_message(m, con.get());
6551
dout(7) << "do_queries querying osd." << who
6552
<< " sending saperate messages "
6553
<< " on " << pit->second.size() << " PGs" << dendl;
6554
for (map<spg_t, pg_query_t>::iterator i = pit->second.begin();
6555
i != pit->second.end();
6557
map<spg_t, pg_query_t> to_send;
6559
MOSDPGQuery *m = new MOSDPGQuery(i->second.epoch_sent, to_send);
6560
cluster_messenger->send_message(m, con.get());
6567
void OSD::do_infos(map<int,
6568
vector<pair<pg_notify_t, pg_interval_map_t> > >& info_map,
6572
vector<pair<pg_notify_t, pg_interval_map_t> > >::iterator p =
6574
p != info_map.end();
6576
if (!curmap->is_up(p->first))
6578
for (vector<pair<pg_notify_t,pg_interval_map_t> >::iterator i = p->second.begin();
6579
i != p->second.end();
6581
dout(20) << "Sending info " << i->first.info
6582
<< " to shard " << p->first << dendl;
6584
ConnectionRef con = service.get_con_osd_cluster(
6585
p->first, curmap->get_epoch());
6588
_share_map_outgoing(p->first, con.get(), curmap);
6589
if (con->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) {
6590
MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
6591
m->pg_list = p->second;
6592
cluster_messenger->send_message(m, con.get());
6594
for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i =
6596
i != p->second.end();
6598
vector<pair<pg_notify_t, pg_interval_map_t> > to_send(1);
6600
MOSDPGInfo *m = new MOSDPGInfo(i->first.epoch_sent);
6601
m->pg_list = to_send;
6602
cluster_messenger->send_message(m, con.get());
6611
* from non-primary to primary
6612
* includes pg_info_t.
6613
* NOTE: called with opqueue active.
6615
void OSD::handle_pg_notify(OpRequestRef op)
6617
MOSDPGNotify *m = (MOSDPGNotify*)op->get_req();
6618
assert(m->get_header().type == MSG_OSD_PG_NOTIFY);
6620
dout(7) << "handle_pg_notify from " << m->get_source() << dendl;
6621
int from = m->get_source().num();
6623
if (!require_osd_peer(op))
6626
if (!require_same_or_newer_map(op, m->get_epoch())) return;
6630
for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator it = m->get_pg_list().begin();
6631
it != m->get_pg_list().end();
6634
if (it->first.info.pgid.preferred() >= 0) {
6635
dout(20) << "ignoring localized pg " << it->first.info.pgid << dendl;
6639
handle_pg_peering_evt(
6640
spg_t(it->first.info.pgid.pgid, it->first.to),
6641
it->first.info, it->second,
6642
it->first.query_epoch, pg_shard_t(from, it->first.from), true,
6643
PG::CephPeeringEvtRef(
6644
new PG::CephPeeringEvt(
6645
it->first.epoch_sent, it->first.query_epoch,
6646
PG::MNotifyRec(pg_shard_t(from, it->first.from), it->first)))
6651
void OSD::handle_pg_log(OpRequestRef op)
6653
MOSDPGLog *m = (MOSDPGLog*) op->get_req();
6654
assert(m->get_header().type == MSG_OSD_PG_LOG);
6655
dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
6657
if (!require_osd_peer(op))
6660
int from = m->get_source().num();
6661
if (!require_same_or_newer_map(op, m->get_epoch())) return;
6663
if (m->info.pgid.preferred() >= 0) {
6664
dout(10) << "ignoring localized pg " << m->info.pgid << dendl;
6669
handle_pg_peering_evt(
6670
spg_t(m->info.pgid.pgid, m->to),
6671
m->info, m->past_intervals, m->get_epoch(),
6672
pg_shard_t(from, m->from), false,
6673
PG::CephPeeringEvtRef(
6674
new PG::CephPeeringEvt(
6675
m->get_epoch(), m->get_query_epoch(),
6676
PG::MLogRec(pg_shard_t(from, m->from), m)))
6680
void OSD::handle_pg_info(OpRequestRef op)
6682
MOSDPGInfo *m = static_cast<MOSDPGInfo *>(op->get_req());
6683
assert(m->get_header().type == MSG_OSD_PG_INFO);
6684
dout(7) << "handle_pg_info " << *m << " from " << m->get_source() << dendl;
6686
if (!require_osd_peer(op))
6689
int from = m->get_source().num();
6690
if (!require_same_or_newer_map(op, m->get_epoch())) return;
6694
for (vector<pair<pg_notify_t,pg_interval_map_t> >::iterator p = m->pg_list.begin();
6695
p != m->pg_list.end();
6697
if (p->first.info.pgid.preferred() >= 0) {
6698
dout(10) << "ignoring localized pg " << p->first.info.pgid << dendl;
6702
handle_pg_peering_evt(
6703
spg_t(p->first.info.pgid.pgid, p->first.to),
6704
p->first.info, p->second, p->first.epoch_sent,
6705
pg_shard_t(from, p->first.from), false,
6706
PG::CephPeeringEvtRef(
6707
new PG::CephPeeringEvt(
6708
p->first.epoch_sent, p->first.query_epoch,
6711
from, p->first.from), p->first.info, p->first.epoch_sent)))
6716
void OSD::handle_pg_trim(OpRequestRef op)
6718
MOSDPGTrim *m = (MOSDPGTrim *)op->get_req();
6719
assert(m->get_header().type == MSG_OSD_PG_TRIM);
6721
dout(7) << "handle_pg_trim " << *m << " from " << m->get_source() << dendl;
6723
if (!require_osd_peer(op))
6726
int from = m->get_source().num();
6727
if (!require_same_or_newer_map(op, m->epoch)) return;
6729
if (m->pgid.preferred() >= 0) {
6730
dout(10) << "ignoring localized pg " << m->pgid << dendl;
6736
if (!_have_pg(m->pgid)) {
6737
dout(10) << " don't have pg " << m->pgid << dendl;
6739
PG *pg = _lookup_lock_pg(m->pgid);
6740
if (m->epoch < pg->info.history.same_interval_since) {
6741
dout(10) << *pg << " got old trim to " << m->trim_to << ", ignoring" << dendl;
6747
if (pg->is_primary()) {
6748
// peer is informing us of their last_complete_ondisk
6749
dout(10) << *pg << " replica osd." << from << " lcod " << m->trim_to << dendl;
6750
pg->peer_last_complete_ondisk[pg_shard_t(from, m->pgid.shard)] =
6752
if (pg->calc_min_last_complete_ondisk()) {
6753
dout(10) << *pg << " min lcod now " << pg->min_last_complete_ondisk << dendl;
6757
// primary is instructing us to trim
6758
ObjectStore::Transaction *t = new ObjectStore::Transaction;
6759
PG::PGLogEntryHandler handler;
6760
pg->pg_log.trim(&handler, m->trim_to, pg->info);
6761
handler.apply(pg, t);
6762
pg->dirty_info = true;
6763
pg->write_if_dirty(*t);
6764
int tr = store->queue_transaction(
6766
new ObjectStore::C_DeleteTransaction(t));
6773
void OSD::handle_pg_scan(OpRequestRef op)
6775
MOSDPGScan *m = static_cast<MOSDPGScan*>(op->get_req());
6776
assert(m->get_header().type == MSG_OSD_PG_SCAN);
6777
dout(10) << "handle_pg_scan " << *m << " from " << m->get_source() << dendl;
6779
if (!require_osd_peer(op))
6781
if (!require_same_or_newer_map(op, m->query_epoch))
6784
if (m->pgid.preferred() >= 0) {
6785
dout(10) << "ignoring localized pg " << m->pgid << dendl;
6791
if (!_have_pg(m->pgid)) {
6795
pg = _lookup_pg(m->pgid);
6801
void OSD::handle_pg_backfill(OpRequestRef op)
6803
MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->get_req());
6804
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
6805
dout(10) << "handle_pg_backfill " << *m << " from " << m->get_source() << dendl;
6807
if (!require_osd_peer(op))
6809
if (!require_same_or_newer_map(op, m->query_epoch))
6812
if (m->pgid.preferred() >= 0) {
6813
dout(10) << "ignoring localized pg " << m->pgid << dendl;
6819
if (!_have_pg(m->pgid)) {
6823
pg = _lookup_pg(m->pgid);
6829
void OSD::handle_pg_backfill_reserve(OpRequestRef op)
6831
MBackfillReserve *m = static_cast<MBackfillReserve*>(op->get_req());
6832
assert(m->get_header().type == MSG_OSD_BACKFILL_RESERVE);
6834
if (!require_osd_peer(op))
6836
if (!require_same_or_newer_map(op, m->query_epoch))
6839
PG::CephPeeringEvtRef evt;
6840
if (m->type == MBackfillReserve::REQUEST) {
6841
evt = PG::CephPeeringEvtRef(
6842
new PG::CephPeeringEvt(
6845
PG::RequestBackfillPrio(m->priority)));
6846
} else if (m->type == MBackfillReserve::GRANT) {
6847
evt = PG::CephPeeringEvtRef(
6848
new PG::CephPeeringEvt(
6851
PG::RemoteBackfillReserved()));
6852
} else if (m->type == MBackfillReserve::REJECT) {
6853
evt = PG::CephPeeringEvtRef(
6854
new PG::CephPeeringEvt(
6857
PG::RemoteReservationRejected()));
6862
if (service.splitting(m->pgid)) {
6863
peering_wait_for_split[m->pgid].push_back(evt);
6868
if (!_have_pg(m->pgid))
6871
pg = _lookup_lock_pg(m->pgid);
6874
pg->queue_peering_event(evt);
6878
void OSD::handle_pg_recovery_reserve(OpRequestRef op)
6880
MRecoveryReserve *m = static_cast<MRecoveryReserve*>(op->get_req());
6881
assert(m->get_header().type == MSG_OSD_RECOVERY_RESERVE);
6883
if (!require_osd_peer(op))
6885
if (!require_same_or_newer_map(op, m->query_epoch))
6888
PG::CephPeeringEvtRef evt;
6889
if (m->type == MRecoveryReserve::REQUEST) {
6890
evt = PG::CephPeeringEvtRef(
6891
new PG::CephPeeringEvt(
6894
PG::RequestRecovery()));
6895
} else if (m->type == MRecoveryReserve::GRANT) {
6896
evt = PG::CephPeeringEvtRef(
6897
new PG::CephPeeringEvt(
6900
PG::RemoteRecoveryReserved()));
6901
} else if (m->type == MRecoveryReserve::RELEASE) {
6902
evt = PG::CephPeeringEvtRef(
6903
new PG::CephPeeringEvt(
6906
PG::RecoveryDone()));
6911
if (service.splitting(m->pgid)) {
6912
peering_wait_for_split[m->pgid].push_back(evt);
6917
if (!_have_pg(m->pgid))
6920
pg = _lookup_lock_pg(m->pgid);
6923
pg->queue_peering_event(evt);
6929
* from primary to replica | stray
6930
* NOTE: called with opqueue active.
6932
void OSD::handle_pg_query(OpRequestRef op)
6934
assert(osd_lock.is_locked());
6936
MOSDPGQuery *m = (MOSDPGQuery*)op->get_req();
6937
assert(m->get_header().type == MSG_OSD_PG_QUERY);
6939
if (!require_osd_peer(op))
6942
dout(7) << "handle_pg_query from " << m->get_source() << " epoch " << m->get_epoch() << dendl;
6943
int from = m->get_source().num();
6945
if (!require_same_or_newer_map(op, m->get_epoch())) return;
6949
map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > notify_list;
6951
for (map<spg_t,pg_query_t>::iterator it = m->pg_list.begin();
6952
it != m->pg_list.end();
6954
spg_t pgid = it->first;
6956
if (pgid.preferred() >= 0) {
6957
dout(10) << "ignoring localized pg " << pgid << dendl;
6961
if (service.splitting(pgid)) {
6962
peering_wait_for_split[pgid].push_back(
6963
PG::CephPeeringEvtRef(
6964
new PG::CephPeeringEvt(
6965
it->second.epoch_sent, it->second.epoch_sent,
6966
PG::MQuery(pg_shard_t(from, it->second.from),
6967
it->second, it->second.epoch_sent))));
6971
if (pg_map.count(pgid)) {
6973
pg = _lookup_lock_pg(pgid);
6975
it->second.epoch_sent, it->second.epoch_sent,
6976
pg_shard_t(from, it->second.from), it->second);
6981
if (!osdmap->have_pg_pool(pgid.pool()))
6984
// get active crush mapping
6985
int up_primary, acting_primary;
6986
vector<int> up, acting;
6987
osdmap->pg_to_up_acting_osds(
6988
pgid.pgid, &up, &up_primary, &acting, &acting_primary);
6991
pg_history_t history = it->second.history;
6992
bool valid_history = project_pg_history(
6993
pgid, history, it->second.epoch_sent,
6994
up, up_primary, acting, acting_primary);
6996
if (!valid_history ||
6997
it->second.epoch_sent < history.same_interval_since) {
6998
dout(10) << " pg " << pgid << " dne, and pg has changed in "
6999
<< history.same_interval_since
7000
<< " (msg from " << it->second.epoch_sent << ")" << dendl;
7004
dout(10) << " pg " << pgid << " dne" << dendl;
7005
pg_info_t empty(spg_t(pgid.pgid, it->second.to));
7006
/* This is racy, but that should be ok: if we complete the deletion
7007
* before the pg is recreated, we'll just start it off backfilling
7008
* instead of just empty */
7009
if (service.deleting_pgs.lookup(pgid))
7010
empty.last_backfill = hobject_t();
7011
if (it->second.type == pg_query_t::LOG ||
7012
it->second.type == pg_query_t::FULLLOG) {
7013
ConnectionRef con = service.get_con_osd_cluster(from, osdmap->get_epoch());
7015
MOSDPGLog *mlog = new MOSDPGLog(
7016
it->second.from, it->second.to,
7017
osdmap->get_epoch(), empty,
7018
it->second.epoch_sent);
7019
_share_map_outgoing(from, con.get(), osdmap);
7020
cluster_messenger->send_message(mlog, con.get());
7023
notify_list[from].push_back(
7026
it->second.from, it->second.to,
7027
it->second.epoch_sent,
7028
osdmap->get_epoch(),
7030
pg_interval_map_t()));
7033
do_notifies(notify_list, osdmap);
7037
void OSD::handle_pg_remove(OpRequestRef op)
7039
MOSDPGRemove *m = (MOSDPGRemove *)op->get_req();
7040
assert(m->get_header().type == MSG_OSD_PG_REMOVE);
7041
assert(osd_lock.is_locked());
7043
if (!require_osd_peer(op))
7046
dout(7) << "handle_pg_remove from " << m->get_source() << " on "
7047
<< m->pg_list.size() << " pgs" << dendl;
7049
if (!require_same_or_newer_map(op, m->get_epoch())) return;
7053
for (vector<spg_t>::iterator it = m->pg_list.begin();
7054
it != m->pg_list.end();
7057
if (pgid.preferred() >= 0) {
7058
dout(10) << "ignoring localized pg " << pgid << dendl;
7062
if (pg_map.count(pgid) == 0) {
7063
dout(10) << " don't have pg " << pgid << dendl;
7066
dout(5) << "queue_pg_for_deletion: " << pgid << dendl;
7067
PG *pg = _lookup_lock_pg(pgid);
7068
pg_history_t history = pg->info.history;
7069
int up_primary, acting_primary;
7070
vector<int> up, acting;
7071
osdmap->pg_to_up_acting_osds(
7072
pgid.pgid, &up, &up_primary, &acting, &acting_primary);
7073
bool valid_history = project_pg_history(
7074
pg->info.pgid, history, pg->get_osdmap()->get_epoch(),
7075
up, up_primary, acting, acting_primary);
7076
if (valid_history &&
7077
history.same_interval_since <= m->get_epoch()) {
7078
assert(pg->get_primary().osd == m->get_source().num());
7083
dout(10) << *pg << " ignoring remove request, pg changed in epoch "
7084
<< history.same_interval_since
7085
<< " > " << m->get_epoch() << dendl;
7091
void OSD::_remove_pg(PG *pg)
7093
ObjectStore::Transaction *rmt = new ObjectStore::Transaction;
7095
// on_removal, which calls remove_watchers_and_notifies, and the erasure from
7096
// the pg_map must be done together without unlocking the pg lock,
7097
// to avoid racing with watcher cleanup in ms_handle_reset
7098
// and handle_notify_timeout
7099
pg->on_removal(rmt);
7101
service.cancel_pending_splits_for_parent(pg->info.pgid);
7103
store->queue_transaction(
7105
new ObjectStore::C_DeleteTransactionHolder<
7106
SequencerRef>(rmt, pg->osr),
7107
new ContainerContext<
7108
SequencerRef>(pg->osr));
7110
DeletingStateRef deleting = service.deleting_pgs.lookup_or_create(
7116
remove_wq.queue(make_pair(PGRef(pg), deleting));
7119
pg_map.erase(pg->info.pgid);
7120
pg->put("PGMap"); // since we've taken it out of map
7124
// =========================================================
7128
* caller holds osd_lock
7130
void OSD::check_replay_queue()
7132
assert(osd_lock.is_locked());
7134
utime_t now = ceph_clock_now(cct);
7135
list< pair<spg_t,utime_t> > pgids;
7136
replay_queue_lock.Lock();
7137
while (!replay_queue.empty() &&
7138
replay_queue.front().second <= now) {
7139
pgids.push_back(replay_queue.front());
7140
replay_queue.pop_front();
7142
replay_queue_lock.Unlock();
7144
for (list< pair<spg_t,utime_t> >::iterator p = pgids.begin(); p != pgids.end(); ++p) {
7145
spg_t pgid = p->first;
7146
if (pg_map.count(pgid)) {
7147
PG *pg = _lookup_lock_pg_with_map_lock_held(pgid);
7148
dout(10) << "check_replay_queue " << *pg << dendl;
7149
if (pg->is_active() &&
7152
pg->replay_until == p->second) {
7153
pg->replay_queued_ops();
7157
dout(10) << "check_replay_queue pgid " << pgid << " (not found)" << dendl;
7161
// wake up _all_ pg waiters; raw pg -> actual pg mapping may have shifted
7162
wake_all_pg_waiters();
7166
bool OSDService::queue_for_recovery(PG *pg)
7168
bool b = recovery_wq.queue(pg);
7170
dout(10) << "queue_for_recovery queued " << *pg << dendl;
7172
dout(10) << "queue_for_recovery already queued " << *pg << dendl;
7176
bool OSD::_recover_now()
7178
if (recovery_ops_active >= cct->_conf->osd_recovery_max_active) {
7179
dout(15) << "_recover_now active " << recovery_ops_active
7180
<< " >= max " << cct->_conf->osd_recovery_max_active << dendl;
7183
if (ceph_clock_now(cct) < defer_recovery_until) {
7184
dout(15) << "_recover_now defer until " << defer_recovery_until << dendl;
7191
void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle)
7193
// see how many we should try to start. note that this is a bit racy.
7195
int max = MIN(cct->_conf->osd_recovery_max_active - recovery_ops_active,
7196
cct->_conf->osd_recovery_max_single_start);
7198
dout(10) << "do_recovery can start " << max << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active
7199
<< " rops)" << dendl;
7200
recovery_ops_active += max; // take them now, return them if we don't use them.
7202
dout(10) << "do_recovery can start 0 (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active
7203
<< " rops)" << dendl;
7205
recovery_wq.unlock();
7208
dout(10) << "do_recovery raced and failed to start anything; requeuing " << *pg << dendl;
7209
recovery_wq.queue(pg);
7212
pg->lock_suspend_timeout(handle);
7213
if (pg->deleting || !(pg->is_active() && pg->is_primary())) {
7218
dout(10) << "do_recovery starting " << max << " " << *pg << dendl;
7219
#ifdef DEBUG_RECOVERY_OIDS
7220
dout(20) << " active was " << recovery_oids[pg->info.pgid] << dendl;
7223
PG::RecoveryCtx rctx = create_context();
7226
bool more = pg->start_recovery_ops(max, &rctx, handle, &started);
7227
dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl;
7230
* if we couldn't start any recovery ops and things are still
7231
* unfound, see if we can discover more missing object locations.
7232
* It may be that our initial locations were bad and we errored
7233
* out while trying to pull.
7235
if (!more && pg->have_unfound()) {
7236
pg->discover_all_missing(*rctx.query_map);
7237
if (rctx.query_map->empty()) {
7238
dout(10) << "do_recovery no luck, giving up on this pg for now" << dendl;
7240
recovery_wq._dequeue(pg);
7241
recovery_wq.unlock();
7245
pg->write_if_dirty(*rctx.transaction);
7246
OSDMapRef curmap = pg->get_osdmap();
7248
dispatch_context(rctx, pg, curmap);
7254
assert(recovery_ops_active >= max);
7255
recovery_ops_active -= max;
7257
recovery_wq._wake();
7258
recovery_wq.unlock();
7261
void OSD::start_recovery_op(PG *pg, const hobject_t& soid)
7264
dout(10) << "start_recovery_op " << *pg << " " << soid
7265
<< " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)"
7267
assert(recovery_ops_active >= 0);
7268
recovery_ops_active++;
7270
#ifdef DEBUG_RECOVERY_OIDS
7271
dout(20) << " active was " << recovery_oids[pg->info.pgid] << dendl;
7272
assert(recovery_oids[pg->info.pgid].count(soid) == 0);
7273
recovery_oids[pg->info.pgid].insert(soid);
7276
recovery_wq.unlock();
7279
void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
7282
dout(10) << "finish_recovery_op " << *pg << " " << soid
7283
<< " dequeue=" << dequeue
7284
<< " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)"
7288
recovery_ops_active--;
7289
assert(recovery_ops_active >= 0);
7291
#ifdef DEBUG_RECOVERY_OIDS
7292
dout(20) << " active oids was " << recovery_oids[pg->info.pgid] << dendl;
7293
assert(recovery_oids[pg->info.pgid].count(soid));
7294
recovery_oids[pg->info.pgid].erase(soid);
7298
recovery_wq._dequeue(pg);
7300
recovery_wq._queue_front(pg);
7303
recovery_wq._wake();
7304
recovery_wq.unlock();
7307
// =========================================================
7310
void OSDService::reply_op_error(OpRequestRef op, int err)
7312
reply_op_error(op, err, eversion_t(), 0);
7315
void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
7318
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
7319
assert(m->get_header().type == CEPH_MSG_OSD_OP);
7321
flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
7323
MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags,
7325
reply->set_reply_versions(v, uv);
7326
m->get_connection()->get_messenger()->send_message(reply, m->get_connection());
7329
void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
7331
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
7332
assert(m->get_header().type == CEPH_MSG_OSD_OP);
7334
assert(m->get_map_epoch() >= pg->info.history.same_primary_since);
7336
if (pg->is_ec_pg()) {
7338
* OSD recomputes op target based on current OSDMap. With an EC pg, we
7339
* can get this result:
7340
* 1) client at map 512 sends an op to osd 3, pg_t 3.9 based on mapping
7341
* [CRUSH_ITEM_NONE, 2, 3]/3
7342
* 2) OSD 3 at map 513 remaps op to osd 3, spg_t 3.9s0 based on mapping
7344
* 3) PG 3.9s0 dequeues the op at epoch 512 and notices that it isn't primary
7346
* 4) client resends and this time PG 3.9s0 having caught up to 513 gets
7349
* We can't compute the op target based on the sending map epoch due to
7350
* splitting. The simplest thing is to detect such cases here and drop
7351
* them without an error (the client will resend anyway).
7353
OSDMapRef opmap = try_get_map(m->get_map_epoch());
7355
dout(7) << __func__ << ": " << *pg << " no longer have map for "
7356
<< m->get_map_epoch() << ", dropping" << dendl;
7359
pg_t _pgid = m->get_pg();
7361
if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0)
7362
_pgid = opmap->raw_pg_to_pg(_pgid);
7363
if (opmap->get_primary_shard(_pgid, &pgid) &&
7364
pgid.shard != pg->info.pgid.shard) {
7365
dout(7) << __func__ << ": " << *pg << " primary changed since "
7366
<< m->get_map_epoch() << ", dropping" << dendl;
7371
dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl;
7372
clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
7373
<< " pg " << m->get_pg()
7374
<< " to osd." << whoami
7375
<< " not " << pg->acting
7376
<< " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch() << "\n";
7377
reply_op_error(op, -ENXIO);
7380
void OSD::handle_op(OpRequestRef op)
7382
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
7383
assert(m->get_header().type == CEPH_MSG_OSD_OP);
7384
if (op_is_discardable(m)) {
7385
dout(10) << " discardable " << *m << dendl;
7389
// we don't need encoded payload anymore
7392
// require same or newer map
7393
if (!require_same_or_newer_map(op, m->get_map_epoch()))
7396
// object name too long?
7397
if (m->get_oid().name.size() > MAX_CEPH_OBJECT_NAME_LEN) {
7398
dout(4) << "handle_op '" << m->get_oid().name << "' is longer than "
7399
<< MAX_CEPH_OBJECT_NAME_LEN << " bytes!" << dendl;
7400
service.reply_op_error(op, -ENAMETOOLONG);
7405
if (osdmap->is_blacklisted(m->get_source_addr())) {
7406
dout(4) << "handle_op " << m->get_source_addr() << " is blacklisted" << dendl;
7407
service.reply_op_error(op, -EBLACKLISTED);
7410
// share our map with sender, if they're old
7411
_share_map_incoming(m->get_source(), m->get_connection().get(), m->get_map_epoch(),
7412
static_cast<Session *>(m->get_connection()->get_priv()));
7414
if (op->rmw_flags == 0) {
7415
int r = init_op_flags(op);
7417
service.reply_op_error(op, r);
7422
if (cct->_conf->osd_debug_drop_op_probability > 0 &&
7423
!m->get_source().is_mds()) {
7424
if ((double)rand() / (double)RAND_MAX < cct->_conf->osd_debug_drop_op_probability) {
7425
dout(0) << "handle_op DEBUG artificially dropping op " << *m << dendl;
7430
if (op->may_write()) {
7432
if ((service.check_failsafe_full() ||
7433
osdmap->test_flag(CEPH_OSDMAP_FULL) ||
7434
m->get_map_epoch() < superblock.last_map_marked_full) &&
7435
!m->get_source().is_mds()) { // FIXME: we'll exclude mds writes for now.
7436
// Drop the request, since the client will retry when the full
7442
if (m->get_snapid() != CEPH_NOSNAP) {
7443
service.reply_op_error(op, -EINVAL);
7448
if (cct->_conf->osd_max_write_size &&
7449
m->get_data_len() > cct->_conf->osd_max_write_size << 20) {
7450
// journal can't hold commit!
7451
derr << "handle_op msg data len " << m->get_data_len()
7452
<< " > osd_max_write_size " << (cct->_conf->osd_max_write_size << 20)
7453
<< " on " << *m << dendl;
7454
service.reply_op_error(op, -OSD_WRITETOOBIG);
7459
pg_t _pgid = m->get_pg();
7460
int64_t pool = _pgid.pool();
7461
if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0 &&
7462
osdmap->have_pg_pool(pool))
7463
_pgid = osdmap->raw_pg_to_pg(_pgid);
7466
if (!osdmap->get_primary_shard(_pgid, &pgid)) {
7467
// missing pool or acting set empty -- drop
7471
// get and lock *pg.
7472
PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL;
7474
dout(7) << "hit non-existent pg " << pgid << dendl;
7476
if (osdmap->get_pg_acting_role(pgid.pgid, whoami) >= 0) {
7477
dout(7) << "we are valid target for op, waiting" << dendl;
7478
waiting_for_pg[pgid].push_back(op);
7479
op->mark_delayed("waiting for pg to exist locally");
7483
// okay, we aren't valid now; check send epoch
7484
if (m->get_map_epoch() < superblock.oldest_map) {
7485
dout(7) << "don't have sender's osdmap; assuming it was valid and that client will resend" << dendl;
7488
OSDMapRef send_map = get_map(m->get_map_epoch());
7490
if (send_map->get_pg_acting_role(pgid.pgid, whoami) >= 0) {
7491
dout(7) << "dropping request; client will resend when they get new map" << dendl;
7492
} else if (!send_map->have_pg_pool(pgid.pool())) {
7493
dout(7) << "dropping request; pool did not exist" << dendl;
7494
clog.warn() << m->get_source_inst() << " invalid " << m->get_reqid()
7495
<< " pg " << m->get_pg()
7496
<< " to osd." << whoami
7497
<< " in e" << osdmap->get_epoch()
7498
<< ", client e" << m->get_map_epoch()
7499
<< " when pool " << m->get_pg().pool() << " did not exist"
7502
dout(7) << "we are invalid target" << dendl;
7503
clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
7504
<< " pg " << m->get_pg()
7505
<< " to osd." << whoami
7506
<< " in e" << osdmap->get_epoch()
7507
<< ", client e" << m->get_map_epoch()
7509
<< " features " << m->get_connection()->get_features()
7511
service.reply_op_error(op, -ENXIO);
7519
template<typename T, int MSGTYPE>
7520
void OSD::handle_replica_op(OpRequestRef op)
7522
T *m = static_cast<T *>(op->get_req());
7523
assert(m->get_header().type == MSGTYPE);
7525
dout(10) << __func__ << " " << *m << " epoch " << m->map_epoch << dendl;
7526
if (m->map_epoch < up_epoch) {
7527
dout(3) << "replica op from before up" << dendl;
7531
if (!require_osd_peer(op))
7534
// must be a rep op.
7535
assert(m->get_source().is_osd());
7537
// require same or newer map
7538
if (!require_same_or_newer_map(op, m->map_epoch))
7541
// share our map with sender, if they're old
7542
_share_map_incoming(m->get_source(), m->get_connection().get(), m->map_epoch,
7543
static_cast<Session*>(m->get_connection()->get_priv()));
7545
// make sure we have the pg
7546
const spg_t pgid = m->pgid;
7547
if (service.splitting(pgid)) {
7548
waiting_for_pg[pgid].push_back(op);
7552
PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL;
7559
bool OSD::op_is_discardable(MOSDOp *op)
7561
// drop client request if they are not connected and can't get the
7562
// reply anyway. unless this is a replayed op, in which case we
7563
// want to do what we can to apply it.
7564
if (!op->get_connection()->is_connected() &&
7565
op->get_version().version == 0) {
7572
* enqueue called with osd_lock held
7574
void OSD::enqueue_op(PG *pg, OpRequestRef op)
7576
utime_t latency = ceph_clock_now(cct) - op->get_req()->get_recv_stamp();
7577
dout(15) << "enqueue_op " << op << " prio " << op->get_req()->get_priority()
7578
<< " cost " << op->get_req()->get_cost()
7579
<< " latency " << latency
7580
<< " " << *(op->get_req()) << dendl;
7584
void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item)
7586
unsigned priority = item.second->get_req()->get_priority();
7587
unsigned cost = item.second->get_req()->get_cost();
7588
if (priority >= CEPH_MSG_PRIO_LOW)
7589
pqueue.enqueue_strict(
7590
item.second->get_req()->get_source_inst(),
7593
pqueue.enqueue(item.second->get_req()->get_source_inst(),
7594
priority, cost, item);
7595
osd->logger->set(l_osd_opq, pqueue.length());
7598
void OSD::OpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item)
7600
Mutex::Locker l(qlock);
7601
if (pg_for_processing.count(&*(item.first))) {
7602
pg_for_processing[&*(item.first)].push_front(item.second);
7603
item.second = pg_for_processing[&*(item.first)].back();
7604
pg_for_processing[&*(item.first)].pop_back();
7606
unsigned priority = item.second->get_req()->get_priority();
7607
unsigned cost = item.second->get_req()->get_cost();
7608
if (priority >= CEPH_MSG_PRIO_LOW)
7609
pqueue.enqueue_strict_front(
7610
item.second->get_req()->get_source_inst(),
7613
pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
7614
priority, cost, item);
7615
osd->logger->set(l_osd_opq, pqueue.length());
7618
PGRef OSD::OpWQ::_dequeue()
7620
assert(!pqueue.empty());
7623
Mutex::Locker l(qlock);
7624
pair<PGRef, OpRequestRef> ret = pqueue.dequeue();
7626
pg_for_processing[&*pg].push_back(ret.second);
7628
osd->logger->set(l_osd_opq, pqueue.length());
7632
void OSD::OpWQ::_process(PGRef pg, ThreadPool::TPHandle &handle)
7634
pg->lock_suspend_timeout(handle);
7637
Mutex::Locker l(qlock);
7638
if (!pg_for_processing.count(&*pg)) {
7642
assert(pg_for_processing[&*pg].size());
7643
op = pg_for_processing[&*pg].front();
7644
pg_for_processing[&*pg].pop_front();
7645
if (!(pg_for_processing[&*pg].size()))
7646
pg_for_processing.erase(&*pg);
7649
lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
7650
Formatter *f = new_formatter("json");
7651
f->open_object_section("q");
7658
osd->dequeue_op(pg, op, handle);
7663
void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
7665
osd->op_wq.dequeue(pg, dequeued);
7669
* NOTE: dequeue called in worker thread, with pg lock
7671
void OSD::dequeue_op(
7672
PGRef pg, OpRequestRef op,
7673
ThreadPool::TPHandle &handle)
7675
utime_t now = ceph_clock_now(cct);
7676
op->set_dequeued_time(now);
7677
utime_t latency = now - op->get_req()->get_recv_stamp();
7678
dout(10) << "dequeue_op " << op << " prio " << op->get_req()->get_priority()
7679
<< " cost " << op->get_req()->get_cost()
7680
<< " latency " << latency
7681
<< " " << *(op->get_req())
7682
<< " pg " << *pg << dendl;
7686
op->mark_reached_pg();
7688
pg->do_request(op, handle);
7691
dout(10) << "dequeue_op " << op << " finish" << dendl;
7695
void OSDService::queue_for_peering(PG *pg)
7697
peering_wq.queue(pg);
7700
struct C_CompleteSplits : public Context {
7702
set<boost::intrusive_ptr<PG> > pgs;
7703
C_CompleteSplits(OSD *osd, const set<boost::intrusive_ptr<PG> > &in)
7704
: osd(osd), pgs(in) {}
7705
void finish(int r) {
7706
Mutex::Locker l(osd->osd_lock);
7707
if (osd->is_stopping())
7709
PG::RecoveryCtx rctx = osd->create_context();
7710
set<spg_t> to_complete;
7711
for (set<boost::intrusive_ptr<PG> >::iterator i = pgs.begin();
7715
osd->add_newly_split_pg(&**i, &rctx);
7716
osd->dispatch_context_transaction(rctx, &**i);
7717
if (!((*i)->deleting))
7718
to_complete.insert((*i)->info.pgid);
7721
osd->service.complete_split(to_complete);
7722
osd->dispatch_context(rctx, 0, osd->service.get_osdmap());
7726
void OSD::process_peering_events(
7727
const list<PG*> &pgs,
7728
ThreadPool::TPHandle &handle
7731
bool need_up_thru = false;
7732
epoch_t same_interval_since = 0;
7733
OSDMapRef curmap = service.get_osdmap();
7734
PG::RecoveryCtx rctx = create_context();
7735
for (list<PG*>::const_iterator i = pgs.begin();
7738
set<boost::intrusive_ptr<PG> > split_pgs;
7740
pg->lock_suspend_timeout(handle);
7741
curmap = service.get_osdmap();
7746
advance_pg(curmap->get_epoch(), pg, handle, &rctx, &split_pgs);
7747
if (!pg->peering_queue.empty()) {
7748
PG::CephPeeringEvtRef evt = pg->peering_queue.front();
7749
pg->peering_queue.pop_front();
7750
pg->handle_peering_event(evt, &rctx);
7752
need_up_thru = pg->need_up_thru || need_up_thru;
7753
same_interval_since = MAX(pg->info.history.same_interval_since,
7754
same_interval_since);
7755
pg->write_if_dirty(*rctx.transaction);
7756
if (!split_pgs.empty()) {
7757
rctx.on_applied->add(new C_CompleteSplits(this, split_pgs));
7760
if (compat_must_dispatch_immediately(pg)) {
7761
dispatch_context(rctx, pg, curmap, &handle);
7762
rctx = create_context();
7764
dispatch_context_transaction(rctx, pg, &handle);
7767
handle.reset_tp_timeout();
7770
queue_want_up_thru(same_interval_since);
7771
dispatch_context(rctx, 0, curmap, &handle);
7773
service.send_pg_temp();
7776
// --------------------------------
7778
const char** OSD::get_tracked_conf_keys() const
7780
static const char* KEYS[] = {
7781
"osd_max_backfills",
7782
"osd_op_complaint_time", "osd_op_log_threshold",
7783
"osd_op_history_size", "osd_op_history_duration",
7789
void OSD::handle_conf_change(const struct md_config_t *conf,
7790
const std::set <std::string> &changed)
7792
if (changed.count("osd_max_backfills")) {
7793
service.local_reserver.set_max(cct->_conf->osd_max_backfills);
7794
service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
7796
if (changed.count("osd_op_complaint_time") ||
7797
changed.count("osd_op_log_threshold")) {
7798
op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time,
7799
cct->_conf->osd_op_log_threshold);
7801
if (changed.count("osd_op_history_size") ||
7802
changed.count("osd_op_history_duration")) {
7803
op_tracker.set_history_size_and_duration(cct->_conf->osd_op_history_size,
7804
cct->_conf->osd_op_history_duration);
7808
// --------------------------------
7810
int OSD::init_op_flags(OpRequestRef op)
7812
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
7813
vector<OSDOp>::iterator iter;
7815
// client flags have no bearing on whether an op is a read, write, etc.
7818
// set bits based on op codes, called methods.
7819
for (iter = m->ops.begin(); iter != m->ops.end(); ++iter) {
7820
if (ceph_osd_op_mode_modify(iter->op.op))
7822
if (ceph_osd_op_mode_read(iter->op.op))
7825
// set READ flag if there are src_oids
7826
if (iter->soid.oid.name.length())
7829
// set PGOP flag if there are PG ops
7830
if (ceph_osd_op_type_pg(iter->op.op))
7833
if (ceph_osd_op_mode_cache(iter->op.op))
7836
switch (iter->op.op) {
7837
case CEPH_OSD_OP_CALL:
7839
bufferlist::iterator bp = iter->indata.begin();
7840
int is_write, is_read;
7841
string cname, mname;
7842
bp.copy(iter->op.cls.class_len, cname);
7843
bp.copy(iter->op.cls.method_len, mname);
7845
ClassHandler::ClassData *cls;
7846
int r = class_handler->open_class(cname, &cls);
7848
derr << "class " << cname << " open got " << cpp_strerror(r) << dendl;
7855
int flags = cls->get_method_flags(mname.c_str());
7857
if (flags == -ENOENT)
7863
is_read = flags & CLS_METHOD_RD;
7864
is_write = flags & CLS_METHOD_WR;
7866
dout(10) << "class " << cname << " method " << mname
7867
<< " flags=" << (is_read ? "r" : "") << (is_write ? "w" : "") << dendl;
7869
op->set_class_read();
7871
op->set_class_write();
7879
if (op->rmw_flags == 0)
7885
bool OSD::RecoveryWQ::_enqueue(PG *pg) {
7886
if (!pg->recovery_item.is_on_list()) {
7887
pg->get("RecoveryWQ");
7888
osd->recovery_queue.push_back(&pg->recovery_item);
7890
if (osd->cct->_conf->osd_recovery_delay_start > 0) {
7891
osd->defer_recovery_until = ceph_clock_now(osd->cct);
7892
osd->defer_recovery_until += osd->cct->_conf->osd_recovery_delay_start;
7899
void OSD::PeeringWQ::_dequeue(list<PG*> *out) {
7901
for (list<PG*>::iterator i = peering_queue.begin();
7902
i != peering_queue.end() &&
7903
out->size() < osd->cct->_conf->osd_peering_wq_batch_size;
7905
if (in_use.count(*i)) {
7910
peering_queue.erase(i++);
7913
in_use.insert(got.begin(), got.end());