~ubuntu-branches/ubuntu/intrepid/miro/intrepid

« back to all changes in this revision

Viewing changes to portable/libtorrent/include/libtorrent/asio/detail/reactive_socket_service.hpp

  • Committer: Bazaar Package Importer
  • Author(s): Christopher James Halse Rogers
  • Date: 2008-02-09 13:37:10 UTC
  • mfrom: (1.1.2 upstream)
  • Revision ID: james.westby@ubuntu.com-20080209133710-9rs90q6gckvp1b6i
Tags: 1.1.2-0ubuntu1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
//
 
2
// reactive_socket_service.hpp
 
3
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
4
//
 
5
// Copyright (c) 2003-2007 Christopher M. Kohlhoff (chris at kohlhoff dot com)
 
6
//
 
7
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
8
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
9
//
 
10
 
 
11
#ifndef ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
 
12
#define ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
 
13
 
 
14
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
 
15
# pragma once
 
16
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
 
17
 
 
18
#include "asio/detail/push_options.hpp"
 
19
 
 
20
#include "asio/detail/push_options.hpp"
 
21
#include <boost/shared_ptr.hpp>
 
22
#include "asio/detail/pop_options.hpp"
 
23
 
 
24
#include "asio/buffer.hpp"
 
25
#include "asio/error.hpp"
 
26
#include "asio/io_service.hpp"
 
27
#include "asio/socket_base.hpp"
 
28
#include "asio/detail/bind_handler.hpp"
 
29
#include "asio/detail/noncopyable.hpp"
 
30
#include "asio/detail/service_base.hpp"
 
31
#include "asio/detail/socket_holder.hpp"
 
32
#include "asio/detail/socket_ops.hpp"
 
33
#include "asio/detail/socket_types.hpp"
 
