~ubuntu-branches/ubuntu/jaunty/asio/jaunty

« back to all changes in this revision

Viewing changes to include/asio/detail/win_iocp_io_service.hpp

  • Committer: Bazaar Package Importer
  • Author(s): Simon Richter
  • Date: 2007-09-07 11:10:41 UTC
  • Revision ID: james.westby@ubuntu.com-20070907111041-f0uwhs0llvzj9ah5
Tags: upstream-0.3.8~rc3
ImportĀ upstreamĀ versionĀ 0.3.8~rc3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
//
 
2
// win_iocp_io_service.hpp
 
3
// ~~~~~~~~~~~~~~~~~~~~~~~
 
4
//
 
5
// Copyright (c) 2003-2007 Christopher M. Kohlhoff (chris at kohlhoff dot com)
 
6
//
 
7
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
8
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
9
//
 
10
 
 
11
#ifndef ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP
 
12
#define ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP
 
13
 
 
14
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
 
15
# pragma once
 
16
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
 
17
 
 
18
#include "asio/detail/push_options.hpp"
 
19
 
 
20
#include "asio/detail/win_iocp_io_service_fwd.hpp"
 
21
 
 
22
#if defined(ASIO_HAS_IOCP)
 
23
 
 
24
#include "asio/detail/push_options.hpp"
 
25
#include <limits>
 
26
#include <boost/throw_exception.hpp>
 
27
#include "asio/detail/pop_options.hpp"
 
28
 
 
29
#include "asio/io_service.hpp"
 
30
#include "asio/system_error.hpp"
 
31
#include "asio/detail/call_stack.hpp"
 
32
#include "asio/detail/handler_alloc_helpers.hpp"
 
33
#include "asio/detail/handler_invoke_helpers.hpp"
 
34
#include "asio/detail/service_base.hpp"
 
35
#include "asio/detail/socket_types.hpp"
 
36
#include "asio/detail/win_iocp_operation.hpp"
 
37
 
 
38
namespace asio {
 
39
namespace detail {
 
40
 
 
41
class win_iocp_io_service
 
42
  : public asio::detail::service_base<win_iocp_io_service>
 
43
{
 
44
public:
 
45
  // Base class for all operations.
 
46
  typedef win_iocp_operation operation;
 
47
 
 
48
  // Constructor.
 
49
  win_iocp_io_service(asio::io_service& io_service)
 
50
    : asio::detail::service_base<win_iocp_io_service>(io_service),
 
51
      iocp_(),
 
52
      outstanding_work_(0),
 
53
      stopped_(0),
 
54
      shutdown_(0)
 
55
  {
 
56
  }
 
57
 
 
58
  void init(size_t concurrency_hint)
 
59
  {
 
60
    iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
 
61
        static_cast<DWORD>((std::min<size_t>)(concurrency_hint, DWORD(~0))));
 
62
    if (!iocp_.handle)
 
63
    {
 
64
      DWORD last_error = ::GetLastError();
 
65
      asio::system_error e(
 
66
          asio::error_code(last_error, asio::native_ecat),
 
67
          "iocp");
 
68
      boost::throw_exception(e);
 
69
    }
 
70
  }
 
71
 
 
72
  // Destroy all user-defined handler objects owned by the service.
 
73
  void shutdown_service()
 
74
  {
 
75
    ::InterlockedExchange(&shutdown_, 1);
 
76
 
 
77
    for (;;)
 
78
    {
 
79
      DWORD bytes_transferred = 0;
 
80
#if (WINVER < 0x0500)
 
81
      DWORD completion_key = 0;
 
82
#else
 
83
      DWORD_PTR completion_key = 0;
 
84
#endif
 
85
      LPOVERLAPPED overlapped = 0;
 
86
      ::SetLastError(0);
 
87
      BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
 
88
          &bytes_transferred, &completion_key, &overlapped, 0);
 
89
      DWORD last_error = ::GetLastError();
 
90
      if (!ok && overlapped == 0 && last_error == WAIT_TIMEOUT)
 
91
        break;
 
92
      if (overlapped)
 
93
        static_cast<operation*>(overlapped)->destroy();
 
94
    }
 
95
  }
 
96
 
 
97
  // Register a handle with the IO completion port.
 
98
  void register_handle(HANDLE handle)
 
99
  {
 
100
    ::CreateIoCompletionPort(handle, iocp_.handle, 0, 0);
 
101
  }
 
102
 
 
103
  // Run the event loop until stopped or no more work.
 
104
  size_t run(asio::error_code& ec)
 
105
  {
 
106
    if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
 
107
    {
 
108
      ec = asio::error_code();
 
109
      return 0;
 
110
    }
 
111
 
 
112
    call_stack<win_iocp_io_service>::context ctx(this);
 
113
 
 
114
    size_t n = 0;
 
115
    while (do_one(true, ec))
 
116
      if (n != (std::numeric_limits<size_t>::max)())
 
117
        ++n;
 
118
    return n;
 
119
  }
 
120
 
 
121
  // Run until stopped or one operation is performed.
 
122
  size_t run_one(asio::error_code& ec)
 
123
  {
 
124
    if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
 
125
    {
 
126
      ec = asio::error_code();
 
127
      return 0;
 
128
    }
 
129
 
 
130
    call_stack<win_iocp_io_service>::context ctx(this);
 
131
 
 
132
    return do_one(true, ec);
 
133
  }
 
134
 
 
135
  // Poll for operations without blocking.
 
136
  size_t poll(asio::error_code& ec)
 
137
  {
 
138
    if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
 
139
    {
 
140
      ec = asio::error_code();
 
141
      return 0;
 
142
    }
 
143
 
 
144
    call_stack<win_iocp_io_service>::context ctx(this);
 
145
 
 
146
    size_t n = 0;
 
147
    while (do_one(false, ec))
 
148
      if (n != (std::numeric_limits<size_t>::max)())
 
149
        ++n;
 
150
    return n;
 
151
  }
 
152
 
 
153
  // Poll for one operation without blocking.
 
154
  size_t poll_one(asio::error_code& ec)
 
155
  {
 
156
    if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
 
157
    {
 
158
      ec = asio::error_code();
 
159
      return 0;
 
160
    }
 
161
 
 
162
    call_stack<win_iocp_io_service>::context ctx(this);
 
163
 
 
164
    return do_one(false, ec);
 
165
  }
 
166
 
 
167
  // Stop the event processing loop.
 
168
  void stop()
 
169
  {
 
170
    if (::InterlockedExchange(&stopped_, 1) == 0)
 
171
    {
 
172
      if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
 
173
      {
 
174
        DWORD last_error = ::GetLastError();
 
175
        asio::system_error e(
 
176
            asio::error_code(last_error, asio::native_ecat),
 
177
            "pqcs");
 
178
        boost::throw_exception(e);
 
179
      }
 
180
    }
 
181
  }
 
182
 
 
183
  // Reset in preparation for a subsequent run invocation.
 
184
  void reset()
 
185
  {
 
186
    ::InterlockedExchange(&stopped_, 0);
 
187
  }
 
188
 
 
189
  // Notify that some work has started.
 
190
  void work_started()
 
191
  {
 
192
    ::InterlockedIncrement(&outstanding_work_);
 
193
  }
 
194
 
 
195
  // Notify that some work has finished.
 
196
  void work_finished()
 
197
  {
 
198
    if (::InterlockedDecrement(&outstanding_work_) == 0)
 
199
      stop();
 
200
  }
 
201
 
 
202
  // Request invocation of the given handler.
 
203
  template <typename Handler>
 
204
  void dispatch(Handler handler)
 
205
  {
 
206
    if (call_stack<win_iocp_io_service>::contains(this))
 
207
      asio_handler_invoke_helpers::invoke(handler, &handler);
 
208
    else
 
209
      post(handler);
 
210
  }
 
211
 
 
212
  // Request invocation of the given handler and return immediately.
 
213
  template <typename Handler>
 
214
  void post(Handler handler)
 
215
  {
 
216
    // If the service has been shut down we silently discard the handler.
 
217
    if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
 
218
      return;
 
219
 
 
220
    // Allocate and construct an operation to wrap the handler.
 
221
    typedef handler_operation<Handler> value_type;
 
222
    typedef handler_alloc_traits<Handler, value_type> alloc_traits;
 
223
    raw_handler_ptr<alloc_traits> raw_ptr(handler);
 
224
    handler_ptr<alloc_traits> ptr(raw_ptr, *this, handler);
 
225
 
 
226
    // Enqueue the operation on the I/O completion port.
 
227
    if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, ptr.get()))
 
