~ubuntu-branches/ubuntu/maverick/libtorrent-rasterbar/maverick

« back to all changes in this revision

Viewing changes to include/libtorrent/asio/detail/kqueue_reactor.hpp

  • Committer: Bazaar Package Importer
  • Author(s): Christophe Sauthier
  • Date: 2010-08-10 12:59:37 UTC
  • mfrom: (1.3.7 upstream)
  • Revision ID: james.westby@ubuntu.com-20100810125937-jbcmmf17y8yo9hgz
Tags: 0.15.0-0ubuntu1
* New upstream version.
* debian/patches/100_fix_html_docs.patch: refreshed.
* debian/control: bump up standards-version to 3.9.1 (no changes).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
//
2
 
// kqueue_reactor.hpp
3
 
// ~~~~~~~~~~~~~~~~~~
4
 
//
5
 
// Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6
 
// Copyright (c) 2005 Stefan Arentz (stefan at soze dot com)
7
 
//
8
 
// Distributed under the Boost Software License, Version 1.0. (See accompanying
9
 
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
10
 
//
11
 
 
12
 
#ifndef ASIO_DETAIL_KQUEUE_REACTOR_HPP
13
 
#define ASIO_DETAIL_KQUEUE_REACTOR_HPP
14
 
 
15
 
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
16
 
# pragma once
17
 
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
18
 
 
19
 
#include "asio/detail/push_options.hpp"
20
 
 
21
 
#include "asio/detail/kqueue_reactor_fwd.hpp"
22
 
 
23
 
#if defined(ASIO_HAS_KQUEUE)
24
 
 
25
 
#include "asio/detail/push_options.hpp"
26
 
#include <cstddef>
27
 
#include <vector>
28
 
#include <sys/types.h>
29
 
#include <sys/event.h>
30
 
#include <sys/time.h>
31
 
#include <boost/config.hpp>
32
 
#include <boost/date_time/posix_time/posix_time_types.hpp>
33
 
#include <boost/throw_exception.hpp>
34
 
#include "asio/detail/pop_options.hpp"
35
 
 
36
 
#include "asio/error.hpp"
37
 
#include "asio/io_service.hpp"
38
 
#include "asio/system_error.hpp"
39
 
#include "asio/detail/bind_handler.hpp"
40
 
#include "asio/detail/mutex.hpp"
41
 
#include "asio/detail/task_io_service.hpp"
42
 
#include "asio/detail/thread.hpp"
43
 
#include "asio/detail/reactor_op_queue.hpp"
44
 
#include "asio/detail/select_interrupter.hpp"
45
 
#include "asio/detail/service_base.hpp"
46
 
#include "asio/detail/signal_blocker.hpp"
47
 
#include "asio/detail/socket_types.hpp"
48
 
#include "asio/detail/timer_queue.hpp"
49
 
 
50
 
// Older versions of Mac OS X may not define EV_OOBAND.
51
 
#if !defined(EV_OOBAND)
52
 
# define EV_OOBAND EV_FLAG1
53
 
#endif // !defined(EV_OOBAND)
54
 
 
55
 
namespace asio {
56
 
namespace detail {
57
 
 
58
 
template <bool Own_Thread>
59
 
class kqueue_reactor
60
 
