1
// cloner.cpp - copy a database (export/import basically)
4
* Copyright (C) 2008 10gen Inc.
6
* This program is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU Affero General Public License, version 3,
8
* as published by the Free Software Foundation.
10
* This program is distributed in the hope that it will be useful,
11
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
* GNU Affero General Public License for more details.
15
* You should have received a copy of the GNU Affero General Public License
16
* along with this program. If not, see <http://www.gnu.org/licenses/>.
21
#include "../client/dbclient.h"
22
#include "../util/builder.h"
32
void ensureHaveIdIndex(const char *ns);
34
bool replAuthenticate(DBClientConnection *);
36
class Cloner: boost::noncopyable {
37
auto_ptr< DBClientWithCommands > conn;
38
void copy(const char *from_ns, const char *to_ns, bool isindex, bool logForRepl,
39
bool masterSameProcess, bool slaveOk, Query q = Query());
40
void replayOpLog( DBClientCursor *c, const BSONObj &query );
44
/* slaveOk - if true it is ok if the source of the data is !ismaster.
45
useReplAuth - use the credentials we normally use as a replication slave for the cloning
46
snapshot - use $snapshot mode for copying collections. note this should not be used when it isn't required, as it will be slower.
47
for example repairDatabase need not use it.
49
bool go(const char *masterHost, string& errmsg, const string& fromdb, bool logForRepl, bool slaveOk, bool useReplAuth, bool snapshot);
50
bool startCloneCollection( const char *fromhost, const char *ns, const BSONObj &query, string& errmsg, bool logForRepl, bool copyIndexes, int logSizeMb, long long &cursorId );
51
bool finishCloneCollection( const char *fromhost, const char *ns, const BSONObj &query, long long cursorId, string &errmsg );
54
/* for index info object:
55
{ "name" : "name_1" , "ns" : "foo.index3" , "key" : { "name" : 1.0 } }
56
we need to fix up the value in the "ns" parameter so that the name prefix is correct on a
59
BSONObj fixindex(BSONObj o) {
62
while ( i.moreWithEOO() ) {
63
BSONElement e = i.next();
66
if ( string("ns") == e.fieldName() ) {
67
uassert( 10024 , "bad ns field for index during dbcopy", e.type() == String);
68
const char *p = strchr(e.valuestr(), '.');
69
uassert( 10025 , "bad ns field for index during dbcopy [2]", p);
70
string newname = cc().database()->name + p;
71
b.append("ns", newname);
79
out() << "before: " << o.toString() << endl;
81
out() << "after: " << res.toString() << endl;
88
/* copy the specified collection
89
isindex - if true, this is system.indexes collection, in which we do some transformation when copying.
91
void Cloner::copy(const char *from_collection, const char *to_collection, bool isindex, bool logForRepl, bool masterSameProcess, bool slaveOk, Query query) {
92
auto_ptr<DBClientCursor> c;
95
c = conn->query( from_collection, query, 0, 0, 0, QueryOption_NoCursorTimeout | ( slaveOk ? QueryOption_SlaveOk : 0 ) );
98
list<BSONObj> storedForLater;
102
time_t saveLast = time( 0 );
109
BSONObj tmp = c->next();
111
/* assure object is valid. note this will slow us down a little. */
112
if ( !tmp.valid() ) {
114
ss << "skipping corrupt object from " << from_collection;
115
BSONElement e = tmp.firstElement();
118
ss << " firstElement: " << e;
121
ss << " firstElement corrupt";
123
out() << ss.str() << endl;
131
assert( strstr(from_collection, "system.indexes") );
133
storedForLater.push_back( js.getOwned() );
138
theDataFileMgr.insert(to_collection, js);
140
logOp("i", to_collection, js);
142
catch( UserException& e ) {
143
log() << "warning: exception cloning object in " << from_collection << ' ' << e.what() << " obj:" << js.toString() << '\n';
146
RARELY if ( time( 0 ) - saveLast > 60 ) {
147
log() << n << " objects cloned so far from collection " << from_collection << endl;
148
saveLast = time( 0 );
152
if ( storedForLater.size() ){
153
for ( list<BSONObj>::iterator i = storedForLater.begin(); i!=storedForLater.end(); i++ ){
156
theDataFileMgr.insert(to_collection, js);
158
logOp("i", to_collection, js);
160
catch( UserException& e ) {
161
log() << "warning: exception cloning object in " << from_collection << ' ' << e.what() << " obj:" << js.toString() << '\n';
167
bool Cloner::go(const char *masterHost, string& errmsg, const string& fromdb, bool logForRepl, bool slaveOk, bool useReplAuth, bool snapshot) {
169
massert( 10289 , "useReplAuth is not written to replication log", !useReplAuth || !logForRepl );
171
string todb = cc().database()->name;
173
a << "localhost:" << cmdLine.port;
174
b << "127.0.0.1:" << cmdLine.port;
175
bool masterSameProcess = ( a.str() == masterHost || b.str() == masterHost );
176
if ( masterSameProcess ) {
177
if ( fromdb == todb && cc().database()->path == dbpath ) {
178
// guard against an "infinite" loop
179
/* if you are replicating, the local.sources config may be wrong if you get this */
180
errmsg = "can't clone from self (localhost).";
184
/* todo: we can put these releases inside dbclient or a dbclient specialization.
185
or just wait until we get rid of global lock anyway.
187
string ns = fromdb + ".system.namespaces";
188
list<BSONObj> toClone;
192
auto_ptr<DBClientCursor> c;
194
if ( !masterSameProcess ) {
195
auto_ptr< DBClientConnection > c( new DBClientConnection() );
196
if ( !c->connect( masterHost, errmsg ) )
198
if( !replAuthenticate(c.get()) )
203
conn.reset( new DBDirectClient() );
205
c = conn->query( ns.c_str(), BSONObj(), 0, 0, 0, slaveOk ? QueryOption_SlaveOk : 0 );
208
if ( c.get() == 0 ) {
209
errmsg = "query failed " + ns;
214
BSONObj collection = c->next();
216
log(2) << "\t cloner got " << collection << endl;
218
BSONElement e = collection.findElement("name");
220
string s = "bad system.namespaces object " + collection.toString();
221
massert( 10290 , s.c_str(), false);
224
assert( e.type() == String );
225
const char *from_name = e.valuestr();
227
if( strstr(from_name, ".system.") ) {
228
/* system.users is cloned -- but nothing else from system. */
229
if( legalClientSystemNS( from_name , true ) == 0 ){
230
log(2) << "\t\t not cloning because system collection" << endl;
234
else if( strchr(from_name, '$') ) {
235
// don't clone index namespaces -- we take care of those separately below.
236
log(2) << "\t\t not cloning because has $ " << endl;
240
toClone.push_back( collection.getOwned() );
244
for ( list<BSONObj>::iterator i=toClone.begin(); i != toClone.end(); i++ ){
248
BSONObj collection = *i;
249
log(2) << " really will clone: " << collection << endl;
250
const char * from_name = collection["name"].valuestr();
251
BSONObj options = collection.getObjectField("options");
253
/* change name "<fromdb>.collection" -> <todb>.collection */
254
const char *p = strchr(from_name, '.');
256
string to_name = todb + p;
260
const char *toname = to_name.c_str();
261
userCreateNS(toname, options, err, logForRepl);
263
log(1) << "\t\t cloning " << from_name << " -> " << to_name << endl;
267
copy(from_name, to_name.c_str(), false, logForRepl, masterSameProcess, slaveOk, q);
270
// now build the indexes
271
string system_indexes_from = fromdb + ".system.indexes";
272
string system_indexes_to = todb + ".system.indexes";
273
/* [dm]: is the ID index sometimes not called "_id_"? There is other code in the system that looks for a "_id" prefix
274
rather than this exact value. we should standardize. OR, remove names - which is in the bugdb. Anyway, this
275
is dubious here at the moment.
277
copy(system_indexes_from.c_str(), system_indexes_to.c_str(), true, logForRepl, masterSameProcess, slaveOk, BSON( "name" << NE << "_id_" ) );
282
bool Cloner::startCloneCollection( const char *fromhost, const char *ns, const BSONObj &query, string &errmsg, bool logForRepl, bool copyIndexes, int logSizeMb, long long &cursorId ) {
284
nsToDatabase( ns, db );
286
NamespaceDetails *nsd = nsdetails( ns );
288
/** note: its ok to clone into a collection, but only if the range you're copying
289
doesn't exist on this server */
291
if ( runCount( ns , BSON( "query" << query ) , err ) > 0 ){
292
log() << "WARNING: data already exists for: " << ns << " in range : " << query << " deleting..." << endl;
293
deleteObjects( ns , query , false , logForRepl , false );
299
auto_ptr< DBClientConnection > c( new DBClientConnection() );
300
if ( !c->connect( fromhost, errmsg ) )
302
if( !replAuthenticate(c.get()) )
306
// Start temporary op log
307
BSONObjBuilder cmdSpec;
308
cmdSpec << "logCollection" << ns << "start" << 1;
309
if ( logSizeMb != INT_MIN )
310
cmdSpec << "logSizeMb" << logSizeMb;
312
if ( !conn->runCommand( db, cmdSpec.done(), info ) ) {
313
errmsg = "logCollection failed: " + (string)info;
319
BSONObj spec = conn->findOne( string( db ) + ".system.namespaces", BSON( "name" << ns ) );
320
if ( !userCreateNS( ns, spec.getObjectField( "options" ), errmsg, true ) )
324
copy( ns, ns, false, logForRepl, false, false, query );
327
string indexNs = string( db ) + ".system.indexes";
328
copy( indexNs.c_str(), indexNs.c_str(), true, logForRepl, false, false, BSON( "ns" << ns << "name" << NE << "_id_" ) );
331
auto_ptr< DBClientCursor > c;
334
string logNS = "local.temp.oplog." + string( ns );
335
c = conn->query( logNS.c_str(), Query(), 0, 0, 0, QueryOption_CursorTailable );
338
replayOpLog( c.get(), query );
339
cursorId = c->getCursorId();
340
massert( 10291 , "Expected valid tailing cursor", cursorId != 0 );
342
massert( 10292 , "Did not expect valid cursor for empty query result", c->getCursorId() == 0 );
349
void Cloner::replayOpLog( DBClientCursor *c, const BSONObj &query ) {
350
Matcher matcher( query );
359
// For sharding v1.0, we don't allow shard key updates -- so just
360
// filter each insert by value.
361
if ( op.getStringField( "op" )[ 0 ] != 'i' || matcher.matches( op.getObjectField( "o" ) ) )
362
ReplSource::applyOperation( op );
366
bool Cloner::finishCloneCollection( const char *fromhost, const char *ns, const BSONObj &query, long long cursorId, string &errmsg ) {
368
nsToDatabase( ns, db );
370
auto_ptr< DBClientCursor > cur;
373
auto_ptr< DBClientConnection > c( new DBClientConnection() );
374
if ( !c->connect( fromhost, errmsg ) )
376
if( !replAuthenticate(c.get()) )
379
string logNS = "local.temp.oplog." + string( ns );
381
cur = conn->getMore( logNS.c_str(), cursorId );
383
cur = conn->query( logNS.c_str(), Query() );
385
replayOpLog( cur.get(), query );
389
if ( !conn->runCommand( db, BSON( "logCollection" << ns << "validateComplete" << 1 ), info ) ) {
390
errmsg = "logCollection failed: " + (string)info;
397
/* slaveOk - if true it is ok if the source of the data is !ismaster.
398
useReplAuth - use the credentials we normally use as a replication slave for the cloning
399
snapshot - use $snapshot mode for copying collections. note this should not be used when it isn't required, as it will be slower.
400
for example repairDatabase need not use it.
402
bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication,
403
bool slaveOk, bool useReplAuth, bool snapshot)
406
return c.go(masterHost, errmsg, fromdb, logForReplication, slaveOk, useReplAuth, snapshot);
410
mydb.$cmd.findOne( { clone: "fromhost" } );
412
class CmdClone : public Command {
414
virtual bool slaveOk() {
417
virtual void help( stringstream &help ) const {
418
help << "clone this database from an instance of the db on another host\n";
419
help << "example: { clone : \"host13\" }";
421
CmdClone() : Command("clone") { }
422
virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
423
string from = cmdObj.getStringField("clone");
426
/* replication note: we must logOp() not the command, but the cloned data -- if the slave
427
were to clone it would get a different point-in-time and not match.
429
return cloneFrom(from.c_str(), errmsg, cc().database()->name,
430
/*logForReplication=*/!fromRepl, /*slaveok*/false, /*usereplauth*/false, /*snapshot*/true);
434
class CmdCloneCollection : public Command {
436
virtual bool slaveOk() {
439
CmdCloneCollection() : Command("cloneCollection") { }
440
virtual void help( stringstream &help ) const {
441
help << " example: { cloneCollection: <collection ns>, from: <hostname>, query: <query> }";
443
virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
444
string fromhost = cmdObj.getStringField("from");
445
if ( fromhost.empty() ) {
446
errmsg = "missing from spec";
449
string collection = cmdObj.getStringField("cloneCollection");
450
if ( collection.empty() ) {
451
errmsg = "missing cloneCollection spec";
454
BSONObj query = cmdObj.getObjectField("query");
455
if ( query.isEmpty() )
457
BSONElement copyIndexesSpec = cmdObj.getField("copyindexes");
458
bool copyIndexes = copyIndexesSpec.isBoolean() ? copyIndexesSpec.boolean() : true;
459
// Will not be used if doesn't exist.
460
int logSizeMb = cmdObj.getIntField( "logSizeMb" );
462
/* replication note: we must logOp() not the command, but the cloned data -- if the slave
463
were to clone it would get a different point-in-time and not match.
465
setClient( collection.c_str() );
467
log() << "cloneCollection. db:" << ns << " collection:" << collection << " from: " << fromhost << " query: " << query << " logSizeMb: " << logSizeMb << ( copyIndexes ? "" : ", not copying indexes" ) << endl;
471
if ( !c.startCloneCollection( fromhost.c_str(), collection.c_str(), query, errmsg, !fromRepl, copyIndexes, logSizeMb, cursorId ) )
473
return c.finishCloneCollection( fromhost.c_str(), collection.c_str(), query, cursorId, errmsg);
475
} cmdclonecollection;
477
class CmdStartCloneCollection : public Command {
479
virtual bool slaveOk() {
482
CmdStartCloneCollection() : Command("startCloneCollection") { }
483
virtual void help( stringstream &help ) const {
484
help << " example: { startCloneCollection: <collection ns>, from: <hostname>, query: <query> }";
485
help << ", returned object includes a finishToken field, the value of which may be passed to the finishCloneCollection command";
487
virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
488
string fromhost = cmdObj.getStringField("from");
489
if ( fromhost.empty() ) {
490
errmsg = "missing from spec";
493
string collection = cmdObj.getStringField("startCloneCollection");
494
if ( collection.empty() ) {
495
errmsg = "missing startCloneCollection spec";
498
BSONObj query = cmdObj.getObjectField("query");
499
if ( query.isEmpty() )
501
BSONElement copyIndexesSpec = cmdObj.getField("copyindexes");
502
bool copyIndexes = copyIndexesSpec.isBoolean() ? copyIndexesSpec.boolean() : true;
503
// Will not be used if doesn't exist.
504
int logSizeMb = cmdObj.getIntField( "logSizeMb" );
506
/* replication note: we must logOp() not the command, but the cloned data -- if the slave
507
were to clone it would get a different point-in-time and not match.
509
setClient( collection.c_str() );
511
log() << "startCloneCollection. db:" << ns << " collection:" << collection << " from: " << fromhost << " query: " << query << endl;
515
bool res = c.startCloneCollection( fromhost.c_str(), collection.c_str(), query, errmsg, !fromRepl, copyIndexes, logSizeMb, cursorId );
519
b << "fromhost" << fromhost;
520
b << "collection" << collection;
521
b << "query" << query;
522
b.appendDate( "cursorId", cursorId );
523
BSONObj token = b.done();
524
result << "finishToken" << token;
528
} cmdstartclonecollection;
530
class CmdFinishCloneCollection : public Command {
532
virtual bool slaveOk() {
535
CmdFinishCloneCollection() : Command("finishCloneCollection") { }
536
virtual void help( stringstream &help ) const {
537
help << " example: { finishCloneCollection: <finishToken> }";
539
virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
540
BSONObj fromToken = cmdObj.getObjectField("finishCloneCollection");
541
if ( fromToken.isEmpty() ) {
542
errmsg = "missing finishCloneCollection finishToken spec";
545
string fromhost = fromToken.getStringField( "fromhost" );
546
if ( fromhost.empty() ) {
547
errmsg = "missing fromhost spec";
550
string collection = fromToken.getStringField("collection");
551
if ( collection.empty() ) {
552
errmsg = "missing collection spec";
555
BSONObj query = fromToken.getObjectField("query");
556
if ( query.isEmpty() ) {
559
long long cursorId = 0;
560
BSONElement cursorIdToken = fromToken.getField( "cursorId" );
561
if ( cursorIdToken.type() == Date ) {
562
cursorId = cursorIdToken._numberLong();
565
setClient( collection.c_str() );
567
log() << "finishCloneCollection. db:" << ns << " collection:" << collection << " from: " << fromhost << " query: " << query << endl;
570
return c.finishCloneCollection( fromhost.c_str(), collection.c_str(), query, cursorId, errmsg );
572
} cmdfinishclonecollection;
575
admindb.$cmd.findOne( { copydb: 1, fromhost: <hostname>, fromdb: <db>, todb: <db> } );
577
class CmdCopyDb : public Command {
579
CmdCopyDb() : Command("copydb") { }
580
virtual bool adminOnly() {
583
virtual bool slaveOk() {
586
virtual void help( stringstream &help ) const {
587
help << "copy a database from antoher host to this host\n";
588
help << "usage: {copydb: 1, fromhost: <hostname>, fromdb: <db>, todb: <db>}";
590
virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
591
string fromhost = cmdObj.getStringField("fromhost");
592
if ( fromhost.empty() ) {
595
ss << "localhost:" << cmdLine.port;
598
string fromdb = cmdObj.getStringField("fromdb");
599
string todb = cmdObj.getStringField("todb");
600
if ( fromhost.empty() || todb.empty() || fromdb.empty() ) {
601
errmsg = "parms missing - {copydb: 1, fromhost: <hostname>, fromdb: <db>, todb: <db>}";
604
setClient(todb.c_str());
605
bool res = cloneFrom(fromhost.c_str(), errmsg, fromdb, /*logForReplication=*/!fromRepl, /*slaveok*/false, /*replauth*/false, /*snapshot*/true);
611
class CmdRenameCollection : public Command {
613
CmdRenameCollection() : Command( "renameCollection" ) {}
614
virtual bool adminOnly() {
617
virtual bool slaveOk() {
620
virtual bool logTheOp() {
621
return true; // can't log steps when doing fast rename within a db, so always log the op rather than individual steps comprising it.
623
virtual void help( stringstream &help ) const {
624
help << " example: { renameCollection: foo.a, to: bar.b }";
626
virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
627
string source = cmdObj.getStringField( name.c_str() );
628
string target = cmdObj.getStringField( "to" );
629
if ( source.empty() || target.empty() ) {
630
errmsg = "invalid command syntax";
634
setClient( source.c_str() );
635
NamespaceDetails *nsd = nsdetails( source.c_str() );
636
uassert( 10026 , "source namespace does not exist", nsd );
637
bool capped = nsd->capped;
640
for( DiskLoc i = nsd->firstExtent; !i.isNull(); i = i.ext()->xnext )
641
size += i.ext()->length;
643
setClient( target.c_str() );
645
if ( nsdetails( target.c_str() ) ){
646
uassert( 10027 , "target namespace exists", cmdObj["dropTarget"].trueValue() );
647
BSONObjBuilder bb( result.subobjStart( "dropTarget" ) );
648
dropCollection( target , errmsg , bb );
650
if ( errmsg.size() > 0 )
656
nsToDatabase( source.c_str(), from );
658
nsToDatabase( target.c_str(), to );
659
if ( strcmp( from, to ) == 0 ) {
660
renameNamespace( source.c_str(), target.c_str() );
667
spec.appendBool( "capped", true );
668
spec.append( "size", double( size ) );
670
if ( !userCreateNS( target.c_str(), spec.done(), errmsg, false ) )
673
auto_ptr< DBClientCursor > c;
674
DBDirectClient bridge;
677
c = bridge.query( source, BSONObj() );
684
BSONObj o = c->next();
685
theDataFileMgr.insert( target.c_str(), o );
689
nsToDatabase( source.c_str(), cl );
690
string sourceIndexes = string( cl ) + ".system.indexes";
691
nsToDatabase( target.c_str(), cl );
692
string targetIndexes = string( cl ) + ".system.indexes";
694
c = bridge.query( sourceIndexes, QUERY( "ns" << source ) );
701
BSONObj o = c->next();
703
BSONObjIterator i( o );
704
while( i.moreWithEOO() ) {
705
BSONElement e = i.next();
708
if ( strcmp( e.fieldName(), "ns" ) == 0 ) {
709
b.append( "ns", target );
714
BSONObj n = b.done();
715
theDataFileMgr.insert( targetIndexes.c_str(), n );
718
setClient( source.c_str() );
719
dropCollection( source, errmsg, result );
722
} cmdrenamecollection;