~ubuntu-branches/ubuntu/trusty/tomahawk/trusty-proposed

« back to all changes in this revision

Viewing changes to src/libtomahawk/network/Servent.cpp

  • Committer: Package Import Robot
  • Author(s): Harald Sitter
  • Date: 2013-03-07 21:50:13 UTC
  • Revision ID: package-import@ubuntu.com-20130307215013-6gdjkdds7i9uenvs
Tags: upstream-0.6.0+dfsg
ImportĀ upstreamĀ versionĀ 0.6.0+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* === This file is part of Tomahawk Player - <http://tomahawk-player.org> ===
 
2
 *
 
3
 *   Copyright 2010-2011, Christian Muehlhaeuser <muesli@tomahawk-player.org>
 
4
 *   Copyright 2010-2012, Jeff Mitchell <jeff@tomahawk-player.org>
 
5
 *
 
6
 *   Tomahawk is free software: you can redistribute it and/or modify
 
7
 *   it under the terms of the GNU General Public License as published by
 
8
 *   the Free Software Foundation, either version 3 of the License, or
 
9
 *   (at your option) any later version.
 
10
 *
 
11
 *   Tomahawk is distributed in the hope that it will be useful,
 
12
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 
14
 *   GNU General Public License for more details.
 
15
 *
 
16
 *   You should have received a copy of the GNU General Public License
 
17
 *   along with Tomahawk. If not, see <http://www.gnu.org/licenses/>.
 
18
 */
 
19
 
 
20
#include "Servent.h"
 
21
 
 
22
#include <QtCore/QCoreApplication>
 
23
#include <QtCore/QMutexLocker>
 
24
#include <QtNetwork/QNetworkInterface>
 
25
#include <QtCore/QFile>
 
26
#include <QtCore/QThread>
 
27
#include <QtNetwork/QNetworkProxy>
 
28
#include <QtNetwork/QNetworkRequest>
 
29
#include <QtNetwork/QNetworkReply>
 
30
 
 
31
#include <boost/bind.hpp>
 
32
 
 
33
#include "Result.h"
 
34
#include "Source.h"
 
35
#include "BufferIoDevice.h"
 
36
#include "Connection.h"
 
37
#include "ControlConnection.h"
 
38
#include "database/Database.h"
 
39
#include "database/DatabaseImpl.h"
 
40
#include "StreamConnection.h"
 
41
#include "SourceList.h"
 
42
 
 
43
#include "PortFwdThread.h"
 
44
#include "TomahawkSettings.h"
 
45
#include "utils/TomahawkUtils.h"
 
46
#include "utils/Logger.h"
 
47
 
 
48
using namespace Tomahawk;
 
49
 
 
50
Servent* Servent::s_instance = 0;
 
51
 
 
52
 
 
53
Servent*
 
54
Servent::instance()
 
55
{
 
56
    return s_instance;
 
57
}
 
58
 
 
59
 
 
60
Servent::Servent( QObject* parent )
 
61
    : QTcpServer( parent )
 
62
    , m_port( 0 )
 
63
    , m_externalPort( 0 )
 
64
    , m_ready( false )
 
65
{
 
66
    s_instance = this;
 
67
 
 
68
    m_lanHack = qApp->arguments().contains( "--lanhack" );
 
69
    setProxy( QNetworkProxy::NoProxy );
 
70
 
 
71
    {
 
72
    boost::function<QSharedPointer<QIODevice>(result_ptr)> fac =
 
73
        boost::bind( &Servent::localFileIODeviceFactory, this, _1 );
 
74
    this->registerIODeviceFactory( "file", fac );
 
75
    }
 
76
 
 
77
    {
 
78
    boost::function<QSharedPointer<QIODevice>(result_ptr)> fac =
 
79
        boost::bind( &Servent::remoteIODeviceFactory, this, _1 );
 
80
    this->registerIODeviceFactory( "servent", fac );
 
81
    }
 
82
 
 
83
    {
 
84
    boost::function<QSharedPointer<QIODevice>(result_ptr)> fac =
 
85
        boost::bind( &Servent::httpIODeviceFactory, this, _1 );
 
86
    this->registerIODeviceFactory( "http", fac );
 
87
    }
 
88
}
 
89
 
 
90
 
 
91
Servent::~Servent()
 
92
{
 
93
    if ( m_portfwd )
 
94
    {
 
95
        m_portfwd.data()->quit();
 
96
        m_portfwd.data()->wait( 60000 );
 
97
        delete m_portfwd.data();
 
98
    }
 
99
}
 
