61
61
server->handlePendingConnections();
64
static int start_timer_event_type = 0;
65
static int start_write_notifier_event_type = 0;
67
64
///////////////////////////////////////////////////////////
69
UTPServer::UTPServer(QObject* parent)
70
: ServerInterface(parent),
74
mutex(QMutex::Recursive),
79
mtc(new MainThreadCall(this))
66
UTPServer::Private::Private(UTPServer* p) :
70
mutex(QMutex::Recursive),
73
mtc(new MainThreadCall(p))
82
connect(this,SIGNAL(handlePendingConnectionsDelayed()),
75
QObject::connect(p,SIGNAL(handlePendingConnectionsDelayed()),
83
76
mtc,SLOT(handlePendingConnections()),Qt::QueuedConnection);
84
connect(this,SIGNAL(accepted(Connection*)),this,SLOT(onAccepted(Connection*)));
85
78
poll_pipes.setAutoDelete(true);
86
start_timer_event_type = QEvent::registerEventType();
87
start_write_notifier_event_type = QEvent::registerEventType();
90
UTPServer::~UTPServer()
81
UTPServer::Private::~Private()
101
// HACK: to avoid binary incompatibilities,
102
// relies on the fact that there will only be one UTPServer instance
103
static QMutex pending_mutex;
104
static QMutex output_queue_mutex(QMutex::Recursive);
90
void UTPServer::Private::stop()
105
Globals::instance().getPortList().removePort(port,net::UDP);
108
bool UTPServer::Private::bind(const net::Address& addr)
110
net::ServerSocket::Ptr sock(new net::ServerSocket(this));
111
if (!sock->bind(addr))
117
Out(SYS_UTP|LOG_NOTICE) << "UTP: bound to " << addr.toString() << endl;
119
sock->setReadNotificationsEnabled(false);
120
sock->setWriteNotificationsEnabled(false);
121
sockets.append(sock);
126
void UTPServer::Private::syn(const PacketParser & parser, const QByteArray& data, const net::Address & addr)
128
const Header* hdr = parser.header();
129
quint16 recv_conn_id = hdr->connection_id + 1;
130
if (connections.contains(recv_conn_id))
132
// Send a reset packet if the ID is in use
133
Connection::Ptr conn(new Connection(recv_conn_id,Connection::INCOMING,addr,p));
134
conn->setWeakPointer(conn);
139
Connection::Ptr conn(new Connection(recv_conn_id,Connection::INCOMING,addr,p));
142
conn->setWeakPointer(conn);
143
conn->handlePacket(parser,data);
144
connections.insert(recv_conn_id,conn);
147
UTPSocket* utps = new UTPSocket(conn);
148
mse::StreamSocket::Ptr ss(new mse::StreamSocket(utps));
150
QMutexLocker lock(&pending_mutex);
153
p->handlePendingConnectionsDelayed();
157
last_accepted.append(conn);
161
catch (Connection::TransmissionError & err)
163
Out(SYS_UTP|LOG_NOTICE) << "UTP: " << err.location << endl;
164
connections.remove(recv_conn_id);
169
void UTPServer::Private::reset(const utp::Header* hdr)
171
Connection::Ptr c = find(hdr->connection_id);
178
Connection::Ptr UTPServer::Private::find(quint16 conn_id)
180
ConnectionMapItr i = connections.find(conn_id);
181
if (i != connections.end())
184
return Connection::Ptr();
187
void UTPServer::Private::wakeUpPollPipes(utp::Connection::Ptr conn, bool readable, bool writeable)
189
QMutexLocker lock(&mutex);
190
for (PollPipePairItr itr = poll_pipes.begin();itr != poll_pipes.end();itr++)
192
PollPipePair* pp = itr->second;
193
if (readable && pp->read_pipe->polling(conn->receiveConnectionID()))
194
itr->second->read_pipe->wakeUp();
195
if (writeable && pp->write_pipe->polling(conn->receiveConnectionID()))
196
itr->second->write_pipe->wakeUp();
201
void UTPServer::Private::dataReceived(const QByteArray& data, const net::Address& addr)
203
QMutexLocker lock(&mutex);
204
//Out(SYS_UTP|LOG_NOTICE) << "UTP: received " << ba << " bytes packet from " << addr.toString() << endl;
207
if (data.size() >= (int)utp::Header::size()) // discard packets which are to small
209
p->handlePacket(data,addr);
212
catch (utp::Connection::TransmissionError & err)
214
Out(SYS_UTP|LOG_NOTICE) << "UTP: " << err.location << endl;
218
void UTPServer::Private::readyToWrite(net::ServerSocket* sock)
220
output_queue.send(sock);
223
///////////////////////////////////////////////////////////
225
UTPServer::UTPServer(QObject* parent)
226
: ServerInterface(parent),d(new Private(this))
232
UTPServer::~UTPServer()
106
237
void UTPServer::handlePendingConnections()
108
239
// This should be called from the main thread
109
QList<mse::StreamSocket*> p;
240
QList<mse::StreamSocket::Ptr> p;
111
QMutexLocker lock(&pending_mutex);
242
QMutexLocker lock(&d->pending_mutex);
112
243
// Copy the pending list and clear it before using it's contents to avoid a deadlock
117
foreach (mse::StreamSocket* s,p)
248
foreach (mse::StreamSocket::Ptr s,p)
119
250
newConnection(s);
124
255
bool UTPServer::changePort(bt::Uint16 p)
126
if (sock && port == p)
257
if (d->sockets.count() > 0 && port == p)
260
Globals::instance().getPortList().removePort(port,net::UDP);
129
263
QStringList possible = bindAddresses();
130
foreach (const QString & ip,possible)
132
net::Address addr(ip,p);
141
bool UTPServer::bind(const net::Address& addr)
145
Globals::instance().getPortList().removePort(port,net::UDP);
149
sock = new net::Socket(false,addr.ipVersion());
150
sock->setBlocking(false);
151
if (!sock->bind(addr,false))
264
foreach (const QString & addr,possible)
266
d->bind(net::Address(addr,p));
269
if (d->sockets.count() == 0)
271
// Try any addresses if previous binds failed
272
d->bind(net::Address(QHostAddress(QHostAddress::AnyIPv6).toString(),p));
273
d->bind(net::Address(QHostAddress(QHostAddress::Any).toString(),p));
276
if (d->sockets.count())
278
Globals::instance().getPortList().addNewPort(p,net::UDP,true);
159
Out(SYS_UTP|LOG_NOTICE) << "UTP: bound to " << addr.toString() << endl;
161
Globals::instance().getPortList().addNewPort(addr.port(),net::UDP,true);
166
285
void UTPServer::setTOS(Uint8 type_of_service)
168
tos = type_of_service;
287
d->tos = type_of_service;
288
foreach (net::ServerSocket::Ptr sock,d->sockets)
289
sock->setTOS(d->tos);
173
292
void UTPServer::threadStarted()
177
read_notifier = new QSocketNotifier(sock->fd(),QSocketNotifier::Read,this);
178
connect(read_notifier,SIGNAL(activated(int)),this,SLOT(readPacket(int)));
183
write_notifier = new QSocketNotifier(sock->fd(),QSocketNotifier::Write,this);
184
connect(write_notifier,SIGNAL(activated(int)),this,SLOT(writePacket(int)));
187
write_notifier->setEnabled(false);
190
void UTPServer::readPacket(int)
192
QMutexLocker lock(&mutex);
194
int ba = sock->bytesAvailable();
195
QByteArray packet(ba,0);
197
if (sock->recvFrom((bt::Uint8*)packet.data(),ba,addr) > 0)
199
//Out(SYS_UTP|LOG_NOTICE) << "UTP: received " << ba << " bytes packet from " << addr.toString() << endl;
202
if (ba >= utp::Header::size()) // discard packets which are to small
203
handlePacket(packet,addr);
205
catch (utp::Connection::TransmissionError & err)
207
Out(SYS_UTP|LOG_NOTICE) << "UTP: " << err.location << endl;
212
void UTPServer::writePacket(int)
214
QMutexLocker lock(&output_queue_mutex);
218
// Keep sending until the output queue is empty or the socket
219
// can't handle the data anymore
220
while (!output_queue.empty())
222
OutputQueueEntry & packet = output_queue.front();
223
const QByteArray & data = packet.get<0>();
224
const net::Address & addr = packet.get<1>();
225
int ret = sock->sendTo((const bt::Uint8*)data.data(),data.size(),addr);
226
if (ret == net::SEND_WOULD_BLOCK)
229
output_queue.pop_front();
232
catch (Connection::TransmissionError & err)
234
Out(SYS_UTP|LOG_NOTICE) << "UTP: " << err.location << endl;
237
write_notifier->setEnabled(!output_queue.empty());
294
foreach (net::ServerSocket::Ptr sock,d->sockets)
296
sock->setReadNotificationsEnabled(true);
297
Connection* c = find(hdr->connection_id);
299
c->handlePacket(parser,packet);
357
c = d->find(hdr->connection_id);
358
if (c && c->handlePacket(parser,packet) == CS_CLOSED)
359
d->connections.remove(c->receiveConnectionID());
301
361
catch (Connection::TransmissionError & err)
303
363
Out(SYS_UTP|LOG_NOTICE) << "UTP: " << err.location << endl;
304
// TODO: kill connection
311
syn(parser,packet,addr);
372
d->syn(parser,packet,addr);
317
bool UTPServer::sendTo(const QByteArray& data, const net::Address& addr,quint16 conn_id)
378
bool UTPServer::sendTo(utp::Connection::Ptr conn, const QByteArray& data)
319
QMutexLocker lock(&output_queue_mutex);
320
output_queue.append(OutputQueueEntry(data,addr,conn_id));
321
if (output_queue.size() == 1)
380
if (d->output_queue.add(data,conn) == 1)
323
// Need to start write notifiers
324
if (QThread::currentThread() == utp_thread)
325
write_notifier->setEnabled(true);
382
// If there is only one packet queued,
383
// We need to enable the write notifiers, use the event queue to do this
384
// if we are not in the utp thread
385
if (QThread::currentThread() != d->utp_thread)
387
QCoreApplication::postEvent(this, new QEvent(QEvent::User));
327
QCoreApplication::postEvent(this,new QEvent((QEvent::Type)start_write_notifier_event_type));
391
foreach (net::ServerSocket::Ptr sock, d->sockets)
392
sock->setWriteNotificationsEnabled(true);
333
Connection* UTPServer::connectTo(const net::Address& addr)
335
if (!sock || addr.port() == 0)
398
void UTPServer::customEvent(QEvent* ev)
400
if (ev->type() == QEvent::User)
402
foreach (net::ServerSocket::Ptr sock, d->sockets)
403
sock->setWriteNotificationsEnabled(true);
408
Connection::WPtr UTPServer::connectTo(const net::Address& addr)
410
if (d->sockets.isEmpty() || addr.port() == 0)
411
return Connection::WPtr();
338
QMutexLocker lock(&mutex);
413
QMutexLocker lock(&d->mutex);
339
414
quint16 recv_conn_id = qrand() % 32535;
340
while (connections.contains(recv_conn_id))
415
while (d->connections.contains(recv_conn_id))
341
416
recv_conn_id = qrand() % 32535;
343
Connection* conn = new Connection(recv_conn_id,Connection::OUTGOING,addr,this);
344
conn->moveToThread(utp_thread);
345
connections.insert(recv_conn_id,conn);
418
Connection::Ptr conn(new Connection(recv_conn_id,Connection::OUTGOING,addr,this));
419
conn->setWeakPointer(conn);
420
conn->moveToThread(d->utp_thread);
421
d->connections.insert(recv_conn_id,conn);
348
424
conn->startConnecting();
351
427
catch (Connection::TransmissionError & err)
353
connections.erase(recv_conn_id);
359
void UTPServer::syn(const PacketParser & parser, const QByteArray& data, const net::Address & addr)
361
const Header* hdr = parser.header();
362
quint16 recv_conn_id = hdr->connection_id + 1;
363
if (connections.find(recv_conn_id))
365
// Send a reset packet if the ID is in use
366
Connection conn(recv_conn_id,Connection::INCOMING,addr,this);
371
Connection* conn = new Connection(recv_conn_id,Connection::INCOMING,addr,this);
374
conn->handlePacket(parser,data);
375
connections.insert(recv_conn_id,conn);
378
catch (Connection::TransmissionError & err)
380
Out(SYS_UTP|LOG_NOTICE) << "UTP: " << err.location << endl;
381
connections.erase(recv_conn_id);
387
void UTPServer::reset(const utp::Header* hdr)
389
Connection* c = find(hdr->connection_id);
396
Connection* UTPServer::find(quint16 conn_id)
398
return connections.find(conn_id);
401
void UTPServer::clearDeadConnections()
403
QMutexLocker lock(&mutex);
404
QList<Connection*>::iterator i = dead_connections.begin();
405
while (i != dead_connections.end())
407
Connection* conn = *i;
408
if (conn->connectionState() == CS_CLOSED)
410
connections.erase(conn->receiveConnectionID());
412
i = dead_connections.erase(i);
419
void UTPServer::attach(UTPSocket* socket, Connection* conn)
421
QMutexLocker lock(&mutex);
422
alive_connections.insert(conn,socket);
425
void UTPServer::detach(UTPSocket* socket, Connection* conn)
427
QMutexLocker lock(&mutex);
428
UTPSocket* sock = alive_connections.find(conn);
431
// given the fact that the socket is gone, we can close it
436
catch (Connection::TransmissionError)
439
alive_connections.erase(conn);
440
dead_connections.append(conn);
429
d->connections.remove(recv_conn_id);
430
return Connection::WPtr();
444
434
void UTPServer::stop()
457
// Cleanup all connections
458
QList<UTPSocket*> sockets;
459
bt::PtrMap<Connection*,UTPSocket>::iterator i = alive_connections.begin();
460
while (i != alive_connections.end())
462
sockets.append(i->second);
466
foreach (UTPSocket* s,sockets)
469
alive_connections.clear();
470
clearDeadConnections();
472
connections.setAutoDelete(true);
474
connections.setAutoDelete(false);
482
Globals::instance().getPortList().removePort(port,net::UDP);
487
439
void UTPServer::start()
491
utp_thread = new UTPServerThread(this);
443
d->utp_thread = new UTPServerThread(this);
444
foreach (net::ServerSocket::Ptr sock,d->sockets)
445
sock->moveToThread(d->utp_thread);
446
d->utp_thread->start();
496
void UTPServer::timerEvent(QTimerEvent* event)
498
if (event->timerId() == timer.timerId())
501
QObject::timerEvent(event);
504
void UTPServer::wakeUpPollPipes()
506
bool restart_timer = false;
507
QMutexLocker lock(&mutex);
508
for (PollPipePairItr p = poll_pipes.begin();p != poll_pipes.end();p++)
510
PollPipePair* pp = p->second;
511
if (pp->read_pipe->polling())
512
p->second->testRead(connections.begin(),connections.end());
513
if (pp->write_pipe->polling())
514
p->second->testWrite(connections.begin(),connections.end());
516
restart_timer = restart_timer || pp->read_pipe->polling() || pp->write_pipe->polling();
520
timer.start(10,this);
524
void UTPServer::preparePolling(net::Poll* p, net::Poll::Mode mode,Connection* conn)
526
QMutexLocker lock(&mutex);
527
PollPipePair* pair = poll_pipes.find(p);
450
void UTPServer::preparePolling(net::Poll* p, net::Poll::Mode mode, utp::Connection::Ptr conn)
452
QMutexLocker lock(&d->mutex);
453
PollPipePair* pair = d->poll_pipes.find(p);
530
456
pair = new PollPipePair();
531
poll_pipes.insert(p,pair);
457
d->poll_pipes.insert(p,pair);
534
460
if (mode == net::Poll::INPUT)
543
469
pair->write_pipe->wakeUp();
544
470
pair->write_pipe->prepare(p,conn->receiveConnectionID(),pair->write_pipe);
547
// Use thread safe event mechanism to start timer
548
if (!timer.isActive())
549
QCoreApplication::postEvent(this,new QEvent((QEvent::Type)start_timer_event_type));
552
void UTPServer::customEvent(QEvent* event)
554
if (event->type() == start_timer_event_type)
556
timer.start(10,this);
559
else if (event->type() == start_write_notifier_event_type)
561
write_notifier->setEnabled(true);
567
void UTPServer::onAccepted(Connection* conn)
571
UTPSocket* utps = new UTPSocket(conn);
572
mse::StreamSocket* ss = new mse::StreamSocket(utps);
574
QMutexLocker lock(&pending_mutex);
577
handlePendingConnectionsDelayed();
581
UTPServer::PollPipePair::PollPipePair()
474
void UTPServer::stateChanged(utp::Connection::Ptr conn, bool readable, bool writeable)
476
d->wakeUpPollPipes(conn,readable,writeable);
479
///////////////////////////////////////////////////////
481
PollPipePair::PollPipePair()
582
482
: read_pipe(new PollPipe(net::Poll::INPUT)),
583
483
write_pipe(new PollPipe(net::Poll::OUTPUT))
588
void UTPServer::PollPipePair::testRead(utp::UTPServer::ConItr b, utp::UTPServer::ConItr e)
488
bool PollPipePair::testRead(utp::ConItr b, utp::ConItr e)
590
for (utp::UTPServer::ConItr i = b;i != e;i++)
490
for (utp::ConItr i = b;i != e;i++)
592
492
if (read_pipe->readyToWakeUp(i->second))
594
494
read_pipe->wakeUp();
600
void UTPServer::PollPipePair::testWrite(utp::UTPServer::ConItr b, utp::UTPServer::ConItr e)
502
bool PollPipePair::testWrite(utp::ConItr b, utp::ConItr e)
602
for (utp::UTPServer::ConItr i = b;i != e;i++)
504
for (utp::ConItr i = b;i != e;i++)
604
506
if (write_pipe->readyToWakeUp(i->second))
606
508
write_pipe->wakeUp();
516
void UTPServer::setCreateSockets(bool on)
518
d->create_sockets = on;
521
Connection::WPtr UTPServer::acceptedConnection()
523
if (d->last_accepted.isEmpty())
524
return Connection::WPtr();
526
return d->last_accepted.takeFirst();
529
void UTPServer::closed(Connection::Ptr conn)
532
QTimer::singleShot(0,this,SLOT(cleanup()));
612
535
void UTPServer::cleanup()
614
clearDeadConnections();
537
QMutexLocker lock(&d->mutex);
538
QMap<quint16,Connection::Ptr>::iterator i = d->connections.begin();
539
while (i != d->connections.end())
541
if (i.value()->connectionState() == CS_CLOSED)
542
i = d->connections.erase(i);
548
int UTPServer::scheduleTimer(Connection::Ptr conn, Uint32 timeout)
550
int timer_id = startTimer(timeout);
551
d->active_timers.insert(timer_id, Connection::WPtr(conn));
555
void UTPServer::cancelTimer(int timer_id)
557
QMap<int,Connection::WPtr>::iterator i = d->active_timers.find(timer_id);
558
if (i != d->active_timers.end())
561
d->active_timers.erase(i);
566
void UTPServer::timerEvent(QTimerEvent* ev)
568
int timer_id = ev->timerId();
571
QMap<int,Connection::WPtr>::iterator i = d->active_timers.find(timer_id);
572
if (i != d->active_timers.end())
574
Connection::Ptr conn = i.value().toStrongRef();
575
d->active_timers.erase(i);
577
conn->handleTimeout();
b'\\ No newline at end of file'