31
30
return str::stream() << min << " -->> " << max << " on " << tag;
35
33
DistributionStatus::DistributionStatus( const ShardInfoMap& shardInfo,
36
34
const ShardToChunksMap& shardToChunksMap )
37
35
: _shardInfo( shardInfo ), _shardChunks( shardToChunksMap ) {
39
37
for ( ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i ) {
40
38
_shards.insert( i->first );
45
42
const ShardInfo& DistributionStatus::shardInfo( const string& shard ) const {
46
43
ShardInfoMap::const_iterator i = _shardInfo.find( shard );
47
44
verify( i != _shardInfo.end() );
78
75
string DistributionStatus::getBestReceieverShard( const string& tag ) const {
80
77
unsigned minChunks = numeric_limits<unsigned>::max();
82
79
for ( ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i ) {
84
80
if ( i->second.isSizeMaxed() || i->second.isDraining() || i->second.hasOpsQueued() ) {
85
81
LOG(1) << i->first << " is unavailable" << endl;
89
85
if ( ! i->second.hasTag( tag ) ) {
90
86
LOG(1) << i->first << " doesn't have right tag" << endl;
109
105
unsigned maxChunks = 0;
111
107
for ( ShardInfoMap::const_iterator i = _shardInfo.begin(); i != _shardInfo.end(); ++i ) {
113
109
if ( i->second.hasOpsQueued() ) {
114
110
// we can't move stuff off anyway
118
114
unsigned myChunks = numberOfChunksInShardWithTag( i->first, tag );
119
115
if ( myChunks <= maxChunks )
122
118
worst = i->first;
123
119
maxChunks = myChunks;
130
const vector<BSONObj>& DistributionStatus::getChunks( const string& shard ) const {
125
const vector<BSONObj>& DistributionStatus::getChunks( const string& shard ) const {
131
126
ShardToChunksMap::const_iterator i = _shardChunks.find(shard);
132
127
verify( i != _shardChunks.end() );
133
128
return i->second;
172
166
if ( _tagRanges.size() == 0 )
175
BSONObj min = chunk["min"].Obj();
169
BSONObj min = chunk[ChunkType::min()].Obj();
177
171
map<BSONObj,TagRange>::const_iterator i = _tagRanges.upper_bound( min );
178
172
if ( i == _tagRanges.end() )
181
175
const TagRange& range = i->second;
182
176
if ( min < range.min )
185
179
return range.tag;
196
190
for ( unsigned x = 0; x < v.size(); x++ )
197
191
log() << " " << v[x] << endl;
200
194
if ( _tagRanges.size() > 0 ) {
201
195
log() << " tag ranges" << endl;
203
for ( map<BSONObj,TagRange>::const_iterator i = _tagRanges.begin();
204
i != _tagRanges.end();
197
for ( map<BSONObj,TagRange>::const_iterator i = _tagRanges.begin();
198
i != _tagRanges.end();
206
200
log() << i->second.toString() << endl;
204
bool BalancerPolicy::_isJumbo( const BSONObj& chunk ) {
205
if ( chunk[ChunkType::jumbo()].trueValue() ) {
206
LOG(1) << "chunk: " << chunk << "is marked as jumbo" << endl;
210
211
MigrateInfo* BalancerPolicy::balance( const string& ns,
211
const DistributionStatus& distribution,
212
const DistributionStatus& distribution,
212
213
int balancedLastTime ) {
225
226
for ( set<string>::const_iterator z = shards.begin(); z != shards.end(); ++z ) {
226
227
string shard = *z;
227
228
const ShardInfo& info = distribution.shardInfo( shard );
229
230
if ( ! info.isDraining() )
232
233
if ( distribution.numberOfChunksInShard( shard ) == 0 )
235
236
// now we know we need to move to chunks off this shard
236
237
// we will if we are allowed
238
239
if ( info.hasOpsQueued() ) {
239
240
warning() << "want to shed load from " << shard << " but can't because it has ops queued" << endl;
243
244
const vector<BSONObj>& chunks = distribution.getChunks( shard );
245
unsigned numJumboChunks = 0;
245
247
// since we have to move all chunks, lets just do in order
246
248
for ( unsigned i=0; i<chunks.size(); i++ ) {
247
249
BSONObj chunkToMove = chunks[i];
250
if ( _isJumbo( chunkToMove ) ) {
248
255
string tag = distribution.getTagForChunk( chunkToMove );
249
256
string to = distribution.getBestReceieverShard( tag );
251
258
if ( to.size() == 0 ) {
252
259
warning() << "want to move chunk: " << chunkToMove << "(" << tag << ") "
253
260
<< "from " << shard << " but can't find anywhere to put it" << endl;
257
264
log() << "going to move " << chunkToMove << " from " << shard << "(" << tag << ")" << " to " << to << endl;
259
266
return new MigrateInfo( ns, to, shard, chunkToMove.getOwned() );
262
warning() << "can't find any chunk to move from: " << shard << " but we want to" << endl;
269
warning() << "can't find any chunk to move from: " << shard
270
<< " but we want to. "
271
<< " numJumboChunks: " << numJumboChunks
267
276
// 2) tag violations
268
277
if ( distribution.tags().size() > 0 ) {
269
278
const set<string>& shards = distribution.shards();
271
280
for ( set<string>::const_iterator i = shards.begin(); i != shards.end(); ++i ) {
272
281
string shard = *i;
273
282
const ShardInfo& info = distribution.shardInfo( shard );
275
284
const vector<BSONObj>& chunks = distribution.getChunks( shard );
276
285
for ( unsigned j = 0; j < chunks.size(); j++ ) {
277
286
string tag = distribution.getTagForChunk( chunks[j] );
279
288
if ( info.hasTag( tag ) )
282
291
// uh oh, this chunk is in the wrong place
283
log() << "chunk " << chunks[j] << " is not on a shard with the right tag: " << tag << endl;
292
log() << "chunk " << chunks[j]
293
<< " is not on a shard with the right tag: "
296
if ( _isJumbo( chunks[j] ) ) {
297
warning() << "chunk " << chunks[j] << " is jumbo, so cannot be moved" << endl;
285
301
string to = distribution.getBestReceieverShard( tag );
286
302
if ( to.size() == 0 ) {
287
303
log() << "no where to put it :(" << endl;
330
346
log() << "no available shards to take chunks for tag [" << tag << "]" << endl;
335
350
unsigned min = distribution.numberOfChunksInShardWithTag( to, tag );
337
352
const int imbalance = max - min;
339
354
LOG(1) << "collection : " << ns << endl;
340
355
LOG(1) << "donor : " << from << " chunks on " << max << endl;
341
356
LOG(1) << "receiver : " << to << " chunks on " << min << endl;
347
362
const vector<BSONObj>& chunks = distribution.getChunks( from );
363
unsigned numJumboChunks = 0;
348
364
for ( unsigned j = 0; j < chunks.size(); j++ ) {
349
365
if ( distribution.getTagForChunk( chunks[j] ) != tag )
351
log() << " ns: " << ns << " going to move " << chunks[j]
368
if ( _isJumbo( chunks[j] ) ) {
373
log() << " ns: " << ns << " going to move " << chunks[j]
352
374
<< " from: " << from << " to: " << to << " tag [" << tag << "]"
354
376
return new MigrateInfo( ns, to, from, chunks[j] );
379
if ( numJumboChunks ) {
380
error() << "shard: " << from << "ns: " << ns
381
<< "has too many chunks, but they are all jumbo "
382
<< " numJumboChunks: " << numJumboChunks
356
387
verify( false ); // should be impossible
359
// Everything is balanced here!
390
// Everything is balanced here!
364
ShardInfo::ShardInfo( long long maxSize, long long currSize,
365
bool draining, bool opsQueued,
366
const set<string>& tags )
367
: _maxSize( maxSize ),
395
ShardInfo::ShardInfo( long long maxSize, long long currSize,
396
bool draining, bool opsQueued,
397
const set<string>& tags,
398
const string& mongoVersion )
399
: _maxSize( maxSize ),
368
400
_currSize( currSize ),
369
401
_draining( draining ),
370
402
_hasOpsQueued( opsQueued ),
404
_mongoVersion( mongoVersion ) {
374
407
ShardInfo::ShardInfo()
377
410
_draining( false ),
378
411
_hasOpsQueued( false ) {
386
419
bool ShardInfo::isSizeMaxed() const {
387
420
if ( _maxSize == 0 || _currSize == 0 )
390
423
return _currSize >= _maxSize;
393
bool ShardInfo::hasTag( const string& tag ) const {
426
bool ShardInfo::hasTag( const string& tag ) const {
394
427
if ( tag.size() == 0 )
396
return _tags.count( tag ) > 0;
429
return _tags.count( tag ) > 0;
399
432
string ShardInfo::toString() const {