100
 
 
101
 
 
102
bool
 
103
Servent::startListening( QHostAddress ha, bool upnp, int port )
 
104
{
 
105
    m_port = port;
 
106
    int defPort = TomahawkSettings::instance()->defaultPort();
 
107
 
 
108
    // Listen on both the selected port and, if not the same, the default port -- the latter sometimes necessary for zeroconf
 
109
    // TODO: only listen on both when zeroconf sip is enabled
 
110
    // TODO: use a real zeroconf system instead of a simple UDP broadcast?
 
111
    if ( !listen( ha, m_port ) )
 
112
    {
 
113
        if ( m_port != defPort )
 
114
        {
 
115
            if ( !listen( ha, defPort ) )
 
116
            {
 
117
                tLog() << "Failed to listen on both port" << m_port << "and port" << defPort;
 
118
                tLog() << "Error string is:" << errorString();
 
119
                return false;
 
120
            }
 
121
            else
 
122
                m_port = defPort;
 
123
        }
 
124
    }
 
125
 
 
126
    TomahawkSettings::ExternalAddressMode mode = TomahawkSettings::instance()->externalAddressMode();
 
127
    
 
128
    tLog() << "Servent listening on port" << m_port << "- servent thread:" << thread()
 
129
           << "- address mode:" << (int)( mode );
 
130
 
 
131
    // --lanhack means to advertise your LAN IP as if it were externally visible
 
132
    switch ( mode )
 
133
    {
 
134
        case TomahawkSettings::Static:
 
135
            m_externalHostname = TomahawkSettings::instance()->externalHostname();
 
136
            m_externalPort = TomahawkSettings::instance()->externalPort();
 
137
            m_ready = true;
 
138
            emit ready();
 
139
            break;
 
140
 
 
141
        case TomahawkSettings::Lan:
 
142
            setInternalAddress();
 
143
            break;
 
144
 
 
145
        case TomahawkSettings::Upnp:
 
146
            if ( !upnp )
 
147
            {
 
148
                setInternalAddress();
 
149
                break;
 
150
            }
 
151
            // TODO check if we have a public/internet IP on this machine directly
 
152
            tLog() << "External address mode set to upnp...";
 
153
            m_portfwd = QWeakPointer< PortFwdThread >( new PortFwdThread( m_port ) );
 
154
            Q_ASSERT( m_portfwd );
 
155
            connect( m_portfwd.data(), SIGNAL( externalAddressDetected( QHostAddress, unsigned int ) ),
 
156
                                  SLOT( setExternalAddress( QHostAddress, unsigned int ) ) );
 
157
            m_portfwd.data()->start();
 
158
            break;
 
159
    }
 
160
 
 
161
    return true;
 
162
}
 
163
 
 
164
 
 
165
QString
 
166
Servent::createConnectionKey( const QString& name, const QString &nodeid, const QString &key, bool onceOnly )
 
167
{
 
168
    Q_ASSERT( this->thread() == QThread::currentThread() );
 
169
 
 
170
    QString _key = ( key.isEmpty() ? uuid() : key );
 
171
    ControlConnection* cc = new ControlConnection( this, name );
 
172
    cc->setName( name.isEmpty() ? QString( "KEY(%1)" ).arg( key ) : name );
 
173
    if ( !nodeid.isEmpty() )
 
174
        cc->setId( nodeid );
 
175
    cc->setOnceOnly( onceOnly );
 
176
 
 
177
    tDebug( LOGVERBOSE ) << "Creating connection key with name of" << cc->name() << "and id of" << cc->id() << "and key of" << _key << "; key is once only? :" << (onceOnly ? "true" : "false");
 
178
    registerOffer( _key, cc );
 
179
    return _key;
 
180
}
 
181
 
 
182
 
 
183
bool
 
184
Servent::isValidExternalIP( const QHostAddress& addr ) const
 
185
{
 
186
    QString ip = addr.toString();
 
187
    if ( !m_lanHack && ( ip.startsWith( "10." ) || ip.startsWith( "172.16." ) || ip.startsWith( "192.168." ) ) )
 
188
    {
 
189
        return false;
 
190
    }
 
191
 
 
192
    return !addr.isNull();
 
193
}
 
194
 
 
195
 
 
196
void
 
197
Servent::setInternalAddress()
 
