2
// reactive_socket_service.hpp
3
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
// Copyright (c) 2003-2007 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7
// Distributed under the Boost Software License, Version 1.0. (See accompanying
8
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
11
#ifndef ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
12
#define ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
14
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
16
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
18
#include "asio/detail/push_options.hpp"
20
#include "asio/detail/push_options.hpp"
21
#include <boost/shared_ptr.hpp>
22
#include "asio/detail/pop_options.hpp"
24
#include "asio/buffer.hpp"
25
#include "asio/error.hpp"
26
#include "asio/io_service.hpp"
27
#include "asio/socket_base.hpp"
28
#include "asio/detail/bind_handler.hpp"
29
#include "asio/detail/noncopyable.hpp"
30
#include "asio/detail/service_base.hpp"
31
#include "asio/detail/socket_holder.hpp"
32
#include "asio/detail/socket_ops.hpp"
33
#include "asio/detail/socket_types.hpp"
38
template <typename Protocol, typename Reactor>
39
class reactive_socket_service
40
: public asio::detail::service_base<
41
reactive_socket_service<Protocol, Reactor> >
45
typedef Protocol protocol_type;
48
typedef typename Protocol::endpoint endpoint_type;
50
// The native type of a socket.
51
typedef socket_type native_type;
53
// The implementation type of the socket.
54
class implementation_type
55
: private asio::detail::noncopyable
58
// Default constructor.
60
: socket_(invalid_socket),
62
protocol_(endpoint_type().protocol())
67
// Only this service will have access to the internal values.
68
friend class reactive_socket_service<Protocol, Reactor>;
70
// The native socket representation.
75
user_set_non_blocking = 1, // The user wants a non-blocking socket.
76
internal_non_blocking = 2, // The socket has been set non-blocking.
77
enable_connection_aborted = 4, // User wants connection_aborted errors.
78
user_set_linger = 8 // The user set the linger option.
81
// Flags indicating the current state of the socket.
84
// The protocol associated with the socket.
85
protocol_type protocol_;
88
// The maximum number of buffers to support in a single operation.
89
enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len };
92
reactive_socket_service(asio::io_service& io_service)
93
: asio::detail::service_base<
94
reactive_socket_service<Protocol, Reactor> >(io_service),
95
reactor_(asio::use_service<Reactor>(io_service))
99
// Destroy all user-defined handler objects owned by the service.
100
void shutdown_service()
104
// Construct a new socket implementation.
105
void construct(implementation_type& impl)
107
impl.socket_ = invalid_socket;
111
// Destroy a socket implementation.
112
void destroy(implementation_type& impl)
114
if (impl.socket_ != invalid_socket)
116
reactor_.close_descriptor(impl.socket_);
118
if (impl.flags_ & implementation_type::internal_non_blocking)
120
ioctl_arg_type non_blocking = 0;
121
asio::error_code ignored_ec;
122
socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
123
impl.flags_ &= ~implementation_type::internal_non_blocking;
126
if (impl.flags_ & implementation_type::user_set_linger)
131
asio::error_code ignored_ec;
132
socket_ops::setsockopt(impl.socket_,
133
SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec);
136
asio::error_code ignored_ec;
137
socket_ops::close(impl.socket_, ignored_ec);
139
impl.socket_ = invalid_socket;
143
// Open a new socket implementation.
144
asio::error_code open(implementation_type& impl,
145
const protocol_type& protocol, asio::error_code& ec)
149
ec = asio::error::already_open;
153
socket_holder sock(socket_ops::socket(protocol.family(),
154
protocol.type(), protocol.protocol(), ec));
155
if (sock.get() == invalid_socket)
158
if (int err = reactor_.register_descriptor(sock.get()))
160
ec = asio::error_code(err, asio::error::system_category);
164
impl.socket_ = sock.release();
166
impl.protocol_ = protocol;
167
ec = asio::error_code();
171
// Assign a native socket to a socket implementation.
172
asio::error_code assign(implementation_type& impl,
173
const protocol_type& protocol, const native_type& native_socket,
174
asio::error_code& ec)
178
ec = asio::error::already_open;
182
if (int err = reactor_.register_descriptor(native_socket))
184
ec = asio::error_code(err, asio::error::system_category);
188
impl.socket_ = native_socket;
190
impl.protocol_ = protocol;
191
ec = asio::error_code();
195
// Determine whether the socket is open.
196
bool is_open(const implementation_type& impl) const
198
return impl.socket_ != invalid_socket;
201
// Destroy a socket implementation.
202
asio::error_code close(implementation_type& impl,
203
asio::error_code& ec)
207
reactor_.close_descriptor(impl.socket_);
209
if (impl.flags_ & implementation_type::internal_non_blocking)
211
ioctl_arg_type non_blocking = 0;
212
asio::error_code ignored_ec;
213
socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
214
impl.flags_ &= ~implementation_type::internal_non_blocking;
217
if (socket_ops::close(impl.socket_, ec) == socket_error_retval)
220
impl.socket_ = invalid_socket;
223
ec = asio::error_code();
227
// Get the native socket representation.
228
native_type native(implementation_type& impl)
233
// Cancel all operations associated with the socket.
234
asio::error_code cancel(implementation_type& impl,
235
asio::error_code& ec)
239
ec = asio::error::bad_descriptor;
243
reactor_.cancel_ops(impl.socket_);
244
ec = asio::error_code();
248
// Determine whether the socket is at the out-of-band data mark.
249
bool at_mark(const implementation_type& impl,
250
asio::error_code& ec) const
254
ec = asio::error::bad_descriptor;
258
asio::detail::ioctl_arg_type value = 0;
259
socket_ops::ioctl(impl.socket_, SIOCATMARK, &value, ec);
261
if (ec.value() == ENOTTY)
262
ec = asio::error::not_socket;
263
#endif // defined(ENOTTY)
264
return ec ? false : value != 0;
267
// Determine the number of bytes available for reading.
268
std::size_t available(const implementation_type& impl,
269
asio::error_code& ec) const
273
ec = asio::error::bad_descriptor;
277
asio::detail::ioctl_arg_type value = 0;
278
socket_ops::ioctl(impl.socket_, FIONREAD, &value, ec);
280
if (ec.value() == ENOTTY)
281
ec = asio::error::not_socket;
282
#endif // defined(ENOTTY)
283
return ec ? static_cast<std::size_t>(0) : static_cast<std::size_t>(value);
286
// Bind the socket to the specified local endpoint.
287
asio::error_code bind(implementation_type& impl,
288
const endpoint_type& endpoint, asio::error_code& ec)
292
ec = asio::error::bad_descriptor;
296
socket_ops::bind(impl.socket_, endpoint.data(), endpoint.size(), ec);
300
// Place the socket into the state where it will listen for new connections.
301
asio::error_code listen(implementation_type& impl, int backlog,
302
asio::error_code& ec)
306
ec = asio::error::bad_descriptor;
310
socket_ops::listen(impl.socket_, backlog, ec);
314
// Set a socket option.
315
template <typename Option>
316
asio::error_code set_option(implementation_type& impl,
317
const Option& option, asio::error_code& ec)
321
ec = asio::error::bad_descriptor;
325
if (option.level(impl.protocol_) == custom_socket_option_level
326
&& option.name(impl.protocol_) == enable_connection_aborted_option)
328
if (option.size(impl.protocol_) != sizeof(int))
330
ec = asio::error::invalid_argument;
334
if (*reinterpret_cast<const int*>(option.data(impl.protocol_)))
335
impl.flags_ |= implementation_type::enable_connection_aborted;
337
impl.flags_ &= ~implementation_type::enable_connection_aborted;
338
ec = asio::error_code();
344
if (option.level(impl.protocol_) == SOL_SOCKET
345
&& option.name(impl.protocol_) == SO_LINGER)
347
impl.flags_ |= implementation_type::user_set_linger;
350
socket_ops::setsockopt(impl.socket_,
351
option.level(impl.protocol_), option.name(impl.protocol_),
352
option.data(impl.protocol_), option.size(impl.protocol_), ec);
354
#if defined(__MACH__) && defined(__APPLE__) \
355
|| defined(__NetBSD__) || defined(__FreeBSD__) || defined(__OpenBSD__)
356
// To implement portable behaviour for SO_REUSEADDR with UDP sockets we
357
// need to also set SO_REUSEPORT on BSD-based platforms.
358
if (!ec && impl.protocol_.type() == SOCK_DGRAM
359
&& option.level(impl.protocol_) == SOL_SOCKET
360
&& option.name(impl.protocol_) == SO_REUSEADDR)
362
asio::error_code ignored_ec;
363
socket_ops::setsockopt(impl.socket_, SOL_SOCKET, SO_REUSEPORT,
364
option.data(impl.protocol_), option.size(impl.protocol_),
373
// Set a socket option.
374
template <typename Option>
375
asio::error_code get_option(const implementation_type& impl,
376
Option& option, asio::error_code& ec) const
380
ec = asio::error::bad_descriptor;
384
if (option.level(impl.protocol_) == custom_socket_option_level
385
&& option.name(impl.protocol_) == enable_connection_aborted_option)
387
if (option.size(impl.protocol_) != sizeof(int))
389
ec = asio::error::invalid_argument;
393
int* target = reinterpret_cast<int*>(option.data(impl.protocol_));
394
if (impl.flags_ & implementation_type::enable_connection_aborted)
398
option.resize(impl.protocol_, sizeof(int));
399
ec = asio::error_code();
405
size_t size = option.size(impl.protocol_);
406
socket_ops::getsockopt(impl.socket_,
407
option.level(impl.protocol_), option.name(impl.protocol_),
408
option.data(impl.protocol_), &size, ec);
410
option.resize(impl.protocol_, size);
415
// Perform an IO control command on the socket.
416
template <typename IO_Control_Command>
417
asio::error_code io_control(implementation_type& impl,
418
IO_Control_Command& command, asio::error_code& ec)
422
ec = asio::error::bad_descriptor;
426
if (command.name() == static_cast<int>(FIONBIO))
429
impl.flags_ |= implementation_type::user_set_non_blocking;
431
impl.flags_ &= ~implementation_type::user_set_non_blocking;
432
ec = asio::error_code();
436
socket_ops::ioctl(impl.socket_, command.name(),
437
static_cast<ioctl_arg_type*>(command.data()), ec);
442
// Get the local endpoint.
443
endpoint_type local_endpoint(const implementation_type& impl,
444
asio::error_code& ec) const
448
ec = asio::error::bad_descriptor;
449
return endpoint_type();
452
endpoint_type endpoint;
453
std::size_t addr_len = endpoint.capacity();
454
if (socket_ops::getsockname(impl.socket_, endpoint.data(), &addr_len, ec))
455
return endpoint_type();
456
endpoint.resize(addr_len);
460
// Get the remote endpoint.
461
endpoint_type remote_endpoint(const implementation_type& impl,
462
asio::error_code& ec) const
466
ec = asio::error::bad_descriptor;
467
return endpoint_type();
470
endpoint_type endpoint;
471
std::size_t addr_len = endpoint.capacity();
472
if (socket_ops::getpeername(impl.socket_, endpoint.data(), &addr_len, ec))
473
return endpoint_type();
474
endpoint.resize(addr_len);
478
/// Disable sends or receives on the socket.
479
asio::error_code shutdown(implementation_type& impl,
480
socket_base::shutdown_type what, asio::error_code& ec)
484
ec = asio::error::bad_descriptor;
488
socket_ops::shutdown(impl.socket_, what, ec);
492
// Send the given data to the peer.
493
template <typename ConstBufferSequence>
494
size_t send(implementation_type& impl, const ConstBufferSequence& buffers,
495
socket_base::message_flags flags, asio::error_code& ec)
499
ec = asio::error::bad_descriptor;
503
// Copy buffers into array.
504
socket_ops::buf bufs[max_buffers];
505
typename ConstBufferSequence::const_iterator iter = buffers.begin();
506
typename ConstBufferSequence::const_iterator end = buffers.end();
508
size_t total_buffer_size = 0;
509
for (; iter != end && i < max_buffers; ++iter, ++i)
511
asio::const_buffer buffer(*iter);
512
socket_ops::init_buf(bufs[i],
513
asio::buffer_cast<const void*>(buffer),
514
asio::buffer_size(buffer));
515
total_buffer_size += asio::buffer_size(buffer);
518
// A request to receive 0 bytes on a stream socket is a no-op.
519
if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
521
ec = asio::error_code();
525
// Make socket non-blocking if user wants non-blocking.
526
if (impl.flags_ & implementation_type::user_set_non_blocking)
528
if (!(impl.flags_ & implementation_type::internal_non_blocking))
530
ioctl_arg_type non_blocking = 1;
531
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
533
impl.flags_ |= implementation_type::internal_non_blocking;
540
// Try to complete the operation without blocking.
541
int bytes_sent = socket_ops::send(impl.socket_, bufs, i, flags, ec);
543
// Check if operation succeeded.
548
if ((impl.flags_ & implementation_type::user_set_non_blocking)
549
|| (ec != asio::error::would_block
550
&& ec != asio::error::try_again))
553
// Wait for socket to become ready.
554
if (socket_ops::poll_write(impl.socket_, ec) < 0)
559
template <typename ConstBufferSequence, typename Handler>
563
send_handler(socket_type socket, asio::io_service& io_service,
564
const ConstBufferSequence& buffers, socket_base::message_flags flags,
567
io_service_(io_service),
575
bool operator()(const asio::error_code& result)
577
// Check whether the operation was successful.
580
io_service_.post(bind_handler(handler_, result, 0));
584
// Copy buffers into array.
585
socket_ops::buf bufs[max_buffers];
586
typename ConstBufferSequence::const_iterator iter = buffers_.begin();
587
typename ConstBufferSequence::const_iterator end = buffers_.end();
589
for (; iter != end && i < max_buffers; ++iter, ++i)
591
asio::const_buffer buffer(*iter);
592
socket_ops::init_buf(bufs[i],
593
asio::buffer_cast<const void*>(buffer),
594
asio::buffer_size(buffer));
599
int bytes = socket_ops::send(socket_, bufs, i, flags_, ec);
601
// Check if we need to run the operation again.
602
if (ec == asio::error::would_block
603
|| ec == asio::error::try_again)
606
io_service_.post(bind_handler(handler_, ec, bytes < 0 ? 0 : bytes));
612
asio::io_service& io_service_;
613
asio::io_service::work work_;
614
ConstBufferSequence buffers_;
615
socket_base::message_flags flags_;
619
// Start an asynchronous send. The data being sent must be valid for the
620
// lifetime of the asynchronous operation.
621
template <typename ConstBufferSequence, typename Handler>
622
void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
623
socket_base::message_flags flags, Handler handler)
627
this->io_service().post(bind_handler(handler,
628
asio::error::bad_descriptor, 0));
632
if (impl.protocol_.type() == SOCK_STREAM)
634
// Determine total size of buffers.
635
typename ConstBufferSequence::const_iterator iter = buffers.begin();
636
typename ConstBufferSequence::const_iterator end = buffers.end();
638
size_t total_buffer_size = 0;
639
for (; iter != end && i < max_buffers; ++iter, ++i)
641
asio::const_buffer buffer(*iter);
642
total_buffer_size += asio::buffer_size(buffer);
645
// A request to receive 0 bytes on a stream socket is a no-op.
646
if (total_buffer_size == 0)
648
this->io_service().post(bind_handler(handler,
649
asio::error_code(), 0));
654
// Make socket non-blocking.
655
if (!(impl.flags_ & implementation_type::internal_non_blocking))
657
ioctl_arg_type non_blocking = 1;
659
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
661
this->io_service().post(bind_handler(handler, ec, 0));
664
impl.flags_ |= implementation_type::internal_non_blocking;
667
reactor_.start_write_op(impl.socket_,
668
send_handler<ConstBufferSequence, Handler>(
669
impl.socket_, this->io_service(), buffers, flags, handler));
673
// Send a datagram to the specified endpoint. Returns the number of bytes
675
template <typename ConstBufferSequence>
676
size_t send_to(implementation_type& impl, const ConstBufferSequence& buffers,
677
const endpoint_type& destination, socket_base::message_flags flags,
678
asio::error_code& ec)
682
ec = asio::error::bad_descriptor;
686
// Copy buffers into array.
687
socket_ops::buf bufs[max_buffers];
688
typename ConstBufferSequence::const_iterator iter = buffers.begin();
689
typename ConstBufferSequence::const_iterator end = buffers.end();
691
for (; iter != end && i < max_buffers; ++iter, ++i)
693
asio::const_buffer buffer(*iter);
694
socket_ops::init_buf(bufs[i],
695
asio::buffer_cast<const void*>(buffer),
696
asio::buffer_size(buffer));
699
// Make socket non-blocking if user wants non-blocking.
700
if (impl.flags_ & implementation_type::user_set_non_blocking)
702
if (!(impl.flags_ & implementation_type::internal_non_blocking))
704
ioctl_arg_type non_blocking = 1;
705
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
707
impl.flags_ |= implementation_type::internal_non_blocking;
714
// Try to complete the operation without blocking.
715
int bytes_sent = socket_ops::sendto(impl.socket_, bufs, i, flags,
716
destination.data(), destination.size(), ec);
718
// Check if operation succeeded.
723
if ((impl.flags_ & implementation_type::user_set_non_blocking)
724
|| (ec != asio::error::would_block
725
&& ec != asio::error::try_again))
728
// Wait for socket to become ready.
729
if (socket_ops::poll_write(impl.socket_, ec) < 0)
734
template <typename ConstBufferSequence, typename Handler>
735
class send_to_handler
738
send_to_handler(socket_type socket, asio::io_service& io_service,
739
const ConstBufferSequence& buffers, const endpoint_type& endpoint,
740
socket_base::message_flags flags, Handler handler)
742
io_service_(io_service),
745
destination_(endpoint),
751
bool operator()(const asio::error_code& result)
753
// Check whether the operation was successful.
756
io_service_.post(bind_handler(handler_, result, 0));
760
// Copy buffers into array.
761
socket_ops::buf bufs[max_buffers];
762
typename ConstBufferSequence::const_iterator iter = buffers_.begin();
763
typename ConstBufferSequence::const_iterator end = buffers_.end();
765
for (; iter != end && i < max_buffers; ++iter, ++i)
767
asio::const_buffer buffer(*iter);
768
socket_ops::init_buf(bufs[i],
769
asio::buffer_cast<const void*>(buffer),
770
asio::buffer_size(buffer));
775
int bytes = socket_ops::sendto(socket_, bufs, i, flags_,
776
destination_.data(), destination_.size(), ec);
778
// Check if we need to run the operation again.
779
if (ec == asio::error::would_block
780
|| ec == asio::error::try_again)
783
io_service_.post(bind_handler(handler_, ec, bytes < 0 ? 0 : bytes));
789
asio::io_service& io_service_;
790
asio::io_service::work work_;
791
ConstBufferSequence buffers_;
792
endpoint_type destination_;
793
socket_base::message_flags flags_;
797
// Start an asynchronous send. The data being sent must be valid for the
798
// lifetime of the asynchronous operation.
799
template <typename ConstBufferSequence, typename Handler>
800
void async_send_to(implementation_type& impl,
801
const ConstBufferSequence& buffers,
802
const endpoint_type& destination, socket_base::message_flags flags,
807
this->io_service().post(bind_handler(handler,
808
asio::error::bad_descriptor, 0));
812
// Make socket non-blocking.
813
if (!(impl.flags_ & implementation_type::internal_non_blocking))
815
ioctl_arg_type non_blocking = 1;
817
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
819
this->io_service().post(bind_handler(handler, ec, 0));
822
impl.flags_ |= implementation_type::internal_non_blocking;
825
reactor_.start_write_op(impl.socket_,
826
send_to_handler<ConstBufferSequence, Handler>(
827
impl.socket_, this->io_service(), buffers,
828
destination, flags, handler));
832
// Receive some data from the peer. Returns the number of bytes received.
833
template <typename MutableBufferSequence>
834
size_t receive(implementation_type& impl,
835
const MutableBufferSequence& buffers,
836
socket_base::message_flags flags, asio::error_code& ec)
840
ec = asio::error::bad_descriptor;
844
// Copy buffers into array.
845
socket_ops::buf bufs[max_buffers];
846
typename MutableBufferSequence::const_iterator iter = buffers.begin();
847
typename MutableBufferSequence::const_iterator end = buffers.end();
849
size_t total_buffer_size = 0;
850
for (; iter != end && i < max_buffers; ++iter, ++i)
852
asio::mutable_buffer buffer(*iter);
853
socket_ops::init_buf(bufs[i],
854
asio::buffer_cast<void*>(buffer),
855
asio::buffer_size(buffer));
856
total_buffer_size += asio::buffer_size(buffer);
859
// A request to receive 0 bytes on a stream socket is a no-op.
860
if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
862
ec = asio::error_code();
866
// Make socket non-blocking if user wants non-blocking.
867
if (impl.flags_ & implementation_type::user_set_non_blocking)
869
if (!(impl.flags_ & implementation_type::internal_non_blocking))
871
ioctl_arg_type non_blocking = 1;
872
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
874
impl.flags_ |= implementation_type::internal_non_blocking;
878
// Receive some data.
881
// Try to complete the operation without blocking.
882
int bytes_recvd = socket_ops::recv(impl.socket_, bufs, i, flags, ec);
884
// Check if operation succeeded.
889
if (bytes_recvd == 0)
891
ec = asio::error::eof;
896
if ((impl.flags_ & implementation_type::user_set_non_blocking)
897
|| (ec != asio::error::would_block
898
&& ec != asio::error::try_again))
901
// Wait for socket to become ready.
902
if (socket_ops::poll_read(impl.socket_, ec) < 0)
907
template <typename MutableBufferSequence, typename Handler>
908
class receive_handler
911
receive_handler(socket_type socket, asio::io_service& io_service,
912
const MutableBufferSequence& buffers, socket_base::message_flags flags,
915
io_service_(io_service),
923
bool operator()(const asio::error_code& result)
925
// Check whether the operation was successful.
928
io_service_.post(bind_handler(handler_, result, 0));
932
// Copy buffers into array.
933
socket_ops::buf bufs[max_buffers];
934
typename MutableBufferSequence::const_iterator iter = buffers_.begin();
935
typename MutableBufferSequence::const_iterator end = buffers_.end();
937
for (; iter != end && i < max_buffers; ++iter, ++i)
939
asio::mutable_buffer buffer(*iter);
940
socket_ops::init_buf(bufs[i],
941
asio::buffer_cast<void*>(buffer),
942
asio::buffer_size(buffer));
945
// Receive some data.
947
int bytes = socket_ops::recv(socket_, bufs, i, flags_, ec);
949
ec = asio::error::eof;
951
// Check if we need to run the operation again.
952
if (ec == asio::error::would_block
953
|| ec == asio::error::try_again)
956
io_service_.post(bind_handler(handler_, ec, bytes < 0 ? 0 : bytes));
962
asio::io_service& io_service_;
963
asio::io_service::work work_;
964
MutableBufferSequence buffers_;
965
socket_base::message_flags flags_;
969
// Start an asynchronous receive. The buffer for the data being received
970
// must be valid for the lifetime of the asynchronous operation.
971
template <typename MutableBufferSequence, typename Handler>
972
void async_receive(implementation_type& impl,
973
const MutableBufferSequence& buffers,
974
socket_base::message_flags flags, Handler handler)
978
this->io_service().post(bind_handler(handler,
979
asio::error::bad_descriptor, 0));
983
if (impl.protocol_.type() == SOCK_STREAM)
985
// Determine total size of buffers.
986
typename MutableBufferSequence::const_iterator iter = buffers.begin();
987
typename MutableBufferSequence::const_iterator end = buffers.end();
989
size_t total_buffer_size = 0;
990
for (; iter != end && i < max_buffers; ++iter, ++i)
992
asio::mutable_buffer buffer(*iter);
993
total_buffer_size += asio::buffer_size(buffer);
996
// A request to receive 0 bytes on a stream socket is a no-op.
997
if (total_buffer_size == 0)
999
this->io_service().post(bind_handler(handler,
1000
asio::error_code(), 0));
1005
// Make socket non-blocking.
1006
if (!(impl.flags_ & implementation_type::internal_non_blocking))
1008
ioctl_arg_type non_blocking = 1;
1009
asio::error_code ec;
1010
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1012
this->io_service().post(bind_handler(handler, ec, 0));
1015
impl.flags_ |= implementation_type::internal_non_blocking;
1018
if (flags & socket_base::message_out_of_band)
1020
reactor_.start_except_op(impl.socket_,
1021
receive_handler<MutableBufferSequence, Handler>(
1022
impl.socket_, this->io_service(), buffers, flags, handler));
1026
reactor_.start_read_op(impl.socket_,
1027
receive_handler<MutableBufferSequence, Handler>(
1028
impl.socket_, this->io_service(), buffers, flags, handler));
1033
// Receive a datagram with the endpoint of the sender. Returns the number of
1035
template <typename MutableBufferSequence>
1036
size_t receive_from(implementation_type& impl,
1037
const MutableBufferSequence& buffers,
1038
endpoint_type& sender_endpoint, socket_base::message_flags flags,
1039
asio::error_code& ec)
1043
ec = asio::error::bad_descriptor;
1047
// Copy buffers into array.
1048
socket_ops::buf bufs[max_buffers];
1049
typename MutableBufferSequence::const_iterator iter = buffers.begin();
1050
typename MutableBufferSequence::const_iterator end = buffers.end();
1052
for (; iter != end && i < max_buffers; ++iter, ++i)
1054
asio::mutable_buffer buffer(*iter);
1055
socket_ops::init_buf(bufs[i],
1056
asio::buffer_cast<void*>(buffer),
1057
asio::buffer_size(buffer));
1060
// Make socket non-blocking if user wants non-blocking.
1061
if (impl.flags_ & implementation_type::user_set_non_blocking)
1063
if (!(impl.flags_ & implementation_type::internal_non_blocking))
1065
ioctl_arg_type non_blocking = 1;
1066
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1068
impl.flags_ |= implementation_type::internal_non_blocking;
1072
// Receive some data.
1075
// Try to complete the operation without blocking.
1076
std::size_t addr_len = sender_endpoint.capacity();
1077
int bytes_recvd = socket_ops::recvfrom(impl.socket_, bufs, i, flags,
1078
sender_endpoint.data(), &addr_len, ec);
1080
// Check if operation succeeded.
1081
if (bytes_recvd > 0)
1083
sender_endpoint.resize(addr_len);
1088
if (bytes_recvd == 0)
1090
ec = asio::error::eof;
1094
// Operation failed.
1095
if ((impl.flags_ & implementation_type::user_set_non_blocking)
1096
|| (ec != asio::error::would_block
1097
&& ec != asio::error::try_again))
1100
// Wait for socket to become ready.
1101
if (socket_ops::poll_read(impl.socket_, ec) < 0)
1106
template <typename MutableBufferSequence, typename Handler>
1107
class receive_from_handler
1110
receive_from_handler(socket_type socket,
1111
asio::io_service& io_service,
1112
const MutableBufferSequence& buffers, endpoint_type& endpoint,
1113
socket_base::message_flags flags, Handler handler)
1115
io_service_(io_service),
1118
sender_endpoint_(endpoint),
1124
bool operator()(const asio::error_code& result)
1126
// Check whether the operation was successful.
1129
io_service_.post(bind_handler(handler_, result, 0));
1133
// Copy buffers into array.
1134
socket_ops::buf bufs[max_buffers];
1135
typename MutableBufferSequence::const_iterator iter = buffers_.begin();
1136
typename MutableBufferSequence::const_iterator end = buffers_.end();
1138
for (; iter != end && i < max_buffers; ++iter, ++i)
1140
asio::mutable_buffer buffer(*iter);
1141
socket_ops::init_buf(bufs[i],
1142
asio::buffer_cast<void*>(buffer),
1143
asio::buffer_size(buffer));
1146
// Receive some data.
1147
std::size_t addr_len = sender_endpoint_.capacity();
1148
asio::error_code ec;
1149
int bytes = socket_ops::recvfrom(socket_, bufs, i, flags_,
1150
sender_endpoint_.data(), &addr_len, ec);
1152
ec = asio::error::eof;
1154
// Check if we need to run the operation again.
1155
if (ec == asio::error::would_block
1156
|| ec == asio::error::try_again)
1159
sender_endpoint_.resize(addr_len);
1160
io_service_.post(bind_handler(handler_, ec, bytes < 0 ? 0 : bytes));
1165
socket_type socket_;
1166
asio::io_service& io_service_;
1167
asio::io_service::work work_;
1168
MutableBufferSequence buffers_;
1169
endpoint_type& sender_endpoint_;
1170
socket_base::message_flags flags_;
1174
// Start an asynchronous receive. The buffer for the data being received and
1175
// the sender_endpoint object must both be valid for the lifetime of the
1176
// asynchronous operation.
1177
template <typename MutableBufferSequence, typename Handler>
1178
void async_receive_from(implementation_type& impl,
1179
const MutableBufferSequence& buffers, endpoint_type& sender_endpoint,
1180
socket_base::message_flags flags, Handler handler)
1184
this->io_service().post(bind_handler(handler,
1185
asio::error::bad_descriptor, 0));
1189
// Make socket non-blocking.
1190
if (!(impl.flags_ & implementation_type::internal_non_blocking))
1192
ioctl_arg_type non_blocking = 1;
1193
asio::error_code ec;
1194
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1196
this->io_service().post(bind_handler(handler, ec, 0));
1199
impl.flags_ |= implementation_type::internal_non_blocking;
1202
reactor_.start_read_op(impl.socket_,
1203
receive_from_handler<MutableBufferSequence, Handler>(
1204
impl.socket_, this->io_service(), buffers,
1205
sender_endpoint, flags, handler));
1209
// Accept a new connection.
1210
template <typename Socket>
1211
asio::error_code accept(implementation_type& impl,
1212
Socket& peer, endpoint_type* peer_endpoint, asio::error_code& ec)
1216
ec = asio::error::bad_descriptor;
1220
// We cannot accept a socket that is already open.
1223
ec = asio::error::already_open;
1227
// Make socket non-blocking if user wants non-blocking.
1228
if (impl.flags_ & implementation_type::user_set_non_blocking)
1230
if (!(impl.flags_ & implementation_type::internal_non_blocking))
1232
ioctl_arg_type non_blocking = 1;
1233
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1235
impl.flags_ |= implementation_type::internal_non_blocking;
1242
// Try to complete the operation without blocking.
1243
asio::error_code ec;
1244
socket_holder new_socket;
1245
std::size_t addr_len = 0;
1248
addr_len = peer_endpoint->capacity();
1249
new_socket.reset(socket_ops::accept(impl.socket_,
1250
peer_endpoint->data(), &addr_len, ec));
1254
new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec));
1257
// Check if operation succeeded.
1258
if (new_socket.get() >= 0)
1261
peer_endpoint->resize(addr_len);
1262
peer.assign(impl.protocol_, new_socket.get(), ec);
1264
new_socket.release();
1268
// Operation failed.
1269
if (ec == asio::error::would_block
1270
|| ec == asio::error::try_again)
1272
if (impl.flags_ & implementation_type::user_set_non_blocking)
1274
// Fall through to retry operation.
1276
else if (ec == asio::error::connection_aborted)
1278
if (impl.flags_ & implementation_type::enable_connection_aborted)
1280
// Fall through to retry operation.
1283
else if (ec.value() == EPROTO)
1285
if (impl.flags_ & implementation_type::enable_connection_aborted)
1287
// Fall through to retry operation.
1289
#endif // defined(EPROTO)
1293
// Wait for socket to become ready.
1294
if (socket_ops::poll_read(impl.socket_, ec) < 0)
1299
template <typename Socket, typename Handler>
1300
class accept_handler
1303
accept_handler(socket_type socket, asio::io_service& io_service,
1304
Socket& peer, const protocol_type& protocol,
1305
endpoint_type* peer_endpoint, bool enable_connection_aborted,
1308
io_service_(io_service),
1311
protocol_(protocol),
1312
peer_endpoint_(peer_endpoint),
1313
enable_connection_aborted_(enable_connection_aborted),
1318
bool operator()(const asio::error_code& result)
1320
// Check whether the operation was successful.
1323
io_service_.post(bind_handler(handler_, result));
1327
// Accept the waiting connection.
1328
asio::error_code ec;
1329
socket_holder new_socket;
1330
std::size_t addr_len = 0;
1333
addr_len = peer_endpoint_->capacity();
1334
new_socket.reset(socket_ops::accept(socket_,
1335
peer_endpoint_->data(), &addr_len, ec));
1339
new_socket.reset(socket_ops::accept(socket_, 0, 0, ec));
1342
// Check if we need to run the operation again.
1343
if (ec == asio::error::would_block
1344
|| ec == asio::error::try_again)
1346
if (ec == asio::error::connection_aborted
1347
&& !enable_connection_aborted_)
1350
if (ec.value() == EPROTO && !enable_connection_aborted_)
1352
#endif // defined(EPROTO)
1354
// Transfer ownership of the new socket to the peer object.
1358
peer_endpoint_->resize(addr_len);
1359
peer_.assign(protocol_, new_socket.get(), ec);
1361
new_socket.release();
1364
io_service_.post(bind_handler(handler_, ec));
1369
socket_type socket_;
1370
asio::io_service& io_service_;
1371
asio::io_service::work work_;
1373
protocol_type protocol_;
1374
endpoint_type* peer_endpoint_;
1375
bool enable_connection_aborted_;
1379
// Start an asynchronous accept. The peer and peer_endpoint objects
1380
// must be valid until the accept's handler is invoked.
1381
template <typename Socket, typename Handler>
1382
void async_accept(implementation_type& impl, Socket& peer,
1383
endpoint_type* peer_endpoint, Handler handler)
1387
this->io_service().post(bind_handler(handler,
1388
asio::error::bad_descriptor));
1390
else if (peer.is_open())
1392
this->io_service().post(bind_handler(handler,
1393
asio::error::already_open));
1397
// Make socket non-blocking.
1398
if (!(impl.flags_ & implementation_type::internal_non_blocking))
1400
ioctl_arg_type non_blocking = 1;
1401
asio::error_code ec;
1402
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1404
this->io_service().post(bind_handler(handler, ec));
1407
impl.flags_ |= implementation_type::internal_non_blocking;
1410
reactor_.start_read_op(impl.socket_,
1411
accept_handler<Socket, Handler>(
1412
impl.socket_, this->io_service(),
1413
peer, impl.protocol_, peer_endpoint,
1414
(impl.flags_ & implementation_type::enable_connection_aborted) != 0,
1419
// Connect the socket to the specified endpoint.
1420
asio::error_code connect(implementation_type& impl,
1421
const endpoint_type& peer_endpoint, asio::error_code& ec)
1425
ec = asio::error::bad_descriptor;
1429
if (impl.flags_ & implementation_type::internal_non_blocking)
1431
// Mark the socket as blocking while we perform the connect.
1432
ioctl_arg_type non_blocking = 0;
1433
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1435
impl.flags_ &= ~implementation_type::internal_non_blocking;
1438
// Perform the connect operation.
1439
socket_ops::connect(impl.socket_,
1440
peer_endpoint.data(), peer_endpoint.size(), ec);
1444
template <typename Handler>
1445
class connect_handler
1448
connect_handler(socket_type socket, boost::shared_ptr<bool> completed,
1449
asio::io_service& io_service, Reactor& reactor, Handler handler)
1451
completed_(completed),
1452
io_service_(io_service),
1459
bool operator()(const asio::error_code& result)
1461
// Check whether a handler has already been called for the connection.
1462
// If it has, then we don't want to do anything in this handler.
1466
// Cancel the other reactor operation for the connection.
1468
reactor_.enqueue_cancel_ops_unlocked(socket_);
1470
// Check whether the operation was successful.
1473
io_service_.post(bind_handler(handler_, result));
1477
// Get the error code from the connect operation.
1478
int connect_error = 0;
1479
size_t connect_error_len = sizeof(connect_error);
1480
asio::error_code ec;
1481
if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR,
1482
&connect_error, &connect_error_len, ec) == socket_error_retval)
1484
io_service_.post(bind_handler(handler_, ec));
1488
// If connection failed then post the handler with the error code.
1491
ec = asio::error_code(connect_error,
1492
asio::error::system_category);
1493
io_service_.post(bind_handler(handler_, ec));
1497
// Post the result of the successful connection operation.
1498
io_service_.post(bind_handler(handler_, ec));
1503
socket_type socket_;
1504
boost::shared_ptr<bool> completed_;
1505
asio::io_service& io_service_;
1506
asio::io_service::work work_;
1511
// Start an asynchronous connect.
1512
template <typename Handler>
1513
void async_connect(implementation_type& impl,
1514
const endpoint_type& peer_endpoint, Handler handler)
1518
this->io_service().post(bind_handler(handler,
1519
asio::error::bad_descriptor));
1523
// Make socket non-blocking.
1524
if (!(impl.flags_ & implementation_type::internal_non_blocking))
1526
ioctl_arg_type non_blocking = 1;
1527
asio::error_code ec;
1528
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1530
this->io_service().post(bind_handler(handler, ec));
1533
impl.flags_ |= implementation_type::internal_non_blocking;
1536
// Start the connect operation. The socket is already marked as non-blocking
1537
// so the connection will take place asynchronously.
1538
asio::error_code ec;
1539
if (socket_ops::connect(impl.socket_, peer_endpoint.data(),
1540
peer_endpoint.size(), ec) == 0)
1542
// The connect operation has finished successfully so we need to post the
1543
// handler immediately.
1544
this->io_service().post(bind_handler(handler,
1545
asio::error_code()));
1547
else if (ec == asio::error::in_progress
1548
|| ec == asio::error::would_block)
1550
// The connection is happening in the background, and we need to wait
1551
// until the socket becomes writeable.
1552
boost::shared_ptr<bool> completed(new bool(false));
1553
reactor_.start_write_and_except_ops(impl.socket_,
1554
connect_handler<Handler>(
1555
impl.socket_, completed, this->io_service(), reactor_, handler));
1559
// The connect operation has failed, so post the handler immediately.
1560
this->io_service().post(bind_handler(handler, ec));
1565
// The selector that performs event demultiplexing for the service.
1569
} // namespace detail
1572
#include "asio/detail/pop_options.hpp"
1574
#endif // ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP