~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/internet/kqreactor.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
 
# See LICENSE for details.
3
 
 
4
 
 
5
 
"""A kqueue()/kevent() based implementation of the Twisted main loop.
6
 
 
7
 
To install the event loop (and you should do this before any connections,
8
 
listeners or connectors are added)::
9
 
 
10
 
    | from twisted.internet import kqreactor
11
 
    | kqreactor.install()
12
 
 
13
 
This reactor only works on FreeBSD and requires PyKQueue 1.3, which is
14
 
available at:  U{http://people.freebsd.org/~dwhite/PyKQueue/}
15
 
 
16
 
API Stability: stable
17
 
 
18
 
Maintainer: U{Itamar Shtull-Trauring<mailto:twisted@itamarst.org>}
19
 
 
20
 
 
21
 
 
22
 
You're going to need to patch PyKqueue::
23
 
 
24
 
    =====================================================
25
 
    --- PyKQueue-1.3/kqsyscallmodule.c  Sun Jan 28 21:59:50 2001
26
 
    +++ PyKQueue-1.3/kqsyscallmodule.c.new      Tue Jul 30 18:06:08 2002
27
 
    @@ -137,7 +137,7 @@
28
 
     }
29
 
     
30
 
     statichere PyTypeObject KQEvent_Type = {
31
 
    -  PyObject_HEAD_INIT(NULL)
32
 
    +  PyObject_HEAD_INIT(&PyType_Type)
33
 
       0,                             // ob_size
34
 
       "KQEvent",                     // tp_name
35
 
       sizeof(KQEventObject),         // tp_basicsize
36
 
    @@ -291,13 +291,14 @@
37
 
     
38
 
       /* Build timespec for timeout */
39
 
       totimespec.tv_sec = timeout / 1000;
40
 
    -  totimespec.tv_nsec = (timeout % 1000) * 100000;
41
 
    +  totimespec.tv_nsec = (timeout % 1000) * 1000000;
42
 
     
43
 
       // printf("timespec: sec=%d nsec=%d\\n", totimespec.tv_sec, totimespec.tv_nsec);
44
 
     
45
 
       /* Make the call */
46
 
    -
47
 
    +  Py_BEGIN_ALLOW_THREADS
48
 
       gotNumEvents = kevent (self->fd, changelist, haveNumEvents, triggered, wantNumEvents, &totimespec);
49
 
    +  Py_END_ALLOW_THREADS
50
 
     
51
 
       /* Don't need the input event list anymore, so get rid of it */
52
 
       free (changelist);
53
 
    @@ -361,7 +362,7 @@
54
 
     statichere PyTypeObject KQueue_Type = {
55
 
            /* The ob_type field must be initialized in the module init function
56
 
             * to be portable to Windows without using C++. */
57
 
    -   PyObject_HEAD_INIT(NULL)
58
 
    +   PyObject_HEAD_INIT(&PyType_Type)
59
 
            0,                  /*ob_size*/
60
 
            "KQueue",                   /*tp_name*/
61
 
            sizeof(KQueueObject),       /*tp_basicsize*/
62
 
 
63
 
"""
64
 
 
65
 
# System imports
66
 
import errno, sys
67
 
 
68
 
# PyKQueue imports
69
 
from kqsyscall import *
70
 
 
71
 
# Twisted imports
72
 
from twisted.python import log, failure
73
 
 
74
 
# Sibling imports
75
 
import main
76
 
import posixbase
77
 
 
78
 
# globals
79
 
reads = {}
80
 
writes = {}
81
 
selectables = {}
82
 
kq = kqueue()
83
 
 
84
 
 
85
 
class KQueueReactor(posixbase.PosixReactorBase):
86
 
    """A reactor that uses kqueue(2)/kevent(2)."""
87
 
 
88
 
    def _updateRegistration(self, *args):
89
 
        kq.kevent([kevent(*args)], 0, 0)
90
 
 
91
 
    def addReader(self, reader):
92
 
        """Add a FileDescriptor for notification of data available to read.
93
 
        """
94
 
        fd = reader.fileno()
95
 
        if not reads.has_key(fd):