  : public asio::detail::service_base<kqueue_reactor<Own_Thread> >
61
 
{
62
 
public:
63
 
  // Per-descriptor data.
64
 
  struct per_descriptor_data
65
 
  {
66
 
    bool allow_speculative_read;
67
 
    bool allow_speculative_write;
68
 
  };
69
 
 
70
 
  // Constructor.
71
 
  kqueue_reactor(asio::io_service& io_service)
72
 
    : asio::detail::service_base<
73
 
        kqueue_reactor<Own_Thread> >(io_service),
74
 
      mutex_(),
75
 
      kqueue_fd_(do_kqueue_create()),
76
 
      wait_in_progress_(false),
77
 
      interrupter_(),
78
 
      read_op_queue_(),
79
 
      write_op_queue_(),
80
 
      except_op_queue_(),
81
 
      pending_cancellations_(),
82
 
      stop_thread_(false),
83
 
      thread_(0),
84
 
      shutdown_(false),
85
 
      need_kqueue_wait_(true)
86
 
  {
87
 
    // Start the reactor's internal thread only if needed.
88
 
    if (Own_Thread)
89
 
    {
90
 
      asio::detail::signal_blocker sb;
91
 
      thread_ = new asio::detail::thread(
92
 
          bind_handler(&kqueue_reactor::call_run_thread, this));
93
 
    }
94
 
 
95
 
    // Add the interrupter's descriptor to the kqueue.
96
 
    struct kevent event;
97
 
    EV_SET(&event, interrupter_.read_descriptor(),
98
 
        EVFILT_READ, EV_ADD, 0, 0, 0);
99
 
    ::kevent(kqueue_fd_, &event, 1, 0, 0, 0);
100
 
  }
101
 
 
102
 
  // Destructor.
103
 
  ~kqueue_reactor()
104
 
  {
105
 
    shutdown_service();
106
 
    close(kqueue_fd_);
107
 
  }
108
 
 
109
 
  // Destroy all user-defined handler objects owned by the service.
110
 
  void shutdown_service()
111
 
  {
112
 
    asio::detail::mutex::scoped_lock lock(mutex_);
113
 
    shutdown_ = true;
114
 
    stop_thread_ = true;
115
 
    lock.unlock();
116
 
 
117
 
    if (thread_)
118
 
    {
119
 
      interrupter_.interrupt();
120
 
      thread_->join();
121
 
      delete thread_;
122
 
      thread_ = 0;
123
 
    }
124
 
 
125
 
    read_op_queue_.destroy_operations();
126
 
    write_op_queue_.destroy_operations();
127
 
    except_op_queue_.destroy_operations();
128
 
 
129
 
    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
130
 
      timer_queues_[i]->destroy_timers();
131
 
    timer_queues_.clear();
132
 
  }
133
 
 
134
 
  // Register a socket with the reactor. Returns 0 on success, system error
135
 
  // code on failure.
136
 
  int register_descriptor(socket_type, per_descriptor_data& descriptor_data)
137
 
  {
138
 
    descriptor_data.allow_speculative_read = true;
139
 
    descriptor_data.allow_speculative_write = true;
140
 
 
141
 
    return 0;
142
 
  }
143
 
 
144
 
  // Start a new read operation. The handler object will be invoked when the
145
 
  // given descriptor is ready to be read, or an error has occurred.
146
 
  template <typename Handler>
147
 
  void start_read_op(socket_type descriptor,
148
 
      per_descriptor_data& descriptor_data, Handler handler,
149
 
      bool allow_speculative_read = true)
150
 
  {
151
 
    if (allow_speculative_read && descriptor_data.allow_speculative_read)
152
 
    {
153
 
      asio::error_code ec;
154
 
      std::size_t bytes_transferred = 0;
155
 
      if (handler.perform(ec, bytes_transferred))
156
 
      {
157
 
        handler.complete(ec, bytes_transferred);
158
 
        return;
159
 
      }
160
 
 
161
 
      // We only get one shot at a speculative read in this function.
162
 
      allow_speculative_read = false;
163
 
    }
164
 
 
165
 
    asio::detail::mutex::scoped_lock lock(mutex_);
166
 
 
167
 
    if (shutdown_)
168
 
      return;
169
 
 
170
 
    if (!allow_speculative_read)
171
 
      need_kqueue_wait_ = true;
172
 
    else if (!read_op_queue_.has_operation(descriptor))
173
 
    {
174
 
      // Speculative reads are ok as there are no queued read operations.
175
 
      descriptor_data.allow_speculative_read = true;
176
 
 
177
 
      asio::error_code ec;
178
 
      std::size_t bytes_transferred = 0;
179
 
      if (handler.perform(ec, bytes_transferred))
180
 
      {
181
 
        handler.complete(ec, bytes_transferred);
182
 
        return;
183
 
      }
184
 
    }
185
 
 
186
 
    // Speculative reads are not ok as there will be queued read operations.
187
 
    descriptor_data.allow_speculative_read = false;
188
 
 
189
 
    if (read_op_queue_.enqueue_operation(descriptor, handler))
190
 
    {
191
 
      struct kevent event;
192
 
      EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
193
 
      if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
194
 
      {
195
 
        asio::error_code ec(errno,
196
 
            asio::error::get_system_category());
197
 
        read_op_queue_.perform_all_operations(descriptor, ec);
198
 
      }
199
 
    }
200
 
  }
201
 
 
202
 
  // Start a new write operation. The handler object will be invoked when the
203
 
  // given descriptor is ready to be written, or an error has occurred.
204
 
  template <typename Handler>
205
 
  void start_write_op(socket_type descriptor,
206
 
      per_descriptor_data& descriptor_data, Handler handler,
207
 
      bool allow_speculative_write = true)
208
 
  {
209
 
    if (allow_speculative_write && descriptor_data.allow_speculative_write)
210
 
    {
211
 
      asio::error_code ec;
212
 
      std::size_t bytes_transferred = 0;
213
 
      if (handler.perform(ec, bytes_transferred))
214
 
      {
215
 
        handler.complete(ec, bytes_transferred);
216
 
        return;
217
 
      }
218
 
 
219
 
      // We only get one shot at a speculative write in this function.
220
 
      allow_speculative_write = false;
221
 
    }
222
 
 
223
 
    asio::detail::mutex::scoped_lock lock(mutex_);
224
 
 
225
 
    if (shutdown_)
226
 
      return;
227
 
 
228
 
    if (!allow_speculative_write)
229
 
      need_kqueue_wait_ = true;
230
 
    else if (!write_op_queue_.has_operation(descriptor))
231
 
    {
232
 
      // Speculative writes are ok as there are no queued write operations.
233
 
      descriptor_data.allow_speculative_write = true;
234
 
 
235
 
      asio::error_code ec;
236
 
      std::size_t bytes_transferred = 0;
237
 
      if (handler.perform(ec, bytes_transferred))
238
 
      {
239
 
        handler.complete(ec, bytes_transferred);
240
 
        return;
241
 
      }
242
 
    }
243
 
 
244
 
    // Speculative writes are not ok as there will be queued write operations.
245
 
    descriptor_data.allow_speculative_write = false;
246
 
 
247
 
    if (write_op_queue_.enqueue_operation(descriptor, handler))
248
 
    {
249
 
      struct kevent event;
250
 
      EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0);
251
 
      if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
252
 
      {
253
 
        asio::error_code ec(errno,
254
 
            asio::error::get_system_category());
255
 
        write_op_queue_.perform_all_operations(descriptor, ec);
256
 
      }
257
 
    }
258
 
  }
259
 
 
260
 