228
    {
 
229
      DWORD last_error = ::GetLastError();
 
230
      asio::system_error e(
 
231
          asio::error_code(last_error, asio::native_ecat),
 
232
          "pqcs");
 
233
      boost::throw_exception(e);
 
234
    }
 
235
 
 
236
    // Operation has been successfully posted.
 
237
    ptr.release();
 
238
  }
 
239
 
 
240
  // Request invocation of the given OVERLAPPED-derived operation.
 
241
  void post_completion(win_iocp_operation* op, DWORD op_last_error,
 
242
      DWORD bytes_transferred)
 
243
  {
 
244
    // Enqueue the operation on the I/O completion port.
 
245
    if (!::PostQueuedCompletionStatus(iocp_.handle,
 
246
          bytes_transferred, op_last_error, op))
 
247
    {
 
248
      DWORD last_error = ::GetLastError();
 
249
      asio::system_error e(
 
250
          asio::error_code(last_error, asio::native_ecat),
 
251
          "pqcs");
 
252
      boost::throw_exception(e);
 
253
    }
 
254
  }
 
255
 
 
256
private:
 
257
  // Dequeues at most one operation from the I/O completion port, and then
 
258
  // executes it. Returns the number of operations that were dequeued (i.e.
 
259
  // either 0 or 1).
 
260
  size_t do_one(bool block, asio::error_code& ec)
 
261
  {
 
262
    for (;;)
 
263
    {
 
264
      // Get the next operation from the queue.
 
265
      DWORD bytes_transferred = 0;
 
266
#if (WINVER < 0x0500)
 
267
      DWORD completion_key = 0;
 
268
#else
 
269
      DWORD_PTR completion_key = 0;
 
270
#endif
 
271
      LPOVERLAPPED overlapped = 0;
 
272
      ::SetLastError(0);
 
273
      BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
 
274
          &completion_key, &overlapped, block ? 1000 : 0);
 
275
      DWORD last_error = ::GetLastError();
 
276
 
 
277
      if (!ok && overlapped == 0)
 
278
      {
 
279
        if (block && last_error == WAIT_TIMEOUT)
 
280
          continue;
 
281
        ec = asio::error_code();
 
282
        return 0;
 
283
      }
 
284
 
 
285
      if (overlapped)
 
286
      {
 
287
        // We may have been passed a last_error value in the completion_key.
 
288
        if (last_error == 0)
 
289
        {
 
290
          last_error = completion_key;
 
291
        }
 
292
 
 
293
        // Ensure that the io_service does not exit due to running out of work
 
294
        // while we make the upcall.
 
295
        auto_work work(*this);
 
296
 
 
297
        // Dispatch the operation.
 
298
        operation* op = static_cast<operation*>(overlapped);
 
299
        op->do_completion(last_error, bytes_transferred);
 
300
 
 
301
        ec = asio::error_code();
 
302
        return 1;
 
303
      }
 
304
      else
 
305
      {
 
306
        // The stopped_ flag is always checked to ensure that any leftover
 
307
        // interrupts from a previous run invocation are ignored.
 
308
        if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
 
309
        {
 
310
          // Wake up next thread that is blocked on GetQueuedCompletionStatus.
 
311
          if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
 
312
          {
 
313
            DWORD last_error = ::GetLastError();
 
314
            ec = asio::error_code(last_error,
 
315
                asio::native_ecat);
 
316
            return 0;
 
317
          }
 
318
 
 
319
          ec = asio::error_code();
 
320
          return 0;
 
321
        }
 
322
      }
 
323
    }
 