34
 
 
35
namespace asio {
 
36
namespace detail {
 
37
 
 
38
template <typename Protocol, typename Reactor>
 
39
class reactive_socket_service
 
40
  : public asio::detail::service_base<
 
41
      reactive_socket_service<Protocol, Reactor> >
 
42
{
 
43
public:
 
44
  // The protocol type.
 
45
  typedef Protocol protocol_type;
 
46
 
 
47
  // The endpoint type.
 
48
  typedef typename Protocol::endpoint endpoint_type;
 
49
 
 
50
  // The native type of a socket.
 
51
  typedef socket_type native_type;
 
52
 
 
53
  // The implementation type of the socket.
 
54
  class implementation_type
 
55
    : private asio::detail::noncopyable
 
56
  {
 
57
  public:
 
58
    // Default constructor.
 
59
    implementation_type()
 
60
      : socket_(invalid_socket),
 
61
        flags_(0),
 
62
        protocol_(endpoint_type().protocol())
 
63
    {
 
64
    }
 
65
 
 
66
  private:
 
67
    // Only this service will have access to the internal values.
 
68
    friend class reactive_socket_service<Protocol, Reactor>;
 
69
 
 
70
    // The native socket representation.
 
71
    socket_type socket_;
 
72
 
 
73
    enum
 
74
    {
 
75
      user_set_non_blocking = 1, // The user wants a non-blocking socket.
 
76
      internal_non_blocking = 2, // The socket has been set non-blocking.
 
77
      enable_connection_aborted = 4, // User wants connection_aborted errors.
 
78
      user_set_linger = 8 // The user set the linger option.
 
79
    };
 
80
 
 
81
    // Flags indicating the current state of the socket.
 
82
    unsigned char flags_;
 
83
 
 
84
    // The protocol associated with the socket.
 
85
    protocol_type protocol_;
 
86
  };
 
87
 
 
88
  // The maximum number of buffers to support in a single operation.
 
89
  enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len };
 
90
 
 
91
  // Constructor.
 
92
  reactive_socket_service(asio::io_service& io_service)
 
93
    : asio::detail::service_base<
 
94
        reactive_socket_service<Protocol, Reactor> >(io_service),
 
95
      reactor_(asio::use_service<Reactor>(io_service))
 
96
  {
 
97
  }
 
98
 
 
99
  // Destroy all user-defined handler objects owned by the service.
 
100
  void shutdown_service()
 
101
  {
 
102
  }
 
103
 
 
104
  // Construct a new socket implementation.
 
105
  void construct(implementation_type& impl)
 
106
  {
 
107
    impl.socket_ = invalid_socket;
 
108
    impl.flags_ = 0;
 
109
  }
 
110
 
 
111
  // Destroy a socket implementation.
 
112
  void destroy(implementation_type& impl)
 
113
  {
 
114
    if (impl.socket_ != invalid_socket)
 
115
    {
 
116
      reactor_.close_descriptor(impl.socket_);
 
117
 
 
118
      if (impl.flags_ & implementation_type::internal_non_blocking)
 
119
      {
 
120
        ioctl_arg_type non_blocking = 0;
 
121
        asio::error_code ignored_ec;
 
122
        socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
 
123
        impl.flags_ &= ~implementation_type::internal_non_blocking;
 
124
      }
 
125
 
 
126
      if (impl.flags_ & implementation_type::user_set_linger)
 
127
      {
 
128
        ::linger opt;
 
129
        opt.l_onoff = 0;
 
130
        opt.l_linger = 0;
 
131
        asio::error_code ignored_ec;
 
132
        socket_ops::setsockopt(impl.socket_,
 
133
            SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec);
 
134
      }
 
135
 
 
136
      asio::error_code ignored_ec;
 
137
      socket_ops::close(impl.socket_, ignored_ec);
 
138
 
 
139
      impl.socket_ = invalid_socket;
 
140
    }
 
141
  }
 
142
 
 
143
  // Open a new socket implementation.
 
144
  asio::error_code open(implementation_type& impl,
 
145
      const protocol_type& protocol, asio::error_code& ec)
 
146
  {
 
147
    if (is_open(impl))
 
148
    {
 
149
      ec = asio::error::already_open;
 
150
      return ec;
 
151
    }
 
152
 
 
153
    socket_holder sock(socket_ops::socket(protocol.family(),
 
154
          protocol.type(), protocol.protocol(), ec));
 
155
    if (sock.get() == invalid_socket)
 
156
      return ec;
 
157
 
 
158
    if (int err = reactor_.register_descriptor(sock.get()))
 
159
    {
 
160
      ec = asio::error_code(err, asio::error::system_category);
 
161
      return ec;
 
162
    }
 
163
 
 
164
    impl.socket_ = sock.release();
 
165
    impl.flags_ = 0;
 
166
    impl.protocol_ = protocol;
 
167
    ec = asio::error_code();
 
168
    return ec;
 
169
  }
 
170
 
 
171
  // Assign a native socket to a socket implementation.
 
172
  asio::error_code assign(implementation_type& impl,
 
173
      const protocol_type& protocol, const native_type& native_socket,
 
174
      asio::error_code& ec)
 
175
  {
 
176
    if (is_open(impl))
 
177
    {
 
178
      ec = asio::error::already_open;
 
179
      return ec;
 
180
    }
 
181
 
 
182
    if (int err = reactor_.register_descriptor(native_socket))
 
183
    {
 
184
      ec = asio::error_code(err, asio::error::system_category);
 
185
      return ec;
 
186
    }
 
187
 
 
188
    impl.socket_ = native_socket;
 
189
    impl.flags_ = 0;
 
190
    impl.protocol_ = protocol;
 
191
    ec = asio::error_code();
 
192
    return ec;
 
193
  }
 
194
 
 
195
  // Determine whether the socket is open.
 
196
  bool is_open(const implementation_type& impl) const
 
197
  {
 
198
    return impl.socket_ != invalid_socket;
 
199
  }
 
200
 
 
201
  // Destroy a socket implementation.
 
202
  asio::error_code close(implementation_type& impl,
 
203
      asio::error_code& ec)
 
204
  {
 
205
    if (is_open(impl))
 
206
    {
 
207
      reactor_.close_descriptor(impl.socket_);
 
208
 
 
209
      if (impl.flags_ & implementation_type::internal_non_blocking)
 
210
      {
 
211
        ioctl_arg_type non_blocking = 0;
 
212
        asio::error_code ignored_ec;
 
213
        socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
 
214
        impl.flags_ &= ~implementation_type::internal_non_blocking;
 
215
      }
 
216
 
 
217
      if (socket_ops::close(impl.socket_, ec) == socket_error_retval)
 
218
        return ec;
 
219
 
 
220
      impl.socket_ = invalid_socket;
 
221
    }
 
222
 
 
223
    ec = asio::error_code();
 
224
    return ec;
 
225
  }
 
226
 
 
227
  // Get the native socket representation.
 
228
  native_type native(implementation_type& impl)
 
229
  {
 
230
    return impl.socket_;
 
231
  }
 
232
 
 
233
  // Cancel all operations associated with the socket.
 
234
  asio::error_code cancel(implementation_type& impl,
 
235
      asio::error_code& ec)
 
236
  {
 
237
    if (!is_open(impl))
 
238
    {
 
239
      ec = asio::error::bad_descriptor;
 
240
      return ec;
 
241
    }
 
242
 
 
243
    reactor_.cancel_ops(impl.socket_);
 
244
    ec = asio::error_code();
 
245
    return ec;
 
246
  }
 
247
 
 
248
  // Determine whether the socket is at the out-of-band data mark.
 
249
  bool at_mark(const implementation_type& impl,
 
250
      asio::error_code& ec) const
 
251
  {
 
252
    if (!is_open(impl))
 
253
    {
 
254
      ec = asio::error::bad_descriptor;
 
255
      return false;
 
256
    }
 
257
 
 
258
    asio::detail::ioctl_arg_type value = 0;
 
259
    socket_ops::ioctl(impl.socket_, SIOCATMARK, &value, ec);
 
260
#if defined(ENOTTY)
 
261
    if (ec.value() == ENOTTY)
 
262
      ec = asio::error::not_socket;
 
263
#endif // defined(ENOTTY)
 
264
    return ec ? false : value != 0;
 
265
  }
 
266
 
 
267
  // Determine the number of bytes available for reading.
 
268
  std::size_t available(const implementation_type& impl,
 
269
      asio::error_code& ec) const
 
270
  {
 
271
    if (!is_open(impl))
 
272
    {
 
273
      ec = asio::error::bad_descriptor;
 
274
      return 0;
 
275
    }
 
276
 
 
277
    asio::detail::ioctl_arg_type value = 0;
 
278
    socket_ops::ioctl(impl.socket_, FIONREAD, &value, ec);
 
279
#if defined(ENOTTY)
 
280
    if (ec.value() == ENOTTY)
 
281
      ec = asio::error::not_socket;
 
282
#endif // defined(ENOTTY)
 
283
    return ec ? static_cast<std::size_t>(0) : static_cast<std::size_t>(value);
 
284
  }
 
285
 
 
286
  // Bind the socket to the specified local endpoint.
 
287
  asio::error_code bind(implementation_type& impl,
 
288
      const endpoint_type& endpoint, asio::error_code& ec)
 
289
  {
 
290
    if (!is_open(impl))
 
291
    {
 
292
      ec = asio::error::bad_descriptor;
 
293
      return ec;
 
294
    }
 
295
 
 
296
    socket_ops::bind(impl.socket_, endpoint.data(), endpoint.size(), ec);
 
297
    return ec;
 
298
  }
 
299
 
 
300
  // Place the socket into the state where it will listen for new connections.
 
301
  asio::error_code listen(implementation_type& impl, int backlog,
 
302
      asio::error_code& ec)
 
303
  {
 
304
    if (!is_open(impl))
 
305
    {
 
306
      ec = asio::error::bad_descriptor;
 
307
      return ec;
 
308
    }
 
309
 
 
310
    socket_ops::listen(impl.socket_, backlog, ec);
 
311
    return ec;
 
312
  }
 
313
 
 
314
  // Set a socket option.
 
315
  template <typename Option>
 
316
  asio::error_code set_option(implementation_type& impl,
 
317
      const Option& option, asio::error_code& ec)
 
318
  {
 
319
    if (!is_open(impl))
 
320
    {
 
321
      ec = asio::error::bad_descriptor;
 
322
      return ec;
 
323
    }
 
324
 
 
325
    if (option.level(impl.protocol_) == custom_socket_option_level
 
326
        && option.name(impl.protocol_) == enable_connection_aborted_option)
 
327
    {
 
328
      if (option.size(impl.protocol_) != sizeof(int))
 
329
      {
 
330
        ec = asio::error::invalid_argument;
 
331
      }
 
332
      else
 
333
      {
 
334
        if (*reinterpret_cast<const int*>(option.data(impl.protocol_)))
 
335
          impl.flags_ |= implementation_type::enable_connection_aborted;
 
336
        else
 
337
          impl.flags_ &= ~implementation_type::enable_connection_aborted;
 
338
        ec = asio::error_code();
 
339
      }
 
340
      return ec;
 
341
    }
 
342
    else
 
343
    {
 
344
      if (option.level(impl.protocol_) == SOL_SOCKET
 
345
          && option.name(impl.protocol_) == SO_LINGER)
 
346
      {
 
347
        impl.flags_ |= implementation_type::user_set_linger;
 
348
      }
 
349
 
 
350
      socket_ops::setsockopt(impl.socket_,
 
351
          option.level(impl.protocol_), option.name(impl.protocol_),
 
352
          option.data(impl.protocol_), option.size(impl.protocol_), ec);
 
353
 
 
354
#if defined(__MACH__) && defined(__APPLE__) \
 
355
|| defined(__NetBSD__) || defined(__FreeBSD__) || defined(__OpenBSD__)
 
356
      // To implement portable behaviour for SO_REUSEADDR with UDP sockets we
 
357
      // need to also set SO_REUSEPORT on BSD-based platforms.
 
358
      if (!ec && impl.protocol_.type() == SOCK_DGRAM
 
359
          && option.level(impl.protocol_) == SOL_SOCKET
 
360
          && option.name(impl.protocol_) == SO_REUSEADDR)
 
361
      {
 
362
        asio::error_code ignored_ec;
 
363
        socket_ops::setsockopt(impl.socket_, SOL_SOCKET, SO_REUSEPORT,
 
364
            option.data(impl.protocol_), option.size(impl.protocol_),
 
365
            ignored_ec);
 
366
      }
 
367
#endif
 
368
 
 
369
      return ec;
 
370
    }
 
371
  }
 
372
 
 
373
  // Set a socket option.
 
374
  template <typename Option>
 
375
  asio::error_code get_option(const implementation_type& impl,
 
376
      Option& option, asio::error_code& ec) const
 
377
  {
 
378
    if (!is_open(impl))
 
379
    {
 
380
      ec = asio::error::bad_descriptor;
 
381
      return ec;
 
382
    }
 
383
 
 
384
    if (option.level(impl.protocol_) == custom_socket_option_level
 
385
        && option.name(impl.protocol_) == enable_connection_aborted_option)
 
386
    {
 
387
      if (option.size(impl.protocol_) != sizeof(int))
 
388
      {
 
389
        ec = asio::error::invalid_argument;
 
390
      }
 
391
      else
 
392
      {
 
393
        int* target = reinterpret_cast<int*>(option.data(impl.protocol_));
 
394
        if (impl.flags_ & implementation_type::enable_connection_aborted)
 
395
          *target = 1;
 
396
        else
 
397
          *target = 0;
 
398
        option.resize(impl.protocol_, sizeof(int));
 
399
        ec = asio::error_code();
 
400
      }
 
401
      return ec;
 
402
    }
 
403
    else
 
404
    {
 
405
      size_t size = option.size(impl.protocol_);
 
406
      socket_ops::getsockopt(impl.socket_,
 
407
          option.level(impl.protocol_), option.name(impl.protocol_),
 
408
          option.data(impl.protocol_), &size, ec);
 
409
      if (!ec)
 
410
        option.resize(impl.protocol_, size);
 
411
      return ec;
 
412
    }
 
413
  }
 
414
 
 
415
  // Perform an IO control command on the socket.
 
416
  template <typename IO_Control_Command>
 
417
  asio::error_code io_control(implementation_type& impl,
 
418
      IO_Control_Command& command, asio::error_code& ec)
 
419
  {
 
420
    if (!is_open(impl))
 
421
    {
 
422
      ec = asio::error::bad_descriptor;
 
423
      return ec;
 
424
    }
 
425
 
 
426
    if (command.name() == static_cast<int>(FIONBIO))
 
427
    {
 
428
      if (command.get())
 
429
        impl.flags_ |= implementation_type::user_set_non_blocking;
 
430
      else
 
431
        impl.flags_ &= ~implementation_type::user_set_non_blocking;
 
432
      ec = asio::error_code();
 
433
    }
 
434
    else
 
435
    {
 
436
      socket_ops::ioctl(impl.socket_, command.name(),
 
437
          static_cast<ioctl_arg_type*>(command.data()), ec);
 
438
    }
 
439
    return ec;
 
440
  }
 
441
 
 
442
  // Get the local endpoint.
 
443
  endpoint_type local_endpoint(const implementation_type& impl,
 
444
      asio::error_code& ec) const
 
445
  {
 
446
    if (!is_open(impl))
 
447
    {
 
448
      ec = asio::error::bad_descriptor;
 
449
      return endpoint_type();
 
450
    }
 
451
 
 
452
    endpoint_type endpoint;
 
453
    std::size_t addr_len = endpoint.capacity();
 
454
    if (socket_ops::getsockname(impl.socket_, endpoint.data(), &addr_len, ec))
 
455
      return endpoint_type();
 
456
    endpoint.resize(addr_len);
 
457
    return endpoint;
 
458
  }
 
459
 
 
460
  // Get the remote endpoint.
 
461
  endpoint_type remote_endpoint(const implementation_type& impl,
 
462
      asio::error_code& ec) const
 
463
  {
 
464
    if (!is_open(impl))
 
465
    {
 
466
      ec = asio::error::bad_descriptor;
 
467
      return endpoint_type();
 
468
    }
 
469
 
 
470
    endpoint_type endpoint;
 
471
    std::size_t addr_len = endpoint.capacity();
 
472
    if (socket_ops::getpeername(impl.socket_, endpoint.data(), &addr_len, ec))
 
473
      return endpoint_type();
 
474
    endpoint.resize(addr_len);
 
475
    return endpoint;
 
476
  }
 
477
 
 
478
  /// Disable sends or receives on the socket.
 
479
  asio::error_code shutdown(implementation_type& impl,
 
480
      socket_base::shutdown_type what, asio::error_code& ec)
 
481
  {
 
482
    if (!is_open(impl))
 
483
    {
 
484
      ec = asio::error::bad_descriptor;
 
485
      return ec;
 
486
    }
 
487
 
 
488
    socket_ops::shutdown(impl.socket_, what, ec);
 
489
    return ec;
 
490
  }
 
491
 
 
492
  // Send the given data to the peer.
 
493
  template <typename ConstBufferSequence>
 
494
  size_t send(implementation_type& impl, const ConstBufferSequence& buffers,
 
495
      socket_base::message_flags flags, asio::error_code& ec)
 
496
  {
 
497
    if (!is_open(impl))
 
498
    {
 
499
      ec = asio::error::bad_descriptor;
 
500
      return 0;
 
501
    }
 
502
 
 
503
    // Copy buffers into array.
 
504
    socket_ops::buf bufs[max_buffers];
 
505
    typename ConstBufferSequence::const_iterator iter = buffers.begin();
 
506
    typename ConstBufferSequence::const_iterator end = buffers.end();
 
507
    size_t i = 0;
 
508
    size_t total_buffer_size = 0;
 
509
    for (; iter != end && i < max_buffers; ++iter, ++i)
 
510
    {
 
511
      asio::const_buffer buffer(*iter);
 
512
      socket_ops::init_buf(bufs[i],
 
513
          asio::buffer_cast<const void*>(buffer),
 
514
          asio::buffer_size(buffer));
 
515
      total_buffer_size += asio::buffer_size(buffer);
 
516
    }
 
517
 
 
518
    // A request to receive 0 bytes on a stream socket is a no-op.
 
519
    if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
 
520
    {
 
521
      ec = asio::error_code();
 
522
      return 0;
 
523
    }
 
524
 
 
525
    // Make socket non-blocking if user wants non-blocking.
 
526
    if (impl.flags_ & implementation_type::user_set_non_blocking)
 
527
    {
 
528
      if (!(impl.flags_ & implementation_type::internal_non_blocking))
 
529
      {
 
530
        ioctl_arg_type non_blocking = 1;
 
531
        if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
 
532
          return 0;
 
533
        impl.flags_ |= implementation_type::internal_non_blocking;
 
534
      }
 
535
    }
 
536
 
 
537
    // Send the data.
 
538
    for (;;)
 
539
    {
 
540
      // Try to complete the operation without blocking.
 
541
      int bytes_sent = socket_ops::send(impl.socket_, bufs, i, flags, ec);
 
542
 
 
543
      // Check if operation succeeded.
 
544
      if (bytes_sent >= 0)
 
545
        return bytes_sent;
 
546
 
 
547
      // Operation failed.
 
548
      if ((impl.flags_ & implementation_type::user_set_non_blocking)
 
549
          || (ec != asio::error::would_block
 
550
            && ec != asio::error::try_again))
 
551
        return 0;
 
552
 
 
553
      // Wait for socket to become ready.
 
554
      if (socket_ops::poll_write(impl.socket_, ec) < 0)
 
555
        return 0;
 
556
    }
 
557
  }
 
558
 
 
559
  template <typename ConstBufferSequence, typename Handler>
 
560
  class send_handler
 
561
  {
 
562
  public:
 
563
    send_handler(socket_type socket, asio::io_service& io_service,
 
564
        const ConstBufferSequence& buffers, socket_base::message_flags flags,
 
565
        Handler handler)
 
566
      : socket_(socket),
 
567
        io_service_(io_service),
 
568
        work_(io_service),
 
569
        buffers_(buffers),
 
570
        flags_(flags),
 
571
        handler_(handler)
 
572
    {
 
573
    }
 
574
 
 
575
    bool operator()(const asio::error_code& result)
 
576
    {
 
577
      // Check whether the operation was successful.
 
578
      if (result)
 
579
      {
 
580
        io_service_.post(bind_handler(handler_, result, 0));
 
581
        return true;
 
582
      }
 
583
 
 
584
      // Copy buffers into array.
 
585
      socket_ops::buf bufs[max_buffers];
 
586
      typename ConstBufferSequence::const_iterator iter = buffers_.begin();
 
587
      typename ConstBufferSequence::const_iterator end = buffers_.end();
 
588
      size_t i = 0;
 
589
      for (; iter != end && i < max_buffers; ++iter, ++i)
 
590
      {
 
591
        asio::const_buffer buffer(*iter);
 
592
        socket_ops::init_buf(bufs[i],
 
593
            asio::buffer_cast<const void*>(buffer),
 
594
            asio::buffer_size(buffer));
 
595
      }
 
596
 
 
597
      // Send the data.
 
598
      asio::error_code ec;
 
599
      int bytes = socket_ops::send(socket_, bufs, i, flags_, ec);
 
600
 
 
601
      // Check if we need to run the operation again.
 
602
      if (ec == asio::error::would_block
 
603
          || ec == asio::error::try_again)
 
604
        return false;
 
605
 
 
606
      io_service_.post(bind_handler(handler_, ec, bytes < 0 ? 0 : bytes));
 
607
      return true;
 
608
    }
 
609
 
 
610
  private:
 
611
    socket_type socket_;
 
612
    asio::io_service& io_service_;
 
613
    asio::io_service::work work_;
 
614
    ConstBufferSequence buffers_;
 
615
    socket_base::message_flags flags_;
 
616
    Handler handler_;
 
617
  };
 
618
 
 
619
  // Start an asynchronous send. The data being sent must be valid for the
 
620
  // lifetime of the asynchronous operation.
 
621
  template <typename ConstBufferSequence, typename Handler>
 
622
  void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
 
623
      socket_base::message_flags flags, Handler handler)
 
624
  {
 
625
    if (!is_open(impl))
 
626
    {
 
627
      this->io_service().post(bind_handler(handler,
 
628
            asio::error::bad_descriptor, 0));
 
629
    }
 
630
    else
 
631
    {
 
632
      if (impl.protocol_.type() == SOCK_STREAM)
 
633
      {
 
634
        // Determine total size of buffers.
 
635
        typename ConstBufferSequence::const_iterator iter = buffers.begin();
 
636
        typename ConstBufferSequence::const_iterator end = buffers.end();
 
637
        size_t i = 0;
 
638
        size_t total_buffer_size = 0;
 
639
        for (; iter != end && i < max_buffers; ++iter, ++i)
 
640
        {
 
641
          asio::const_buffer buffer(*iter);
 
642
          total_buffer_size += asio::buffer_size(buffer);
 
643
        }
 
644
 
 
645
        // A request to receive 0 bytes on a stream socket is a no-op.
 
646
        if (total_buffer_size == 0)
 
647
        {
 
648
          this->io_service().post(bind_handler(handler,
 
649
                asio::error_code(), 0));
 
650
          return;
 
651
        }
 
652
      }
 
653
 
 
654
      // Make socket non-blocking.
 
655
      if (!(impl.flags_ & implementation_type::internal_non_blocking))
 
656
      {
 
657
        ioctl_arg_type non_blocking = 1;
 
658
        asio::error_code ec;
 
659
        if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
 
660
        {
 
661
          this->io_service().post(bind_handler(handler, ec, 0));
 
662
          return;
 
663
        }
 
664
        impl.flags_ |= implementation_type::internal_non_blocking;
 
665
      }
 
666
 
 
667
      reactor_.start_write_op(impl.socket_,
 
668
          send_handler<ConstBufferSequence, Handler>(
 
669
            impl.socket_, this->io_service(), buffers, flags, handler));
 
670
    }
 
671
  }
 
672
 
 
673
  // Send a datagram to the specified endpoint. Returns the number of bytes
 
674
  // sent.
 
675
  template <typename ConstBufferSequence>
 
676
  size_t send_to(implementation_type& impl, const ConstBufferSequence& buffers,
 
677
      const endpoint_type& destination, socket_base::message_flags flags,
 
678
      asio::error_code& ec)
 
679
  {
 
680
    if (!is_open(impl))
 
681
    {
 
682
      ec = asio::error::bad_descriptor;
 
683
      return 0;
 
684
    }
 
685
 
 
686
    // Copy buffers into array.
 
687
    socket_ops::buf bufs[max_buffers];
 
688
    typename ConstBufferSequence::const_iterator iter = buffers.begin();
 
689
    typename ConstBufferSequence::const_iterator end = buffers.end();
 
690
    size_t i = 0;
 
691
    for (; iter != end && i < max_buffers; ++iter, ++i)
 
692
    {
 
693
      asio::const_buffer buffer(*iter);
 
694
      socket_ops::init_buf(bufs[i],
 
695
          asio::buffer_cast<const void*>(buffer),
 
696
          asio::buffer_size(buffer));
 
697
    }
 
698
 
 
699
    // Make socket non-blocking if user wants non-blocking.
 
700
    if (impl.flags_ & implementation_type::user_set_non_blocking)
 
701
    {
 
702
      if (!(impl.flags_ & implementation_type::internal_non_blocking))
 
703
      {
 
704
        ioctl_arg_type non_blocking = 1;
 
705
        if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
 
706
          return 0;
 
707
        impl.flags_ |= implementation_type::internal_non_blocking;
 
708
      }
 
709
    }
 
710
 
 
711
    // Send the data.
 
712
    for (;;)
 
713
    {
 
714
      // Try to complete the operation without blocking.
 
715
      int bytes_sent = socket_ops::sendto(impl.socket_, bufs, i, flags,
 
716
          destination.data(), destination.size(), ec);
 
717
 
 
718
      // Check if operation succeeded.
 
719
      if (bytes_sent >= 0)
 
720
        return bytes_sent;
 
721
 
 
722
      // Operation failed.
 
723
      if ((impl.flags_ & implementation_type::user_set_non_blocking)
 
724
          || (ec != asio::error::would_block
 
725
            && ec != asio::error::try_again))
 
726
        return 0;
 
727
 
 
728
      // Wait for socket to become ready.
 
729
      if (socket_ops::poll_write(impl.socket_, ec) < 0)
 
730
        return 0;
 
731
    }
 
732
  }
 
733
 
 
734
  template <typename ConstBufferSequence, typename Handler>
 
735
  class send_to_handler
 
736
  {
 
737
  public:
 
738
    send_to_handler(socket_type socket, asio::io_service& io_service,
 
739
        const ConstBufferSequence& buffers, const endpoint_type& endpoint,
 
740
        socket_base::message_flags flags, Handler handler)
 
741
      : socket_(socket),
 
742
        io_service_(io_service),
 
743
        work_(io_service),
 
744
        buffers_(buffers),
 
745
        destination_(endpoint),
 
746
        flags_(flags),
 
747
        handler_(handler)
 
748
    {
 
749
    }
 
750
 
 
751
    bool operator()(const asio::error_code& result)
 
752
    {
 
753
      // Check whether the operation was successful.
 
754
      if (result)
 
755
      {
 
756
        io_service_.post(bind_handler(handler_, result, 0));
 
757
        return true;
 
758
      }
 
759
 
 
760
      // Copy buffers into array.
 
761
      socket_ops::buf bufs[max_buffers];
 
762
      typename ConstBufferSequence::const_iterator iter = buffers_.begin();
 
763
      typename ConstBufferSequence::const_iterator end = buffers_.end();
 
764
      size_t i = 0;
 
765
      for (; iter != end && i < max_buffers; ++iter, ++i)
 
766
      {
 
767
        asio::const_buffer buffer(*iter);
 
768
        socket_ops::init_buf(bufs[i],
 
769
            asio::buffer_cast<const void*>(buffer),
 
770
            asio::buffer_size(buffer));
 
771
      }
 
772
 
 
773
      // Send the data.
 
774
      asio::error_code ec;
 
775
      int bytes = socket_ops::sendto(socket_, bufs, i, flags_,
 
776
          destination_.data(), destination_.size(), ec);
 
777
 
 
778
      // Check if we need to run the operation again.
 
779
      if (ec == asio::error::would_block
 
780
          || ec == asio::error::try_again)
 
781
        return false;
 
782
 
 
783
      io_service_.post(bind_handler(handler_, ec, bytes < 0 ? 0 : bytes));
 
784
      return true;
 
785
    }
 
786
 
 
787
  private:
 
788
    socket_type socket_;
 
789
    asio::io_service& io_service_;
 
790
    asio::io_service::work work_;
 
791
    ConstBufferSequence buffers_;
 
792
    endpoint_type destination_;
 
793
    socket_base::message_flags flags_;
 
794
    Handler handler_;
 
795
  };
 
796
 
 
797
  // Start an asynchronous send. The data being sent must be valid for the
 
798
  // lifetime of the asynchronous operation.
 
799
  template <typename ConstBufferSequence, typename Handler>
 
800
  void async_send_to(implementation_type& impl,
 
801
      const ConstBufferSequence& buffers,
 
802
      const endpoint_type& destination, socket_base::message_flags flags,
 
803
      Handler handler)
 
804
  {
 
805
    if (!is_open(impl))
 
806
    {
 
807
      this->io_service().post(bind_handler(handler,
 
808
            asio::error::bad_descriptor, 0));
 
809
    }
 
810
    else
 
811
    {
 
812
      // Make socket non-blocking.
 
813
      if (!(impl.flags_ & implementation_type::internal_non_blocking))
 
814
      {
 
815
        ioctl_arg_type non_blocking = 1;
 
816
        asio::error_code ec;
 
817
        if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
 
818
        {
 
819
          this->io_service().post(bind_handler(handler, ec, 0));
 
820
          return;
 
821
        }
 
822
        impl.flags_ |= implementation_type::internal_non_blocking;
 
823
      }
 
824
 
 
825
      reactor_.start_write_op(impl.socket_,
 
826
          send_to_handler<ConstBufferSequence, Handler>(
 
827
            impl.socket_, this->io_service(), buffers,
 
828
            destination, flags, handler));
 
829
    }
 
830
  }
 
831
 
 
832
  // Receive some data from the peer. Returns the number of bytes received.
 
833
  template <typename MutableBufferSequence>
 
834
  size_t receive(implementation_type& impl,
 
835
      const MutableBufferSequence& buffers,
 
836
      socket_base::message_flags flags, asio::error_code& ec)
 
837
  {
 
838
    if (!is_open(impl))
 
839
    {
 
840
      ec = asio::error::bad_descriptor;
 
841
      return 0;
 
842
    }
 
843
 
 
844
    // Copy buffers into array.
 
845
    socket_ops::buf bufs[max_buffers];
 
846
    typename MutableBufferSequence::const_iterator iter = buffers.begin();
 
847
    typename MutableBufferSequence::const_iterator end = buffers.end();
 
848
    size_t i = 0;
 
849
    size_t total_buffer_size = 0;
 
850
    for (; iter != end && i < max_buffers; ++iter, ++i)
 
851
    {
 
852
      asio::mutable_buffer buffer(*iter);
 
853
      socket_ops::init_buf(bufs[i],
 
854
          asio::buffer_cast<void*>(buffer),
 
855
          asio::buffer_size(buffer));
 
856
      total_buffer_size += asio::buffer_size(buffer);
 
857
    }
 
858
 
 
859
    // A request to receive 0 bytes on a stream socket is a no-op.
 
860
    if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
 
861
    {
 
862
      ec = asio::error_code();
 
863
      return 0;
 
864
    }
 
865
 
 
866
    // Make socket non-blocking if user wants non-blocking.
 
867
    if (impl.flags_ & implementation_type::user_set_non_blocking)
 
868
    {
 
869
      if (!(impl.flags_ & implementation_type::internal_non_blocking))
 
870
      {
 
871
        ioctl_arg_type non_blocking = 1;
 
872
        if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
 
873
          return 0;
 
874
        impl.flags_ |= implementation_type::internal_non_blocking;
 
875
      }
 
876
    }
 
877
 
 
878
    // Receive some data.
 
879
    for (;;)
 
880
    {
 
881
      // Try to complete the operation without blocking.
 
882
      int bytes_recvd = socket_ops::recv(impl.socket_, bufs, i, flags, ec);
 
883
 
 
884
      // Check if operation succeeded.
 
885
      if (bytes_recvd > 0)
 
886
        return bytes_recvd;
 
887
 
 
888
      // Check for EOF.
 
889
      if (bytes_recvd == 0)
 
890
      {
 
891
        ec = asio::error::eof;
 
892
        return 0;
 
893
      }
 
894
 
 
895
      // Operation failed.
 
896
      if ((impl.flags_ & implementation_type::user_set_non_blocking)
 
897
          || (ec != asio::error::would_block
 
898
            && ec != asio::error::try_again))
 
899
        return 0;
 
900
 
 
901
      // Wait for socket to become ready.
 
902
      if (socket_ops::poll_read(impl.socket_, ec) < 0)
 
903
        return 0;
 
904
    }
 
905
  }
 
906
 
 
907
  template <typename MutableBufferSequence, typename Handler>
 
908
  class receive_handler
 
909
  {
 
910
  public:
 
911
    receive_handler(socket_type socket, asio::io_service& io_service,
 
912
        const MutableBufferSequence& buffers, socket_base::message_flags flags,
 
913
        Handler handler)
 
914
      : socket_(socket),
 
915
        io_service_(io_service),
 
916
        work_(io_service),
 
917
        buffers_(buffers),
 
918
        flags_(flags),
 
919
        handler_(handler)
 
920
    {
 
921
    }
 
922
 
 
923
    bool operator()(const asio::error_code& result)
 
924
    {
 
925
      // Check whether the operation was successful.
 
926
      if (result)
 
927
      {
 
928
        io_service_.post(bind_handler(handler_, result, 0));
 
929
        return true;
 
930
      }
 
931
 
 
932
      // Copy buffers into array.
 
933
      socket_ops::buf bufs[max_buffers];
 
934
      typename MutableBufferSequence::const_iterator iter = buffers_.begin();
 
935
      typename MutableBufferSequence::const_iterator end = buffers_.end();
 
936
      size_t i = 0;
 
937
      for (; iter != end && i < max_buffers; ++iter, ++i)
 
938
      {
 
939
        asio::mutable_buffer buffer(*iter);
 
940
        socket_ops::init_buf(bufs[i],
 
941
            asio::buffer_cast<void*>(buffer),
 
942
            asio::buffer_size(buffer));
 
943
      }
 
944
 
 
945
      // Receive some data.
 
946
      asio::error_code ec;
 
947
      int bytes = socket_ops::recv(socket_, bufs, i, flags_, ec);
 
948
      if (bytes == 0)
 
949
        ec = asio::error::eof;
 
950
 
 
951
      // Check if we need to run the operation again.
 
952
      if (ec == asio::error::would_block
 
953
          || ec == asio::error::try_again)
 
954
        return false;
 
955
 
 
956
      io_service_.post(bind_handler(handler_, ec, bytes < 0 ? 0 : bytes));
 
957
      return true;
 
958
    }
 
959
 
 
960
  private:
 
961
    socket_type socket_;
 
962
    asio::io_service& io_service_;
 
963
    asio::io_service::work work_;
 
964
    MutableBufferSequence buffers_;
 
965
    socket_base::message_flags flags_;
 
966
    Handler handler_;
 
967
  };
 
968
 
 
969
  // Start an asynchronous receive. The buffer for the data being received
 
970
  // must be valid for the lifetime of the asynchronous operation.
 
971
  template <typename MutableBufferSequence, typename Handler>
 
972
  void async_receive(implementation_type& impl,
 
973
      const MutableBufferSequence& buffers,
 
974
      socket_base::message_flags flags, Handler handler)
 
975
  {
 
976
    if (!is_open(impl))
 
977
    {
 
978
      this->io_service().post(bind_handler(handler,
 
979
            asio::error::bad_descriptor, 0));
 
980
    }
 
981
    else
 
982
    {
 
983
      if (impl.protocol_.type() == SOCK_STREAM)
 
984
      {
 
985
        // Determine total size of buffers.
 
986
        typename MutableBufferSequence::const_iterator iter = buffers.begin();
 
987
        typename MutableBufferSequence::const_iterator end = buffers.end();
 
988
        size_t i = 0;
 
989
        size_t total_buffer_size = 0;
 
990
        for (; iter != end && i < max_buffers; ++iter, ++i)
 
991
        {
 
992
          asio::mutable_buffer buffer(*iter);
 
993
          total_buffer_size += asio::buffer_size(buffer);
 
994
        }
 
995
 
 
996
        // A request to receive 0 bytes on a stream socket is a no-op.
 
997
        if (total_buffer_size == 0)
 
998
        {
 
999
          this->io_service().post(bind_handler(handler,
 
1000
                asio::error_code(), 0));
 
1001
          return;
 
1002
        }
 
1003
      }
 
1004
 
 
1005
      // Make socket non-blocking.
 
1006
      if (!(impl.flags_ & implementation_type::internal_non_blocking))
 
1007
      {
 
1008
        ioctl_arg_type non_blocking = 1;
 
1009
        asio::error_code ec;
 
1010
        if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
 
1011
        {
 
1012
          this->io_service().post(bind_handler(handler, ec, 0));
 
1013
          return;
 
1014
        }
 
1015
        impl.flags_ |= implementation_type::internal_non_blocking;
 
1016
      }
 
1017
 
 
1018
      if (flags & socket_base::message_out_of_band)
 
1019
      {
 
1020
        reactor_.start_except_op(impl.socket_,
 
1021
            receive_handler<MutableBufferSequence, Handler>(
 
1022
              impl.socket_, this->io_service(), buffers, flags, handler));
 
1023
      }
 
1024
      else
 
1025
      {
 
1026
        reactor_.start_read_op(impl.socket_,
 
1027
            receive_handler<MutableBufferSequence, Handler>(
 
1028
              impl.socket_, this->io_service(), buffers, flags, handler));
 
1029
      }
 
1030
    }
 
1031
  }
 
1032
 
 
1033
  // Receive a datagram with the endpoint of the sender. Returns the number of
 
1034
  // bytes received.
 
1035
  template <typename MutableBufferSequence>
 
1036
  size_t receive_from(implementation_type& impl,
 
1037
      const MutableBufferSequence& buffers,
 
1038
      endpoint_type& sender_endpoint, socket_base::message_flags flags,
 
1039
      asio::error_code& ec)
 
1040
  {
 
1041
    if (!is_open(impl))
 
1042
    {
 
1043
      ec = asio::error::bad_descriptor;
 
1044
      return 0;
 
1045
    }
 
1046
 
 
1047
    // Copy buffers into array.
 
1048
    socket_ops::buf bufs[max_buffers];
 
1049
    typename MutableBufferSequence::const_iterator iter = buffers.begin();
 
1050
    typename MutableBufferSequence::const_iterator end = buffers.end();
 
1051
    size_t i = 0;
 
1052
    for (; iter != end && i < max_buffers; ++iter, ++i)
 
1053
    {
 
1054
      asio::mutable_buffer buffer(*iter);
 
1055
      socket_ops::init_buf(bufs[i],
 
1056
          asio::buffer_cast<void*>(buffer),
 
1057
          asio::buffer_size(buffer));
 
1058
    }
 
1059
 
 
1060
    // Make socket non-blocking if user wants non-blocking.
 
1061
    if (impl.flags_ & implementation_type::user_set_non_blocking)
 
1062
    {
 
1063
      if (!(impl.flags_ & implementation_type::internal_non_blocking))
 
1064
      {
 
1065
        ioctl_arg_type non_blocking = 1;
 
1066
        if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
 
1067
          return 0;
 
1068
        impl.flags_ |= implementation_type::internal_non_blocking;
 
1069
      }
 
1070
    }
 
1071
 
 
1072
    // Receive some data.
 
1073
    for (;;)
 
1074
    {
 
1075
      // Try to complete the operation without blocking.
 
1076
      std::size_t addr_len = sender_endpoint.capacity();
 
1077
      int bytes_recvd = socket_ops::recvfrom(impl.socket_, bufs, i, flags,
 
1078
          sender_endpoint.data(), &addr_len, ec);
 
1079
 
 
1080
      // Check if operation succeeded.
 
1081
      if (bytes_recvd > 0)
 
1082
      {
 
1083
        sender_endpoint.resize(addr_len);
 
1084
        return bytes_recvd;
 
1085
      }
 
1086
 
 
1087
      // Check for EOF.
 
1088
      if (bytes_recvd == 0)
 
1089
      {
 
1090
        ec = asio::error::eof;
 
1091
        return 0;
 
1092
      }
 
1093
 
 
1094
      // Operation failed.
 
1095
      if ((impl.flags_ & implementation_type::user_set_non_blocking)
 
1096
          || (ec != asio::error::would_block
 
1097
            && ec != asio::error::try_again))
 
1098
        return 0;
 
1099
 
 
1100
      // Wait for socket to become ready.
 
1101
      if (socket_ops::poll_read(impl.socket_, ec) < 0)
 
1102
        return 0;
 
1103
    }
 
1104
  }
 
1105
 
 
1106
  template <typename MutableBufferSequence, typename Handler>
 
1107
  class receive_from_handler
 
1108
  {
 
1109
  public:
 
1110
    receive_from_handler(socket_type socket,
 
1111
        asio::io_service& io_service,
 
1112
        const MutableBufferSequence& buffers, endpoint_type& endpoint,
 
1113
        socket_base::message_flags flags, Handler handler)
 
1114
      : socket_(socket),
 
1115
        io_service_(io_service),
 
1116
        work_(io_service),
 
1117
        buffers_(buffers),
 
1118
        sender_endpoint_(endpoint),
 
1119
        flags_(flags),
 
1120
        handler_(handler)
 
1121
    {
 
1122
    }
 
1123
 
 
1124
    bool operator()(const asio::error_code& result)
 
1125
    {
 
1126
      // Check whether the operation was successful.
 
1127
      if (result)
 
1128
      {
 
1129
        io_service_.post(bind_handler(handler_, result, 0));
 
1130
        return true;
 
1131
      }
 
1132
 
 
1133
      // Copy buffers into array.
 
1134
      socket_ops::buf bufs[max_buffers];
 
1135
      typename MutableBufferSequence::const_iterator iter = buffers_.begin();
 
1136
      typename MutableBufferSequence::const_iterator end = buffers_.end();
 
1137
      size_t i = 0;
 
1138
      for (; iter != end && i < max_buffers; ++iter, ++i)
 
1139
      {
 
1140
        asio::mutable_buffer buffer(*iter);
 
1141
        socket_ops::init_buf(bufs[i],
 
1142
            asio::buffer_cast<void*>(buffer),
 
1143
            asio::buffer_size(buffer));
 
1144
      }
 
1145
 
 
1146
      // Receive some data.
 
1147
      std::size_t addr_len = sender_endpoint_.capacity();
 
1148
      asio::error_code ec;
 
1149
      int bytes = socket_ops::recvfrom(socket_, bufs, i, flags_,
 
1150
          sender_endpoint_.data(), &addr_len, ec);
 
1151
      if (bytes == 0)
 
1152
        ec = asio::error::eof;
 
1153
 
 
1154
      // Check if we need to run the operation again.
 
1155
      if (ec == asio::error::would_block
 
1156
          || ec == asio::error::try_again)
 
1157
        return false;
 
1158
 
 
1159
      sender_endpoint_.resize(addr_len);
 
1160
      io_service_.post(bind_handler(handler_, ec, bytes < 0 ? 0 : bytes));
 
1161
      return true;
 
1162
    }
 
1163
 
 
1164
  private:
 
1165
    socket_type socket_;
 
1166
    asio::io_service& io_service_;
 
1167
    asio::io_service::work work_;
 
1168
    MutableBufferSequence buffers_;
 
1169
    endpoint_type& sender_endpoint_;
 
1170
    socket_base::message_flags flags_;
 
1171
    Handler handler_;
 
1172
  };
 
1173
 
 
1174
  // Start an asynchronous receive. The buffer for the data being received and
 
1175
  // the sender_endpoint object must both be valid for the lifetime of the
 
1176
  // asynchronous operation.
 
1177
  template <typename MutableBufferSequence, typename Handler>
 
1178
  void async_receive_from(implementation_type& impl,
 
1179
      const MutableBufferSequence& buffers, endpoint_type& sender_endpoint,
 
1180
      socket_base::message_flags flags, Handler handler)
 
1181
  {
 
1182
    if (!is_open(impl))
 
1183
    {
 
1184
      this->io_service().post(bind_handler(handler,
 
1185
            asio::error::bad_descriptor, 0));
 
1186
    }
 
1187
    else
 
1188
    {
 
1189
      // Make socket non-blocking.
 
1190
      if (!(impl.flags_ & implementation_type::internal_non_blocking))
 
1191
      {
 
1192
        ioctl_arg_type non_blocking = 1;
 
1193
        asio::error_code ec;
 
1194
        if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
 
1195
        {
 
1196
          this->io_service().post(bind_handler(handler, ec, 0));
 
1197
          return;
 
1198
        }
 
1199
        impl.flags_ |= implementation_type::internal_non_blocking;
 
1200
      }
 
1201
 
 
1202
      reactor_.start_read_op(impl.socket_,
 
1203
          receive_from_handler<MutableBufferSequence, Handler>(
 
1204
            impl.socket_, this->io_service(), buffers,
 
1205
            sender_endpoint, flags, handler));
 
1206
    }
 
1207
  }
 
1208
 
 
1209
  // Accept a new connection.
 
1210
  template <typename Socket>
 
1211
  asio::error_code accept(implementation_type& impl,
 
1212
      Socket& peer, endpoint_type* peer_endpoint, asio::error_code& ec)
 
1213
  {
 
1214
    if (!is_open(impl))
 
1215
    {
 
1216
      ec = asio::error::bad_descriptor;
 
1217
      return ec;
 
1218
    }
 
1219
 
 
1220
    // We cannot accept a socket that is already open.
 
1221
    if (peer.is_open())
 
1222
    {
 
1223
      ec = asio::error::already_open;
 
1224
      return ec;
 
1225
    }
 
1226
 
 
1227
    // Make socket non-blocking if user wants non-blocking.
 
1228
    if (impl.flags_ & implementation_type::user_set_non_blocking)
 
1229
    {
 
1230
      if (!(impl.flags_ & implementation_type::internal_non_blocking))
 
1231
      {
 
1232
        ioctl_arg_type non_blocking = 1;
 
1233
        if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
 
1234
          return ec;
 
1235
        impl.flags_ |= implementation_type::internal_non_blocking;
 
1236
      }
 
1237
    }
 
1238
 
 
1239
    // Accept a socket.
 
1240
    for (;;)
 
1241
    {
 
1242
      // Try to complete the operation without blocking.
 
1243
      asio::error_code ec;
 
1244
      socket_holder new_socket;
 
1245
      std::size_t addr_len = 0;
 
1246
      if (peer_endpoint)
 
1247
      {
 
1248
        addr_len = peer_endpoint->capacity();
 
1249
        new_socket.reset(socket_ops::accept(impl.socket_,
 
1250
              peer_endpoint->data(), &addr_len, ec));
 
1251
      }
 
1252
      else
 
1253
      {
 
1254
        new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec));
 
1255
      }
 
1256
 
 
1257
      // Check if operation succeeded.
 
1258
      if (new_socket.get() >= 0)
 
1259
      {
 
1260
        if (peer_endpoint)
 
1261
          peer_endpoint->resize(addr_len);
 
1262
        peer.assign(impl.protocol_, new_socket.get(), ec);
 
1263
        if (!ec)
 
1264
          new_socket.release();
 
1265
        return ec;
 
1266
      }
 
1267
 
 
1268
      // Operation failed.
 
1269
      if (ec == asio::error::would_block
 
1270
          || ec == asio::error::try_again)
 
1271
      {
 
1272
        if (impl.flags_ & implementation_type::user_set_non_blocking)
 
1273
          return ec;
 
1274
        // Fall through to retry operation.
 
1275
      }
 
1276
      else if (ec == asio::error::connection_aborted)
 
1277
      {
 
1278
        if (impl.flags_ & implementation_type::enable_connection_aborted)
 
1279
          return ec;
 
1280
        // Fall through to retry operation.
 
1281
      }
 
1282
#if defined(EPROTO)
 
1283
      else if (ec.value() == EPROTO)
 
1284
      {
 
1285
        if (impl.flags_ & implementation_type::enable_connection_aborted)
 
1286
          return ec;
 
1287
        // Fall through to retry operation.
 
1288
      }
 
1289
#endif // defined(EPROTO)
 
1290
      else
 
1291
        return ec;
 
1292
 
 
1293
      // Wait for socket to become ready.
 
1294
      if (socket_ops::poll_read(impl.socket_, ec) < 0)
 
1295
        return ec;
 
1296
    }
 
1297
  }
 
1298
 
 
1299
  template <typename Socket, typename Handler>
 
1300
  class accept_handler
 
1301
  {
 
1302
  public:
 
1303
    accept_handler(socket_type socket, asio::io_service& io_service,
 
1304
        Socket& peer, const protocol_type& protocol,
 
1305
        endpoint_type* peer_endpoint, bool enable_connection_aborted,
 
1306
        Handler handler)
 
1307
      : socket_(socket),
 
1308
        io_service_(io_service),
 
1309
        work_(io_service),
 
1310
        peer_(peer),
 
1311
        protocol_(protocol),
 
1312
        peer_endpoint_(peer_endpoint),
 
1313
        enable_connection_aborted_(enable_connection_aborted),
 
1314
        handler_(handler)
 
1315
    {
 
1316
    }
 
1317
 
 
1318
    bool operator()(const asio::error_code& result)
 
1319
    {
 
1320
      // Check whether the operation was successful.
 
1321
      if (result)
 
1322
      {
 
1323
        io_service_.post(bind_handler(handler_, result));
 
1324
        return true;
 
1325
      }
 
1326
 
 
1327
      // Accept the waiting connection.
 
1328
      asio::error_code ec;
 
1329
      socket_holder new_socket;
 
1330
      std::size_t addr_len = 0;
 
1331
      if (peer_endpoint_)
 
1332
      {
 
1333
        addr_len = peer_endpoint_->capacity();
 
1334
        new_socket.reset(socket_ops::accept(socket_,
 
1335
              peer_endpoint_->data(), &addr_len, ec));
 
1336
      }
 
1337
      else
 
1338
      {
 
1339
        new_socket.reset(socket_ops::accept(socket_, 0, 0, ec));
 
1340
      }
 
1341
 
 
1342
      // Check if we need to run the operation again.
 
1343
      if (ec == asio::error::would_block
 
1344
          || ec == asio::error::try_again)
 
1345
        return false;
 
1346
      if (ec == asio::error::connection_aborted
 
1347
          && !enable_connection_aborted_)
 
1348
        return false;
 
1349
#if defined(EPROTO)
 
1350
      if (ec.value() == EPROTO && !enable_connection_aborted_)
 
1351
        return false;
 
1352
#endif // defined(EPROTO)
 
1353
 
 
1354
      // Transfer ownership of the new socket to the peer object.
 
1355
      if (!ec)
 
1356
      {
 
1357
        if (peer_endpoint_)
 
1358
          peer_endpoint_->resize(addr_len);
 
1359
        peer_.assign(protocol_, new_socket.get(), ec);
 
1360
        if (!ec)
 
1361
          new_socket.release();
 
1362
      }
 
1363
 
 
1364
      io_service_.post(bind_handler(handler_, ec));
 
1365
      return true;
 
1366
    }
 
1367
 
 
1368
  private:
 
1369
    socket_type socket_;
 
1370
    asio::io_service& io_service_;
 
1371
    asio::io_service::work work_;
 
1372
    Socket& peer_;
 
1373
    protocol_type protocol_;
 
1374
    endpoint_type* peer_endpoint_;
 
1375
    bool enable_connection_aborted_;
 
1376
    Handler handler_;
 
1377
  };
 