  // Start a new exception operation. The handler object will be invoked when
261
 
  // the given descriptor has exception information, or an error has occurred.
262
 
  template <typename Handler>
263
 
  void start_except_op(socket_type descriptor,
264
 
      per_descriptor_data&, Handler handler)
265
 
  {
266
 
    asio::detail::mutex::scoped_lock lock(mutex_);
267
 
 
268
 
    if (shutdown_)
269
 
      return;
270
 
 
271
 
    if (except_op_queue_.enqueue_operation(descriptor, handler))
272
 
    {
273
 
      struct kevent event;
274
 
      if (read_op_queue_.has_operation(descriptor))
275
 
        EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
276
 
      else
277
 
        EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0);
278
 
      if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
279
 
      {
280
 
        asio::error_code ec(errno,
281
 
            asio::error::get_system_category());
282
 
        except_op_queue_.perform_all_operations(descriptor, ec);
283
 
      }
284
 
    }
285
 
  }
286
 
 
287
 
  // Start a new write operation. The handler object will be invoked when the
288
 
  // given descriptor is ready to be written, or an error has occurred.
289
 
  template <typename Handler>
290
 
  void start_connect_op(socket_type descriptor,
291
 
      per_descriptor_data& descriptor_data, Handler handler)
292
 
  {
293
 
    asio::detail::mutex::scoped_lock lock(mutex_);
294
 
 
295
 
    if (shutdown_)
296
 
      return;
297
 
 
298
 
    // Speculative writes are not ok as there will be queued write operations.
299
 
    descriptor_data.allow_speculative_write = false;
300
 
 
301
 
    if (write_op_queue_.enqueue_operation(descriptor, handler))
302
 
    {
303
 
      struct kevent event;
304
 
      EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0);
305
 
      if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
306
 
      {
307
 
        asio::error_code ec(errno,
308
 
            asio::error::get_system_category());
309
 
        write_op_queue_.perform_all_operations(descriptor, ec);
310
 
      }
311
 
    }
312
 
  }