198
{
 
199
    foreach ( QHostAddress ha, QNetworkInterface::allAddresses() )
 
200
    {
 
201
        if ( ha.toString() == "127.0.0.1" )
 
202
            continue;
 
203
        if ( ha.toString().contains( ":" ) )
 
204
            continue; //ipv6
 
205
 
 
206
        if ( m_lanHack && isValidExternalIP( ha ) )
 
207
        {
 
208
            tLog() << "LANHACK: set external address to lan address" << ha.toString();
 
209
            setExternalAddress( ha, m_port );
 
210
        }
 
211
        else
 
212
        {
 
213
            m_ready = true;
 
214
            emit ready();
 
215
        }
 
216
        break;
 
217
    }
 
218
}
 
219
 
 
220
 
 
221
void
 
222
Servent::setExternalAddress( QHostAddress ha, unsigned int port )
 
223
{
 
224
    if ( isValidExternalIP( ha ) )
 
225
    {
 
226
        m_externalAddress = ha;
 
227
        m_externalPort = port;
 
228
    }
 
229
 
 
230
    if ( m_externalPort == 0 || !isValidExternalIP( ha ) )
 
231
    {
 
232
        tLog() << "UPnP failed, LAN and outbound connections only!";
 
233
        setInternalAddress();
 
234
        return;
 
235
    }
 
236
 
 
237
    tLog() << "UPnP setup successful";
 
238
    m_ready = true;
 
239
    emit ready();
 
240
}
 
241
 
 
242
 
 
243
void
 
244
Servent::registerOffer( const QString& key, Connection* conn )
 
245
{
 
246
    m_offers[key] = QWeakPointer<Connection>(conn);
 
247
}
 
248
 
 
249
 
 
250
void
 
251
Servent::registerControlConnection( ControlConnection* conn )
 
252
{
 
253
    m_controlconnections.append( conn );
 
254
}
 
255
 
 
256
 
 
257
void
 
258
Servent::unregisterControlConnection( ControlConnection* conn )
 
259
{
 
260
    QList<ControlConnection*> n;
 
261
    foreach( ControlConnection* c, m_controlconnections )
 
262
        if( c!=conn )
 
263
            n.append( c );
 
264
 
 
265
    m_connectedNodes.removeAll( conn->id() );
 
266
    m_controlconnections = n;
 
267
}
 
268
 
 
269
 
 
270
ControlConnection*
 
271
Servent::lookupControlConnection( const QString& name )
 
272
{
 
273
    foreach( ControlConnection* c, m_controlconnections )
 
274
        if( c->name() == name )
 
275
            return c;
 
276
 
 
277
    return NULL;
 
278
}
 
279
 
 
280
 
 
281
void
 
282
Servent::incomingConnection( int sd )
 
283
{
 
284
    Q_ASSERT( this->thread() == QThread::currentThread() );
 
285
 
 
286
    QTcpSocketExtra* sock = new QTcpSocketExtra;
 
287
    tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "Accepting connection, sock" << sock;
 
288
 
 
289
    sock->moveToThread( thread() );
 
290
    sock->_disowned = false;
 
291
    sock->_outbound = false;
 
292
    if( !sock->setSocketDescriptor( sd ) )
 
293
    {
 
294
        Q_ASSERT( false );
 
295
        return;
 
296
    }
 
297
 
 
298
    connect( sock, SIGNAL( readyRead() ), SLOT( readyRead() ) );
 
299
    connect( sock, SIGNAL( disconnected() ), sock, SLOT( deleteLater() ) );
 
300
}
 
301
 
 
302
 
 
303
void
 
304
Servent::readyRead()
 
