~ubuntu-branches/ubuntu/oneiric/libktorrent/oneiric

« back to all changes in this revision

Viewing changes to src/utp/utpserver.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Modestas Vainius
  • Date: 2011-05-26 00:33:38 UTC
  • mfrom: (5.1.7 experimental)
  • Revision ID: james.westby@ubuntu.com-20110526003338-2r4zwyzn8bovsxay
Tags: 1.1.1-2
Release to unstable.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
 ***************************************************************************/
20
20
 
21
21
#include "utpserver.h"
 
22
#include "utpserver_p.h"
22
23
#include <QEvent>
 
24
#include <QTimer>
 
25
#include <QHostAddress>
23
26
#include <QCoreApplication>
24
27
#include <stdlib.h>
25
28
#ifndef Q_CC_MSVC
40
43
#endif
41
44
 
42
45
 
43
 
 
44
 
 
45
 
 
46
46
using namespace bt;
47
47
 
48
48
namespace utp
49
49
{
50
 
        
 
50
 
51
51
        MainThreadCall::MainThreadCall(UTPServer* server) : server(server)
52
52
        {
53
53
        }
60
60
        {
61
61
                server->handlePendingConnections();
62
62
        }
63
 
 
64
 
        static int start_timer_event_type = 0;
65
 
        static int start_write_notifier_event_type = 0;
66
 
 
 
63
        
67
64
        ///////////////////////////////////////////////////////////
68
 
 
69
 
        UTPServer::UTPServer(QObject* parent) 
70
 
                : ServerInterface(parent),
71
 
                sock(0),
72
 
                running(false),
73
 
                utp_thread(0),
74
 
                mutex(QMutex::Recursive),
75
 
                create_sockets(true),
76
 
                tos(0),
77
 
                read_notifier(0),
78
 
                write_notifier(0),
79
 
                mtc(new MainThreadCall(this))
 
65
        
 
66
        UTPServer::Private::Private(UTPServer* p) : 
 
67
                        p(p),
 
68
                        running(false),
 
69
                        utp_thread(0),
 
70
                        mutex(QMutex::Recursive),
 
71
                        create_sockets(true),
 
72
                        tos(0),
 
73
                        mtc(new MainThreadCall(p))
80
74
        {
81
 
                qsrand(time(0));
82
 
                connect(this,SIGNAL(handlePendingConnectionsDelayed()),
 
75
                QObject::connect(p,SIGNAL(handlePendingConnectionsDelayed()),
83
76
                                mtc,SLOT(handlePendingConnections()),Qt::QueuedConnection);
84
 
                connect(this,SIGNAL(accepted(Connection*)),this,SLOT(onAccepted(Connection*)));
 
77
        
85
78
                poll_pipes.setAutoDelete(true);
86
 
                start_timer_event_type = QEvent::registerEventType();
87
 
                start_write_notifier_event_type = QEvent::registerEventType();
88
79
        }
89
 
 
90
 
        UTPServer::~UTPServer()
 
80
                
 
81
        UTPServer::Private::~Private()
91
82
        {
92
83
                if (running)
93
84
                        stop();
94
85
                
95
 
                qDeleteAll(pending);
96
86
                pending.clear();
97
 
                
98
87
                delete mtc;
99
88
        }
100
89
        
101
 
        // HACK: to avoid binary incompatibilities, 
102
 
        // relies on the fact that there will only be one  UTPServer instance
103
 
        static QMutex pending_mutex;
104
 
        static QMutex output_queue_mutex(QMutex::Recursive);
105
 
        
 
90
        void UTPServer::Private::stop()
 
91
        {
 
92
                running = false;
 
93
                if (utp_thread)
 
94
                {
 
95
                        utp_thread->exit();
 
96
                        utp_thread->wait();
 
97
                        delete utp_thread;
 
98
                        utp_thread = 0;
 
99
                }
 
100
                
 
101
                connections.clear();
 
102
                
 
103
                // Close the socket
 
104
                sockets.clear();
 
105
                Globals::instance().getPortList().removePort(port,net::UDP);
 
106
        }
 
107
        
 
108
        bool UTPServer::Private::bind(const net::Address& addr)
 
109
        {
 
110
                net::ServerSocket::Ptr sock(new net::ServerSocket(this));
 
111
                if (!sock->bind(addr))
 
112
                {
 
113
                        return false;
 
114
                }
 
115
                else
 
116
                {
 
117
                        Out(SYS_UTP|LOG_NOTICE) << "UTP: bound to " << addr.toString() << endl;
 
118
                        sock->setTOS(tos);
 
119
                        sock->setReadNotificationsEnabled(false);
 
120
                        sock->setWriteNotificationsEnabled(false);
 
121
                        sockets.append(sock);
 
122
                        return true;
 
123
                }
 
124
        }
 
125
        
 
126
        void UTPServer::Private::syn(const PacketParser & parser, const QByteArray& data, const net::Address & addr)
 
127
        {
 
128
                const Header* hdr = parser.header();
 
129
                quint16 recv_conn_id = hdr->connection_id + 1;
 
130
                if (connections.contains(recv_conn_id))
 
131
                {
 
132
                        // Send a reset packet if the ID is in use
 
133
                        Connection::Ptr conn(new Connection(recv_conn_id,Connection::INCOMING,addr,p));
 
134
                        conn->setWeakPointer(conn);
 
135
                        conn->sendReset();
 
136
                }
 
137
                else
 
138
                {
 
139
                        Connection::Ptr conn(new Connection(recv_conn_id,Connection::INCOMING,addr,p));
 
140
                        try
 
141
                        {
 
142
                                conn->setWeakPointer(conn);
 
143
                                conn->handlePacket(parser,data);
 
144
                                connections.insert(recv_conn_id,conn);
 
145
                                if (create_sockets)
 
146
                                {
 
147
                                        UTPSocket* utps = new UTPSocket(conn);
 
148
                                        mse::StreamSocket::Ptr ss(new mse::StreamSocket(utps));
 
149
                                        {
 
150
                                                QMutexLocker lock(&pending_mutex);
 
151
                                                pending.append(ss);
 
152
                                        }
 
153
                                        p->handlePendingConnectionsDelayed();
 
154
                                }
 
155
                                else
 
156
                                {
 
157
                                        last_accepted.append(conn);
 
158
                                        p->accepted();
 
159
                                }
 
160
                        }
 
161
                        catch (Connection::TransmissionError & err)
 
162
                        {
 
163
                                Out(SYS_UTP|LOG_NOTICE) << "UTP: " << err.location << endl;
 
164
                                connections.remove(recv_conn_id);
 
165
                        }
 
166
                }
 
167
        }
 
168
        
 
169
        void UTPServer::Private::reset(const utp::Header* hdr)
 
170
        {
 
171
                Connection::Ptr c = find(hdr->connection_id);
 
172
                if (c)
 
173
                {
 
174
                        c->reset();
 
175
                }
 
176
        }
 
177
        
 
178
        Connection::Ptr UTPServer::Private::find(quint16 conn_id)
 
179
        {
 
180
                ConnectionMapItr i = connections.find(conn_id);
 
181
                if (i != connections.end())
 
182
                        return i.value();
 
183
                else
 
184
                        return Connection::Ptr();
 
185
        }
 
186
        
 
187
        void UTPServer::Private::wakeUpPollPipes(utp::Connection::Ptr conn, bool readable, bool writeable)
 
188
        {
 
189
                QMutexLocker lock(&mutex);
 
190
                for (PollPipePairItr itr = poll_pipes.begin();itr != poll_pipes.end();itr++)
 
191
                {
 
192
                        PollPipePair* pp = itr->second;
 
193
                        if (readable && pp->read_pipe->polling(conn->receiveConnectionID()))
 
194
                                itr->second->read_pipe->wakeUp();
 
195
                        if (writeable && pp->write_pipe->polling(conn->receiveConnectionID()))
 
196
                                itr->second->write_pipe->wakeUp();
 
197
                }
 
198
        }
 
199
        
 
200
        
 
201
        void UTPServer::Private::dataReceived(const QByteArray& data, const net::Address& addr)
 
202
        {
 
203
                QMutexLocker lock(&mutex);
 
204
                //Out(SYS_UTP|LOG_NOTICE) << "UTP: received " << ba << " bytes packet from " << addr.toString() << endl;
 
205
                try
 
206
                {
 
207
                        if (data.size() >= (int)utp::Header::size()) // discard packets which are to small
 
208
                        {
 
209
                                p->handlePacket(data,addr);
 
210
                        }
 
211
                }
 
212
                catch (utp::Connection::TransmissionError & err)
 
213
                {
 
214
                        Out(SYS_UTP|LOG_NOTICE) << "UTP: " << err.location << endl;
 
215
                }
 
216
        }
 
217
        
 
218
        void UTPServer::Private::readyToWrite(net::ServerSocket* sock)
 
219
        {
 
220
                output_queue.send(sock);
 
221
        }
 
222
        
 
223
        ///////////////////////////////////////////////////////////
 
224
 
 
225
        UTPServer::UTPServer(QObject* parent) 
 
226
                : ServerInterface(parent),d(new Private(this))
 
227
                
 
228
        {
 
229
                qsrand(time(0));
 
230
        }
 
231
 
 
232
        UTPServer::~UTPServer()
 
233
        {
 
234
                delete d;
 
235
        }
 
236
 
106
237
        void UTPServer::handlePendingConnections()
107
238
        {
108
239
                // This should be called from the main thread
109
 
                QList<mse::StreamSocket*> p;
 
240
                QList<mse::StreamSocket::Ptr> p;
110
241
                {
111
 
                        QMutexLocker lock(&pending_mutex);
 
242
                        QMutexLocker lock(&d->pending_mutex);
112
243
                        // Copy the pending list and clear it before using it's contents to avoid a deadlock
113
 
                        p = pending; 
114
 
                        pending.clear();
 
244
                        p = d->pending;
 
245
                        d->pending.clear();
115
246
                }
116
247
                
117
 
                foreach (mse::StreamSocket* s,p)
 
248
                foreach (mse::StreamSocket::Ptr s,p)
118
249
                {
119
250
                        newConnection(s);
120
251
                }
123
254
        
124
255
        bool UTPServer::changePort(bt::Uint16 p)
125
256
        {
126
 
                if (sock && port == p)
 
257
                if (d->sockets.count() > 0 && port == p)
127
258
                        return true;
128
259
                
 
260
                Globals::instance().getPortList().removePort(port,net::UDP);
 
261
                d->sockets.clear();
 
262
                
129
263
                QStringList possible = bindAddresses();
130
 
                foreach (const QString & ip,possible)
131
 
                {
132
 
                        net::Address addr(ip,p);
133
 
                        if (bind(addr))
134
 
                                return true;
135
 
                }
136
 
                
137
 
                return false;
138
 
        }
139
 
 
140
 
 
141
 
        bool UTPServer::bind(const net::Address& addr)
142
 
        {
143
 
                if (sock)
144
 
                {
145
 
                        Globals::instance().getPortList().removePort(port,net::UDP);
146
 
                        delete sock;
147
 
                }
148
 
                
149
 
                sock = new net::Socket(false,addr.ipVersion());
150
 
                sock->setBlocking(false);
151
 
                if (!sock->bind(addr,false))
152
 
                {
153
 
                        delete sock;
154
 
                        sock = 0;
 
264
                foreach (const QString & addr,possible)
 
265
                {
 
266
                        d->bind(net::Address(addr,p));
 
267
                }
 
268
                
 
269
                if (d->sockets.count() == 0)
 
270
                {
 
271
                        // Try any addresses if previous binds failed
 
272
                        d->bind(net::Address(QHostAddress(QHostAddress::AnyIPv6).toString(),p));
 
273
                        d->bind(net::Address(QHostAddress(QHostAddress::Any).toString(),p));
 
274
                }
 
275
                
 
276
                if (d->sockets.count())
 
277
                {
 
278
                        Globals::instance().getPortList().addNewPort(p,net::UDP,true);
 
279
                        return true;
 
280
                }
 
281
                else
155
282
                        return false;
156
 
                }
157
 
                else
158
 
                {
159
 
                        Out(SYS_UTP|LOG_NOTICE) << "UTP: bound to " << addr.toString() << endl;
160
 
                        sock->setTOS(tos);
161
 
                        Globals::instance().getPortList().addNewPort(addr.port(),net::UDP,true);
162
 
                        return true;
163
 
                }
164
283
        }
165
284
        
166
285
        void UTPServer::setTOS(Uint8 type_of_service)
167
286
        {
168
 
                tos = type_of_service;
169
 
                if (sock)
170
 
                        sock->setTOS(tos);
 
287
                d->tos = type_of_service;
 
288
                foreach (net::ServerSocket::Ptr sock,d->sockets)
 
289
                        sock->setTOS(d->tos);
171
290
        }
172
291
        
173
292
        void UTPServer::threadStarted()
174
293
        {
175
 
                if (!read_notifier)
176
 
                {
177
 
                        read_notifier = new QSocketNotifier(sock->fd(),QSocketNotifier::Read,this);
178
 
                        connect(read_notifier,SIGNAL(activated(int)),this,SLOT(readPacket(int)));
179
 
                }
180
 
                
181
 
                if (!write_notifier)
182
 
                {
183
 
                        write_notifier = new QSocketNotifier(sock->fd(),QSocketNotifier::Write,this);
184
 
                        connect(write_notifier,SIGNAL(activated(int)),this,SLOT(writePacket(int)));
185
 
                }
186
 
                
187
 
                write_notifier->setEnabled(false);
188
 
        }
189
 
        
190
 
        void UTPServer::readPacket(int)
191
 
        {
192
 
                QMutexLocker lock(&mutex);
193
 
                
194
 
                int ba = sock->bytesAvailable();
195
 
                QByteArray packet(ba,0);
196
 
                net::Address addr;
197
 
                if (sock->recvFrom((bt::Uint8*)packet.data(),ba,addr) > 0)
198
 
                {
199
 
                        //Out(SYS_UTP|LOG_NOTICE) << "UTP: received " << ba << " bytes packet from " << addr.toString() << endl;
200
 
                        try
201
 
                        {
202
 
                                if (ba >= utp::Header::size()) // discard packets which are to small
203
 
                                        handlePacket(packet,addr);
204
 
                        }
205
 
                        catch (utp::Connection::TransmissionError & err)
206
 
                        {
207
 
                                Out(SYS_UTP|LOG_NOTICE) << "UTP: " << err.location << endl;
208
 
                        }
209
 
                }
210
 
        }
211
 
        
212
 
        void UTPServer::writePacket(int)
213
 
        {
214
 
                QMutexLocker lock(&output_queue_mutex);
215
 
                
216
 
                try
217
 
                {
218
 
                        // Keep sending until the output queue is empty or the socket 
219
 
                        // can't handle the data anymore
220
 
                        while (!output_queue.empty())
221
 
                        {
222
 
                                OutputQueueEntry & packet = output_queue.front();
223
 
                                const QByteArray & data = packet.get<0>();
224
 
                                const net::Address & addr = packet.get<1>();
225
 
                                int ret = sock->sendTo((const bt::Uint8*)data.data(),data.size(),addr);
226
 
                                if (ret == net::SEND_WOULD_BLOCK)
227
 
                                        break;
228
 
                                
229
 
                                output_queue.pop_front();
230
 
                        }
231
 
                }
232
 
                catch (Connection::TransmissionError & err)
233
 
                {
234
 
                        Out(SYS_UTP|LOG_NOTICE) << "UTP: " << err.location << endl;
235
 
                }
236
 
                
237
 
                write_notifier->setEnabled(!output_queue.empty());
 
294
                foreach (net::ServerSocket::Ptr sock,d->sockets)
 
295
                {
 
296
                        sock->setReadNotificationsEnabled(true);
 
297
                }
238
298
        }
239
299
        
240
300
#if 0
286
346
                const Header* hdr = parser.header();
287
347
                //Dump(packet,addr);
288
348
                //DumpPacket(*hdr);
289
 
                
 
349
                Connection::Ptr c;
290
350
                switch (hdr->type)
291
351
                {
292
352
                        case ST_DATA:
294
354
                        case ST_STATE:
295
355
                                try
296
356
                                {
297
 
                                        Connection* c = find(hdr->connection_id);
298
 
                                        if (c)
299
 
                                                c->handlePacket(parser,packet);
 
357
                                        c = d->find(hdr->connection_id);
 
358
                                        if (c && c->handlePacket(parser,packet) == CS_CLOSED)
 
359
                                                d->connections.remove(c->receiveConnectionID());
300
360
                                }
301
361
                                catch (Connection::TransmissionError & err)
302
362
                                {
303
363
                                        Out(SYS_UTP|LOG_NOTICE) << "UTP: " << err.location << endl;
304
 
                                        // TODO: kill connection
 
364
                                        if (c)
 
365
                                                c->close();
305
366
                                }
306
367
                                break;
307
368
                        case ST_RESET:
308
 
                                reset(hdr);
 
369
                                d->reset(hdr);
309
370
                                break;
310
371
                        case ST_SYN:
311
 
                                syn(parser,packet,addr);
 
372
                                d->syn(parser,packet,addr);
312
373
                                break;
313
374
                }
314
375
        }
315
376
 
316
377
 
317
 
        bool UTPServer::sendTo(const QByteArray& data, const net::Address& addr,quint16 conn_id)
 
378
        bool UTPServer::sendTo(utp::Connection::Ptr conn, const QByteArray& data)
318
379
        {
319
 
                QMutexLocker lock(&output_queue_mutex);
320
 
                output_queue.append(OutputQueueEntry(data,addr,conn_id));
321
 
                if (output_queue.size() == 1)
 
380
                if (d->output_queue.add(data,conn) == 1)
322
381
                {
323
 
                        // Need to start write notifiers
324
 
                        if (QThread::currentThread() == utp_thread)
325
 
                                write_notifier->setEnabled(true);
 
382
                        // If there is only one packet queued, 
 
383
                        // We need to enable the write notifiers, use the event queue to do this
 
384
                        // if we are not in the utp thread
 
385
                        if (QThread::currentThread() != d->utp_thread)
 
386
                        {
 
387
                                QCoreApplication::postEvent(this, new QEvent(QEvent::User));
 
388
                        }
326
389
                        else
327
 
                                QCoreApplication::postEvent(this,new QEvent((QEvent::Type)start_write_notifier_event_type));
 
390
                        {
 
391
                                foreach (net::ServerSocket::Ptr sock, d->sockets)
 
392
                                        sock->setWriteNotificationsEnabled(true);
 
393
                        }
328
394
                }
329
 
                
330
395
                return true;
331
396
        }
332
 
 
333
 
        Connection* UTPServer::connectTo(const net::Address& addr)
334
 
        {
335
 
                if (!sock || addr.port() == 0)
336
 
                        return 0;
 
397
        
 
398
        void UTPServer::customEvent(QEvent* ev)
 
399
        {
 
400
                if (ev->type() == QEvent::User)
 
401
                {
 
402
                        foreach (net::ServerSocket::Ptr sock, d->sockets)
 
403
                                sock->setWriteNotificationsEnabled(true);
 
404
                }
 
405
        }
 
406
 
 
407
 
 
408
        Connection::WPtr UTPServer::connectTo(const net::Address& addr)
 
409
        {
 
410
                if (d->sockets.isEmpty() || addr.port() == 0)
 
411
                        return Connection::WPtr();
337
412
                
338
 
                QMutexLocker lock(&mutex);
 
413
                QMutexLocker lock(&d->mutex);
339
414
                quint16 recv_conn_id = qrand() % 32535;
340
 
                while (connections.contains(recv_conn_id))
 
415
                while (d->connections.contains(recv_conn_id))
341
416
                        recv_conn_id = qrand() % 32535;
342
417
                
343
 
                Connection* conn = new Connection(recv_conn_id,Connection::OUTGOING,addr,this);
344
 
                conn->moveToThread(utp_thread);
345
 
                connections.insert(recv_conn_id,conn);
 
418
                Connection::Ptr conn(new Connection(recv_conn_id,Connection::OUTGOING,addr,this));
 
419
                conn->setWeakPointer(conn);
 
420
                conn->moveToThread(d->utp_thread);
 
421
                d->connections.insert(recv_conn_id,conn);
346
422
                try
347
423
                {
348
424
                        conn->startConnecting();
350
426
                }
351
427
                catch (Connection::TransmissionError & err)
352
428
                {
353
 
                        connections.erase(recv_conn_id);
354
 
                        delete conn;
355
 
                        return 0;
356
 
                }
357
 
        }
358
 
 
359
 
        void UTPServer::syn(const PacketParser & parser, const QByteArray& data, const net::Address & addr)
360
 
        {
361
 
                const Header* hdr = parser.header();
362
 
                quint16 recv_conn_id = hdr->connection_id + 1;
363
 
                if (connections.find(recv_conn_id))
364
 
                {
365
 
                        // Send a reset packet if the ID is in use
366
 
                        Connection conn(recv_conn_id,Connection::INCOMING,addr,this);
367
 
                        conn.sendReset();
368
 
                }
369
 
                else
370
 
                {
371
 
                        Connection* conn = new Connection(recv_conn_id,Connection::INCOMING,addr,this);
372
 
                        try
373
 
                        {
374
 
                                conn->handlePacket(parser,data);
375
 
                                connections.insert(recv_conn_id,conn);
376
 
                                accepted(conn);
377
 
                        }
378
 
                        catch (Connection::TransmissionError & err)
379
 
                        {
380
 
                                Out(SYS_UTP|LOG_NOTICE) << "UTP: " << err.location << endl;
381
 
                                connections.erase(recv_conn_id);
382
 
                                delete conn;
383
 
                        }
384
 
                }
385
 
        }
386
 
 
387
 
        void UTPServer::reset(const utp::Header* hdr)
388
 
        {
389
 
                Connection* c = find(hdr->connection_id);
390
 
                if (c)
391
 
                {
392
 
                        c->reset();
393
 
                }
394
 
        }
395
 
 
396
 
        Connection* UTPServer::find(quint16 conn_id)
397
 
        {
398
 
                return connections.find(conn_id);
399
 
        }
400
 
 
401
 
        void UTPServer::clearDeadConnections()
402
 
        {
403
 
                QMutexLocker lock(&mutex);
404
 
                QList<Connection*>::iterator i = dead_connections.begin();
405
 
                while (i != dead_connections.end())
406
 
                {
407
 
                        Connection* conn = *i;
408
 
                        if (conn->connectionState() == CS_CLOSED)
409
 
                        {
410
 
                                connections.erase(conn->receiveConnectionID());
411
 
                                delete conn;
412
 
                                i = dead_connections.erase(i);
413
 
                        }
414
 
                        else
415
 
                                i++;
416
 
                }
417
 
        }
418
 
        
419
 
        void UTPServer::attach(UTPSocket* socket, Connection* conn)
420
 
        {
421
 
                QMutexLocker lock(&mutex);
422
 
                alive_connections.insert(conn,socket);
423
 
        }
424
 
 
425
 
        void UTPServer::detach(UTPSocket* socket, Connection* conn)
426
 
        {
427
 
                QMutexLocker lock(&mutex);
428
 
                UTPSocket* sock = alive_connections.find(conn);
429
 
                if (sock == socket)
430
 
                {
431
 
                        // given the fact that the socket is gone, we can close it
432
 
                        try
433
 
                        {
434
 
                                conn->close();
435
 
                        }
436
 
                        catch (Connection::TransmissionError)
437
 
                        {
438
 
                        }
439
 
                        alive_connections.erase(conn);
440
 
                        dead_connections.append(conn);
 
429
                        d->connections.remove(recv_conn_id);
 
430
                        return Connection::WPtr();
441
431
                }
442
432
        }
443
433
 
444
434
        void UTPServer::stop()
445
435
        {
446
 
                running = false;
447
 
                if (utp_thread)
448
 
                {
449
 
                        utp_thread->exit();
450
 
                        utp_thread->wait();
451
 
                        delete utp_thread;
452
 
                        utp_thread = 0;
453
 
                }
454
 
                
455
 
                timer.stop();
456
 
        
457
 
                // Cleanup all connections
458
 
                QList<UTPSocket*> sockets;
459
 
                bt::PtrMap<Connection*,UTPSocket>::iterator i = alive_connections.begin();
460
 
                while (i != alive_connections.end())
461
 
                {
462
 
                        sockets.append(i->second);
463
 
                        i++;
464
 
                }
465
 
                
466
 
                foreach (UTPSocket* s,sockets)
467
 
                        s->reset();
468
 
                
469
 
                alive_connections.clear();
470
 
                clearDeadConnections();
471
 
                
472
 
                connections.setAutoDelete(true);
473
 
                connections.clear();
474
 
                connections.setAutoDelete(false);
475
 
                
476
 
                // Close the socket
477
 
                if (sock)
478
 
                {
479
 
                        sock->close();
480
 
                        delete sock;
481
 
                        sock = 0;
482
 
                        Globals::instance().getPortList().removePort(port,net::UDP);
483
 
                }
 
436
                d->stop();
484
437
        }
485
438
        
486
 
        
487
439
        void UTPServer::start()
488
440
        {
489
 
                if (!utp_thread)
 
441
                if (!d->utp_thread)
490
442
                {
491
 
                        utp_thread = new UTPServerThread(this);
492
 
                        utp_thread->start();
 
443
                        d->utp_thread = new UTPServerThread(this);
 
444
                        foreach (net::ServerSocket::Ptr sock,d->sockets)
 
445
                                sock->moveToThread(d->utp_thread);
 
446
                        d->utp_thread->start();
493
447
                }
494
448
        }
495
449
        
496
 
        void UTPServer::timerEvent(QTimerEvent* event)
497
 
        {
498
 
                if (event->timerId() == timer.timerId())
499
 
                        wakeUpPollPipes();
500
 
                else
501
 
                        QObject::timerEvent(event);
502
 
        }
503
 
 
504
 
        void UTPServer::wakeUpPollPipes()
505
 
        {
506
 
                bool restart_timer = false;
507
 
                QMutexLocker lock(&mutex);
508
 
                for (PollPipePairItr p = poll_pipes.begin();p != poll_pipes.end();p++)
509
 
                {
510
 
                        PollPipePair* pp = p->second;
511
 
                        if (pp->read_pipe->polling())
512
 
                                p->second->testRead(connections.begin(),connections.end());
513
 
                        if (pp->write_pipe->polling())
514
 
                                p->second->testWrite(connections.begin(),connections.end());
515
 
                        
516
 
                        restart_timer = restart_timer || pp->read_pipe->polling() || pp->write_pipe->polling();
517
 
                }
518
 
                
519
 
                if (restart_timer)
520
 
                        timer.start(10,this);
521
 
        }
522
 
 
523
 
 
524
 
        void UTPServer::preparePolling(net::Poll* p, net::Poll::Mode mode,Connection* conn)
525
 
        {
526
 
                QMutexLocker lock(&mutex);
527
 
                PollPipePair* pair = poll_pipes.find(p);
 
450
        void UTPServer::preparePolling(net::Poll* p, net::Poll::Mode mode, utp::Connection::Ptr conn)
 
451
        {
 
452
                QMutexLocker lock(&d->mutex);
 
453
                PollPipePair* pair = d->poll_pipes.find(p);
528
454
                if (!pair)
529
455
                {
530
456
                        pair = new PollPipePair();
531
 
                        poll_pipes.insert(p,pair);
 
457
                        d->poll_pipes.insert(p,pair);
532
458
                }
533
459
                
534
460
                if (mode == net::Poll::INPUT)
543
469
                                pair->write_pipe->wakeUp();
544
470
                        pair->write_pipe->prepare(p,conn->receiveConnectionID(),pair->write_pipe);
545
471
                }
546
 
                
547
 
                // Use thread safe event mechanism to start timer
548
 
                if (!timer.isActive())
549
 
                        QCoreApplication::postEvent(this,new QEvent((QEvent::Type)start_timer_event_type));
550
 
        }
551
 
        
552
 
        void UTPServer::customEvent(QEvent* event)
553
 
        {
554
 
                if (event->type() == start_timer_event_type)
555
 
                {
556
 
                        timer.start(10,this);
557
 
                        event->accept();
558
 
                }
559
 
                else if (event->type() == start_write_notifier_event_type)
560
 
                {
561
 
                        write_notifier->setEnabled(true);
562
 
                        event->accept();
563
 
                }
564
 
        }
565
 
 
566
 
        
567
 
        void UTPServer::onAccepted(Connection* conn)
568
 
        {
569
 
                if (create_sockets)
570
 
                {
571
 
                        UTPSocket* utps = new UTPSocket(conn);
572
 
                        mse::StreamSocket* ss = new mse::StreamSocket(utps);
573
 
                        {
574
 
                                QMutexLocker lock(&pending_mutex);
575
 
                                pending.append(ss);
576
 
                        }
577
 
                        handlePendingConnectionsDelayed();
578
 
                }
579
 
        }
580
 
        
581
 
        UTPServer::PollPipePair::PollPipePair() 
 
472
        }
 
473
        
 
474
        void UTPServer::stateChanged(utp::Connection::Ptr conn, bool readable, bool writeable)
 
475
        {
 
476
                d->wakeUpPollPipes(conn,readable,writeable);
 
477
        }
 
478
        
 
479
        ///////////////////////////////////////////////////////
 
480
        
 
481
        PollPipePair::PollPipePair() 
582
482
                : read_pipe(new PollPipe(net::Poll::INPUT)),
583
483
                write_pipe(new PollPipe(net::Poll::OUTPUT))
584
484
        {
585
485
                
586
486
        }
587
487
                
588
 
        void UTPServer::PollPipePair::testRead(utp::UTPServer::ConItr b, utp::UTPServer::ConItr e)
 
488
        bool PollPipePair::testRead(utp::ConItr b, utp::ConItr e)
589
489
        {
590
 
                for (utp::UTPServer::ConItr i = b;i != e;i++)
 
490
                for (utp::ConItr i = b;i != e;i++)
591
491
                {
592
492
                        if (read_pipe->readyToWakeUp(i->second))
593
493
                        {
594
494
                                read_pipe->wakeUp();
595
 
                                break;
 
495
                                return true;
596
496
                        }
597
497
                }
 
498
                
 
499
                return false;
598
500
        }
599
501
 
600
 
        void UTPServer::PollPipePair::testWrite(utp::UTPServer::ConItr b, utp::UTPServer::ConItr e)
 
502
        bool PollPipePair::testWrite(utp::ConItr b, utp::ConItr e)
601
503
        {
602
 
                for (utp::UTPServer::ConItr i = b;i != e;i++)
 
504
                for (utp::ConItr i = b;i != e;i++)
603
505
                {
604
506
                        if (write_pipe->readyToWakeUp(i->second))
605
507
                        {
606
508
                                write_pipe->wakeUp();
607
 
                                break;
 
509
                                return true;
608
510
                        }
609
511
                }
610
 
        }
611
512
                
 
513
                return false;
 
514
        }
 
515
 
 
516
        void UTPServer::setCreateSockets(bool on)
 
517
        {
 
518
                d->create_sockets = on;
 
519
        }
 
520
 
 
521
        Connection::WPtr UTPServer::acceptedConnection()
 
522
        {
 
523
                if (d->last_accepted.isEmpty())
 
524
                        return Connection::WPtr();
 
525
                else
 
526
                        return d->last_accepted.takeFirst();
 
527
        }
 
528
        
 
529
        void UTPServer::closed(Connection::Ptr conn)
 
530
        {
 
531
                Q_UNUSED(conn);
 
532
                QTimer::singleShot(0,this,SLOT(cleanup()));
 
533
        }
 
534
        
612
535
        void UTPServer::cleanup()
613
536
        {
614
 
                clearDeadConnections();
615
 
        }
616
 
 
617
 
}
 
537
                QMutexLocker lock(&d->mutex);
 
538
                QMap<quint16,Connection::Ptr>::iterator i = d->connections.begin(); 
 
539
                while (i != d->connections.end())
 
540
                {
 
541
                        if (i.value()->connectionState() == CS_CLOSED)
 
542
                                i = d->connections.erase(i);
 
543
                        else
 
544
                                i++;
 
545
                }
 
546
        }
 
547
        
 
548
        int UTPServer::scheduleTimer(Connection::Ptr conn, Uint32 timeout)
 
549
        {
 
550
                int timer_id = startTimer(timeout);
 
551
                d->active_timers.insert(timer_id, Connection::WPtr(conn));
 
552
                return timer_id;
 
553
        }
 
554
        
 
555
        void UTPServer::cancelTimer(int timer_id)
 
556
        {
 
557
                QMap<int,Connection::WPtr>::iterator i = d->active_timers.find(timer_id);
 
558
                if (i != d->active_timers.end())
 
559
                {
 
560
                        killTimer(timer_id);
 
561
                        d->active_timers.erase(i);
 
562
                }
 
563
        }
 
564
 
 
565
 
 
566
        void UTPServer::timerEvent(QTimerEvent* ev)
 
567
        {
 
568
                int timer_id = ev->timerId();
 
569
                killTimer(timer_id);
 
570
                
 
571
                QMap<int,Connection::WPtr>::iterator i = d->active_timers.find(timer_id);
 
572
                if (i != d->active_timers.end())
 
573
                {
 
574
                        Connection::Ptr conn = i.value().toStrongRef();
 
575
                        d->active_timers.erase(i);
 
576
                        if (conn)
 
577
                                conn->handleTimeout();
 
578
                }
 
579
                
 
580
                ev->accept();
 
581
        }
 
582
 
 
583
}
 
 
b'\\ No newline at end of file'