313
 
 
314
 
  // Cancel all operations associated with the given descriptor. The
315
 
  // handlers associated with the descriptor will be invoked with the
316
 
  // operation_aborted error.
317
 
  void cancel_ops(socket_type descriptor, per_descriptor_data&)
318
 
  {
319
 
    asio::detail::mutex::scoped_lock lock(mutex_);
320
 
    cancel_ops_unlocked(descriptor);
321
 
  }
322
 
 
323
 
  // Cancel any operations that are running against the descriptor and remove
324
 
  // its registration from the reactor.
325
 
  void close_descriptor(socket_type descriptor, per_descriptor_data&)
326
 
  {
327
 
    asio::detail::mutex::scoped_lock lock(mutex_);
328
 
 
329
 
    // Remove the descriptor from kqueue.
330
 
    struct kevent event[2];
331
 
    EV_SET(&event[0], descriptor, EVFILT_READ, EV_DELETE, 0, 0, 0);
332
 
    EV_SET(&event[1], descriptor, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
333
 
    ::kevent(kqueue_fd_, event, 2, 0, 0, 0);
334
 
    
335
 
    // Cancel any outstanding operations associated with the descriptor.
336
 
    cancel_ops_unlocked(descriptor);
337
 
  }
338
 
 
339
 
  // Add a new timer queue to the reactor.
340
 
  template <typename Time_Traits>
341
 
  void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
342
 
  {
343
 
    asio::detail::mutex::scoped_lock lock(mutex_);
344
 
    timer_queues_.push_back(&timer_queue);
345
 
  }
346
 
 
347
 
  // Remove a timer queue from the reactor.
348
 
  template <typename Time_Traits>
349
 
  void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
350
 
  {
351
 
    asio::detail::mutex::scoped_lock lock(mutex_);
352
 
    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
353
 
    {
354
 
      if (timer_queues_[i] == &timer_queue)
355
 
      {
356
 
        timer_queues_.erase(timer_queues_.begin() + i);
357
 
        return;
358
 
      }
359
 
    }
360
 
  }
361
 
 
362
 
  // Schedule a timer in the given timer queue to expire at the specified
363
 
  // absolute time. The handler object will be invoked when the timer expires.
364
 
  template <typename Time_Traits, typename Handler>
365
 
  void schedule_timer(timer_queue<Time_Traits>& timer_queue,
366
 
      const typename Time_Traits::time_type& time, Handler handler, void* token)
367
 
  {
368
 
    asio::detail::mutex::scoped_lock lock(mutex_);
369
 
    if (!shutdown_)
370
 
      if (timer_queue.enqueue_timer(time, handler, token))
371
 
        interrupter_.interrupt();
372
 
  }
373
 
 
374
 
  // Cancel the timer associated with the given token. Returns the number of
375
 
  // handlers that have been posted or dispatched.
376
 
  template <typename Time_Traits>
377
 
  std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
378
 
  {
379
 
    asio::detail::mutex::scoped_lock lock(mutex_);
380
 
    std::size_t n = timer_queue.cancel_timer(token);
381
 
    if (n > 0)
382
 
      interrupter_.interrupt();
383
 
    return n;
384
 
  }
385
 
 
386
 
private:
387
 
  friend class task_io_service<kqueue_reactor<Own_Thread> >;
388
 
 
389
 
  // Run the kqueue loop.
390
 
  void run(bool block)
391
 
  {
392
 
    asio::detail::mutex::scoped_lock lock(mutex_);
393
 
 
394
 
    // Dispatch any operation cancellations that were made while the select
395
 
    // loop was not running.
396
 
    read_op_queue_.perform_cancellations();
397
 
    write_op_queue_.perform_cancellations();
398
 
    except_op_queue_.perform_cancellations();
399
 
    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
400
 
      timer_queues_[i]->dispatch_cancellations();
401
 
 
402
 
    // Check if the thread is supposed to stop.
403
 
    if (stop_thread_)
404
 
    {
405
 
      complete_operations_and_timers(lock);
406
 
      return;
407
 
    }
408
 
 
409
 
    // We can return immediately if there's no work to do and the reactor is
410
 
    // not supposed to block.
411
 
    if (!block && read_op_queue_.empty() && write_op_queue_.empty()
412
 
        && except_op_queue_.empty() && all_timer_queues_are_empty())
413
 
    {
414
 
      complete_operations_and_timers(lock);
415
 
      return;
416
 
    }
417
 
 
418
 
    // Determine how long to block while waiting for events.
419
 
    timespec timeout_buf = { 0, 0 };
420
 
    timespec* timeout = block ? get_timeout(timeout_buf) : &timeout_buf;
421
 
 
422
 
    wait_in_progress_ = true;
423
 
    lock.unlock();
424
 
 
425
 
    // Block on the kqueue descriptor.
426
 
    struct kevent events[128];
427
 
    int num_events = (block || need_kqueue_wait_)
428
 
      ? kevent(kqueue_fd_, 0, 0, events, 128, timeout)
429
 
      : 0;
430
 
 
431
 
    lock.lock();
432
 
    wait_in_progress_ = false;
433
 
 
434
 
    // Block signals while performing operations.
435
 
    asio::detail::signal_blocker sb;
436
 
 
437
 
    // Dispatch the waiting events.
438
 
    for (int i = 0; i < num_events; ++i)
439
 
    {
440
 
      int descriptor = events[i].ident;
441
 
      if (descriptor == interrupter_.read_descriptor())
442
 
      {
443
 
        interrupter_.reset();
444
 
      }
445
 
      else if (events[i].filter == EVFILT_READ)
446
 
      {
447
 
        // Dispatch operations associated with the descriptor.
448
 
        bool more_reads = false;
449
 
        bool more_except = false;
450
 
        if (events[i].flags & EV_ERROR)
451
 
        {
452
 
          asio::error_code error(
453
 
              events[i].data, asio::error::get_system_category());
454
 
          except_op_queue_.perform_all_operations(descriptor, error);
455
 
          read_op_queue_.perform_all_operations(descriptor, error);
456
 
        }
457
 
        else if (events[i].flags & EV_OOBAND)
458
 
        {
459
 
          asio::error_code error;
460
 
          more_except = except_op_queue_.perform_operation(descriptor, error);
461
 
          if (events[i].data > 0)
462
 
            more_reads = read_op_queue_.perform_operation(descriptor, error);
463
 
          else
464
 
            more_reads = read_op_queue_.has_operation(descriptor);
465
 
        }
466
 
        else
467
 
        {
468
 
          asio::error_code error;
469
 
          more_reads = read_op_queue_.perform_operation(descriptor, error);
470
 
          more_except = except_op_queue_.has_operation(descriptor);
471
 
        }
472
 
 
473
 
        // Update the descriptor in the kqueue.
474
 
        struct kevent event;
475
 
        if (more_reads)
476
 
          EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
477
 
        else if (more_except)
478
 
          EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0);
479
 
        else
480
 
          EV_SET(&event, descriptor, EVFILT_READ, EV_DELETE, 0, 0, 0);
481
 
        if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
482
 
        {
483
 
          asio::error_code error(errno,
484
 
              asio::error::get_system_category());
485
 
          except_op_queue_.perform_all_operations(descriptor, error);
486
 
          read_op_queue_.perform_all_operations(descriptor, error);
487
 
        }
488
 
      }
489
 
      else if (events[i].filter == EVFILT_WRITE)
490
 
      {
491
 
        // Dispatch operations associated with the descriptor.
492
 
        bool more_writes = false;
493
 
        if (events[i].flags & EV_ERROR)
494
 
        {
495
 
          asio::error_code error(
496
 
              events[i].data, asio::error::get_system_category());
497
 
          write_op_queue_.perform_all_operations(descriptor, error);
498
 
        }
499
 
        else
500
 
        {
501
 
          asio::error_code error;
502
 
          more_writes = write_op_queue_.perform_operation(descriptor, error);
503
 
        }
504
 
 
505
 
        // Update the descriptor in the kqueue.
506
 
        struct kevent event;
507
 
        if (more_writes)
508
 
          EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0);
509
 
        else
510
 
          EV_SET(&event, descriptor, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
511
 
        if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
512
 
        {
513
 
          asio::error_code error(errno,
514
 
              asio::error::get_system_category());
515
 
          write_op_queue_.perform_all_operations(descriptor, error);
516
 
        }
517
 
      }
518
 
    }
519
 
 
520
 
    read_op_queue_.perform_cancellations();
521
 
    write_op_queue_.perform_cancellations();
522
 
    except_op_queue_.perform_cancellations();
523
 
    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
524
 
    {
525
 
      timer_queues_[i]->dispatch_timers();
526
 
      timer_queues_[i]->dispatch_cancellations();
527
 
    }
528
 
 
529
 
    // Issue any pending cancellations.
530
 
    for (std::size_t i = 0; i < pending_cancellations_.size(); ++i)
531
 
      cancel_ops_unlocked(pending_cancellations_[i]);
532
 
    pending_cancellations_.clear();
533
 
 
534
 
    // Determine whether kqueue needs to be called next time the reactor is run.
535
 
    need_kqueue_wait_ = !read_op_queue_.empty()
536
 
      || !write_op_queue_.empty() || !except_op_queue_.empty();
537
 
 
538
 
    complete_operations_and_timers(lock);
539
 
  }
