~ubuntu-branches/ubuntu/lucid/mythtv/lucid

« back to all changes in this revision

Viewing changes to libs/libmythdb/mythsocket.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Mario Limonciello
  • Date: 2009-09-08 23:08:37 UTC
  • mfrom: (1.1.32 upstream)
  • Revision ID: james.westby@ubuntu.com-20090908230837-zrm2j6wutp76hwso
Tags: 0.22.0~trunk21742-0ubuntu1
* New upstream checkout (21742)
  - Fixes FTBFS on PPC. See changeset 21571 for more details.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
// ANSI C
2
2
#include <cstdlib>
3
3
#include <cassert>
 
4
#include <cerrno>
4
5
 
5
6
// POSIX
 
7
#ifndef USING_MINGW
 
8
#include <sys/select.h> // for select
 
9
#endif
6
10
#include <unistd.h>
7
 
#include <errno.h>
 
11
 
 
12
// Microsoft
 
13
#ifdef USING_MINGW
 
14
#include <winsock2.h> // for select & fd_set
 
15
#endif
8
16
 
9
17
// Qt
10
18
#include <QApplication>
11
 
#include <QString>
12
19
#include <QByteArray>
13
 
#include <QMutex>
14
20
#include <QHostInfo>
 
21
#include <QMap>
15
22
 
16
23
// MythTV
 
24
#include "mythsocketthread.h"
 
25
#include "mythsocket.h"
17
26
#include "mythtimer.h"
18
27
#include "mythsocket.h"
19
28
#include "mythverbose.h"
20
 
#include "compat.h"
21
 
 
22
 
#ifdef USING_MINGW
23
 
#include <winsock2.h>
24
 
#else
25
 
#include <sys/select.h>
26
 
#endif
27
29
 
28
30
#define SLOC(a) QString("MythSocket(%1:%2): ")\
29
31
                    .arg((quint64)a, 0, 16).arg(a->socket())
33
35
const uint MythSocket::kShortTimeout = 7000;
34
36
const uint MythSocket::kLongTimeout  = 30000;
35
37
 
36
 
MythSocketThread MythSocket::m_readyread_thread;
37
 
 
38
 
MythSocketThread::MythSocketThread()
39
 
    : QThread(), m_readyread_run(false)
40
 
{
41
 
#if !defined(USING_MINGW)
42
 
    memset(m_readyread_pipe, 0, sizeof(m_readyread_pipe));
43
 
#endif
44
 
}
 
38
MythSocketThread *MythSocket::s_readyread_thread = new MythSocketThread();
45
39
 
46
40
MythSocket::MythSocket(int socket, MythSocketCBs *cb)
47
41
    : MSocketDevice(MSocketDevice::Stream),            m_cb(cb),
48
42
      m_state(Idle),         m_addr(),                 m_port(0),
49
 
      m_ref_count(0),        m_notifyread(0)
 
43
      m_ref_count(0),        m_notifyread(false)
50
44
{
51
45
    VERBOSE(VB_SOCKET, LOC + "new socket");
52
46
    if (socket > -1)
60
54
    }
61
55
 
62
56
    if (m_cb)
63
 
        m_readyread_thread.AddToReadyRead(this);
 
57
        s_readyread_thread->AddToReadyRead(this);
64
58
}
65
59
 
66
60
MythSocket::~MythSocket()
80
74
    m_cb = cb;
81
75
 
82
76
    if (m_cb)
83
 
        m_readyread_thread.AddToReadyRead(this);
 
77
        s_readyread_thread->AddToReadyRead(this);
84
78
    else
85
 
        m_readyread_thread.RemoveFromReadyRead(this);
 
79
        s_readyread_thread->RemoveFromReadyRead(this);
86
80
}
87
81
 
88
82
void MythSocket::UpRef(void)
89
83
{
90
 
    m_ref_lock.lock();
 
84
    QMutexLocker locker(&m_ref_lock);
91
85
    m_ref_count++;
92
 
    m_ref_lock.unlock();
93
86
    VERBOSE(VB_SOCKET, LOC + QString("UpRef: %1").arg(m_ref_count));
94
87
}
95
88
 
104
97
    if (m_cb && ref == 0)
105
98
    {
106
99
        m_cb = NULL;
107
 
        m_readyread_thread.RemoveFromReadyRead(this);
 
100
        s_readyread_thread->RemoveFromReadyRead(this);
108
101
        // thread will downref & delete obj
109
102
        return true;
110
103
    }