96
 
            selectables[fd] = reader
97
 
            reads[fd] = 1
98
 
            self._updateRegistration(fd, EVFILT_READ, EV_ADD)
99
 
 
100
 
    def addWriter(self, writer, writes=writes, selectables=selectables):
101
 
        """Add a FileDescriptor for notification of data available to write.
102
 
        """
103
 
        fd = writer.fileno()
104
 
        if not writes.has_key(fd):
105
 
            selectables[fd] = writer
106
 
            writes[fd] = 1
107
 
            self._updateRegistration(fd, EVFILT_WRITE, EV_ADD)
108
 
 
109
 
    def removeReader(self, reader):
110
 
        """Remove a Selectable for notification of data available to read.
111
 
        """
112
 
        fd = reader.fileno()
113
 
        if reads.has_key(fd):
114
 
            del reads[fd]
115
 
            if not writes.has_key(fd): del selectables[fd]
116
 
            self._updateRegistration(fd, EVFILT_READ, EV_DELETE)
117
 
 
118
 
    def removeWriter(self, writer, writes=writes):
119
 
        """Remove a Selectable for notification of data available to write.
120
 
        """
121
 
        fd = writer.fileno()
122
 
        if writes.has_key(fd):
123
 
            del writes[fd]
124
 
            if not reads.has_key(fd): del selectables[fd]
125
 
            self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE)
126
 
 
127
 
    def removeAll(self):
128
 
        """Remove all selectables, and return a list of them."""
129
 
        if self.waker is not None:
130
 
            self.removeReader(self.waker)
131
 
        result = selectables.values()
132
 
        for fd in reads.keys():
133
 
            self._updateRegistration(fd, EVFILT_READ, EV_DELETE)
134
 
        for fd in writes.keys():
135
 
            self._updateRegistration(fd, EVFILT_WRITE, EV_DELETE)
136
 
        reads.clear()
137
 
        writes.clear()
138
 
        selectables.clear()
139
 
        if self.waker is not None:
140
 
            self.addReader(self.waker)
141
 
        return result
142
 
 
143
 
    def doKEvent(self, timeout,
144
 
                 reads=reads,
145
 
                 writes=writes,
146
 
                 selectables=selectables,
147
 
                 kq=kq,
148
 
                 log=log,
149
 
                 OSError=OSError,
150
 
                 EVFILT_READ=EVFILT_READ,
151
 
                 EVFILT_WRITE=EVFILT_WRITE):
152
 
        """Poll the kqueue for new events."""
153
 
        if timeout is None:
154
 
            timeout = 1000
155
 
        else:
156
 
            timeout = int(timeout * 1000) # convert seconds to milliseconds
157
 
 
158
 
        try:
159
 
            l = kq.kevent([], len(selectables), timeout)
160
 
        except OSError, e:
161
 
            if e[0] == errno.EINTR:
162
 
                return
163
 
            else:
164
 
                raise
165
 
        _drdw = self._doWriteOrRead
166
 
        for event in l:
167
 
            why = None
168
 
            fd, filter = event.ident, event.filter
169
 
            selectable = selectables[fd]
170
 
            log.callWithLogger(selectable, _drdw, selectable, fd, filter)
171
 
 
172
 
    def _doWriteOrRead(self, selectable, fd, filter):
173
 
        try:
174
 
            if filter == EVFILT_READ:
175
 
                why = selectable.doRead()
176
 
            if filter == EVFILT_WRITE:
177
 
                why = selectable.doWrite()
178
 
            if not selectable.fileno() == fd:
179
 
                why = main.CONNECTION_LOST
180
 
        except:
181
 
            why = sys.exc_info()[1]
182
 
            log.deferr()
183
 
 
184
 
        if why:
185
 
            self.removeReader(selectable)
186
 
            self.removeWriter(selectable)
187
 
            selectable.connectionLost(failure.Failure(why))
188
 
 
189
 
    doIteration = doKEvent
190
 
 
191
 
 
192
 
def install():
193
 
    k = KQueueReactor()
194
 
    main.installReactor(k)
195
 
 
196
 
 
197
 
__all__ = ["KQueueReactor", "install"]