5
#include <algorithm> // for min/max
10
#include <sys/select.h> // for select
12
#include <sys/types.h> // for fnctl
13
#include <unistd.h> // for fnctl & other
14
#include <fcntl.h> // for fnctl
15
#include <errno.h> // for checking errno
19
#include <winsock2.h> // for select & fd_set
22
#define O_NONBLOCK 0 /* not actually supported in MINGW */
29
#include "mythsocketthread.h"
30
#include "mythsocket.h"
31
#include "mythverbose.h"
33
#define SLOC(a) QString("MythSocketThread(sock 0x%1:%2): ")\
34
.arg((quint64)a, 0, 16).arg(a->socket())
36
static void setup_pipe(int mypipe[2], long flags[2]);
38
const uint MythSocketThread::kShortWait = 100;
40
MythSocketThread::MythSocketThread()
41
: QThread(), m_readyread_run(false)
43
for (int i = 0; i < 2; i++)
45
m_readyread_pipe[i] = -1;
46
m_readyread_pipe_flags[i] = 0;
50
void ShutdownRRT(void)
52
MythSocket::s_readyread_thread->ShutdownReadyReadThread();
55
void MythSocketThread::ShutdownReadyReadThread(void)
58
QMutexLocker locker(&m_readyread_lock);
59
m_readyread_run = false;
62
WakeReadyReadThread();
64
wait(); // waits for thread to exit
69
void MythSocketThread::CloseReadyReadPipe(void) const
71
for (uint i = 0; i < 2; i++)
73
if (m_readyread_pipe[i] >= 0)
75
::close(m_readyread_pipe[i]);
76
m_readyread_pipe[i] = -1;
77
m_readyread_pipe_flags[i] = 0;
82
void MythSocketThread::StartReadyReadThread(void)
84
QMutexLocker locker(&m_readyread_lock);
88
setup_pipe(m_readyread_pipe, m_readyread_pipe_flags);
89
m_readyread_run = true;
91
m_readyread_started_wait.wait(&m_readyread_lock);
95
void MythSocketThread::AddToReadyRead(MythSocket *sock)
97
if (sock->socket() == -1)
99
VERBOSE(VB_SOCKET, SLOC(sock) +
100
"attempted to insert invalid socket to ReadyRead");
103
StartReadyReadThread();
108
QMutexLocker locker(&m_readyread_lock);
109
m_readyread_addlist.push_back(sock);
112
WakeReadyReadThread();
115
void MythSocketThread::RemoveFromReadyRead(MythSocket *sock)
118
QMutexLocker locker(&m_readyread_lock);
119
m_readyread_dellist.push_back(sock);
121
WakeReadyReadThread();
124
void MythSocketThread::WakeReadyReadThread(void) const
129
QMutexLocker locker(&m_readyread_lock);
130
m_readyread_wait.wakeAll();
132
if (m_readyread_pipe[1] < 0)
135
char buf[1] = { '0' };
139
wret = ::write(m_readyread_pipe[1], &buf, 1);
140
if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
142
VERBOSE(VB_IMPORTANT, "MythSocketThread, Error: "
143
"Failed to write to readyread pipe, closing pipe.");
145
// Closing the pipe will cause the run loop's select to exit.
146
// Then the next time through the loop we should fallback to
147
// using the code for platforms that don't support pipes..
148
CloseReadyReadPipe();
154
void MythSocketThread::ReadyToBeRead(MythSocket *sock)
156
VERBOSE(VB_SOCKET, SLOC(sock) + "socket is readable");
157
int bytesAvail = sock->bytesAvailable();
161
VERBOSE(VB_SOCKET, SLOC(sock) + "socket closed");
166
VERBOSE(VB_SOCKET, SLOC(sock) + "calling m_cb->connectionClosed()");
167
sock->m_cb->connectionClosed(sock);
172
sock->m_notifyread = true;
173
VERBOSE(VB_SOCKET, SLOC(sock) + "calling m_cb->readyRead()");
174
sock->m_cb->readyRead(sock);
178
void MythSocketThread::ProcessAddRemoveQueues(void)
180
while (!m_readyread_dellist.empty())
182
MythSocket *sock = m_readyread_dellist.front();
183
m_readyread_dellist.pop_front();
185
if (m_readyread_list.removeAll(sock))
186
m_readyread_downref_list.push_back(sock);
189
while (!m_readyread_addlist.empty())
191
MythSocket *sock = m_readyread_addlist.front();
192
m_readyread_addlist.pop_front();
193
m_readyread_list.push_back(sock);
197
void MythSocketThread::run(void)
199
VERBOSE(VB_SOCKET, "MythSocketThread: readyread thread start");
201
QMutexLocker locker(&m_readyread_lock);
202
m_readyread_started_wait.wakeAll();
203
while (m_readyread_run)
205
VERBOSE(VB_SOCKET|VB_EXTRA, "ProcessAddRemoveQueues");
207
ProcessAddRemoveQueues();
209
VERBOSE(VB_SOCKET|VB_EXTRA, "Construct FD_SET");
211
// construct FD_SET for all connected and unlocked sockets...
216
QList<MythSocket*>::const_iterator it = m_readyread_list.begin();
217
for (; it != m_readyread_list.end(); ++it)
219
if (!(*it)->TryLock(false))
222
if ((*it)->state() == MythSocket::Connected &&
223
!(*it)->m_notifyread)
225
FD_SET((*it)->socket(), &rfds);
226
maxfd = std::max((*it)->socket(), maxfd);
228
(*it)->Unlock(false);
231
// There are no unlocked sockets, wait for event before we continue..
234
VERBOSE(VB_SOCKET|VB_EXTRA, "Empty FD_SET, sleeping");
235
if (m_readyread_wait.wait(&m_readyread_lock))
236
VERBOSE(VB_SOCKET|VB_EXTRA, "Empty FD_SET, woken up");
238
VERBOSE(VB_SOCKET|VB_EXTRA, "Empty FD_SET, timed out");
244
if (m_readyread_pipe[0] >= 0)
246
// Clear out any pending pipe reads, we have already taken care of
247
// this event above under the m_readyread_lock.
249
if (m_readyread_pipe_flags[0] & O_NONBLOCK)
251
rval = ::read(m_readyread_pipe[0], dummy, 128);
252
FD_SET(m_readyread_pipe[0], &rfds);
253
maxfd = std::max(m_readyread_pipe[0], maxfd);
256
// also exit select on exceptions on same descriptors
258
memcpy(&efds, &rfds, sizeof(fd_set));
260
// The select waits forever for data, so if we need to process
261
// anything else we need to write to m_readyread_pipe[1]..
262
// We unlock the ready read lock, because we don't need it
263
// and this will allow WakeReadyReadThread() to run..
264
m_readyread_lock.unlock();
265
VERBOSE(VB_SOCKET|VB_EXTRA, "Waiting on select..");
266
rval = select(maxfd + 1, &rfds, NULL, &efds, NULL);
267
VERBOSE(VB_SOCKET|VB_EXTRA, "Got data on select");
268
m_readyread_lock.lock();
270
if (rval > 0 && FD_ISSET(m_readyread_pipe[0], &rfds))
272
int ret = ::read(m_readyread_pipe[0], dummy, 128);
275
VERBOSE(VB_SOCKET|VB_EXTRA,
276
"Strange.. failed to read event pipe");
282
VERBOSE(VB_SOCKET|VB_EXTRA, "Waiting on select.. (no pipe)");
284
// also exit select on exceptions on same descriptors
286
memcpy(&efds, &rfds, sizeof(fd_set));
288
// Unfortunately, select on a pipe is not supported on all
289
// platforms. So we fallback to a loop that instead times out
290
// of select and checks for wakeAll event.
293
struct timeval timeout;
295
timeout.tv_usec = kShortWait * 1000;
296
rval = select(maxfd + 1, &rfds, NULL, &efds, &timeout);
298
m_readyread_wait.wait(&m_readyread_lock, kShortWait);
301
VERBOSE(VB_SOCKET|VB_EXTRA, "Got data on select (no pipe)");
308
// Note: This should never occur when using pipes. When there
309
// is no error there should be data in at least one fd..
310
VERBOSE(VB_SOCKET|VB_EXTRA, "MythSocketThread: select timeout");
314
"MythSocketThread: select returned error" + ENO);
316
m_readyread_wait.wait(&m_readyread_lock, kShortWait);
320
// ReadyToBeRead allows calls back into the socket so we need
321
// to release the lock for a little while.
322
// since only this loop updates m_readyread_list this is safe.
323
m_readyread_lock.unlock();
325
// Actually read some data! This is a form of co-operative
326
// multitasking so the ready read handlers should be quick..
329
if (!m_readyread_downref_list.empty())
331
VERBOSE(VB_SOCKET|VB_EXTRA, "Deleting stale sockets");
333
QTime tm = QTime::currentTime();
334
for (it = m_readyread_downref_list.begin();
335
it != m_readyread_downref_list.end(); ++it)
339
m_readyread_downref_list.clear();
340
downref_tm = tm.elapsed();
343
VERBOSE(VB_SOCKET|VB_EXTRA, "Processing ready reads");
345
QMap<uint,uint> timers;
346
QTime tm = QTime::currentTime();
347
it = m_readyread_list.begin();
349
for (; it != m_readyread_list.end() && m_readyread_run; ++it)
351
if (!(*it)->TryLock(false))
354
int socket = (*it)->socket();
357
(*it)->state() == MythSocket::Connected &&
358
FD_ISSET(socket, &rfds))
360
QTime rrtm = QTime::currentTime();
362
timers[socket] = rrtm.elapsed();
364
(*it)->Unlock(false);
367
if ((print_verbose_messages & (VB_SOCKET|VB_EXTRA)) ==
368
(VB_SOCKET|VB_EXTRA))
370
QString rep = QString("Total read time: %1ms, on sockets")
372
QMap<uint,uint>::const_iterator it = timers.begin();
373
for (; it != timers.end(); ++it)
374
rep += QString(" {%1,%2ms}").arg(it.key()).arg(*it);
376
rep += QString(" {downref, %1ms}").arg(downref_tm);
378
VERBOSE(VB_SOCKET|VB_EXTRA, QString("MythSocketThread: ") + rep);
381
m_readyread_lock.lock();
382
VERBOSE(VB_SOCKET|VB_EXTRA, "Reacquired ready read lock");
385
VERBOSE(VB_SOCKET, "MythSocketThread: readyread thread exit");
389
static void setup_pipe(int[2], long[2]) {}
391
static void setup_pipe(int mypipe[2], long myflags[2])
393
int pipe_ret = pipe(mypipe);
396
VERBOSE(VB_IMPORTANT, "Failed to open readyread pipes" + ENO);
397
mypipe[0] = mypipe[1] = -1;
402
long flags = fcntl(mypipe[0], F_GETFL);
405
int ret = fcntl(mypipe[0], F_SETFL, flags|O_NONBLOCK);
407
VERBOSE(VB_IMPORTANT, QString("Set pipe flags error")+ENO);
411
VERBOSE(VB_IMPORTANT, QString("Get pipe flags error") + ENO);
414
for (uint i = 0; i < 2; i++)
417
flags = fcntl(mypipe[i], F_GETFL);