305
{
 
306
    Q_ASSERT( this->thread() == QThread::currentThread() );
 
307
    QWeakPointer< QTcpSocketExtra > sock = (QTcpSocketExtra*)sender();
 
308
 
 
309
    if( sock.isNull() || sock.data()->_disowned )
 
310
    {
 
311
        return;
 
312
    }
 
313
 
 
314
    if( sock.data()->_msg.isNull() )
 
315
    {
 
316
        char msgheader[ Msg::headerSize() ];
 
317
        if( sock.data()->bytesAvailable() < Msg::headerSize() )
 
318
            return;
 
319
 
 
320
        sock.data()->read( (char*) &msgheader, Msg::headerSize() );
 
321
        sock.data()->_msg = Msg::begin( (char*) &msgheader );
 
322
    }
 
323
 
 
324
    if( sock.data()->bytesAvailable() < sock.data()->_msg->length() )
 
325
        return;
 
326
 
 
327
    QByteArray ba = sock.data()->read( sock.data()->_msg->length() );
 
328
    sock.data()->_msg->fill( ba );
 
329
    Q_ASSERT( sock.data()->_msg->is( Msg::JSON ) );
 
330
 
 
331
    ControlConnection* cc = 0;
 
332
    bool ok;
 
333
    QString key, conntype, nodeid, controlid;
 
334
    QVariantMap m = parser.parse( sock.data()->_msg->payload(), &ok ).toMap();
 
335
    if( !ok )
 
336
    {
 
337
        tDebug() << "Invalid JSON on new connection, aborting";
 
338
        goto closeconnection;
 
339
    }
 
340
 
 
341
    conntype  = m.value( "conntype" ).toString();
 
342
    key       = m.value( "key" ).toString();
 
343
    nodeid    = m.value( "nodeid" ).toString();
 
344
    controlid = m.value( "controlid" ).toString();
 
345
 
 
346
    tDebug( LOGVERBOSE ) << "Incoming connection details:" << m;
 
347
 
 
348
    if( !nodeid.isEmpty() ) // only control connections send nodeid
 
349
    {
 
350
        bool dupe = false;
 
351
        if ( m_connectedNodes.contains( nodeid ) )
 
352
            dupe = true;
 
353
 
 
354
        foreach( ControlConnection* con, m_controlconnections )
 
355
        {
 
356
            tLog( LOGVERBOSE ) << "known connection:" << con->id() << con->source()->friendlyName();
 
357
            if( con->id() == nodeid )
 
358
            {
 
359
                dupe = true;
 
360
                break;
 
361
            }
 
362
        }
 
363
 
 
364
        if ( dupe )
 
365
        {
 
366
            tLog() << "Duplicate control connection detected, dropping:" << nodeid << conntype;
 
367
            goto closeconnection;
 
368
        }
 
369
    }
 
370
 
 
371
    foreach( ControlConnection* con, m_controlconnections )
 
372
    {
 
373
        if ( con->id() == controlid )
 
374
        {
 
375
            cc = con;
 
376
            break;
 
377
        }
 
378
    }
 
379
 
 
380
    // they connected to us and want something we are offering
 
381
    if ( conntype == "accept-offer" || conntype == "push-offer" )
 
382
    {
 
383
        sock.data()->_msg.clear();
 
384
        tDebug( LOGVERBOSE ) << Q_FUNC_INFO << key << nodeid << "socket peer address = " << sock.data()->peerAddress() << "socket peer name = " << sock.data()->peerName();
 
385
        Connection* conn = claimOffer( cc, nodeid, key, sock.data()->peerAddress() );
 
386
        if ( !conn )
 
387
        {
 
388
            tLog() << "claimOffer FAILED, key:" << key << nodeid;
 
389
            goto closeconnection;
 
390
        }
 
391
        if ( sock.isNull() )
 
392
        {
 
393
            tLog() << "Socket has become null, possibly took too long to make an ACL decision, key:" << key << nodeid;
 
394
            return;
 
395
        }
 
396
        else if ( !sock.data()->isValid() )
 
397
        {
 
398
            tLog() << "Socket has become invalid, possibly took too long to make an ACL decision, key:" << key << nodeid;
 
399
            goto closeconnection;
 
400
        }
 
401
        tDebug( LOGVERBOSE ) << "claimOffer OK:" << key << nodeid;        
 
402
        
 
403
        m_connectedNodes << nodeid;
 
404
        if( !nodeid.isEmpty() )
 
405
            conn->setId( nodeid );
 
406
 
 
407
        handoverSocket( conn, sock.data() );
 
408
        return;
 
409
    }
 
410
    else
 
411
    {
 
412
        tLog() << "Invalid or unhandled conntype";
 
413
    }
 
414
 
 
415
    // fallthru to cleanup:
 
416
closeconnection:
 
417
    tLog() << "Closing incoming connection, something was wrong.";
 
418
    sock.data()->_msg.clear();
 
419
    sock.data()->disconnectFromHost();
 
420
}
 
421
 
 
422
 
 
423
// creates a new tcp connection to peer from conn, handled by given connector
 
424
// new_conn is responsible for sending the first msg, if needed
 
425
void
 
426
Servent::createParallelConnection( Connection* orig_conn, Connection* new_conn, const QString& key )
 
427
{
 
428
    tDebug( LOGVERBOSE ) << Q_FUNC_INFO << ", key:" << key << thread() << orig_conn;
 
429
    // if we can connect to them directly:
 
430
    if( orig_conn && orig_conn->outbound() )
 
431
    {
 
432
        connectToPeer( orig_conn->socket()->peerAddress().toString(),
 
433
                       orig_conn->peerPort(),
 
434
                       key,
 
435
                       new_conn );
 
436
    }
 
437
    else // ask them to connect to us:
 
438
    {
 
439
        QString tmpkey = uuid();
 
440
        tLog() << "Asking them to connect to us using" << tmpkey ;
 
441
        registerOffer( tmpkey, new_conn );
 
442
 
 
443
        QVariantMap m;
 
444
        m.insert( "conntype", "request-offer" );
 
445
        m.insert( "key", tmpkey );
 
446
        m.insert( "offer", key );
 
447
        m.insert( "port", externalPort() );
 
448
        m.insert( "controlid", Database::instance()->impl()->dbid() );
 
449
 
 
450
        QJson::Serializer ser;
 
451
        orig_conn->sendMsg( Msg::factory( ser.serialize(m), Msg::JSON ) );
 
452
    }
 
453
}
 
454
 
 
455
 
 
456
void
 
457
Servent::socketConnected()
 
458
{
 
459
    QTcpSocketExtra* sock = (QTcpSocketExtra*)sender();
 
460
 
 
461
    tDebug( LOGVERBOSE ) << Q_FUNC_INFO << thread() << "socket: " << sock << ", hostaddr: " << sock->peerAddress() << ", hostname: " << sock->peerName();
 
462
 
 
463
    if ( sock->_conn.isNull() )
 
464
    {
 
465
        sock->close();
 
466
        sock->deleteLater();
 
467
        tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "Socket's connection was null, could have timed out or been given an invalid address";
 
468
        return;
 
469
    }
 
470
    
 
471
    Connection* conn = sock->_conn.data();
 
472
    handoverSocket( conn, sock );
 
473
}
 
474
 
 
475
 
 
476
// transfers ownership of socket to the connection and inits the connection
 
477
void Servent::handoverSocket( Connection* conn, QTcpSocketExtra* sock )
 
478
{
 
479
    Q_ASSERT( conn );
 
480
    Q_ASSERT( sock );
 
481
    Q_ASSERT( conn->socket().isNull() );
 
482
    Q_ASSERT( sock->isValid() );
 
483
 
 
484
    disconnect( sock, SIGNAL( readyRead() ),    this, SLOT( readyRead() ) );
 
485
    disconnect( sock, SIGNAL( disconnected() ), sock, SLOT( deleteLater() ) );
 
486
    disconnect( sock, SIGNAL( error( QAbstractSocket::SocketError ) ),
 
487
                this, SLOT( socketError( QAbstractSocket::SocketError ) ) );
 
488
 
 
489
    sock->_disowned = true;
 
490
    conn->setOutbound( sock->_outbound );
 
491
    conn->setPeerPort( sock->peerPort() );
 
492
 
 
493
    conn->start( sock );
 
494
}
 
495
 
 
496
 
 
497
void
 
498
Servent::socketError( QAbstractSocket::SocketError e )
 
499
{
 
500
    QTcpSocketExtra* sock = (QTcpSocketExtra*)sender();
 
501
    if ( !sock )
 
502
    {
 
503
        tLog() << "SocketError, sock is null";
 
504
        return;
 
505
    }
 
506
 
 
507
    if ( !sock->_conn.isNull() )
 
508
    {
 
509
        Connection* conn = sock->_conn.data();
 
510
        tLog() << "Servent::SocketError:" << e << conn->id() << conn->name();
 
511
 
 
512
        if ( !sock->_disowned )
 
513
        {
 
514
            // connection will delete if we already transferred ownership, otherwise:
 
515
            sock->deleteLater();
 
516
        }
 
517
 
 
518
        conn->markAsFailed(); // will emit failed, then finished
 
519
    }
 
520
    else
 
521
    {
 
522
        tLog() << "SocketError, connection is null";
 
523
        sock->deleteLater();
 
524
    }
 
525
}
 
526
 
 
527
 
 
528
void
 
529
Servent::connectToPeer( const QString& ha, int port, const QString &key, const QString& name, const QString& id )
 
530
{
 
531
    Q_ASSERT( this->thread() == QThread::currentThread() );
 
532
 
 
533
    ControlConnection* conn = new ControlConnection( this, ha );
 
534
    QVariantMap m;
 
535
    m["conntype"]  = "accept-offer";
 
536
    m["key"]       = key;
 
537
    m["port"]      = externalPort();
 
538
    m["nodeid"]    = Database::instance()->impl()->dbid();
 
539
 
 
540
    conn->setFirstMessage( m );
 
541
    if( name.length() )
 
542
        conn->setName( name );
 
543
    if( id.length() )
 
544
        conn->setId( id );
 
545
 
 
546
    conn->setProperty( "nodeid", id );
 
547
 
 
548
    connectToPeer( ha, port, key, conn );
 
549
}
 