1378
 
 
1379
  // Start an asynchronous accept. The peer and peer_endpoint objects
 
1380
  // must be valid until the accept's handler is invoked.
 
1381
  template <typename Socket, typename Handler>
 
1382
  void async_accept(implementation_type& impl, Socket& peer,
 
1383
      endpoint_type* peer_endpoint, Handler handler)
 
1384
  {
 
1385
    if (!is_open(impl))
 
1386
    {
 
1387
      this->io_service().post(bind_handler(handler,
 
1388
            asio::error::bad_descriptor));
 
1389
    }
 
1390
    else if (peer.is_open())
 
1391
    {
 
1392
      this->io_service().post(bind_handler(handler,
 
1393
            asio::error::already_open));
 
1394
    }
 
1395
    else
 
1396
    {
 
1397
      // Make socket non-blocking.
 
1398
      if (!(impl.flags_ & implementation_type::internal_non_blocking))
 
1399
      {
 
1400
        ioctl_arg_type non_blocking = 1;
 
1401
        asio::error_code ec;
 
1402
        if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
 
1403
        {
 
1404
          this->io_service().post(bind_handler(handler, ec));
 
1405
          return;
 
1406
        }
 
1407
        impl.flags_ |= implementation_type::internal_non_blocking;
 
1408
      }
 
1409
 
 
1410
      reactor_.start_read_op(impl.socket_,
 
1411
          accept_handler<Socket, Handler>(
 
1412
            impl.socket_, this->io_service(),
 
1413
            peer, impl.protocol_, peer_endpoint,
 
1414
            (impl.flags_ & implementation_type::enable_connection_aborted) != 0,
 
1415
            handler));
 
1416
    }
 
1417
  }
 
