1
/* === This file is part of Tomahawk Player - <http://tomahawk-player.org> ===
3
* Copyright 2010-2011, Christian Muehlhaeuser <muesli@tomahawk-player.org>
4
* Copyright 2010-2011, Jeff Mitchell <jeff@tomahawk-player.org>
6
* Tomahawk is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
11
* Tomahawk is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with Tomahawk. If not, see <http://www.gnu.org/licenses/>.
20
#include "DatabaseWorker.h"
28
#include "DatabaseImpl.h"
29
#include "DatabaseCommandLoggable.h"
30
#include "TomahawkSqlQuery.h"
31
#include "utils/Logger.h"
34
//#define DEBUG_TIMING TRUE
38
DatabaseWorkerThread::DatabaseWorkerThread( Database* db, bool mutates )
41
, m_mutates( mutates )
47
DatabaseWorkerThread::run()
49
tDebug() << Q_FUNC_INFO << "DatabaseWorkerThread starting...";
50
m_worker = QWeakPointer< DatabaseWorker >( new DatabaseWorker( m_db, m_mutates ) );
52
tDebug() << Q_FUNC_INFO << "DatabaseWorkerThread finishing...";
54
delete m_worker.data();
58
DatabaseWorkerThread::~DatabaseWorkerThread()
63
QWeakPointer< DatabaseWorker >
64
DatabaseWorkerThread::worker() const
70
DatabaseWorker::DatabaseWorker( Database* db, bool mutates )
76
tDebug() << Q_FUNC_INFO << "New db connection with name:" << Database::instance()->impl()->database().connectionName() << "on thread" << this->thread();
80
DatabaseWorker::~DatabaseWorker()
82
tDebug() << Q_FUNC_INFO << m_outstanding;
86
foreach ( const QSharedPointer<DatabaseCommand>& cmd, m_commands )
88
tDebug() << "Outstanding db command to finish:" << cmd->guid() << cmd->commandname();
95
DatabaseWorker::enqueue( const QList< QSharedPointer<DatabaseCommand> >& cmds )
97
QMutexLocker lock( &m_mut );
98
m_outstanding += cmds.count();
101
if ( m_outstanding == cmds.count() )
102
QTimer::singleShot( 0, this, SLOT( doWork() ) );
107
DatabaseWorker::enqueue( const QSharedPointer<DatabaseCommand>& cmd )
109
QMutexLocker lock( &m_mut );
113
if ( m_outstanding == 1 )
114
QTimer::singleShot( 0, this, SLOT( doWork() ) );
119
DatabaseWorker::doWork()
122
Run the dbcmd. Only inside a transaction if the cmd does mutates.
124
If the cmd is modifying local content (ie source->isLocal()) then
125
log to the database oplog for replication to peers.
134
QList< QSharedPointer<DatabaseCommand> > cmdGroup;
135
QSharedPointer<DatabaseCommand> cmd;
137
QMutexLocker lock( &m_mut );
138
cmd = m_commands.takeFirst();
141
DatabaseImpl* impl = Database::instance()->impl();
142
if ( cmd->doesMutates() )
144
bool transok = impl->database().transaction();
149
unsigned int completed = 0;
152
bool finished = false;
157
cmd->_exec( impl ); // runs actual SQL stuff
159
if ( cmd->loggable() )
161
// We only save our own ops to the oplog, since incoming ops from peers
162
// are applied immediately.
164
// Crazy idea: if peers had keypairs and could sign ops/msgs, in theory it
165
// would be safe to sync ops for friend A from friend B's cache, if he saved them,
166
// which would mean you could get updates even if a peer was offline.
167
if ( cmd->source()->isLocal() && !cmd->localOnly() )
170
DatabaseCommandLoggable* command = (DatabaseCommandLoggable*)cmd.data();
175
// Make a note of the last guid we applied for this source
176
// so we can always request just the newer ops in future.
178
if ( !cmd->singletonCmd() )
180
TomahawkSqlQuery query = impl->newquery();
181
query.prepare( "UPDATE source SET lastop = ? WHERE id = ?" );
182
query.addBindValue( cmd->guid() );
183
query.addBindValue( cmd->source()->id() );
187
throw "Failed to set lastop";
194
if ( cmd->groupable() && !m_commands.isEmpty() )
196
QMutexLocker lock( &m_mut );
197
if ( m_commands.first()->groupable() )
199
cmd = m_commands.takeFirst();
210
if ( cmd->doesMutates() )
212
qDebug() << "Committing" << cmd->commandname() << cmd->guid();
213
if ( !impl->newquery().commitTransaction() )
215
tDebug() << "FAILED TO COMMIT TRANSACTION*";
216
throw "commit failed";
221
uint duration = timer.elapsed();
222
tDebug() << "DBCmd Duration:" << duration << "ms, now running postcommit for" << cmd->commandname();
225
foreach ( QSharedPointer<DatabaseCommand> c, cmdGroup )
229
tDebug() << "Post commit finished in" << timer.elapsed() - duration << "ms for" << cmd->commandname();
233
catch( const char * msg )
236
<< "*ERROR* processing databasecommand:"
237
<< cmd->commandname()
239
<< impl->database().lastError().databaseText()
240
<< impl->database().lastError().driverText()
243
if ( cmd->doesMutates() )
244
impl->database().rollback();
250
qDebug() << "Uncaught exception processing dbcmd";
251
if ( cmd->doesMutates() )
252
impl->database().rollback();
258
foreach ( QSharedPointer<DatabaseCommand> c, cmdGroup )
261
QMutexLocker lock( &m_mut );
262
m_outstanding -= completed;
263
if ( m_outstanding > 0 )
264
QTimer::singleShot( 0, this, SLOT( doWork() ) );
268
// this should take a const command, need to check/make json stuff mutable for some objs tho maybe.
270
DatabaseWorker::logOp( DatabaseCommandLoggable* command )
272
TomahawkSqlQuery oplogquery = Database::instance()->impl()->newquery();
273
qDebug() << "INSERTING INTO OPTLOG:" << command->source()->id() << command->guid() << command->commandname();
274
oplogquery.prepare( "INSERT INTO oplog(source, guid, command, singleton, compressed, json) "
275
"VALUES(?, ?, ?, ?, ?, ?)" );
277
QVariantMap variant = QJson::QObjectHelper::qobject2qvariant( command );
278
QByteArray ba = m_serializer.serialize( variant );
280
// qDebug() << "OP JSON:" << ba.isNull() << ba << "from:" << variant; // debug
282
bool compressed = false;
283
if( ba.length() >= 512 )
285
// We need to compress this in this thread, since inserting into the log
286
// has to happen as part of the same transaction as the dbcmd.
287
// (we are in a worker thread for RW dbcmds anyway, so it's ok)
288
//qDebug() << "Compressing DB OP JSON, uncompressed size:" << ba.length();
289
ba = qCompress( ba, 9 );
291
//qDebug() << "Compressed DB OP JSON size:" << ba.length();
294
if ( command->singletonCmd() )
296
tDebug() << "Singleton command, deleting previous oplog commands";
298
TomahawkSqlQuery oplogdelquery = Database::instance()->impl()->newquery();
299
oplogdelquery.prepare( QString( "DELETE FROM oplog WHERE source %1 AND singleton = 'true' AND command = ?" )
300
.arg( command->source()->isLocal() ? "IS NULL" : QString( "= %1" ).arg( command->source()->id() ) ) );
302
oplogdelquery.bindValue( 0, command->commandname() );
303
oplogdelquery.exec();
306
tDebug() << "Saving to oplog:" << command->commandname()
307
<< "bytes:" << ba.length()
308
<< "guid:" << command->guid();
310
oplogquery.bindValue( 0, command->source()->isLocal() ?
311
QVariant(QVariant::Int) : command->source()->id() );
312
oplogquery.bindValue( 1, command->guid() );
313
oplogquery.bindValue( 2, command->commandname() );
314
oplogquery.bindValue( 3, command->singletonCmd() );
315
oplogquery.bindValue( 4, compressed );
316
oplogquery.bindValue( 5, ba );
317
if( !oplogquery.exec() )
319
tLog() << "Error saving to oplog";
320
throw "Failed to save to oplog";