~ubuntu-branches/ubuntu/trusty/tomahawk/trusty-proposed

« back to all changes in this revision

Viewing changes to src/libtomahawk/network/DbSyncConnection.cpp

  • Committer: Package Import Robot
  • Author(s): Harald Sitter
  • Date: 2013-03-07 21:50:13 UTC
  • Revision ID: package-import@ubuntu.com-20130307215013-6gdjkdds7i9uenvs
Tags: upstream-0.6.0+dfsg
ImportĀ upstreamĀ versionĀ 0.6.0+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* === This file is part of Tomahawk Player - <http://tomahawk-player.org> ===
 
2
 *
 
3
 *   Copyright 2010-2011, Christian Muehlhaeuser <muesli@tomahawk-player.org>
 
4
 *   Copyright 2010-2011, Jeff Mitchell <jeff@tomahawk-player.org>
 
5
 *
 
6
 *   Tomahawk is free software: you can redistribute it and/or modify
 
7
 *   it under the terms of the GNU General Public License as published by
 
8
 *   the Free Software Foundation, either version 3 of the License, or
 
9
 *   (at your option) any later version.
 
10
 *
 
11
 *   Tomahawk is distributed in the hope that it will be useful,
 
12
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 
14
 *   GNU General Public License for more details.
 
15
 *
 
16
 *   You should have received a copy of the GNU General Public License
 
17
 *   along with Tomahawk. If not, see <http://www.gnu.org/licenses/>.
 
18
 */
 
19
 
 
20
/*
 
21
    Database syncing using the oplog table.
 
22
    =======================================
 
23
    Load the last GUID we applied for the peer, tell them it.
 
24
    In return, they send us all new ops since that guid.
 
25
 
 
26
    We then apply those new ops to our cache of their data
 
27
 
 
28
    Synced.
 
29
 
 
30
*/
 
31
 
 
32
#include "DbSyncConnection.h"
 
33
 
 
34
#include "database/Database.h"
 
35
#include "database/DatabaseCommand.h"
 
36
#include "database/DatabaseCommand_CollectionStats.h"
 
37
#include "database/DatabaseCommand_LoadOps.h"
 
38
#include "RemoteCollection.h"
 
39
#include "Source.h"
 
40
#include "SourceList.h"
 
41
#include "utils/Logger.h"
 
42
 
 
43
using namespace Tomahawk;
 
44
 
 
45
 
 
46
DBSyncConnection::DBSyncConnection( Servent* s, const source_ptr& src )
 
47
    : Connection( s )
 
48
    , m_fetchCount( 0 )
 
49
    , m_source( src )
 
50
    , m_state( UNKNOWN )
 
51
{
 
52
    qDebug() << Q_FUNC_INFO << src->id() << thread();
 
53
 
 
54
    connect( this,            SIGNAL( stateChanged( DBSyncConnection::State, DBSyncConnection::State, QString ) ),
 
55
             m_source.data(),   SLOT( onStateChanged( DBSyncConnection::State, DBSyncConnection::State, QString ) ) );
 
56
    connect( m_source.data(), SIGNAL( commandsFinished() ),
 
57
             this,              SLOT( lastOpApplied() ) );
 
58
 
 
59
    this->setMsgProcessorModeIn( MsgProcessor::PARSE_JSON | MsgProcessor::UNCOMPRESS_ALL );
 
60
 
 
61
    // msgs are stored compressed in the db, so not typically needed here, but doesnt hurt:
 
62
    this->setMsgProcessorModeOut( MsgProcessor::COMPRESS_IF_LARGE );
 
63
}
 
64
 
 
65
 
 
66
DBSyncConnection::~DBSyncConnection()
 
67
{
 
68
    tDebug() << "DTOR" << Q_FUNC_INFO << m_source->id() << m_source->friendlyName();
 
69
    m_state = SHUTDOWN;
 
70
}
 
71
 
 
72
 
 
73
void
 
74
DBSyncConnection::changeState( State newstate )
 
75
{
 
76
    if ( m_state == SHUTDOWN )
 
77
        return;
 
78
 
 
79
    State s = m_state;
 
80
    m_state = newstate;
 
81
    qDebug() << "DBSYNC State changed from" << s << "to" << newstate << "- source:" << m_source->id();
 
82
    emit stateChanged( newstate, s, "" );
 
83
}
 
84
 
 
85
 
 
86
void
 
87
DBSyncConnection::setup()
 
88
{
 
89
    setId( QString( "DBSyncConnection/%1" ).arg( socket()->peerAddress().toString() ) );
 
90
    check();
 
91
}
 
92
 
 
93
 
 
94
void
 
95
DBSyncConnection::trigger()
 
96
{
 
97
    // if we're still setting up the connection, do nothing - we sync on first connect anyway:
 
98
    if ( !isRunning() )
 
99
        return;
 
100
 
 
101
    QMetaObject::invokeMethod( this, "sendMsg", Qt::QueuedConnection,
 
102
                               Q_ARG( msg_ptr, Msg::factory( "{\"method\":\"trigger\"}", Msg::JSON ) ) );
 
103
}
 
104
 
 
105
 
 
106
void
 
107
DBSyncConnection::check()
 
108
{
 
109
    qDebug() << Q_FUNC_INFO << this << m_source->id();
 
110
 
 
111
    if ( m_state == SHUTDOWN )
 
112
    {
 
113
        qDebug() << "Aborting sync due to shutdown.";
 
114
        return;
 
115
    }
 
116
    if ( m_state != UNKNOWN && m_state != SYNCED )
 
117
    {
 
118
        qDebug() << "Syncing in progress already.";
 
119
        return;
 
120
    }
 
121
 
 
122
    m_uscache.clear();
 
123
    changeState( CHECKING );
 
124
 
 
125
    if ( m_source->lastCmdGuid().isEmpty() )
 
126
    {
 
127
        tDebug( LOGVERBOSE ) << "Fetching lastCmdGuid from database!";
 
128
        DatabaseCommand_CollectionStats* cmd_them = new DatabaseCommand_CollectionStats( m_source );
 
129
        connect( cmd_them, SIGNAL( done( QVariantMap ) ), SLOT( gotThem( QVariantMap ) ) );
 
130
        Database::instance()->enqueue( QSharedPointer<DatabaseCommand>(cmd_them) );
 
131
    }
 
132
    else
 
133
    {
 
134
        fetchOpsData( m_source->lastCmdGuid() );
 
135
    }
 
136
}
 
137
 
 
138
 
 
139
/// Called once we've loaded our cached data about their collection
 
140
void
 
141
DBSyncConnection::gotThem( const QVariantMap& m )
 
142
{
 
143
    fetchOpsData( m.value( "lastop" ).toString() );
 
144
}
 
145
 
 
146
 
 
147
void
 
148
DBSyncConnection::fetchOpsData( const QString& sinceguid )
 
149
{
 
150
    changeState( FETCHING );
 
151
 
 
152
    tLog() << "Sending a FETCHOPS cmd since:" << sinceguid << "- source:" << m_source->id();
 
153
 
 
154
    QVariantMap msg;
 
155
    msg.insert( "method", "fetchops" );
 
156
    msg.insert( "lastop", sinceguid );
 
157
    sendMsg( msg );
 
158
}
 
159
 
 
160
 
 
161
void
 
162
DBSyncConnection::handleMsg( msg_ptr msg )
 
