1
/* === This file is part of Tomahawk Player - <http://tomahawk-player.org> ===
3
* Copyright 2010-2011, Christian Muehlhaeuser <muesli@tomahawk-player.org>
4
* Copyright 2010-2012, Jeff Mitchell <jeff@tomahawk-player.org>
6
* Tomahawk is free software: you can redistribute it and/or modify
7
* it under the terms of the GNU General Public License as published by
8
* the Free Software Foundation, either version 3 of the License, or
9
* (at your option) any later version.
11
* Tomahawk is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU General Public License for more details.
16
* You should have received a copy of the GNU General Public License
17
* along with Tomahawk. If not, see <http://www.gnu.org/licenses/>.
22
#include <QtCore/QCoreApplication>
23
#include <QtCore/QMutexLocker>
24
#include <QtNetwork/QNetworkInterface>
25
#include <QtCore/QFile>
26
#include <QtCore/QThread>
27
#include <QtNetwork/QNetworkProxy>
28
#include <QtNetwork/QNetworkRequest>
29
#include <QtNetwork/QNetworkReply>
31
#include <boost/bind.hpp>
35
#include "BufferIoDevice.h"
36
#include "Connection.h"
37
#include "ControlConnection.h"
38
#include "database/Database.h"
39
#include "database/DatabaseImpl.h"
40
#include "StreamConnection.h"
41
#include "SourceList.h"
43
#include "PortFwdThread.h"
44
#include "TomahawkSettings.h"
45
#include "utils/TomahawkUtils.h"
46
#include "utils/Logger.h"
48
using namespace Tomahawk;
50
Servent* Servent::s_instance = 0;
60
Servent::Servent( QObject* parent )
61
: QTcpServer( parent )
68
m_lanHack = qApp->arguments().contains( "--lanhack" );
69
setProxy( QNetworkProxy::NoProxy );
72
boost::function<QSharedPointer<QIODevice>(result_ptr)> fac =
73
boost::bind( &Servent::localFileIODeviceFactory, this, _1 );
74
this->registerIODeviceFactory( "file", fac );
78
boost::function<QSharedPointer<QIODevice>(result_ptr)> fac =
79
boost::bind( &Servent::remoteIODeviceFactory, this, _1 );
80
this->registerIODeviceFactory( "servent", fac );
84
boost::function<QSharedPointer<QIODevice>(result_ptr)> fac =
85
boost::bind( &Servent::httpIODeviceFactory, this, _1 );
86
this->registerIODeviceFactory( "http", fac );
95
m_portfwd.data()->quit();
96
m_portfwd.data()->wait( 60000 );
97
delete m_portfwd.data();
103
Servent::startListening( QHostAddress ha, bool upnp, int port )
106
int defPort = TomahawkSettings::instance()->defaultPort();
108
// Listen on both the selected port and, if not the same, the default port -- the latter sometimes necessary for zeroconf
109
// TODO: only listen on both when zeroconf sip is enabled
110
// TODO: use a real zeroconf system instead of a simple UDP broadcast?
111
if ( !listen( ha, m_port ) )
113
if ( m_port != defPort )
115
if ( !listen( ha, defPort ) )
117
tLog() << "Failed to listen on both port" << m_port << "and port" << defPort;
118
tLog() << "Error string is:" << errorString();
126
TomahawkSettings::ExternalAddressMode mode = TomahawkSettings::instance()->externalAddressMode();
128
tLog() << "Servent listening on port" << m_port << "- servent thread:" << thread()
129
<< "- address mode:" << (int)( mode );
131
// --lanhack means to advertise your LAN IP as if it were externally visible
134
case TomahawkSettings::Static:
135
m_externalHostname = TomahawkSettings::instance()->externalHostname();
136
m_externalPort = TomahawkSettings::instance()->externalPort();
141
case TomahawkSettings::Lan:
142
setInternalAddress();
145
case TomahawkSettings::Upnp:
148
setInternalAddress();
151
// TODO check if we have a public/internet IP on this machine directly
152
tLog() << "External address mode set to upnp...";
153
m_portfwd = QWeakPointer< PortFwdThread >( new PortFwdThread( m_port ) );
154
Q_ASSERT( m_portfwd );
155
connect( m_portfwd.data(), SIGNAL( externalAddressDetected( QHostAddress, unsigned int ) ),
156
SLOT( setExternalAddress( QHostAddress, unsigned int ) ) );
157
m_portfwd.data()->start();
166
Servent::createConnectionKey( const QString& name, const QString &nodeid, const QString &key, bool onceOnly )
168
Q_ASSERT( this->thread() == QThread::currentThread() );
170
QString _key = ( key.isEmpty() ? uuid() : key );
171
ControlConnection* cc = new ControlConnection( this, name );
172
cc->setName( name.isEmpty() ? QString( "KEY(%1)" ).arg( key ) : name );
173
if ( !nodeid.isEmpty() )
175
cc->setOnceOnly( onceOnly );
177
tDebug( LOGVERBOSE ) << "Creating connection key with name of" << cc->name() << "and id of" << cc->id() << "and key of" << _key << "; key is once only? :" << (onceOnly ? "true" : "false");
178
registerOffer( _key, cc );
184
Servent::isValidExternalIP( const QHostAddress& addr ) const
186
QString ip = addr.toString();
187
if ( !m_lanHack && ( ip.startsWith( "10." ) || ip.startsWith( "172.16." ) || ip.startsWith( "192.168." ) ) )
192
return !addr.isNull();
197
Servent::setInternalAddress()
199
foreach ( QHostAddress ha, QNetworkInterface::allAddresses() )
201
if ( ha.toString() == "127.0.0.1" )
203
if ( ha.toString().contains( ":" ) )
206
if ( m_lanHack && isValidExternalIP( ha ) )
208
tLog() << "LANHACK: set external address to lan address" << ha.toString();
209
setExternalAddress( ha, m_port );
222
Servent::setExternalAddress( QHostAddress ha, unsigned int port )
224
if ( isValidExternalIP( ha ) )
226
m_externalAddress = ha;
227
m_externalPort = port;
230
if ( m_externalPort == 0 || !isValidExternalIP( ha ) )
232
tLog() << "UPnP failed, LAN and outbound connections only!";
233
setInternalAddress();
237
tLog() << "UPnP setup successful";
244
Servent::registerOffer( const QString& key, Connection* conn )
246
m_offers[key] = QWeakPointer<Connection>(conn);
251
Servent::registerControlConnection( ControlConnection* conn )
253
m_controlconnections.append( conn );
258
Servent::unregisterControlConnection( ControlConnection* conn )
260
QList<ControlConnection*> n;
261
foreach( ControlConnection* c, m_controlconnections )
265
m_connectedNodes.removeAll( conn->id() );
266
m_controlconnections = n;
271
Servent::lookupControlConnection( const QString& name )
273
foreach( ControlConnection* c, m_controlconnections )
274
if( c->name() == name )
282
Servent::incomingConnection( int sd )
284
Q_ASSERT( this->thread() == QThread::currentThread() );
286
QTcpSocketExtra* sock = new QTcpSocketExtra;
287
tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "Accepting connection, sock" << sock;
289
sock->moveToThread( thread() );
290
sock->_disowned = false;
291
sock->_outbound = false;
292
if( !sock->setSocketDescriptor( sd ) )
298
connect( sock, SIGNAL( readyRead() ), SLOT( readyRead() ) );
299
connect( sock, SIGNAL( disconnected() ), sock, SLOT( deleteLater() ) );
306
Q_ASSERT( this->thread() == QThread::currentThread() );
307
QWeakPointer< QTcpSocketExtra > sock = (QTcpSocketExtra*)sender();
309
if( sock.isNull() || sock.data()->_disowned )
314
if( sock.data()->_msg.isNull() )
316
char msgheader[ Msg::headerSize() ];
317
if( sock.data()->bytesAvailable() < Msg::headerSize() )
320
sock.data()->read( (char*) &msgheader, Msg::headerSize() );
321
sock.data()->_msg = Msg::begin( (char*) &msgheader );
324
if( sock.data()->bytesAvailable() < sock.data()->_msg->length() )
327
QByteArray ba = sock.data()->read( sock.data()->_msg->length() );
328
sock.data()->_msg->fill( ba );
329
Q_ASSERT( sock.data()->_msg->is( Msg::JSON ) );
331
ControlConnection* cc = 0;
333
QString key, conntype, nodeid, controlid;
334
QVariantMap m = parser.parse( sock.data()->_msg->payload(), &ok ).toMap();
337
tDebug() << "Invalid JSON on new connection, aborting";
338
goto closeconnection;
341
conntype = m.value( "conntype" ).toString();
342
key = m.value( "key" ).toString();
343
nodeid = m.value( "nodeid" ).toString();
344
controlid = m.value( "controlid" ).toString();
346
tDebug( LOGVERBOSE ) << "Incoming connection details:" << m;
348
if( !nodeid.isEmpty() ) // only control connections send nodeid
351
if ( m_connectedNodes.contains( nodeid ) )
354
foreach( ControlConnection* con, m_controlconnections )
356
tLog( LOGVERBOSE ) << "known connection:" << con->id() << con->source()->friendlyName();
357
if( con->id() == nodeid )
366
tLog() << "Duplicate control connection detected, dropping:" << nodeid << conntype;
367
goto closeconnection;
371
foreach( ControlConnection* con, m_controlconnections )
373
if ( con->id() == controlid )
380
// they connected to us and want something we are offering
381
if ( conntype == "accept-offer" || conntype == "push-offer" )
383
sock.data()->_msg.clear();
384
tDebug( LOGVERBOSE ) << Q_FUNC_INFO << key << nodeid << "socket peer address = " << sock.data()->peerAddress() << "socket peer name = " << sock.data()->peerName();
385
Connection* conn = claimOffer( cc, nodeid, key, sock.data()->peerAddress() );
388
tLog() << "claimOffer FAILED, key:" << key << nodeid;
389
goto closeconnection;
393
tLog() << "Socket has become null, possibly took too long to make an ACL decision, key:" << key << nodeid;
396
else if ( !sock.data()->isValid() )
398
tLog() << "Socket has become invalid, possibly took too long to make an ACL decision, key:" << key << nodeid;
399
goto closeconnection;
401
tDebug( LOGVERBOSE ) << "claimOffer OK:" << key << nodeid;
403
m_connectedNodes << nodeid;
404
if( !nodeid.isEmpty() )
405
conn->setId( nodeid );
407
handoverSocket( conn, sock.data() );
412
tLog() << "Invalid or unhandled conntype";
415
// fallthru to cleanup:
417
tLog() << "Closing incoming connection, something was wrong.";
418
sock.data()->_msg.clear();
419
sock.data()->disconnectFromHost();
423
// creates a new tcp connection to peer from conn, handled by given connector
424
// new_conn is responsible for sending the first msg, if needed
426
Servent::createParallelConnection( Connection* orig_conn, Connection* new_conn, const QString& key )
428
tDebug( LOGVERBOSE ) << Q_FUNC_INFO << ", key:" << key << thread() << orig_conn;
429
// if we can connect to them directly:
430
if( orig_conn && orig_conn->outbound() )
432
connectToPeer( orig_conn->socket()->peerAddress().toString(),
433
orig_conn->peerPort(),
437
else // ask them to connect to us:
439
QString tmpkey = uuid();
440
tLog() << "Asking them to connect to us using" << tmpkey ;
441
registerOffer( tmpkey, new_conn );
444
m.insert( "conntype", "request-offer" );
445
m.insert( "key", tmpkey );
446
m.insert( "offer", key );
447
m.insert( "port", externalPort() );
448
m.insert( "controlid", Database::instance()->impl()->dbid() );
450
QJson::Serializer ser;
451
orig_conn->sendMsg( Msg::factory( ser.serialize(m), Msg::JSON ) );
457
Servent::socketConnected()
459
QTcpSocketExtra* sock = (QTcpSocketExtra*)sender();
461
tDebug( LOGVERBOSE ) << Q_FUNC_INFO << thread() << "socket: " << sock << ", hostaddr: " << sock->peerAddress() << ", hostname: " << sock->peerName();
463
if ( sock->_conn.isNull() )
467
tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "Socket's connection was null, could have timed out or been given an invalid address";
471
Connection* conn = sock->_conn.data();
472
handoverSocket( conn, sock );
476
// transfers ownership of socket to the connection and inits the connection
477
void Servent::handoverSocket( Connection* conn, QTcpSocketExtra* sock )
481
Q_ASSERT( conn->socket().isNull() );
482
Q_ASSERT( sock->isValid() );
484
disconnect( sock, SIGNAL( readyRead() ), this, SLOT( readyRead() ) );
485
disconnect( sock, SIGNAL( disconnected() ), sock, SLOT( deleteLater() ) );
486
disconnect( sock, SIGNAL( error( QAbstractSocket::SocketError ) ),
487
this, SLOT( socketError( QAbstractSocket::SocketError ) ) );
489
sock->_disowned = true;
490
conn->setOutbound( sock->_outbound );
491
conn->setPeerPort( sock->peerPort() );
498
Servent::socketError( QAbstractSocket::SocketError e )
500
QTcpSocketExtra* sock = (QTcpSocketExtra*)sender();
503
tLog() << "SocketError, sock is null";
507
if ( !sock->_conn.isNull() )
509
Connection* conn = sock->_conn.data();
510
tLog() << "Servent::SocketError:" << e << conn->id() << conn->name();
512
if ( !sock->_disowned )
514
// connection will delete if we already transferred ownership, otherwise:
518
conn->markAsFailed(); // will emit failed, then finished
522
tLog() << "SocketError, connection is null";
529
Servent::connectToPeer( const QString& ha, int port, const QString &key, const QString& name, const QString& id )
531
Q_ASSERT( this->thread() == QThread::currentThread() );
533
ControlConnection* conn = new ControlConnection( this, ha );
535
m["conntype"] = "accept-offer";
537
m["port"] = externalPort();
538
m["nodeid"] = Database::instance()->impl()->dbid();
540
conn->setFirstMessage( m );
542
conn->setName( name );
546
conn->setProperty( "nodeid", id );
548
connectToPeer( ha, port, key, conn );
553
Servent::connectToPeer( const QString& ha, int port, const QString &key, Connection* conn )
555
tDebug( LOGVERBOSE ) << "Servent::connectToPeer:" << ha << ":" << port
556
<< thread() << QThread::currentThread();
558
Q_ASSERT( port > 0 );
561
if ( ( ha == m_externalAddress.toString() || ha == m_externalHostname ) &&
562
( port == m_externalPort ) )
564
tDebug() << "ERROR: Tomahawk won't try to connect to" << ha << ":" << port << ": identified as ourselves.";
568
if( key.length() && conn->firstMessage().isNull() )
571
m["conntype"] = "accept-offer";
573
m["port"] = externalPort();
574
m["controlid"] = Database::instance()->impl()->dbid();
575
conn->setFirstMessage( m );
578
QTcpSocketExtra* sock = new QTcpSocketExtra();
579
sock->_disowned = false;
581
sock->_outbound = true;
583
connect( sock, SIGNAL( connected() ), SLOT( socketConnected() ) );
584
connect( sock, SIGNAL( error( QAbstractSocket::SocketError ) ),
585
SLOT( socketError( QAbstractSocket::SocketError ) ) );
587
if ( !conn->peerIpAddress().isNull() )
588
sock->connectToHost( conn->peerIpAddress(), port, QTcpSocket::ReadWrite );
590
sock->connectToHost( ha, port, QTcpSocket::ReadWrite );
591
sock->moveToThread( thread() );
596
Servent::reverseOfferRequest( ControlConnection* orig_conn, const QString& theirdbid, const QString& key, const QString& theirkey )
598
Q_ASSERT( this->thread() == QThread::currentThread() );
600
tDebug( LOGVERBOSE ) << "Servent::reverseOfferRequest received for" << key;
601
Connection* new_conn = claimOffer( orig_conn, theirdbid, key );
604
tDebug() << "claimOffer failed, killing requesting connection out of spite";
605
orig_conn->shutdown();
610
m["conntype"] = "push-offer";
612
m["port"] = externalPort();
613
m["controlid"] = Database::instance()->impl()->dbid();
614
new_conn->setFirstMessage( m );
615
createParallelConnection( orig_conn, new_conn, QString() );
619
// return the appropriate connection for a given offer key, or NULL if invalid
621
Servent::claimOffer( ControlConnection* cc, const QString &nodeid, const QString &key, const QHostAddress peer )
623
bool noauth = qApp->arguments().contains( "--noauth" );
625
// magic key for stream connections:
626
if( key.startsWith( "FILE_REQUEST_KEY:" ) )
628
// check if the source IP matches an existing, authenticated connection
629
if ( !noauth && peer != QHostAddress::Any && !isIPWhitelisted( peer ) )
632
foreach( ControlConnection* cc, m_controlconnections )
634
if( cc->socket()->peerAddress() == peer )
642
tLog() << "File transfer request rejected, invalid source IP";
647
QString fid = key.right( key.length() - 17 );
648
StreamConnection* sc = new StreamConnection( this, cc, fid );
652
if( key == "whitelist" ) // LAN IP address, check source IP
654
if( isIPWhitelisted( peer ) )
656
tDebug() << "Connection is from whitelisted IP range (LAN)";
657
Connection* conn = new ControlConnection( this, peer.toString() );
658
conn->setName( peer.toString() );
663
tDebug() << "Connection claimed to be whitelisted, but wasn't.";
668
if( m_offers.contains( key ) )
670
QWeakPointer<Connection> conn = m_offers.value( key );
673
// This can happen if it's a streamconnection, but the audioengine has
674
// already closed the iodevice, causing the connection to be deleted before
675
// the peer connects and provides the first byte
676
tLog() << Q_FUNC_INFO << "invalid/expired offer:" << key;
680
tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "nodeid is: " << nodeid;
681
if( !nodeid.isEmpty() )
683
// Used by the connection for the ACL check
684
// If there isn't a nodeid it's not the first connection and will already have been stopped
685
conn.data()->setProperty( "nodeid", nodeid );
688
if( conn.data()->onceOnly() )
690
m_offers.remove( key );
695
return conn.data()->clone();
701
conn = new ControlConnection( this, peer );
702
conn->setName( key );
707
tLog() << "Invalid offer key:" << key;
713
QSharedPointer<QIODevice>
714
Servent::remoteIODeviceFactory( const result_ptr& result )
716
QSharedPointer<QIODevice> sp;
718
QStringList parts = result->url().mid( QString( "servent://" ).length() ).split( "\t" );
719
const QString sourceName = parts.at( 0 );
720
const QString fileId = parts.at( 1 );
721
source_ptr s = SourceList::instance()->get( sourceName );
722
if ( s.isNull() || !s->controlConnection() )
725
ControlConnection* cc = s->controlConnection();
726
StreamConnection* sc = new StreamConnection( this, cc, fileId, result );
727
createParallelConnection( cc, sc, QString( "FILE_REQUEST_KEY:%1" ).arg( fileId ) );
728
return sc->iodevice();
733
Servent::registerStreamConnection( StreamConnection* sc )
735
Q_ASSERT( !m_scsessions.contains( sc ) );
736
tDebug( LOGVERBOSE ) << "Registering Stream" << m_scsessions.length() + 1;
738
QMutexLocker lock( &m_ftsession_mut );
739
m_scsessions.append( sc );
741
printCurrentTransfers();
742
emit streamStarted( sc );
747
Servent::onStreamFinished( StreamConnection* sc )
750
tDebug( LOGVERBOSE ) << "Stream Finished, unregistering" << sc->id();
752
QMutexLocker lock( &m_ftsession_mut );
753
m_scsessions.removeAll( sc );
755
printCurrentTransfers();
756
emit streamFinished( sc );
760
// used for debug output:
762
Servent::printCurrentTransfers()
765
// qDebug() << "~~~ Active file transfer connections:" << m_scsessions.length();
766
foreach( StreamConnection* i, m_scsessions )
768
qDebug() << k << ") " << i->id();
775
Servent::isIPWhitelisted( QHostAddress ip )
777
tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "Performing checks against ip" << ip.toString();
778
typedef QPair< QHostAddress, int > range;
779
QList< range > subnetEntries;
781
QList< QNetworkInterface > networkInterfaces = QNetworkInterface::allInterfaces();
782
foreach( QNetworkInterface interface, networkInterfaces )
784
tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "Checking interface" << interface.humanReadableName();
785
QList< QNetworkAddressEntry > addressEntries = interface.addressEntries();
786
foreach( QNetworkAddressEntry addressEntry, addressEntries )
788
tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "Checking address entry with ip" << addressEntry.ip().toString() << "and prefix length" << addressEntry.prefixLength();
789
if ( ip.isInSubnet( addressEntry.ip(), addressEntry.prefixLength() ) )
791
tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "success";
796
tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "failure";
802
Servent::connectedToSession( const QString& session )
804
foreach( ControlConnection* cc, m_controlconnections )
806
if( cc->id() == session )
815
Servent::triggerDBSync()
817
// tell peers we have new stuff they should sync
818
QList<source_ptr> sources = SourceList::instance()->sources();
819
foreach( const source_ptr& src, sources )
822
if ( src.isNull() || src->isLocal() )
825
if ( src->controlConnection() && src->controlConnection()->dbSyncConnection() ) // source online?
826
src->controlConnection()->dbSyncConnection()->trigger();
832
Servent::registerIODeviceFactory( const QString &proto, boost::function<QSharedPointer<QIODevice>(Tomahawk::result_ptr)> fac )
834
m_iofactories.insert( proto, fac );
838
QSharedPointer<QIODevice>
839
Servent::getIODeviceForUrl( const Tomahawk::result_ptr& result )
841
QSharedPointer<QIODevice> sp;
843
QRegExp rx( "^([a-zA-Z0-9]+)://(.+)$" );
844
if ( rx.indexIn( result->url() ) == -1 )
847
const QString proto = rx.cap( 1 );
848
if ( !m_iofactories.contains( proto ) )
851
return m_iofactories.value( proto )( result );
855
QSharedPointer<QIODevice>
856
Servent::localFileIODeviceFactory( const Tomahawk::result_ptr& result )
858
// ignore "file://" at front of url
859
QFile* io = new QFile( result->url().mid( QString( "file://" ).length() ) );
861
io->open( QIODevice::ReadOnly );
863
return QSharedPointer<QIODevice>( io );
867
QSharedPointer<QIODevice>
868
Servent::httpIODeviceFactory( const Tomahawk::result_ptr& result )
870
QNetworkRequest req( result->url() );
871
QNetworkReply* reply = TomahawkUtils::nam()->get( req );
872
return QSharedPointer<QIODevice>( reply, &QObject::deleteLater );