1418
 
 
1419
  // Connect the socket to the specified endpoint.
 
1420
  asio::error_code connect(implementation_type& impl,
 
1421
      const endpoint_type& peer_endpoint, asio::error_code& ec)
 
1422
  {
 
1423
    if (!is_open(impl))
 
1424
    {
 
1425
      ec = asio::error::bad_descriptor;
 
1426
      return ec;
 
1427
    }
 
1428
 
 
1429
    if (impl.flags_ & implementation_type::internal_non_blocking)
 
1430
    {
 
1431
      // Mark the socket as blocking while we perform the connect.
 
1432
      ioctl_arg_type non_blocking = 0;
 
1433
      if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
 
1434
        return ec;
 
1435
      impl.flags_ &= ~implementation_type::internal_non_blocking;
 
1436
    }
 
1437
 
 
1438
    // Perform the connect operation.
 
1439
    socket_ops::connect(impl.socket_,
 
1440
        peer_endpoint.data(), peer_endpoint.size(), ec);
 
1441
    return ec;
 
1442
  }
 
1443
 
 
1444
  template <typename Handler>
 
1445
  class connect_handler
 
1446
  {
 
1447
  public:
 
1448
    connect_handler(socket_type socket, boost::shared_ptr<bool> completed,
 
1449
        asio::io_service& io_service, Reactor& reactor, Handler handler)
 
1450
      : socket_(socket),
 
1451
        completed_(completed),
 
1452
        io_service_(io_service),
 
1453
        work_(io_service),
 
1454
        reactor_(reactor),
 
1455
        handler_(handler)
 
1456
    {
 
1457
    }
 
1458
 
 
1459
    bool operator()(const asio::error_code& result)
 
1460
    {
 
1461
      // Check whether a handler has already been called for the connection.
 
1462
      // If it has, then we don't want to do anything in this handler.
 
1463
      if (*completed_)
 
1464
        return true;
 
1465
 
 
1466
      // Cancel the other reactor operation for the connection.
 
1467
      *completed_ = true;
 
1468
      reactor_.enqueue_cancel_ops_unlocked(socket_);
 
1469
 
 
1470
      // Check whether the operation was successful.
 
1471
      if (result)
 
1472
      {
 
1473
        io_service_.post(bind_handler(handler_, result));
 
1474
        return true;
 
1475
      }
 
1476
 
 
1477
      // Get the error code from the connect operation.
 
1478
      int connect_error = 0;
 
1479
      size_t connect_error_len = sizeof(connect_error);
 
1480
      asio::error_code ec;
 
1481
      if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR,
 
1482
            &connect_error, &connect_error_len, ec) == socket_error_retval)
 