540
 
 
541
 
  // Run the select loop in the thread.
542
 
  void run_thread()
543
 
  {
544
 
    asio::detail::mutex::scoped_lock lock(mutex_);
545
 
    while (!stop_thread_)
546
 
    {
547
 
      lock.unlock();
548
 
      run(true);
549
 
      lock.lock();
550
 
    }
551
 
  }
552
 
 
553
 
  // Entry point for the select loop thread.
554
 
  static void call_run_thread(kqueue_reactor* reactor)
555
 
  {
556
 
    reactor->run_thread();
557
 
  }
558
 
 
559
 
  // Interrupt the select loop.
560
 
  void interrupt()
561
 
  {
562
 
    interrupter_.interrupt();
563
 
  }
564
 
 
565
 
  // Create the kqueue file descriptor. Throws an exception if the descriptor
566
 
  // cannot be created.
567
 
  static int do_kqueue_create()
568
 
  {
569
 
    int fd = kqueue();
570
 
    if (fd == -1)
571
 
    {
572
 
      boost::throw_exception(
573
 
          asio::system_error(
574
 
            asio::error_code(errno,
575
 
              asio::error::get_system_category()),
576
 
            "kqueue"));
577
 
    }
578
 
    return fd;
579
 
  }
580
 
 
581
 
  // Check if all timer queues are empty.