204
197
{
205
198
    setState(Idle);
206
199
    MSocketDevice::close();
 
200
    if (m_cb)
 
201
    {
 
202
        VERBOSE(VB_SOCKET, LOC + "calling m_cb->connectionClosed()");
 
203
        m_cb->connectionClosed(this);
 
204
    }
207
205
}
208
206
 
209
207
qint64 MythSocket::readBlock(char *data, quint64 len)
222
220
 
223
221
    qint64 rval = MSocketDevice::readBlock(data, len);
224
222
    if (rval == 0)
225
 
    {
226
223
        close();
227
 
        if (m_cb)
228
 
        {
229
 
            VERBOSE(VB_SOCKET, LOC + "calling m_cb->connectionClosed()");
230
 
            m_cb->connectionClosed(this);
231
 
        }
232
 
    }
 
224
 
233
225
    return rval;
234
226
}
235
227
 
255
247
    if (!isValid() || error() != MSocketDevice::NoError)
256
248
    {
257
249
        close();
258
 
        if (m_cb)
259
 
        {
260
 
            VERBOSE(VB_SOCKET, LOC + "calling m_cb->connectionClosed()");
261
 
            m_cb->connectionClosed(this);
262
 
        }
263
250
        return -1;
264
251
    }
265
252
    return rval;
313
300
        VERBOSE(VB_NETWORK, LOC + msg);
314
301
    }
315
302
 
 
303
    MythTimer timer; timer.start();
316
304
    unsigned int errorcount = 0;
317
305
    while (size > 0)
318
306
    {
319
307
        if (state() != Connected)
320
308
        {
321
309
            VERBOSE(VB_IMPORTANT, LOC +
322
 
                    "writeStringList: Error, socket went unconnected.");
 
310
                    "writeStringList: Error, socket went unconnected." +
 
311
                    QString("\n\t\t\tWe wrote %1 of %2 bytes with %3 errors")
 
312
                    .arg(written).arg(written+size).arg(errorcount));
323
313
            return false;
324
314
        }
325
315
 
339
329
        else if (temp <= 0)
340
330
        {
341
331
            errorcount++;
342
 
            if (errorcount > 5000)
 
332
            if (timer.elapsed() > 1000)
343
333
            {
344
 
                VERBOSE(VB_GENERAL, LOC +
345
 
                        "writeStringList: No data written on writeBlock");
 
334
                VERBOSE(VB_GENERAL, LOC + "writeStringList: Error, " +
 
335
                        QString("No data written on writeBlock (%1 errors)")
 
336
                        .arg(errorcount));
346
337
                return false;
347
338
            }
348
339
            usleep(1000);
349
340
        }
350
341
    }
 
342
 
 
343
    flush();
 
344
 
351
345
    return true;
352
346
}
353
347
 
486
480
            VERBOSE(VB_IMPORTANT, LOC + "readStringList: " +
487
481
                    QString("Error, timed out after %1 ms.").arg(timeoutMS));
488
482
            close();
489
 
            if (m_cb)
490
 
            {
491
 
                VERBOSE(VB_SOCKET, LOC + "calling m_cb->connectionClosed()");
492
 
                m_cb->connectionClosed(this);
493
 
            }
494
483
            return false;
495
484
        }
496
485
 
633
622
    list = str.split("[]:[]");
634
623
 
635
624
    m_notifyread = false;
636
 
    m_readyread_thread.WakeReadyReadThread();
 
625
    s_readyread_thread->WakeReadyReadThread();
637
626
    return true;
638
627
}
639
628
 
640
 
void MythSocket::Lock(void)
 
629
void MythSocket::Lock(void) const
641
630
{
642
631
    m_lock.lock();
643
 
    m_readyread_thread.WakeReadyReadThread();
644
 
}
645
 
 
646
 
void MythSocket::Unlock(void)
 
632
    s_readyread_thread->WakeReadyReadThread();
 
633
}
 
634
 
 
635
bool MythSocket::TryLock(bool wake_readyread) const
 
636
{
 
637
    if (m_lock.tryLock())
 
638
    {
 
639
        if (wake_readyread)
 
640
            s_readyread_thread->WakeReadyReadThread();
 
641
        return true;
 
642
    }
 
643
    return false;
 
644
}
 