550
 
 
551
 
 
552
void
 
553
Servent::connectToPeer( const QString& ha, int port, const QString &key, Connection* conn )
 
554
{
 
555
    tDebug( LOGVERBOSE ) << "Servent::connectToPeer:" << ha << ":" << port
 
556
                         << thread() << QThread::currentThread();
 
557
 
 
558
    Q_ASSERT( port > 0 );
 
559
    Q_ASSERT( conn );
 
560
 
 
561
    if ( ( ha == m_externalAddress.toString() || ha == m_externalHostname ) &&
 
562
         ( port == m_externalPort ) )
 
563
    {
 
564
        tDebug() << "ERROR: Tomahawk won't try to connect to" << ha << ":" << port << ": identified as ourselves.";
 
565
        return;
 
566
    }
 
567
 
 
568
    if( key.length() && conn->firstMessage().isNull() )
 
569
    {
 
570
        QVariantMap m;
 
571
        m["conntype"]  = "accept-offer";
 
572
        m["key"]       = key;
 
573
        m["port"]      = externalPort();
 
574
        m["controlid"] = Database::instance()->impl()->dbid();
 
575
        conn->setFirstMessage( m );
 
576
    }
 
577
 
 
578
    QTcpSocketExtra* sock = new QTcpSocketExtra();
 
579
    sock->_disowned = false;
 
580
    sock->_conn = conn;
 
581
    sock->_outbound = true;
 
582
 
 
583
    connect( sock, SIGNAL( connected() ), SLOT( socketConnected() ) );
 
584
    connect( sock, SIGNAL( error( QAbstractSocket::SocketError ) ),
 
585
                     SLOT( socketError( QAbstractSocket::SocketError ) ) );
 
586
 
 
587
    if ( !conn->peerIpAddress().isNull() )
 
588
        sock->connectToHost( conn->peerIpAddress(), port, QTcpSocket::ReadWrite );
 
589
    else
 
590
        sock->connectToHost( ha, port, QTcpSocket::ReadWrite );
 
591
    sock->moveToThread( thread() );
 
592
}
 
593
 
 
594
 
 
595
void
 
596
Servent::reverseOfferRequest( ControlConnection* orig_conn, const QString& theirdbid, const QString& key, const QString& theirkey )
 
597
{
 
598
    Q_ASSERT( this->thread() == QThread::currentThread() );
 
599
 
 
600
    tDebug( LOGVERBOSE ) << "Servent::reverseOfferRequest received for" << key;
 
601
    Connection* new_conn = claimOffer( orig_conn, theirdbid, key );
 
602
    if ( !new_conn )
 
603
    {
 
604
        tDebug() << "claimOffer failed, killing requesting connection out of spite";
 
605
        orig_conn->shutdown();
 
606
        return;
 
607
    }
 
608
 
 
609
    QVariantMap m;
 
610
    m["conntype"]  = "push-offer";
 
611
    m["key"]       = theirkey;
 
612
    m["port"]      = externalPort();
 
613
    m["controlid"] = Database::instance()->impl()->dbid();
 
614
    new_conn->setFirstMessage( m );
 
615
    createParallelConnection( orig_conn, new_conn, QString() );
 
616
}
 
617
 
 
618
 
 
619
// return the appropriate connection for a given offer key, or NULL if invalid
 
620
Connection*
 
621
Servent::claimOffer( ControlConnection* cc, const QString &nodeid, const QString &key, const QHostAddress peer )
 
622
{
 
623
    bool noauth = qApp->arguments().contains( "--noauth" );
 
624
 
 
625
    // magic key for stream connections:
 
626
    if( key.startsWith( "FILE_REQUEST_KEY:" ) )
 
627
    {
 
628
        // check if the source IP matches an existing, authenticated connection
 
629
        if ( !noauth && peer != QHostAddress::Any && !isIPWhitelisted( peer ) )
 
630
        {
 
631
            bool authed = false;
 
632
            foreach( ControlConnection* cc, m_controlconnections )
 
633
            {
 
634
                if( cc->socket()->peerAddress() == peer )
 
635
                {
 
636
                    authed = true;
 
637
                    break;
 
638
                }
 
639
            }
 
640
            if( !authed )
 
641
            {
 
642
                tLog() << "File transfer request rejected, invalid source IP";
 
643
                return NULL;
 
644
            }
 
645
        }
 
646
 
 
647
        QString fid = key.right( key.length() - 17 );
 
648
        StreamConnection* sc = new StreamConnection( this, cc, fid );
 
649
        return sc;
 
650
    }
 
651
 
 
652
    if( key == "whitelist" ) // LAN IP address, check source IP
 
653
    {
 
654
        if( isIPWhitelisted( peer ) )
 
655
        {
 
656
            tDebug() << "Connection is from whitelisted IP range (LAN)";
 
657
            Connection* conn = new ControlConnection( this, peer.toString() );
 
658
            conn->setName( peer.toString() );
 
659
            return conn;
 
660
        }
 
661
        else
 
662
        {
 
663
            tDebug() << "Connection claimed to be whitelisted, but wasn't.";
 
664
            return NULL;
 
665
        }
 
666
    }
 
667
 
 
668
    if( m_offers.contains( key ) )
 
669
    {
 
670
        QWeakPointer<Connection> conn = m_offers.value( key );
 
671
        if( conn.isNull() )
 
672
        {
 
673
            // This can happen if it's a streamconnection, but the audioengine has
 
674
            // already closed the iodevice, causing the connection to be deleted before
 
675
            // the peer connects and provides the first byte
 
676
            tLog() << Q_FUNC_INFO << "invalid/expired offer:" << key;
 
677
            return NULL;
 
678
        }
 
679
 
 
680
        tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "nodeid is: " << nodeid;
 
681
        if( !nodeid.isEmpty() )
 
682
        {
 
683
            // Used by the connection for the ACL check
 
684
            // If there isn't a nodeid it's not the first connection and will already have been stopped
 
685
            conn.data()->setProperty( "nodeid", nodeid );
 
686
        }
 
687
 
 
688
        if( conn.data()->onceOnly() )
 
689
        {
 
690
            m_offers.remove( key );
 
691
            return conn.data();
 
692
        }
 
693
        else
 
694
        {
 
695
            return conn.data()->clone();
 
696
        }
 
697
    }
 
698
    else if ( noauth )
 
699
    {
 
700
        Connection* conn;
 
701
        conn = new ControlConnection( this, peer );
 
702
        conn->setName( key );
 
703
        return conn;
 
704
    }
 
705
    else
 
706
    {
 
707
        tLog() << "Invalid offer key:" << key;
 
708
        return NULL;
 
709
    }
 
710
}
 
711
 
 
712
 
 
713
QSharedPointer<QIODevice>
 
714
Servent::remoteIODeviceFactory( const result_ptr& result )
 
715
{
 
716
    QSharedPointer<QIODevice> sp;
 
717
 
 
718
    QStringList parts = result->url().mid( QString( "servent://" ).length() ).split( "\t" );
 
719
    const QString sourceName = parts.at( 0 );
 
720
    const QString fileId = parts.at( 1 );
 
721
    source_ptr s = SourceList::instance()->get( sourceName );
 
722
    if ( s.isNull() || !s->controlConnection() )
 
723
        return sp;
 
724
 
 
725
    ControlConnection* cc = s->controlConnection();
 
726
    StreamConnection* sc = new StreamConnection( this, cc, fileId, result );
 
727
    createParallelConnection( cc, sc, QString( "FILE_REQUEST_KEY:%1" ).arg( fileId ) );
 
728
    return sc->iodevice();
 
729
}
 
730
 
 
731
 
 
732
void
 
733
Servent::registerStreamConnection( StreamConnection* sc )
 
734
{
 
735
    Q_ASSERT( !m_scsessions.contains( sc ) );
 
736
    tDebug( LOGVERBOSE ) << "Registering Stream" << m_scsessions.length() + 1;
 
737
 
 
738
    QMutexLocker lock( &m_ftsession_mut );
 
739
    m_scsessions.append( sc );
 
740
 
 
741
    printCurrentTransfers();
 
742
    emit streamStarted( sc );
 
743
}
 
744
 
 
745
 
 
746
void
 
747
Servent::onStreamFinished( StreamConnection* sc )
 
748
{
 
749
    Q_ASSERT( sc );
 
750
    tDebug( LOGVERBOSE ) << "Stream Finished, unregistering" << sc->id();
 
751
 
 
752
    QMutexLocker lock( &m_ftsession_mut );
 
753
    m_scsessions.removeAll( sc );
 
754
 
 
755
    printCurrentTransfers();
 
756
    emit streamFinished( sc );
 
757
}
 
758
 
 
759
 
 
760
// used for debug output:
 
761
void
 
