1
/* === This file is part of Tomahawk Player - <http://tomahawk-player.org> ===
3
* Copyright 2010-2011, Christian Muehlhaeuser <muesli@tomahawk-player.org>
4
* Copyright 2010-2011, Jeff Mitchell <jeff@tomahawk-player.org>
6
* Tomahawk is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
11
* Tomahawk is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with Tomahawk. If not, see <http://www.gnu.org/licenses/>.
21
Database syncing using the oplog table.
22
=======================================
23
Load the last GUID we applied for the peer, tell them it.
24
In return, they send us all new ops since that guid.
26
We then apply those new ops to our cache of their data
32
#include "DbSyncConnection.h"
34
#include "database/Database.h"
35
#include "database/DatabaseCommand.h"
36
#include "database/DatabaseCommand_CollectionStats.h"
37
#include "database/DatabaseCommand_LoadOps.h"
38
#include "RemoteCollection.h"
40
#include "SourceList.h"
41
#include "utils/Logger.h"
43
using namespace Tomahawk;
46
DBSyncConnection::DBSyncConnection( Servent* s, const source_ptr& src )
52
qDebug() << Q_FUNC_INFO << src->id() << thread();
54
connect( this, SIGNAL( stateChanged( DBSyncConnection::State, DBSyncConnection::State, QString ) ),
55
m_source.data(), SLOT( onStateChanged( DBSyncConnection::State, DBSyncConnection::State, QString ) ) );
56
connect( m_source.data(), SIGNAL( commandsFinished() ),
57
this, SLOT( lastOpApplied() ) );
59
this->setMsgProcessorModeIn( MsgProcessor::PARSE_JSON | MsgProcessor::UNCOMPRESS_ALL );
61
// msgs are stored compressed in the db, so not typically needed here, but doesnt hurt:
62
this->setMsgProcessorModeOut( MsgProcessor::COMPRESS_IF_LARGE );
66
DBSyncConnection::~DBSyncConnection()
68
tDebug() << "DTOR" << Q_FUNC_INFO << m_source->id() << m_source->friendlyName();
74
DBSyncConnection::changeState( State newstate )
76
if ( m_state == SHUTDOWN )
81
qDebug() << "DBSYNC State changed from" << s << "to" << newstate << "- source:" << m_source->id();
82
emit stateChanged( newstate, s, "" );
87
DBSyncConnection::setup()
89
setId( QString( "DBSyncConnection/%1" ).arg( socket()->peerAddress().toString() ) );
95
DBSyncConnection::trigger()
97
// if we're still setting up the connection, do nothing - we sync on first connect anyway:
101
QMetaObject::invokeMethod( this, "sendMsg", Qt::QueuedConnection,
102
Q_ARG( msg_ptr, Msg::factory( "{\"method\":\"trigger\"}", Msg::JSON ) ) );
107
DBSyncConnection::check()
109
qDebug() << Q_FUNC_INFO << this << m_source->id();
111
if ( m_state == SHUTDOWN )
113
qDebug() << "Aborting sync due to shutdown.";
116
if ( m_state != UNKNOWN && m_state != SYNCED )
118
qDebug() << "Syncing in progress already.";
123
changeState( CHECKING );
125
if ( m_source->lastCmdGuid().isEmpty() )
127
tDebug( LOGVERBOSE ) << "Fetching lastCmdGuid from database!";
128
DatabaseCommand_CollectionStats* cmd_them = new DatabaseCommand_CollectionStats( m_source );
129
connect( cmd_them, SIGNAL( done( QVariantMap ) ), SLOT( gotThem( QVariantMap ) ) );
130
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>(cmd_them) );
134
fetchOpsData( m_source->lastCmdGuid() );
139
/// Called once we've loaded our cached data about their collection
141
DBSyncConnection::gotThem( const QVariantMap& m )
143
fetchOpsData( m.value( "lastop" ).toString() );
148
DBSyncConnection::fetchOpsData( const QString& sinceguid )
150
changeState( FETCHING );
152
tLog() << "Sending a FETCHOPS cmd since:" << sinceguid << "- source:" << m_source->id();
155
msg.insert( "method", "fetchops" );
156
msg.insert( "lastop", sinceguid );
162
DBSyncConnection::handleMsg( msg_ptr msg )
164
Q_ASSERT( !msg->is( Msg::COMPRESSED ) );
166
if ( m_state == FETCHING )
167
changeState( PARSING );
169
// "everything is synced" indicated by non-json msg containing "ok":
170
if ( !msg->is( Msg::JSON ) &&
171
msg->is( Msg::DBOP ) &&
172
msg->payload() == "ok" )
174
changeState( SYNCED );
176
// calc the collection stats, to updates the "X tracks" in the sidebar etc
177
// this is done automatically if you run a dbcmd to add files.
178
DatabaseCommand_CollectionStats* cmd = new DatabaseCommand_CollectionStats( m_source );
179
connect( cmd, SIGNAL( done( const QVariantMap & ) ),
180
m_source.data(), SLOT( setStats( const QVariantMap& ) ), Qt::QueuedConnection );
181
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>(cmd) );
185
Q_ASSERT( msg->is( Msg::JSON ) );
187
QVariantMap m = msg->json().toMap();
190
tLog() << "Failed to parse msg in dbsync" << m_source->id() << m_source->friendlyName();
196
if ( msg->is( Msg::DBOP ) )
198
DatabaseCommand* cmd = DatabaseCommand::factory( m, m_source );
201
QSharedPointer<DatabaseCommand> cmdsp = QSharedPointer<DatabaseCommand>(cmd);
202
m_source->addCommand( cmdsp );
205
if ( !msg->is( Msg::FRAGMENT ) ) // last msg in this batch
207
changeState( SAVING ); // just DB work left to complete
208
m_source->executeCommands();
213
if ( m.value( "method" ).toString() == "fetchops" )
216
tDebug( LOGVERBOSE ) << "Fetching new dbops:" << m["lastop"].toString() << m_fetchCount;
222
if ( m.value( "method" ).toString() == "trigger" )
224
tLog() << "Got trigger msg on dbsyncconnection, checking for new stuff.";
229
tLog() << Q_FUNC_INFO << "Unhandled msg:" << msg->payload();
235
DBSyncConnection::lastOpApplied()
237
changeState( SYNCED );
238
// check again, until peer responds we have no new ops to process
243
/// request new copies of anything we've cached that is stale
245
DBSyncConnection::sendOps()
247
tLog() << "Will send peer" << m_source->id() << "all ops since" << m_uscache.value( "lastop" ).toString();
249
source_ptr src = SourceList::instance()->getLocal();
251
DatabaseCommand_loadOps* cmd = new DatabaseCommand_loadOps( src, m_uscache.value( "lastop" ).toString() );
252
connect( cmd, SIGNAL( done( QString, QString, QList< dbop_ptr > ) ),
253
SLOT( sendOpsData( QString, QString, QList< dbop_ptr > ) ) );
257
Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
262
DBSyncConnection::sendOpsData( QString sinceguid, QString lastguid, QList< dbop_ptr > ops )
264
if ( m_lastSentOp == lastguid )
267
m_lastSentOp = lastguid;
268
if ( ops.length() == 0 )
270
tLog( LOGVERBOSE ) << "Sending ok" << m_source->id() << m_source->friendlyName();
271
sendMsg( Msg::factory( "ok", Msg::DBOP ) );
275
tLog( LOGVERBOSE ) << Q_FUNC_INFO << sinceguid << lastguid << "Num ops to send:" << ops.length();
278
for( i = 0; i < ops.length(); ++i )
280
quint8 flags = Msg::JSON | Msg::DBOP;
282
if ( ops.at( i )->compressed )
283
flags |= Msg::COMPRESSED;
284
if ( i != ops.length() - 1 )
285
flags |= Msg::FRAGMENT;
287
sendMsg( Msg::factory( ops.at( i )->payload, flags ) );
293
DBSyncConnection::clone()