15
15
* You should have received a copy of the GNU Affero General Public License
16
16
* along with this program. If not, see <http://www.gnu.org/licenses/>.
18
* As a special exception, the copyright holders give permission to link the
19
* code of portions of this program with the OpenSSL library under certain
20
* conditions as described in each individual source file and distribute
21
* linked combinations including the program with the OpenSSL library. You
22
* must comply with the GNU Affero General Public License in all respects
23
* for all of the code used other than as permitted herein. If you modify
24
* file(s) with this exception, you may extend this exception to your
25
* version of the file(s), but you are not obligated to do so. If you do not
26
* wish to do so, delete this exception statement from your version. If you
27
* delete this exception statement from all source files in the program,
28
* then also delete it in the license file.
31
#include "mongo/pch.h"
21
33
#include "pcrecpp.h"
23
35
#include "mongo/client/connpool.h"
24
36
#include "mongo/client/dbclientcursor.h"
25
#include "mongo/client/model.h"
26
#include "mongo/db/cmdline.h"
37
#include "mongo/db/lasterror.h"
27
38
#include "mongo/db/pdfile.h"
39
#include "mongo/db/write_concern.h"
28
40
#include "mongo/s/chunk.h"
29
41
#include "mongo/s/chunk_version.h"
42
#include "mongo/s/cluster_write.h"
30
43
#include "mongo/s/config.h"
31
44
#include "mongo/s/grid.h"
32
45
#include "mongo/s/server.h"
95
void DBConfig::CollectionInfo::save( const string& ns , DBClientBase* conn ) {
108
void DBConfig::CollectionInfo::save( const string& ns ) {
96
109
BSONObj key = BSON( "_id" << ns );
98
111
BSONObjBuilder val;
99
112
val.append(CollectionType::ns(), ns);
100
val.appendDate(CollectionType::DEPRECATED_lastmod(), time(0));
113
val.appendDate(CollectionType::DEPRECATED_lastmod(), jsTime());
101
114
val.appendBool(CollectionType::dropped(), _dropped);
116
// This also appends the lastmodEpoch.
103
117
_cm->getInfo( val );
105
conn->update(CollectionType::ConfigNS, key, val.obj(), true);
106
string err = conn->getLastError();
107
uassert( 13473 , (string)"failed to save collection (" + ns + "): " + err , err.size() == 0 );
120
// lastmodEpoch is a required field so we also need to do it here.
121
val.append(CollectionType::DEPRECATED_lastmodEpoch(), ChunkVersion::DROPPED().epoch());
124
Status result = clusterUpdate(CollectionType::ConfigNS,
129
WriteConcernOptions::AllConfigs,
132
if ( !result.isOK() ) {
133
uasserted( 13473, str::stream() << "failed to save collection (" << ns
134
<< "): " << result.reason() );
174
202
log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder << endl;
204
// Record start in changelog
205
BSONObjBuilder collectionDetail;
206
collectionDetail.append("shardKey", fieldsAndOrder.key());
207
collectionDetail.append("collection", ns);
208
collectionDetail.append("primary", getPrimary().toString());
210
if (initShards == NULL)
214
for (unsigned i = 0; i < initShards->size(); i++) {
215
b.append((*initShards)[i].getName());
219
collectionDetail.append("initShards", a);
220
collectionDetail.append("numChunks", (int)(initPoints->size() + 1));
221
configServer.logChange("shardCollection.start", ns, collectionDetail.obj());
176
223
ChunkManager* cm = new ChunkManager( ns, fieldsAndOrder, unique );
177
224
cm->createFirstChunks( configServer.getPrimary().getConnString(),
178
225
getPrimary(), initPoints, initShards );
326
382
if ( oldVersion.isSet() && ! forceReload ) {
327
scoped_ptr<ScopedDbConnection> conn( ScopedDbConnection::getInternalScopedDbConnection(
328
configServer.modelServer(), 30.0 ) );
329
newest = conn->get()->findOne(ChunkType::ConfigNS,
330
Query(BSON(ChunkType::ns(ns))).sort(ChunkType::DEPRECATED_lastmod(), -1));
383
ScopedDbConnection conn(configServer.modelServer(), 30.0);
384
newest = conn->findOne(ChunkType::ConfigNS,
385
Query(BSON(ChunkType::ns(ns))).sort(
386
ChunkType::DEPRECATED_lastmod(), -1));
333
389
if ( ! newest.isEmpty() ) {
334
390
ChunkVersion v = ChunkVersion::fromBSON(newest, ChunkType::DEPRECATED_lastmod());
587
646
grid.removeDB( _name );
589
scoped_ptr<ScopedDbConnection> conn( ScopedDbConnection::getInternalScopedDbConnection(
590
configServer.modelServer(), 30.0 ) );
591
conn->get()->remove( DatabaseType::ConfigNS , BSON( DatabaseType::name( _name ) ) );
592
errmsg = conn->get()->getLastError();
593
if ( ! errmsg.empty() ) {
594
log() << "could not drop '" << _name << "': " << errmsg << endl;
647
Status result = clusterDelete( DatabaseType::ConfigNS,
648
BSON( DatabaseType::name( _name )),
650
WriteConcernOptions::AllConfigs,
653
if ( !result.isOK() ) {
654
errmsg = result.reason();
655
log() << "could not drop '" << _name << "': " << errmsg << endl;
602
659
if ( ! configServer.allUp( errmsg ) ) {
622
scoped_ptr<ScopedDbConnection> conn(
623
ScopedDbConnection::getScopedDbConnection( _primary.getConnString(), 30.0 ) );
679
ScopedDbConnection conn(_primary.getConnString(), 30.0);
625
if ( ! conn->get()->dropDatabase( _name , &res ) ) {
681
if ( ! conn->dropDatabase( _name , &res ) ) {
626
682
errmsg = res.toString();
633
689
for ( set<Shard>::iterator i=allServers.begin(); i!=allServers.end(); i++ ) {
634
scoped_ptr<ScopedDbConnection> conn(
635
ScopedDbConnection::getScopedDbConnection( i->getConnString(), 30.0 ) );
690
ScopedDbConnection conn(i->getConnString(), 30.0);
637
if ( ! conn->get()->dropDatabase( _name , &res ) ) {
692
if ( ! conn->dropDatabase( _name , &res ) ) {
638
693
errmsg = res.toString();
644
699
LOG(1) << "\t dropped primary db for: " << _name << endl;
839
932
if ( res[i].isEmpty() )
842
string c1 = base.getFieldDotted( "collections.chunks" );
843
string c2 = res[i].getFieldDotted( "collections.chunks" );
845
string d1 = base.getFieldDotted( "collections.databases" );
846
string d2 = res[i].getFieldDotted( "collections.databases" );
848
if ( c1 == c2 && d1 == d2 )
935
string chunksHash1 = base.getFieldDotted( "collections.chunks" );
936
string chunksHash2 = res[i].getFieldDotted( "collections.chunks" );
938
string databaseHash1 = base.getFieldDotted( "collections.databases" );
939
string databaseHash2 = res[i].getFieldDotted( "collections.databases" );
941
string collectionsHash1 = base.getFieldDotted( "collections.collections" );
942
string collectionsHash2 = res[i].getFieldDotted( "collections.collections" );
944
string shardHash1 = base.getFieldDotted( "collections.shards" );
945
string shardHash2 = res[i].getFieldDotted( "collections.shards" );
947
string versionHash1 = base.getFieldDotted( "collections.version" );
948
string versionHash2 = res[i].getFieldDotted( "collections.version" );
950
if ( chunksHash1 == chunksHash2 &&
951
databaseHash1 == databaseHash2 &&
952
collectionsHash1 == collectionsHash2 &&
953
shardHash1 == shardHash2 &&
954
versionHash1 == versionHash2 ) {
852
959
ss << "config servers " << _config[firstGood] << " and " << _config[i] << " differ";
853
LOG( LL_WARNING ) << ss.str() << endl;
960
warning() << ss.str() << endl;
854
961
if ( tries <= 1 ) {
855
ss << "\n" << c1 << "\t" << c2 << "\n" << d1 << "\t" << d2;
962
ss << ": " << base["collections"].Obj() << " vs " << res[i]["collections"].Obj();
856
963
errmsg = ss.str();
964
1065
log() << "warning: unknown setting [" << name << "]" << endl;
968
if ( ! got.count( "chunksize" ) ) {
969
conn->get()->insert(SettingsType::ConfigNS,
970
BSON(SettingsType::key("chunksize") <<
971
SettingsType::chunksize(Chunk::MaxChunkSize /
976
conn->get()->ensureIndex(ChunkType::ConfigNS,
977
BSON(ChunkType::ns() << 1 << ChunkType::min() << 1 ), true);
979
conn->get()->ensureIndex(ChunkType::ConfigNS,
980
BSON(ChunkType::ns() << 1 <<
981
ChunkType::shard() << 1 <<
982
ChunkType::min() << 1 ), true);
984
conn->get()->ensureIndex(ChunkType::ConfigNS,
985
BSON(ChunkType::ns() << 1 <<
986
ChunkType::DEPRECATED_lastmod() << 1 ), true );
988
conn->get()->ensureIndex(ShardType::ConfigNS, BSON(ShardType::host() << 1), true);
990
conn->get()->ensureIndex(LocksType::ConfigNS,
991
BSON( LocksType::lockID() << 1 ), true);
993
conn->get()->ensureIndex(LockpingsType::ConfigNS,
994
BSON( LockpingsType::ping() << 1 ), false);
996
conn->get()->ensureIndex(LocksType::ConfigNS,
997
BSON( LocksType::state() << 1 <<
998
LocksType::process() << 1 ), false);
1002
1069
catch ( DBException& e ) {
1003
warning() << "couldn't load settings or create indexes on config db: " << e.what() << endl;
1070
warning() << "couldn't load settings on config db: "
1071
<< e.what() << endl;
1074
if ( ! got.count( "chunksize" ) ) {
1075
const int chunkSize = Chunk::MaxChunkSize / (1024 * 1024);
1076
Status result = clusterInsert( SettingsType::ConfigNS,
1077
BSON( SettingsType::key("chunksize") <<
1078
SettingsType::chunksize(chunkSize)),
1079
WriteConcernOptions::AllConfigs,
1081
if ( !result.isOK() ) {
1082
warning() << "couldn't set chunkSize on config db: " << result.reason() << endl;
1087
Status result = clusterCreateIndex( ChunkType::ConfigNS,
1088
BSON( ChunkType::ns() << 1 << ChunkType::min() << 1 ),
1090
WriteConcernOptions::AllConfigs,
1093
if ( !result.isOK() ) {
1094
warning() << "couldn't create ns_1_min_1 index on config db: "
1095
<< result.reason() << endl;
1098
result = clusterCreateIndex( ChunkType::ConfigNS,
1099
BSON( ChunkType::ns() << 1 <<
1100
ChunkType::shard() << 1 <<
1101
ChunkType::min() << 1 ),
1103
WriteConcernOptions::AllConfigs,
1106
if ( !result.isOK() ) {
1107
warning() << "couldn't create ns_1_shard_1_min_1 index on config db: "
1108
<< result.reason() << endl;
1111
result = clusterCreateIndex( ChunkType::ConfigNS,
1112
BSON( ChunkType::ns() << 1 <<
1113
ChunkType::DEPRECATED_lastmod() << 1 ),
1115
WriteConcernOptions::AllConfigs,
1118
if ( !result.isOK() ) {
1119
warning() << "couldn't create ns_1_lastmod_1 index on config db: "
1120
<< result.reason() << endl;
1123
result = clusterCreateIndex( ShardType::ConfigNS,
1124
BSON( ShardType::host() << 1 ),
1126
WriteConcernOptions::AllConfigs,
1129
if ( !result.isOK() ) {
1130
warning() << "couldn't create host_1 index on config db: "
1131
<< result.reason() << endl;
1134
result = clusterCreateIndex( LocksType::ConfigNS,
1135
BSON( LocksType::lockID() << 1 ),
1137
WriteConcernOptions::AllConfigs,
1140
if ( !result.isOK() ) {
1141
warning() << "couldn't create lock id index on config db: "
1142
<< result.reason() << endl;
1145
result = clusterCreateIndex( LocksType::ConfigNS,
1146
BSON( LocksType::state() << 1 <<
1147
LocksType::process() << 1 ),
1149
WriteConcernOptions::AllConfigs,
1152
if ( !result.isOK() ) {
1153
warning() << "couldn't create state and process id index on config db: "
1154
<< result.reason() << endl;
1157
result = clusterCreateIndex( LockpingsType::ConfigNS,
1158
BSON( LockpingsType::ping() << 1 ),
1160
WriteConcernOptions::AllConfigs,
1163
if ( !result.isOK() ) {
1164
warning() << "couldn't create lockping ping time index on config db: "
1165
<< result.reason() << endl;
1074
void ConfigServer::replicaSetChange( const ReplicaSetMonitor * monitor ) {
1241
void ConfigServer::replicaSetChange(const string& setName, const string& newConnectionString) {
1242
// This is run in it's own thread. Exceptions escaping would result in a call to terminate.
1243
Client::initThread("replSetChange");
1076
Shard s = Shard::lookupRSName(monitor->getName());
1245
Shard s = Shard::lookupRSName(setName);
1077
1246
if (s == Shard::EMPTY) {
1078
LOG(1) << "replicaSetChange: shard not found for set: " << monitor->getServerAddress() << endl;
1247
LOG(1) << "shard not found for set: " << newConnectionString;
1081
scoped_ptr<ScopedDbConnection> conn( ScopedDbConnection::getInternalScopedDbConnection(
1082
configServer.getConnectionString().toString(), 30.0 ) );
1083
conn->get()->update(ShardType::ConfigNS,
1084
BSON(ShardType::name(s.getName())),
1086
BSON(ShardType::host(monitor->getServerAddress()))));
1089
catch (DBException& e) {
1090
error() << "RSChangeWatcher: could not update config db for set: " << monitor->getName()
1091
<< " to: " << monitor->getServerAddress() << causedBy(e) << endl;
1251
Status result = clusterUpdate(ShardType::ConfigNS,
1252
BSON(ShardType::name(s.getName())),
1253
BSON("$set" << BSON(ShardType::host(newConnectionString))),
1256
WriteConcernOptions::AllConfigs,
1259
if ( !result.isOK() ) {
1260
error() << "RSChangeWatcher: could not update config db for set: "
1262
<< " to: " << newConnectionString
1263
<< ": " << result.reason() << endl;
1266
catch (const std::exception& e) {
1267
log() << "caught exception while updating config servers: " << e.what();
1270
log() << "caught unknown exception while updating config servers";