582
 
  bool all_timer_queues_are_empty() const
583
 
  {
584
 
    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
585
 
      if (!timer_queues_[i]->empty())
586
 
        return false;
587
 
    return true;
588
 
  }
589
 
 
590
 
  // Get the timeout value for the kevent call.
591
 
  timespec* get_timeout(timespec& ts)
592
 
  {
593
 
    if (all_timer_queues_are_empty())
594
 
      return 0;
595
 
 
596
 
    // By default we will wait no longer than 5 minutes. This will ensure that
597
 
    // any changes to the system clock are detected after no longer than this.
598
 
    boost::posix_time::time_duration minimum_wait_duration
599
 
      = boost::posix_time::minutes(5);
600
 
 
601
 
    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
602
 
    {
603
 
      boost::posix_time::time_duration wait_duration
604
 
        = timer_queues_[i]->wait_duration();
605
 
      if (wait_duration < minimum_wait_duration)
606
 
        minimum_wait_duration = wait_duration;
607
 
    }
608
 
 
609
 
    if (minimum_wait_duration > boost::posix_time::time_duration())
610
 
    {
611
 
      ts.tv_sec = minimum_wait_duration.total_seconds();
612
 
      ts.tv_nsec = minimum_wait_duration.total_nanoseconds() % 1000000000;
613
 
    }
614
 
    else
615
 
    {
616
 
      ts.tv_sec = 0;
617
 
      ts.tv_nsec = 0;
618
 
    }
619
 
 
620
 
    return &ts;
621
 
  }