645
 
 
646
void MythSocket::Unlock(bool wake_readyread) const
647
647
{
648
648
    m_lock.unlock();
649
 
    m_readyread_thread.WakeReadyReadThread();
 
649
    if (wake_readyread)
 
650
        s_readyread_thread->WakeReadyReadThread();
650
651
}
651
652
 
652
653
/**
707
708
        {
708
709
            VERBOSE(VB_SOCKET, LOC + "calling m_cb->connected()");
709
710
            m_cb->connected(this);
710
 
            m_readyread_thread.WakeReadyReadThread();
 
711
            s_readyread_thread->WakeReadyReadThread();
711
712
        }
712
713
    }
713
714
    else
717
718
 
718
719
    return true;
719
720
}
720
 
 
721
 
void ShutdownRRT(void)
722
 
{
723
 
    MythSocket::m_readyread_thread.ShutdownReadyReadThread();
724
 
}
725
 
 
726
 
void MythSocketThread::ShutdownReadyReadThread(void)
727
 
{
728
 
    m_readyread_run = false;
729
 
    WakeReadyReadThread();
730
 
    wait();
731
 
 
732
 
#ifdef USING_MINGW
733
 
    if (readyreadevent) {
734
 
        ::CloseHandle(readyreadevent);
735
 
        readyreadevent = NULL;
736
 
    }
737
 
#else
738
 
    ::close(m_readyread_pipe[0]);
739
 
    ::close(m_readyread_pipe[1]);
740
 
#endif
741
 
}
742
 
 
743
 
void MythSocketThread::StartReadyReadThread(void)
744
 
{
745
 
    if (m_readyread_run == false)
746
 
    {
747
 
        QMutexLocker locker(&m_readyread_lock);
748
 
        if (m_readyread_run == false)
749
 
        {
750
 
#ifdef USING_MINGW
751
 
            readyreadevent = ::CreateEvent(NULL, false, false, NULL);
752
 
            assert(readyreadevent);
753
 
#else
754
 
            int ret = pipe(m_readyread_pipe);
755
 
            (void) ret;
756
 
            assert(ret >= 0);
757
 
#endif
758
 
 
759
 
            m_readyread_run = true;
760
 
            start();
761
 
 
762
 
            atexit(ShutdownRRT);
763
 
        }
764
 
    }
765
 
}
766
 
 
767
 
void MythSocketThread::AddToReadyRead(MythSocket *sock)
768
 
{
769
 
    if (sock->socket() == -1)
770
 
    {
771
 
        VERBOSE(VB_SOCKET, SLOC(sock) + "attempted to insert invalid socket to ReadyRead");
772
 
        return;
773
 
    }
774
 
    StartReadyReadThread();
775
 
 
776
 
    sock->UpRef();
777
 
    m_readyread_lock.lock();
778
 
    m_readyread_addlist.append(sock);
779
 
    m_readyread_lock.unlock();
780
 
 
781
 
    WakeReadyReadThread();
782
 
}
783
 
 
784
 
void MythSocketThread::RemoveFromReadyRead(MythSocket *sock)
785
 
{
786
 
    m_readyread_lock.lock();
787
 
    m_readyread_dellist.append(sock);
788
 
    m_readyread_lock.unlock();
789
 
 
790
 
    WakeReadyReadThread();
791
 
}
792
 
 
793
 
void MythSocketThread::WakeReadyReadThread(void)
794
 
{
795
 
    if (!isRunning())
796
 
        return;
797
 
 
798
 
#ifdef USING_MINGW
799
 
    if (readyreadevent) ::SetEvent(readyreadevent);
800
 
#else
801
 
    if (m_readyread_pipe[1] >= 0)
802
 
    {
803
 
        char buf[1] = { '0' };
804
 
        ssize_t wret = 0;
805
 
        while (wret <= 0)
806
 
        {
807
 
            wret = ::write(m_readyread_pipe[1], &buf, 1);
808
 
            if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
809
 
            {
810
 
                VERBOSE(VB_IMPORTANT, "MythSocketThread, Error: "
811
 
                        "Irrecoverable WakeReadyReadThread event");
812
 
                break;
813
 
            }
814
 
        }
815
 
        // dtk note: add this aft dbg for correctness..
816
 
        //::flush(m_readyread_pipe[1]);
817
 
    }
818
 
#endif
819
 
}
820
 
 
821
 
void MythSocketThread::iffound(MythSocket *sock)
822
 
{
823
 
    VERBOSE(VB_SOCKET, SLOC(sock) + "socket is readable");
824
 
    if (sock->bytesAvailable() == 0)
825
 
    {
826
 
        VERBOSE(VB_SOCKET, SLOC(sock) + "socket closed");
827
 
        sock->close();
828
 
 
829
 
        if (sock->m_cb)
830
 
        {
831
 
            VERBOSE(VB_SOCKET, SLOC(sock) + "calling m_cb->connectionClosed()");
832
 
            sock->m_cb->connectionClosed(sock);
833
 
        }
834
 
    }
835
 
    else if (sock->m_cb)
836
 
    {
837
 
        sock->m_notifyread = true;
838
 
        VERBOSE(VB_SOCKET, SLOC(sock) + "calling m_cb->readyRead()");
839
 
        sock->m_cb->readyRead(sock);
840
 
    }
841
 
}
842
 
 
843
 
bool MythSocketThread::isLocked(QMutex &mutex)
844
 
{
845
 
    bool isLocked = true;
846
 
    if (mutex.tryLock())
847
 
    {
848
 
        mutex.unlock();
849
 
        isLocked = false;
850
 
    }
851
 
    return isLocked;
852
 
}
853
 
 
854
 
void MythSocketThread::run(void)
855
 
{
856
 
    VERBOSE(VB_SOCKET, "MythSocketThread: readyread thread start");
857
 
 
858
 
    fd_set rfds;
859
 
    MythSocket *sock;
860
 
    int maxfd;
861
 
    bool found;
862
 
 
863
 
    while (m_readyread_run)
864
 
    {
865
 
        m_readyread_lock.lock();
866
 
        while (m_readyread_dellist.size() > 0)
867
 
        {
868
 
            sock = m_readyread_dellist.takeFirst();
869
 
            bool del = m_readyread_list.removeAll(sock);
870
 
 
871
 
            if (del)
872
 
            {
873
 
                m_readyread_lock.unlock();
874
 
                sock->DownRef();
875
 
                m_readyread_lock.lock();
876
 
            }
877
 
        }
878
 
 
879
 
        while (m_readyread_addlist.count() > 0)
880
 
        {
881
 
            sock = m_readyread_addlist.takeFirst();
882
 
            //sock->UpRef();  Did upref in AddToReadyRead()
883
 
            m_readyread_list.append(sock);
884
 
        }
885
 
        m_readyread_lock.unlock();
886
 
 
887
 
#ifdef USING_MINGW
888
 
 
889
 
        int n = m_readyread_list.count() + 1;
890
 
        HANDLE *hEvents = new HANDLE[n];
891
 
        memset(hEvents, 0, sizeof(HANDLE) * n);
892
 
        unsigned *idx = new unsigned[n];
893
 
        n = 0;
894
 
 
895
 
        for (unsigned i = 0; i < (uint) m_readyread_list.count(); i++)
896
 
        {
897
 
            sock = m_readyread_list.at(i);
898
 
            if (sock->state() == MythSocket::Connected
899
 
                && !sock->m_notifyread && !isLocked(sock->m_lock))
900
 
            {
901
 
                HANDLE hEvent = ::CreateEvent(NULL, false, false, NULL);
902
 
                if (!hEvent)
903
 
                {
904
 
                    VERBOSE(VB_IMPORTANT, "CreateEvent failed");
905
 
                }
906
 
                else
907
 
                {
908
 
                    if (SOCKET_ERROR != ::WSAEventSelect(
909
 
                            sock->socket(), hEvent,
910
 
                            FD_READ | FD_CLOSE))
911
 
                    {
912
 
                        hEvents[n] = hEvent;
913
 
                        idx[n++] = i;
914
 
                    }
915
 
                    else
916
 
                    {
917
 
                        VERBOSE(VB_IMPORTANT, QString(
918
 
                                    "CreateEvent, "
919
 
                                    "WSAEventSelect(%1, %2) failed")
920
 
                                .arg(sock->socket()));
921
 
                        ::CloseHandle(hEvent);
922
 
                    }
923
 
                }
924
 
            }
925
 
        }
926
 
 
927
 
        hEvents[n++] = readyreadevent;
928
 
        int rval = ::WaitForMultipleObjects(n, hEvents, false, INFINITE);
929
 
 
930
 
        for (int i = 0; i < (n - 1); i++)
931
 
            ::CloseHandle(hEvents[i]);
932
 
 
933
 
        delete[] hEvents;
934
 
 
935
 
        if (rval == WAIT_FAILED)
936
 
        {
937
 
            VERBOSE(VB_IMPORTANT, "WaitForMultipleObjects returned error");
938
 
            delete[] idx;
939
 
        }
940
 
        else if (rval >= WAIT_OBJECT_0 && rval < (WAIT_OBJECT_0 + n))
941
 
        {
942
 
            rval -= WAIT_OBJECT_0;
943
 
 
944
 
            if (rval < (n - 1))
945
 
            {
946
 
                rval = idx[rval];
947
 
                sock = m_readyread_list.at(rval);
948
 
                found = (sock->state() == MythSocket::Connected)
949
 
                            && !isLocked(sock->m_lock);
950
 
            }
951
 
            else
952
 
            {
953
 
                found = false;
954
 
            }
955
 
 
956
 
            delete[] idx;
957
 
 
958
 
            if (found)
959
 
                iffound(sock);
960
 
 
961
 
            ::ResetEvent(readyreadevent);
962
 
        }
963
 
        else if (rval >= WAIT_ABANDONED_0 && rval < (WAIT_ABANDONED_0 + n))
964
 
        {
965
 
            VERBOSE(VB_SOCKET, "abandoned");
966
 
        }
967
 
        else
968
 
        {
969
 
            VERBOSE(VB_SOCKET|VB_EXTRA, "select timeout");
970
 
        }
971
 
 
972
 
#else /* if !USING_MINGW */
973
 
 
974
 
        // add check for bad fd?
