~ubuntu-branches/ubuntu/precise/zeromq/precise

« back to all changes in this revision

Viewing changes to src/kqueue.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Martin Lucina
  • Date: 2011-05-13 12:43:09 UTC
  • mfrom: (7.2.1 sid)
  • Revision ID: james.westby@ubuntu.com-20110513124309-m3gdt964ga67rcwu
Tags: 2.1.7-1
* New upstream version. (closes: #619374)
* --with-system-pgm is now used instead of the embedded OpenPGM library. 
* Added Debian watch file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/*
2
 
    Copyright (c) 2007-2010 iMatix Corporation
 
2
    Copyright (c) 2007-2011 iMatix Corporation
 
3
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
3
4
 
4
5
    This file is part of 0MQ.
5
6
 
6
7
    0MQ is free software; you can redistribute it and/or modify it under
7
 
    the terms of the Lesser GNU General Public License as published by
 
8
    the terms of the GNU Lesser General Public License as published by
8
9
    the Free Software Foundation; either version 3 of the License, or
9
10
    (at your option) any later version.
10
11
 
11
12
    0MQ is distributed in the hope that it will be useful,
12
13
    but WITHOUT ANY WARRANTY; without even the implied warranty of
13
14
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
    Lesser GNU General Public License for more details.
 
15
    GNU Lesser General Public License for more details.
15
16
 
16
 
    You should have received a copy of the Lesser GNU General Public License
 
17
    You should have received a copy of the GNU Lesser General Public License
17
18
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
*/
19
20
 
54
55
zmq::kqueue_t::~kqueue_t ()
55
56
{
56
57
    worker.stop ();
57
 
 
58
 
    //  Make sure there are no fds registered on shutdown.
59
 
    zmq_assert (load.get () == 0);
60
 
 
61
58
    close (kqueue_fd);
62
59
}
63
60
 
74
71
{
75
72
    struct kevent ev;
76
73
 
77
 
    EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, (kevent_udata_t)NULL);
 
74
    EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, 0);
78
75
    int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
79
76
    errno_assert (rc != -1);
80
77
}
83
80
    i_poll_events *reactor_)
84
81
{
85
82
    poll_entry_t *pe = new (std::nothrow) poll_entry_t;
86
 
    zmq_assert (pe != NULL);
 
83
    alloc_assert (pe);
87
84
 
88
85
    pe->fd = fd_;
89
86
    pe->flag_pollin = 0;
90
87
    pe->flag_pollout = 0;
91
88
    pe->reactor = reactor_;
92
89
 
 
90
    adjust_load (1);
 
91
 
93
92
    return pe;
94
93
}
95
94
 
102
101
        kevent_delete (pe->fd, EVFILT_WRITE);
103
102
    pe->fd = retired_fd;
104
103
    retired.push_back (pe);
 
104
 
 
105
    adjust_load (-1);
105
106
}
106
107
 
107
108
void zmq::kqueue_t::set_pollin (handle_t handle_)
132
133
    kevent_delete (pe->fd, EVFILT_WRITE);
133
134
}
134
135
 
135
 
void zmq::kqueue_t::add_timer (i_poll_events *events_)
136
 
{
137
 
     timers.push_back (events_);
138
 
}
139
 
 
140
 
void zmq::kqueue_t::cancel_timer (i_poll_events *events_)
141
 
{
142
 
    timers_t::iterator it = std::find (timers.begin (), timers.end (), events_);
143
 
    if (it != timers.end ())
144
 
        timers.erase (it);
145
 
}
146
 
 
147
 
int zmq::kqueue_t::get_load ()
148
 
{
149
 
    return load.get ();
150
 
}
151
 
 
152
136
void zmq::kqueue_t::start ()
153
137
{
154
138
    worker.start (worker_routine, this);
163
147
{
164
148
    while (!stopping) {
165
149
 
 
150
        //  Execute any due timers.
 
151
        int timeout = (int) execute_timers ();
 
152
 
 
153
        //  Wait for events.
166
154
        struct kevent ev_buf [max_io_events];
167
 
 
168
 
        //  Compute time interval to wait.
169
 
        timespec timeout = {max_timer_period / 1000,
170
 
            (max_timer_period % 1000) * 1000000};
171
 
 
172
 
        //  Wait for events.
173
 
        int n = kevent (kqueue_fd, NULL, 0,
174
 
             &ev_buf [0], max_io_events, timers.empty () ? NULL : &timeout);
 
155
        timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
 
156
        int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events,
 
157
            timeout ? &ts: NULL);
175
158
        if (n == -1 && errno == EINTR)
176
159
            continue;
177
160
        errno_assert (n != -1);
178
161
 
179
 
        //  Handle timer.
180
 
        if (!n) {
181
 
 
182
 
            //  Use local list of timers as timer handlers may fill new timers
183
 
            //  into the original array.
184
 
            timers_t t;
185
 
            std::swap (timers, t);
186
 
 
187
 
            //  Trigger all the timers.
188
 
            for (timers_t::iterator it = t.begin (); it != t.end (); it ++)
189
 
                (*it)->timer_event ();
190
 
 
191
 
            continue;
192
 
        }
193
 
 
194
162
        for (int i = 0; i < n; i ++) {
195
163
            poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata;
196
164
 
210
178
 
211
179
        //  Destroy retired event sources.
212
180
        for (retired_t::iterator it = retired.begin (); it != retired.end ();
213
 
              it ++)
 
181
              ++it)
214
182
            delete *it;
215
183
        retired.clear ();
216
184
    }