762
Servent::printCurrentTransfers()
 
763
{
 
764
    int k = 1;
 
765
//    qDebug() << "~~~ Active file transfer connections:" << m_scsessions.length();
 
766
    foreach( StreamConnection* i, m_scsessions )
 
767
    {
 
768
        qDebug() << k << ") " << i->id();
 
769
    }
 
770
    qDebug() << endl;
 
771
}
 
772
 
 
773
 
 
774
bool
 
775
Servent::isIPWhitelisted( QHostAddress ip )
 
776
{
 
777
    tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "Performing checks against ip" << ip.toString();
 
778
    typedef QPair< QHostAddress, int > range;
 
779
    QList< range > subnetEntries;
 
780
    
 
781
    QList< QNetworkInterface > networkInterfaces = QNetworkInterface::allInterfaces();
 
782
    foreach( QNetworkInterface interface, networkInterfaces )
 
783
    {
 
784
        tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "Checking interface" << interface.humanReadableName();
 
785
        QList< QNetworkAddressEntry > addressEntries = interface.addressEntries();
 
786
        foreach( QNetworkAddressEntry addressEntry, addressEntries )
 
787
        {
 
788
            tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "Checking address entry with ip" << addressEntry.ip().toString() << "and prefix length" << addressEntry.prefixLength();
 
789
            if ( ip.isInSubnet( addressEntry.ip(), addressEntry.prefixLength() ) )
 
790
            {
 
791
                tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "success";
 
792
                return true;
 
793
            }
 
794
        }
 
795
    }
 
796
    tDebug( LOGVERBOSE ) << Q_FUNC_INFO << "failure";
 
797
    return false;
 
798
}
 
799
 
 
800
 
 
801
bool
 
802
Servent::connectedToSession( const QString& session )
 
803
{
 
804
    foreach( ControlConnection* cc, m_controlconnections )
 
805
    {
 
806
        if( cc->id() == session )
 
807
            return true;
 
808
    }
 
809
 
 
810
    return false;
 
811
}
 
812
 
 
813
 
 
814
void
 
815
Servent::triggerDBSync()
 
816
{
 
817
    // tell peers we have new stuff they should sync
 
818
    QList<source_ptr> sources = SourceList::instance()->sources();
 
819
    foreach( const source_ptr& src, sources )
 
820
    {
 
821
        // skip local source
 
822
        if ( src.isNull() || src->isLocal() )
 
823
            continue;
 
824
 
 
825
        if ( src->controlConnection() && src->controlConnection()->dbSyncConnection() ) // source online?
 
826
            src->controlConnection()->dbSyncConnection()->trigger();
 
827
    }
 
828
}
 
829
 
 
830
 
 
831
void
 
832
Servent::registerIODeviceFactory( const QString &proto, boost::function<QSharedPointer<QIODevice>(Tomahawk::result_ptr)> fac )
 
833
{
 
834
    m_iofactories.insert( proto, fac );
 
835
}
 
836
 
 
837
 
 
838
QSharedPointer<QIODevice>
 
839
Servent::getIODeviceForUrl( const Tomahawk::result_ptr& result )
 
840
{
 
841
    QSharedPointer<QIODevice> sp;
 
842
 
 
843
    QRegExp rx( "^([a-zA-Z0-9]+)://(.+)$" );
 
844
    if ( rx.indexIn( result->url() ) == -1 )
 
845
        return sp;
 
846
 
 
847
    const QString proto = rx.cap( 1 );
 
848
    if ( !m_iofactories.contains( proto ) )
 
849
        return sp;
 
850
 
 
851
    return m_iofactories.value( proto )( result );
 
852
}
 
853
 
 
854
 
 
855
QSharedPointer<QIODevice>
 
856
Servent::localFileIODeviceFactory( const Tomahawk::result_ptr& result )
 
857
{
 
858
    // ignore "file://" at front of url
 
859
    QFile* io = new QFile( result->url().mid( QString( "file://" ).length() ) );
 
860
    if ( io )
 
861
        io->open( QIODevice::ReadOnly );
 
862
 
 
863
    return QSharedPointer<QIODevice>( io );
 
864
}
 
865
 
 
866
 
 
867
QSharedPointer<QIODevice>
 
868
Servent::httpIODeviceFactory( const Tomahawk::result_ptr& result )
 
869
{
 
870
    QNetworkRequest req( result->url() );
 
871
    QNetworkReply* reply = TomahawkUtils::nam()->get( req );
 
872
    return QSharedPointer<QIODevice>( reply, &QObject::deleteLater );
 
873
}