163
{
 
164
    Q_ASSERT( !msg->is( Msg::COMPRESSED ) );
 
165
 
 
166
    if ( m_state == FETCHING )
 
167
        changeState( PARSING );
 
168
 
 
169
    // "everything is synced" indicated by non-json msg containing "ok":
 
170
    if ( !msg->is( Msg::JSON ) &&
 
171
         msg->is( Msg::DBOP ) &&
 
172
         msg->payload() == "ok" )
 
173
    {
 
174
        changeState( SYNCED );
 
175
 
 
176
        // calc the collection stats, to updates the "X tracks" in the sidebar etc
 
177
        // this is done automatically if you run a dbcmd to add files.
 
178
        DatabaseCommand_CollectionStats* cmd = new DatabaseCommand_CollectionStats( m_source );
 
179
        connect( cmd,           SIGNAL( done( const QVariantMap & ) ),
 
180
                 m_source.data(), SLOT( setStats( const QVariantMap& ) ), Qt::QueuedConnection );
 
181
        Database::instance()->enqueue( QSharedPointer<DatabaseCommand>(cmd) );
 
182
        return;
 
183
    }
 
184
 
 
185
    Q_ASSERT( msg->is( Msg::JSON ) );
 
186
 
 
187
    QVariantMap m = msg->json().toMap();
 
188
    if ( m.empty() )
 
189
    {
 
190
        tLog() << "Failed to parse msg in dbsync" << m_source->id() << m_source->friendlyName();
 
191
        Q_ASSERT( false );
 
192
        return;
 
193
    }
 
194
 
 
195
    // a db sync op msg
 
196
    if ( msg->is( Msg::DBOP ) )
 
197
    {
 
198
        DatabaseCommand* cmd = DatabaseCommand::factory( m, m_source );
 
199
        if ( cmd )
 
200
        {
 
201
            QSharedPointer<DatabaseCommand> cmdsp = QSharedPointer<DatabaseCommand>(cmd);
 
202
            m_source->addCommand( cmdsp );
 
203
        }
 
204
 
 
205
        if ( !msg->is( Msg::FRAGMENT ) ) // last msg in this batch
 
206
        {
 
207
            changeState( SAVING ); // just DB work left to complete
 
208
            m_source->executeCommands();
 
209
        }
 
210
        return;
 
211
    }
 
212
 
 
213
    if ( m.value( "method" ).toString() == "fetchops" )
 
214
    {
 
215
        ++m_fetchCount;
 
216
        tDebug( LOGVERBOSE ) << "Fetching new dbops:" << m["lastop"].toString() << m_fetchCount;
 
217
        m_uscache = m;
 
218
        sendOps();
 
219
        return;
 
220
    }
 
221
 
 
222
    if ( m.value( "method" ).toString() == "trigger" )
 
223
    {
 
224
        tLog() << "Got trigger msg on dbsyncconnection, checking for new stuff.";
 
225
        check();
 
226
        return;
 
227
    }
 
228
 
 
229
    tLog() << Q_FUNC_INFO << "Unhandled msg:" << msg->payload();
 
230
    Q_ASSERT( false );
 
231
}
 
232
 
 
233
 
 
234
void
 
235
DBSyncConnection::lastOpApplied()
 
236
{
 
237
    changeState( SYNCED );
 
238
    // check again, until peer responds we have no new ops to process
 
239
    check();
 
240
}
 
241
 
 
242
 
 
243
/// request new copies of anything we've cached that is stale
 
244
void
 
245
DBSyncConnection::sendOps()
 
246
{
 
247
    tLog() << "Will send peer" << m_source->id() << "all ops since" << m_uscache.value( "lastop" ).toString();
 
248
 
 
249
    source_ptr src = SourceList::instance()->getLocal();
 
250
 
 
251
    DatabaseCommand_loadOps* cmd = new DatabaseCommand_loadOps( src, m_uscache.value( "lastop" ).toString() );
 
252
    connect( cmd, SIGNAL( done( QString, QString, QList< dbop_ptr > ) ),
 
253
                    SLOT( sendOpsData( QString, QString, QList< dbop_ptr > ) ) );
 
254
 
 
255
    m_uscache.clear();
 
256
 
 
257
    Database::instance()->enqueue( QSharedPointer<DatabaseCommand>( cmd ) );
 
258
}
 
259
 
 
260
 
 
261
void
 
262
DBSyncConnection::sendOpsData( QString sinceguid, QString lastguid, QList< dbop_ptr > ops )
 
263
{
 
264
    if ( m_lastSentOp == lastguid )
 
265
        ops.clear();
 
266
 
 
267
    m_lastSentOp = lastguid;
 
268
    if ( ops.length() == 0 )
 
269
    {
 
270
        tLog( LOGVERBOSE ) << "Sending ok" << m_source->id() << m_source->friendlyName();
 
271
        sendMsg( Msg::factory( "ok", Msg::DBOP ) );
 
272
        return;
 
273
    }
 
274
 
 
275
    tLog( LOGVERBOSE ) << Q_FUNC_INFO << sinceguid << lastguid << "Num ops to send:" << ops.length();
 
276
 
 
277
    int i;
 
278
    for( i = 0; i < ops.length(); ++i )
 
279
    {
 
280
        quint8 flags = Msg::JSON | Msg::DBOP;
 
281
 
 
282
        if ( ops.at( i )->compressed )
 
283
            flags |= Msg::COMPRESSED;
 
284
        if ( i != ops.length() - 1 )
 
285
            flags |= Msg::FRAGMENT;
 
286
 
 
287
        sendMsg( Msg::factory( ops.at( i )->payload, flags ) );
 
288
    }
 
289
}
 
290
 
 
291
 
 
292
Connection*
 
293
DBSyncConnection::clone()
 
294
{
 
295
    Q_ASSERT( false );
 
296
    return 0;
 
297
}