1483
      {
 
1484
        io_service_.post(bind_handler(handler_, ec));
 
1485
        return true;
 
1486
      }
 
1487
 
 
1488
      // If connection failed then post the handler with the error code.
 
1489
      if (connect_error)
 
1490
      {
 
1491
        ec = asio::error_code(connect_error,
 
1492
            asio::error::system_category);
 
1493
        io_service_.post(bind_handler(handler_, ec));
 
1494
        return true;
 
1495
      }
 
1496
 
 
1497
      // Post the result of the successful connection operation.
 
1498
      io_service_.post(bind_handler(handler_, ec));
 
1499
      return true;
 
1500
    }
 
1501
 
 
1502
  private:
 
1503
    socket_type socket_;
 
1504
    boost::shared_ptr<bool> completed_;
 
1505
    asio::io_service& io_service_;
 
1506
    asio::io_service::work work_;
 
1507
    Reactor& reactor_;
 
1508
    Handler handler_;
 
1509
  };
 
1510
 
 
1511
  // Start an asynchronous connect.
 
1512
  template <typename Handler>
 
1513
  void async_connect(implementation_type& impl,
 
1514
      const endpoint_type& peer_endpoint, Handler handler)
 
1515
  {
 
1516
    if (!is_open(impl))
 
1517
    {
 
1518
      this->io_service().post(bind_handler(handler,
 
1519
            asio::error::bad_descriptor));
 
1520
      return;
 
1521
    }
 
1522
 
 
1523
    // Make socket non-blocking.
 
1524
    if (!(impl.flags_ & implementation_type::internal_non_blocking))
 
1525
    {
 
1526
      ioctl_arg_type non_blocking = 1;
 
1527
      asio::error_code ec;
 
1528
      if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
 
1529
      {
 
1530
        this->io_service().post(bind_handler(handler, ec));
 
1531
        return;
 
1532
      }
 
1533
      impl.flags_ |= implementation_type::internal_non_blocking;
 
1534
    }
 
1535
 
 
1536
    // Start the connect operation. The socket is already marked as non-blocking
 
1537
    // so the connection will take place asynchronously.
 
1538
    asio::error_code ec;
 
1539
    if (socket_ops::connect(impl.socket_, peer_endpoint.data(),
 
1540
          peer_endpoint.size(), ec) == 0)
 
1541
    {
 
1542
      // The connect operation has finished successfully so we need to post the
 
1543
      // handler immediately.
 
1544
      this->io_service().post(bind_handler(handler,
 
1545
            asio::error_code()));
 
1546
    }
 
1547
    else if (ec == asio::error::in_progress
 
1548
        || ec == asio::error::would_block)
 
1549
    {
 
1550
      // The connection is happening in the background, and we need to wait
 
1551
      // until the socket becomes writeable.
 
1552
      boost::shared_ptr<bool> completed(new bool(false));
 
1553
      reactor_.start_write_and_except_ops(impl.socket_,
 
1554
          connect_handler<Handler>(
 
1555
            impl.socket_, completed, this->io_service(), reactor_, handler));
 
1556
    }
 
1557
    else
 
1558
    {
 
1559
      // The connect operation has failed, so post the handler immediately.
 
1560
      this->io_service().post(bind_handler(handler, ec));
 
1561
    }
 
1562
  }
 
1563
 
 
1564
private:
 
1565
  // The selector that performs event demultiplexing for the service.
 
1566
  Reactor& reactor_;
 
1567
};
 
1568
 
 
1569
} // namespace detail
 
1570
} // namespace asio
 
1571
 
 
1572
#include "asio/detail/pop_options.hpp"
 
1573
 
 
1574
#endif // ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP