~ubuntu-branches/ubuntu/lucid/mythtv/lucid

« back to all changes in this revision

Viewing changes to libs/libmythdb/mythsocketthread.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Mario Limonciello
  • Date: 2009-09-08 23:08:37 UTC
  • mfrom: (1.1.32 upstream)
  • Revision ID: james.westby@ubuntu.com-20090908230837-zrm2j6wutp76hwso
Tags: 0.22.0~trunk21742-0ubuntu1
* New upstream checkout (21742)
  - Fixes FTBFS on PPC. See changeset 21571 for more details.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// ANSI C
 
2
#include <cstdlib>
 
3
 
 
4
// C++
 
5
#include <algorithm> // for min/max
 
6
using namespace std;
 
7
 
 
8
// POSIX
 
9
#ifndef USING_MINGW
 
10
#include <sys/select.h> // for select
 
11
#endif
 
12
#include <sys/types.h>  // for fnctl
 
13
#include <unistd.h>     // for fnctl & other
 
14
#include <fcntl.h>      // for fnctl
 
15
#include <errno.h>      // for checking errno
 
16
 
 
17
// Microsoft
 
18
#ifdef USING_MINGW
 
19
#include <winsock2.h> // for select & fd_set
 
20
#endif
 
21
#ifndef O_NONBLOCK
 
22
#define O_NONBLOCK 0 /* not actually supported in MINGW */
 
23
#endif
 
24
 
 
25
// Qt
 
26
#include <QTime>
 
27
 
 
28
// MythTV
 
29
#include "mythsocketthread.h"
 
30
#include "mythsocket.h"
 
31
#include "mythverbose.h"
 
32
 
 
33
#define SLOC(a) QString("MythSocketThread(sock 0x%1:%2): ")\
 
34
    .arg((quint64)a, 0, 16).arg(a->socket())
 
35
 
 
36
static void setup_pipe(int mypipe[2], long flags[2]);
 
37
 
 
38
const uint MythSocketThread::kShortWait = 100;
 
39
 
 
40
MythSocketThread::MythSocketThread()
 
41
    : QThread(), m_readyread_run(false)
 
42
{
 
43
    for (int i = 0; i < 2; i++)
 
44
    {
 
45
        m_readyread_pipe[i] = -1;
 
46
        m_readyread_pipe_flags[i] = 0;
 
47
    }
 
48
}
 
49
 
 
50
void ShutdownRRT(void)
 
51
{
 
52
    MythSocket::s_readyread_thread->ShutdownReadyReadThread();
 
53
}
 
54
 
 
55
void MythSocketThread::ShutdownReadyReadThread(void)
 
56
{
 
57
    {
 
58
        QMutexLocker locker(&m_readyread_lock);
 
59
        m_readyread_run = false;
 
60
    }
 
61
 
 
62
    WakeReadyReadThread();
 
63
 
 
64
    wait(); // waits for thread to exit
 
65
 
 
66
    CloseReadyReadPipe();
 
67
}
 
68
 
 
69
void MythSocketThread::CloseReadyReadPipe(void) const
 
70
{
 
71
    for (uint i = 0; i < 2; i++)
 
72
    {
 
73
        if (m_readyread_pipe[i] >= 0)
 
74
        {
 
75
            ::close(m_readyread_pipe[i]);
 
76
            m_readyread_pipe[i] = -1;
 
77
            m_readyread_pipe_flags[i] = 0;
 
78
        }
 
79
    }
 
80
}
 
81
 
 
82
void MythSocketThread::StartReadyReadThread(void)
 
83
{
 
84
    QMutexLocker locker(&m_readyread_lock);
 
85
    if (!m_readyread_run)
 
86
    {
 
87
        atexit(ShutdownRRT);
 
88
        setup_pipe(m_readyread_pipe, m_readyread_pipe_flags);
 
89
        m_readyread_run = true;
 
90
        start();
 
91
        m_readyread_started_wait.wait(&m_readyread_lock);
 
92
    }
 
93
}
 
94
 
 
95
void MythSocketThread::AddToReadyRead(MythSocket *sock)
 
96
{
 
97
    if (sock->socket() == -1)
 
98
    {
 
99
        VERBOSE(VB_SOCKET, SLOC(sock) +
 
100
                "attempted to insert invalid socket to ReadyRead");
 
101
        return;
 
102
    }
 
103
    StartReadyReadThread();
 
104
 
 
105
    sock->UpRef();
 
106
 
 
107
    {
 
108
        QMutexLocker locker(&m_readyread_lock);
 
109
        m_readyread_addlist.push_back(sock);
 
110
    }
 
111
 
 
112
    WakeReadyReadThread();
 
113
}
 
114
 
 
115
void MythSocketThread::RemoveFromReadyRead(MythSocket *sock)
 
116
{
 
117
    {
 
118
        QMutexLocker locker(&m_readyread_lock);
 
119
        m_readyread_dellist.push_back(sock);
 
120
    }
 
121
    WakeReadyReadThread();
 
122
}
 
123
 
 
124
void MythSocketThread::WakeReadyReadThread(void) const
 
125
{
 
126
    if (!isRunning())
 
127
        return;
 
128
 
 
129
    QMutexLocker locker(&m_readyread_lock);
 
130
    m_readyread_wait.wakeAll();
 
131
 
 
132
    if (m_readyread_pipe[1] < 0)
 
133
        return;
 
134
 
 
135
    char buf[1] = { '0' };
 
136
    ssize_t wret = 0;
 
137
    while (wret <= 0)
 
138
    {
 
139
        wret = ::write(m_readyread_pipe[1], &buf, 1);
 
140
        if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
 
141
        {
 
142
            VERBOSE(VB_IMPORTANT, "MythSocketThread, Error: "
 
143
                    "Failed to write to readyread pipe, closing pipe.");
 
144
 
 
145
            // Closing the pipe will cause the run loop's select to exit.
 
146
            // Then the next time through the loop we should fallback to
 
147
            // using the code for platforms that don't support pipes..
 
148
            CloseReadyReadPipe();
 
149
            break;
 
150
        }
 
151
    }
 
152
}
 
153
 
 
154
void MythSocketThread::ReadyToBeRead(MythSocket *sock)
 
155
{
 
156
    VERBOSE(VB_SOCKET, SLOC(sock) + "socket is readable");
 
157
    int bytesAvail = sock->bytesAvailable();
 
158
    
 
159
    if (bytesAvail == 0)
 
160
    {
 
161
        VERBOSE(VB_SOCKET, SLOC(sock) + "socket closed");
 
162
        sock->close();
 
163
 
 
164
        if (sock->m_cb)
 
165
        {
 
166
            VERBOSE(VB_SOCKET, SLOC(sock) + "calling m_cb->connectionClosed()");
 
167
            sock->m_cb->connectionClosed(sock);
 
168
        }
 
169
    }
 
170
    else if (sock->m_cb)
 
171
    {
 
172
        sock->m_notifyread = true;
 
173
        VERBOSE(VB_SOCKET, SLOC(sock) + "calling m_cb->readyRead()");
 
174
        sock->m_cb->readyRead(sock);
 
175
    }
 
176
}
 
177
 
 
178
void MythSocketThread::ProcessAddRemoveQueues(void)
 
179
{
 
180
    while (!m_readyread_dellist.empty())
 
181
    {
 
182
        MythSocket *sock = m_readyread_dellist.front();
 
183
        m_readyread_dellist.pop_front();
 
184
 
 
185
        if (m_readyread_list.removeAll(sock))
 
186
            m_readyread_downref_list.push_back(sock);
 
187
    }
 
188
 
 
189
    while (!m_readyread_addlist.empty())
 
190
    {
 
191
        MythSocket *sock = m_readyread_addlist.front();
 
192
        m_readyread_addlist.pop_front();
 
193
        m_readyread_list.push_back(sock);
 
194
    }
 
195
}
 
196
 
 
197
void MythSocketThread::run(void)
 
198
{
 
199
    VERBOSE(VB_SOCKET, "MythSocketThread: readyread thread start");
 
200
 
 
201
    QMutexLocker locker(&m_readyread_lock);
 
202
    m_readyread_started_wait.wakeAll();
 
203
    while (m_readyread_run)
 
204
    {
 
205
        VERBOSE(VB_SOCKET|VB_EXTRA, "ProcessAddRemoveQueues");
 
206
 
 
207
        ProcessAddRemoveQueues();
 
208
 
 
209
        VERBOSE(VB_SOCKET|VB_EXTRA, "Construct FD_SET");
 
210
 
 
211
        // construct FD_SET for all connected and unlocked sockets...
 
212
        int maxfd = -1;
 
213
        fd_set rfds;
 
214
        FD_ZERO(&rfds);
 
215
 
 
216
        QList<MythSocket*>::const_iterator it = m_readyread_list.begin();
 
217
        for (; it != m_readyread_list.end(); ++it)
 
218
        {
 
219
            if (!(*it)->TryLock(false))
 
220
                continue;
 
221
 
 
222
            if ((*it)->state() == MythSocket::Connected &&
 
223
                !(*it)->m_notifyread)
 
224
            {
 
225
                FD_SET((*it)->socket(), &rfds);
 
226
                maxfd = std::max((*it)->socket(), maxfd);
 
227
            }
 
228
            (*it)->Unlock(false);
 
229
        }
 
230
 
 
231
        // There are no unlocked sockets, wait for event before we continue..
 
232
        if (maxfd < 0)
 
233
        {
 
234
            VERBOSE(VB_SOCKET|VB_EXTRA, "Empty FD_SET, sleeping");
 
235
            if (m_readyread_wait.wait(&m_readyread_lock))
 
236
                VERBOSE(VB_SOCKET|VB_EXTRA, "Empty FD_SET, woken up");
 
237
            else
 
238
                VERBOSE(VB_SOCKET|VB_EXTRA, "Empty FD_SET, timed out");
 
239
            continue;
 
240
        }
 
241
 
 
242
        int rval = 0;
 
243
 
 
244
        if (m_readyread_pipe[0] >= 0)
 
245
        {
 
246
            // Clear out any pending pipe reads, we have already taken care of
 
247
            // this event above under the m_readyread_lock.
 
248
            char dummy[128];
 
249
            if (m_readyread_pipe_flags[0] & O_NONBLOCK)
 
250
            {
 
251
                rval = ::read(m_readyread_pipe[0], dummy, 128);
 
252
                FD_SET(m_readyread_pipe[0], &rfds);
 
253
                maxfd = std::max(m_readyread_pipe[0], maxfd);
 
254
            }
 
255
 
 
256
            // also exit select on exceptions on same descriptors
 
257
            fd_set efds;
 
258
            memcpy(&efds, &rfds, sizeof(fd_set));
 
259
 
 
260
            // The select waits forever for data, so if we need to process
 
261
            // anything else we need to write to m_readyread_pipe[1]..
 
262
            // We unlock the ready read lock, because we don't need it
 
263
            // and this will allow WakeReadyReadThread() to run..
 
264
            m_readyread_lock.unlock();
 
265
            VERBOSE(VB_SOCKET|VB_EXTRA, "Waiting on select..");
 
266
            rval = select(maxfd + 1, &rfds, NULL, &efds, NULL);
 
267
            VERBOSE(VB_SOCKET|VB_EXTRA, "Got data on select");
 
268
            m_readyread_lock.lock();
 
269
 
 
270
            if (rval > 0 && FD_ISSET(m_readyread_pipe[0], &rfds))
 
271
            {
 
272
                int ret = ::read(m_readyread_pipe[0], dummy, 128);
 
273
                if (ret < 0)
 
274
                {
 
275
                    VERBOSE(VB_SOCKET|VB_EXTRA,
 
276
                            "Strange.. failed to read event pipe");
 
277
                }
 
278
            }
 
279
        }
 
280
        else
 
281
        {
 
282
            VERBOSE(VB_SOCKET|VB_EXTRA, "Waiting on select.. (no pipe)");
 
283
 
 
284
            // also exit select on exceptions on same descriptors
 
285
            fd_set efds;
 
286
            memcpy(&efds, &rfds, sizeof(fd_set));
 
287
 
 
288
            // Unfortunately, select on a pipe is not supported on all
 
289
            // platforms. So we fallback to a loop that instead times out
 
290
            // of select and checks for wakeAll event.
 
291
            while (!rval)
 
292
            {
 
293
                struct timeval timeout;
 
294
                timeout.tv_sec = 0;
 
295
                timeout.tv_usec = kShortWait * 1000;
 
296
                rval = select(maxfd + 1, &rfds, NULL, &efds, &timeout);
 
297
                if (!rval)
 
298
                    m_readyread_wait.wait(&m_readyread_lock, kShortWait);
 
299
            }
 
300
 
 
301
            VERBOSE(VB_SOCKET|VB_EXTRA, "Got data on select (no pipe)");
 
302
        }
 
303
 
 
304
        if (rval <= 0)
 
305
        {
 
306
            if (rval == 0)
 
307
            {
 
308
                // Note: This should never occur when using pipes. When there
 
309
                // is no error there should be data in at least one fd..
 
310
                VERBOSE(VB_SOCKET|VB_EXTRA, "MythSocketThread: select timeout");
 
311
            }
 
312
            else
 
313
                VERBOSE(VB_SOCKET,
 
314
                        "MythSocketThread: select returned error" + ENO);
 
315
 
 
316
            m_readyread_wait.wait(&m_readyread_lock, kShortWait);
 
317
            continue;
 
318
        }
 
319
        
 
320
        // ReadyToBeRead allows calls back into the socket so we need
 
321
        // to release the lock for a little while.
 
322
        // since only this loop updates m_readyread_list this is safe.
 
323
        m_readyread_lock.unlock();
 
324
 
 
325
        // Actually read some data! This is a form of co-operative
 
326
        // multitasking so the ready read handlers should be quick..
 
327
 
 
328
        uint downref_tm = 0;
 
329
        if (!m_readyread_downref_list.empty())
 
330
        {
 
331
            VERBOSE(VB_SOCKET|VB_EXTRA, "Deleting stale sockets");
 
332
 
 
333
            QTime tm = QTime::currentTime();
 
334
            for (it = m_readyread_downref_list.begin();
 
335
                 it != m_readyread_downref_list.end(); ++it)
 
336
            {
 
337
                (*it)->DownRef();
 
338
            }
 
339
            m_readyread_downref_list.clear();
 
340
            downref_tm = tm.elapsed();
 
341
        }
 
342
 
 
343
        VERBOSE(VB_SOCKET|VB_EXTRA, "Processing ready reads");
 
344
 
 
345
        QMap<uint,uint> timers;
 
346
        QTime tm = QTime::currentTime();
 
347
        it = m_readyread_list.begin();
 
348
 
 
349
        for (; it != m_readyread_list.end() && m_readyread_run; ++it)
 
350
        {
 
351
            if (!(*it)->TryLock(false))
 
352
                continue;
 
353
            
 
354
            int socket = (*it)->socket();
 
355
 
 
356
            if (socket >= 0 &&
 
357
                (*it)->state() == MythSocket::Connected &&
 
358
                FD_ISSET(socket, &rfds))
 
359
            {
 
360
                QTime rrtm = QTime::currentTime();
 
361
                ReadyToBeRead(*it);
 
362
                timers[socket] = rrtm.elapsed();
 
363
            }
 
364
            (*it)->Unlock(false);
 
365
        }
 
366
 
 
367
        if ((print_verbose_messages & (VB_SOCKET|VB_EXTRA)) ==
 
368
            (VB_SOCKET|VB_EXTRA))
 
369
        {
 
370
            QString rep = QString("Total read time: %1ms, on sockets")
 
371
                .arg(tm.elapsed());
 
372
            QMap<uint,uint>::const_iterator it = timers.begin();
 
373
            for (; it != timers.end(); ++it)
 
374
                rep += QString(" {%1,%2ms}").arg(it.key()).arg(*it);
 
375
            if (downref_tm)
 
376
                rep += QString(" {downref, %1ms}").arg(downref_tm);
 
377
 
 
378
            VERBOSE(VB_SOCKET|VB_EXTRA, QString("MythSocketThread: ") + rep);
 
379
        }
 
380
 
 
381
        m_readyread_lock.lock();
 
382
        VERBOSE(VB_SOCKET|VB_EXTRA, "Reacquired ready read lock");
 
383
    }
 
384
 
 
385
    VERBOSE(VB_SOCKET, "MythSocketThread: readyread thread exit");
 
386
}
 
387
 
 
388
#ifdef USING_MINGW
 
389
static void setup_pipe(int[2], long[2]) {}
 
390
#else
 
391
static void setup_pipe(int mypipe[2], long myflags[2])
 
392
{
 
393
    int pipe_ret = pipe(mypipe);
 
394
    if (pipe_ret < 0)
 
395
    {
 
396
        VERBOSE(VB_IMPORTANT, "Failed to open readyread pipes" + ENO);
 
397
        mypipe[0] = mypipe[1] = -1;
 
398
    }
 
399
    else
 
400
    {
 
401
        errno = 0;
 
402
        long flags = fcntl(mypipe[0], F_GETFL);
 
403
        if (0 == errno)
 
404
        {
 
405
            int ret = fcntl(mypipe[0], F_SETFL, flags|O_NONBLOCK);
 
406
            if (ret < 0)
 
407
                VERBOSE(VB_IMPORTANT, QString("Set pipe flags error")+ENO);
 
408
        }
 
409
        else
 
410
        {
 
411
            VERBOSE(VB_IMPORTANT, QString("Get pipe flags error") + ENO);
 
412
        }
 
413
 
 
414
        for (uint i = 0; i < 2; i++)
 
415
        {
 
416
            errno = 0;
 
417
            flags = fcntl(mypipe[i], F_GETFL);
 
418
            if (0 == errno)
 
419
                myflags[i] = flags;
 
420
        }
 
421
    }
 
422
}
 
423
#endif