1
/* === This file is part of Tomahawk Player - <http://tomahawk-player.org> ===
3
* Copyright 2010-2011, Christian Muehlhaeuser <muesli@tomahawk-player.org>
4
* Copyright 2010-2012, Jeff Mitchell <jeff@tomahawk-player.org>
6
* Tomahawk is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
11
* Tomahawk is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with Tomahawk. If not, see <http://www.gnu.org/licenses/>.
20
#include "ControlConnection.h"
22
#include "StreamConnection.h"
23
#include "database/Database.h"
24
#include "database/DatabaseCommand_CollectionStats.h"
25
#include "DbSyncConnection.h"
26
#include "SourceList.h"
27
#include "network/DbSyncConnection.h"
28
#include "network/Servent.h"
29
#include "sip/SipHandler.h"
30
#include "utils/Logger.h"
32
#define TCP_TIMEOUT 600
34
using namespace Tomahawk;
37
ControlConnection::ControlConnection( Servent* parent, const QHostAddress &ha )
38
: Connection( parent )
40
, m_registered( false )
43
qDebug() << "CTOR controlconnection";
44
setId("ControlConnection()");
46
// auto delete when connection closes:
47
connect( this, SIGNAL( finished() ), SLOT( deleteLater() ) );
49
this->setMsgProcessorModeIn( MsgProcessor::UNCOMPRESS_ALL | MsgProcessor::PARSE_JSON );
50
this->setMsgProcessorModeOut( MsgProcessor::COMPRESS_IF_LARGE );
56
ControlConnection::ControlConnection( Servent* parent, const QString &ha )
57
: Connection( parent )
59
, m_registered( false )
62
qDebug() << "CTOR controlconnection";
63
setId("ControlConnection()");
65
// auto delete when connection closes:
66
connect( this, SIGNAL( finished() ), SLOT( deleteLater() ) );
68
this->setMsgProcessorModeIn( MsgProcessor::UNCOMPRESS_ALL | MsgProcessor::PARSE_JSON );
69
this->setMsgProcessorModeOut( MsgProcessor::COMPRESS_IF_LARGE );
73
QHostAddress qha( ha );
75
m_peerIpAddress = qha;
78
QHostInfo qhi = QHostInfo::fromName( ha );
79
if ( !qhi.addresses().isEmpty() )
80
m_peerIpAddress = qhi.addresses().first();
86
ControlConnection::~ControlConnection()
88
qDebug() << "DTOR controlconnection";
90
if ( !m_source.isNull() )
91
m_source->setOffline();
94
m_servent->unregisterControlConnection( this );
96
m_dbsyncconn->deleteLater();
101
ControlConnection::source() const
108
ControlConnection::clone()
110
ControlConnection* clone = new ControlConnection( servent(), m_peerIpAddress.toString() );
111
clone->setOnceOnly( onceOnly() );
112
clone->setName( name() );
118
ControlConnection::setup()
120
qDebug() << Q_FUNC_INFO << id() << name();
122
if ( !m_source.isNull() )
124
qDebug() << "This source seems to be online already.";
129
QString friendlyName = name();
131
tDebug() << "Detected name:" << name() << friendlyName << m_sock->peerAddress();
133
// setup source and remote collection for this peer
134
m_source = SourceList::instance()->get( id(), friendlyName, true );
135
m_source->setControlConnection( this );
137
// delay setting up collection/etc until source is synced.
138
// we need it DB synced so it has an ID + exists in DB.
139
connect( m_source.data(), SIGNAL( syncedWithDatabase() ),
140
SLOT( registerSource() ), Qt::QueuedConnection );
142
m_source->setOnline();
144
m_pingtimer = new QTimer;
145
m_pingtimer->setInterval( 5000 );
146
connect( m_pingtimer, SIGNAL( timeout() ), SLOT( onPingTimer() ) );
147
m_pingtimer->start();
148
m_pingtimer_mark.start();
152
// source was synced to DB, set it up properly:
154
ControlConnection::registerSource()
156
qDebug() << Q_FUNC_INFO << m_source->id();
157
Source* source = (Source*) sender();
159
Q_ASSERT( source == m_source.data() );
161
#ifndef ENABLE_HEADLESS
162
// qDebug() << Q_FUNC_INFO << "Setting avatar ... " << name() << !SipHandler::instance()->avatar( name() ).isNull();
163
if ( !SipHandler::instance()->avatar( name() ).isNull() )
165
source->setAvatar( SipHandler::instance()->avatar( name() ) );
170
m_servent->registerControlConnection( this );
171
setupDbSyncConnection();
176
ControlConnection::setupDbSyncConnection( bool ondemand )
178
qDebug() << Q_FUNC_INFO << ondemand << m_source->id() << m_dbconnkey << m_dbsyncconn << m_registered;
180
if ( m_dbsyncconn || !m_registered )
183
Q_ASSERT( m_source->id() > 0 );
185
if ( !m_dbconnkey.isEmpty() )
187
qDebug() << "Connecting to DBSync offer from peer...";
188
m_dbsyncconn = new DBSyncConnection( m_servent, m_source );
190
m_servent->createParallelConnection( this, m_dbsyncconn, m_dbconnkey );
193
else if ( !outbound() || ondemand ) // only one end makes the offer
195
qDebug() << "Offering a DBSync key to peer...";
196
m_dbsyncconn = new DBSyncConnection( m_servent, m_source );
198
QString key = uuid();
199
m_servent->registerOffer( key, m_dbsyncconn );
201
m.insert( "method", "dbsync-offer" );
202
m.insert( "key", key );
208
connect( m_dbsyncconn, SIGNAL( finished() ),
209
m_dbsyncconn, SLOT( deleteLater() ) );
211
connect( m_dbsyncconn, SIGNAL( destroyed( QObject* ) ),
212
SLOT( dbSyncConnFinished( QObject* ) ), Qt::DirectConnection );
218
ControlConnection::dbSyncConnFinished( QObject* c )
220
qDebug() << Q_FUNC_INFO << "DBSync connection closed (for now)";
221
if ( (DBSyncConnection*)c == m_dbsyncconn )
223
//qDebug() << "Setting m_dbsyncconn to NULL";
227
qDebug() << "Old DbSyncConn destroyed?!";
232
ControlConnection::dbSyncConnection()
234
qDebug() << Q_FUNC_INFO << m_source->id();
237
setupDbSyncConnection( true );
238
// Q_ASSERT( m_dbsyncconn );
246
ControlConnection::handleMsg( msg_ptr msg )
248
if ( msg->is( Msg::PING ) )
250
// qDebug() << "Received Connection PING, nice." << m_pingtimer_mark.elapsed();
251
m_pingtimer_mark.restart();
255
// if small and not compresed, print it out for debug
256
if ( msg->length() < 1024 && !msg->is( Msg::COMPRESSED ) )
258
qDebug() << id() << "got msg:" << QString::fromAscii( msg->payload() );
261
// All control connection msgs are JSON
262
if ( !msg->is( Msg::JSON ) )
264
Q_ASSERT( msg->is( Msg::JSON ) );
269
QVariantMap m = msg->json().toMap();
272
if ( m.value( "conntype" ).toString() == "request-offer" )
274
QString theirkey = m["key"].toString();
275
QString ourkey = m["offer"].toString();
276
QString theirdbid = m["controlid"].toString();
277
servent()->reverseOfferRequest( this, theirdbid, ourkey, theirkey );
279
else if ( m.value( "method" ).toString() == "dbsync-offer" )
281
m_dbconnkey = m.value( "key" ).toString() ;
282
setupDbSyncConnection();
284
else if ( m.value( "method" ) == "protovercheckfail" )
286
qDebug() << "*** Remote peer protocol version mismatch, connection closed";
292
tDebug() << id() << "Unhandled msg:" << QString::fromAscii( msg->payload() );
298
tDebug() << id() << "Invalid msg:" << QString::fromAscii( msg->payload() );
304
ControlConnection::onPingTimer()
306
if ( m_pingtimer_mark.elapsed() >= TCP_TIMEOUT * 1000 )
308
qDebug() << "Timeout reached! Shutting down connection to" << m_source->friendlyName();
312
sendMsg( Msg::factory( QByteArray(), Msg::PING ) );