622
 
 
623
 
  // Cancel all operations associated with the given descriptor. The do_cancel
624
 
  // function of the handler objects will be invoked. This function does not
625
 
  // acquire the kqueue_reactor's mutex.
626
 
  void cancel_ops_unlocked(socket_type descriptor)
627
 
  {
628
 
    bool interrupt = read_op_queue_.cancel_operations(descriptor);
629
 
    interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
630
 
    interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
631
 
    if (interrupt)
632
 
      interrupter_.interrupt();
633
 
  }
634
 
 
635
 
  // Clean up operations and timers. We must not hold the lock since the
636
 
  // destructors may make calls back into this reactor. We make a copy of the
637
 
  // vector of timer queues since the original may be modified while the lock
638
 
  // is not held.
639
 
  void complete_operations_and_timers(
640
 
      asio::detail::mutex::scoped_lock& lock)
641
 
  {
642
 
    timer_queues_for_cleanup_ = timer_queues_;
643
 
    lock.unlock();
644
 
    read_op_queue_.complete_operations();
645
 
    write_op_queue_.complete_operations();
646
 
    except_op_queue_.complete_operations();
647
 
    for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
648
 
      timer_queues_for_cleanup_[i]->complete_timers();
649
 
  }
650
 
 
651
 
  // Mutex to protect access to internal data.
652
 
  asio::detail::mutex mutex_;
653
 
 
654
 
  // The kqueue file descriptor.
655
 
  int kqueue_fd_;
656
 
 
657
 
  // Whether the kqueue wait call is currently in progress
658
 
  bool wait_in_progress_;
659
 
 
660
 
  // The interrupter is used to break a blocking kevent call.
661
 
  select_interrupter interrupter_;
662
 
 
663
 
  // The queue of read operations.
664
 
  reactor_op_queue<socket_type> read_op_queue_;
665
 
 
666
 
  // The queue of write operations.
667
 
  reactor_op_queue<socket_type> write_op_queue_;
668
 
 
669
 
  // The queue of except operations.
670
 
  reactor_op_queue<socket_type> except_op_queue_;
671
 
 
672
 
  // The timer queues.
673
 
  std::vector<timer_queue_base*> timer_queues_;
674
 
 
675
 
  // A copy of the timer queues, used when cleaning up timers. The copy is
676
 
  // stored as a class data member to avoid unnecessary memory allocation.
677
 
  std::vector<timer_queue_base*> timer_queues_for_cleanup_;
678
 
 
679
 
  // The descriptors that are pending cancellation.
680
 
  std::vector<socket_type> pending_cancellations_;
681
 
 
682
 
  // Does the reactor loop thread need to stop.
683
 
  bool stop_thread_;
684
 
 
685
 
  // The thread that is running the reactor loop.
686
 
  asio::detail::thread* thread_;
687
 
 
688
 
  // Whether the service has been shut down.
689
 
  bool shutdown_;
690
 
 
691
 
  // Whether we need to call kqueue the next time the reactor is run.
692
 
  bool need_kqueue_wait_;
693
 
};
694
 
 
695
 
} // namespace detail
696
 
} // namespace asio
697
 
 
698
 
#endif // defined(ASIO_HAS_KQUEUE)
699
 
 
700
 
#include "asio/detail/pop_options.hpp"
701
 
 
702
 
#endif // ASIO_DETAIL_KQUEUE_REACTOR_HPP