975
 
        FD_ZERO(&rfds);
976
 
        maxfd = m_readyread_pipe[0];
977
 
 
978
 
        FD_SET(m_readyread_pipe[0], &rfds);
979
 
 
980
 
        QList<MythSocket*>::const_iterator it = m_readyread_list.begin();
981
 
        while (it != m_readyread_list.end())
982
 
        {
983
 
            sock = *it;
984
 
            if (sock->state() == MythSocket::Connected &&
985
 
                sock->m_notifyread == false &&
986
 
                !isLocked(sock->m_lock))
987
 
            {
988
 
                FD_SET(sock->socket(), &rfds);
989
 
                maxfd = std::max(sock->socket(), maxfd);
990
 
            }
991
 
            ++it;
992
 
        }
993
 
 
994
 
        int rval = select(maxfd + 1, &rfds, NULL, NULL, NULL);
995
 
        if (rval == -1)
996
 
        {
997
 
            VERBOSE(VB_SOCKET, "MythSocketThread: select returned error");
998
 
        }
999
 
        else if (rval)
1000
 
        {
1001
 
            found = false;
1002
 
            QList<MythSocket*>::const_iterator it = m_readyread_list.begin();
1003
 
            while (it != m_readyread_list.end())
1004
 
            {
1005
 
                sock = *it;
1006
 
                if (sock->state() == MythSocket::Connected &&
1007
 
                    FD_ISSET(sock->socket(), &rfds) &&
1008
 
                    !isLocked(sock->m_lock))
1009
 
                {
1010
 
                    found = true;
1011
 
                    break;
1012
 
                }
1013
 
                ++it;
1014
 
            }
1015
 
 
1016
 
            if (found)
1017
 
                iffound(sock);
1018
 
 
1019
 
            if (FD_ISSET(m_readyread_pipe[0], &rfds))
1020
 
            {
1021
 
                char buf[128];
1022
 
                ssize_t rr;
1023
 
 
1024
 
                do rr = ::read(m_readyread_pipe[0], buf, 128);
1025
 
                while ((rr < 0) && (EINTR == errno));
1026
 
            }
1027
 
        }
1028
 
        else
1029
 
        {
1030
 
            VERBOSE(VB_SOCKET|VB_EXTRA, "MythSocketThread: select timeout");
1031
 
        }
1032
 
 
1033
 
#endif /* !USING_MINGW */
1034
 
 
1035
 
    }
1036
 
 
1037
 
    VERBOSE(VB_SOCKET, "MythSocketThread: readyread thread exit");
1038
 
}
1039