324
  }
 
325
 
 
326
  struct auto_work
 
327
  {
 
328
    auto_work(win_iocp_io_service& io_service)
 
329
      : io_service_(io_service)
 
330
    {
 
331
      io_service_.work_started();
 
332
    }
 
333
 
 
334
    ~auto_work()
 
335
    {
 
336
      io_service_.work_finished();
 
337
    }
 
338
 
 
339
  private:
 
340
    win_iocp_io_service& io_service_;
 
341
  };
 
342
 
 
343
  template <typename Handler>
 
344
  struct handler_operation
 
345
    : public operation
 
346
  {
 
347
    handler_operation(win_iocp_io_service& io_service,
 
348
        Handler handler)
 
349
      : operation(&handler_operation<Handler>::do_completion_impl,
 
350
          &handler_operation<Handler>::destroy_impl),
 
351
        io_service_(io_service),
 
352
        handler_(handler)
 
353
    {
 
354
      io_service_.work_started();
 
355
    }
 
356
 
 
357
    ~handler_operation()
 
358
    {
 
359
      io_service_.work_finished();
 
360
    }
 
361
 
 
362
  private:
 
363
    // Prevent copying and assignment.
 
364
    handler_operation(const handler_operation&);
 
365
    void operator=(const handler_operation&);
 
366
    
 
367
    static void do_completion_impl(operation* op, DWORD, size_t)
 
368
    {
 
369
      // Take ownership of the operation object.
 
370
      typedef handler_operation<Handler> op_type;
 
371
      op_type* handler_op(static_cast<op_type*>(op));
 
372
      typedef handler_alloc_traits<Handler, op_type> alloc_traits;
 
373
      handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
 
374
 
 
375
      // Make a copy of the handler so that the memory can be deallocated before
 
376
      // the upcall is made.
 
377
      Handler handler(handler_op->handler_);
 
378
 
 
379
      // Free the memory associated with the handler.
 
380
      ptr.reset();
 
381
 
 
382
      // Make the upcall.
 
383
      asio_handler_invoke_helpers::invoke(handler, &handler);
 
384
    }
 
385
 
 
386
    static void destroy_impl(operation* op)
 
387
    {
 
388
      // Take ownership of the operation object.
 
389
      typedef handler_operation<Handler> op_type;
 
390
      op_type* handler_op(static_cast<op_type*>(op));
 
391
      typedef handler_alloc_traits<Handler, op_type> alloc_traits;
 
392
      handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
 
393
    }
 
394
 
 
395
    win_iocp_io_service& io_service_;
 
396
    Handler handler_;
 
397
  };
 
398
 
 
399
  // The IO completion port used for queueing operations.
 
400
  struct iocp_holder
 
401
  {
 
402
    HANDLE handle;
 
403
    iocp_holder() : handle(0) {}
 
404
    ~iocp_holder() { if (handle) ::CloseHandle(handle); }
 
405
  } iocp_;
 
406
 
 
407
  // The count of unfinished work.
 
408
  long outstanding_work_;
 
409
 
 
410
  // Flag to indicate whether the event loop has been stopped.
 
411
  long stopped_;
 
412
 
 
413
  // Flag to indicate whether the service has been shut down.
 
414
  long shutdown_;
 
415
};
 
416
 
 
417
} // namespace detail
 
418
} // namespace asio
 
419
 
 
420
#endif // defined(ASIO_HAS_IOCP)
 
421
 
 
422
#include "asio/detail/pop_options.hpp"
 
423
 
 
424
#endif // ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP