2
// reactive_socket_service.hpp
3
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
// Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7
// Distributed under the Boost Software License, Version 1.0. (See accompanying
8
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
11
#ifndef ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
12
#define ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
14
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
16
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
18
#include "asio/detail/push_options.hpp"
20
#include "asio/detail/push_options.hpp"
21
#include <boost/shared_ptr.hpp>
22
#include "asio/detail/pop_options.hpp"
24
#include "asio/buffer.hpp"
25
#include "asio/error.hpp"
26
#include "asio/io_service.hpp"
27
#include "asio/socket_base.hpp"
28
#include "asio/detail/bind_handler.hpp"
29
#include "asio/detail/handler_base_from_member.hpp"
30
#include "asio/detail/noncopyable.hpp"
31
#include "asio/detail/service_base.hpp"
32
#include "asio/detail/socket_holder.hpp"
33
#include "asio/detail/socket_ops.hpp"
34
#include "asio/detail/socket_types.hpp"
39
template <typename Protocol, typename Reactor>
40
class reactive_socket_service
41
: public asio::detail::service_base<
42
reactive_socket_service<Protocol, Reactor> >
46
typedef Protocol protocol_type;
49
typedef typename Protocol::endpoint endpoint_type;
51
// The native type of a socket.
52
typedef socket_type native_type;
54
// The implementation type of the socket.
55
class implementation_type
56
: private asio::detail::noncopyable
59
// Default constructor.
61
: socket_(invalid_socket),
63
protocol_(endpoint_type().protocol())
68
// Only this service will have access to the internal values.
69
friend class reactive_socket_service<Protocol, Reactor>;
71
// The native socket representation.
76
user_set_non_blocking = 1, // The user wants a non-blocking socket.
77
internal_non_blocking = 2, // The socket has been set non-blocking.
78
enable_connection_aborted = 4, // User wants connection_aborted errors.
79
user_set_linger = 8 // The user set the linger option.
82
// Flags indicating the current state of the socket.
85
// The protocol associated with the socket.
86
protocol_type protocol_;
88
// Per-descriptor data used by the reactor.
89
typename Reactor::per_descriptor_data reactor_data_;
92
// The maximum number of buffers to support in a single operation.
93
enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len };
96
reactive_socket_service(asio::io_service& io_service)
97
: asio::detail::service_base<
98
reactive_socket_service<Protocol, Reactor> >(io_service),
99
reactor_(asio::use_service<Reactor>(io_service))
103
// Destroy all user-defined handler objects owned by the service.
104
void shutdown_service()
108
// Construct a new socket implementation.
109
void construct(implementation_type& impl)
111
impl.socket_ = invalid_socket;
115
// Destroy a socket implementation.
116
void destroy(implementation_type& impl)
118
if (impl.socket_ != invalid_socket)
120
reactor_.close_descriptor(impl.socket_, impl.reactor_data_);
122
if (impl.flags_ & implementation_type::internal_non_blocking)
124
ioctl_arg_type non_blocking = 0;
125
asio::error_code ignored_ec;
126
socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
127
impl.flags_ &= ~implementation_type::internal_non_blocking;
130
if (impl.flags_ & implementation_type::user_set_linger)
135
asio::error_code ignored_ec;
136
socket_ops::setsockopt(impl.socket_,
137
SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec);
140
asio::error_code ignored_ec;
141
socket_ops::close(impl.socket_, ignored_ec);
143
impl.socket_ = invalid_socket;
147
// Open a new socket implementation.
148
asio::error_code open(implementation_type& impl,
149
const protocol_type& protocol, asio::error_code& ec)
153
ec = asio::error::already_open;
157
socket_holder sock(socket_ops::socket(protocol.family(),
158
protocol.type(), protocol.protocol(), ec));
159
if (sock.get() == invalid_socket)
162
if (int err = reactor_.register_descriptor(sock.get(), impl.reactor_data_))
164
ec = asio::error_code(err,
165
asio::error::get_system_category());
169
impl.socket_ = sock.release();
171
impl.protocol_ = protocol;
172
ec = asio::error_code();
176
// Assign a native socket to a socket implementation.
177
asio::error_code assign(implementation_type& impl,
178
const protocol_type& protocol, const native_type& native_socket,
179
asio::error_code& ec)
183
ec = asio::error::already_open;
187
if (int err = reactor_.register_descriptor(
188
native_socket, impl.reactor_data_))
190
ec = asio::error_code(err,
191
asio::error::get_system_category());
195
impl.socket_ = native_socket;
197
impl.protocol_ = protocol;
198
ec = asio::error_code();
202
// Determine whether the socket is open.
203
bool is_open(const implementation_type& impl) const
205
return impl.socket_ != invalid_socket;
208
// Destroy a socket implementation.
209
asio::error_code close(implementation_type& impl,
210
asio::error_code& ec)
214
reactor_.close_descriptor(impl.socket_, impl.reactor_data_);
216
if (impl.flags_ & implementation_type::internal_non_blocking)
218
ioctl_arg_type non_blocking = 0;
219
asio::error_code ignored_ec;
220
socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
221
impl.flags_ &= ~implementation_type::internal_non_blocking;
224
if (socket_ops::close(impl.socket_, ec) == socket_error_retval)
227
impl.socket_ = invalid_socket;
230
ec = asio::error_code();
234
// Get the native socket representation.
235
native_type native(implementation_type& impl)
240
// Cancel all operations associated with the socket.
241
asio::error_code cancel(implementation_type& impl,
242
asio::error_code& ec)
246
ec = asio::error::bad_descriptor;
250
reactor_.cancel_ops(impl.socket_, impl.reactor_data_);
251
ec = asio::error_code();
255
// Determine whether the socket is at the out-of-band data mark.
256
bool at_mark(const implementation_type& impl,
257
asio::error_code& ec) const
261
ec = asio::error::bad_descriptor;
265
asio::detail::ioctl_arg_type value = 0;
266
socket_ops::ioctl(impl.socket_, SIOCATMARK, &value, ec);
268
if (ec.value() == ENOTTY)
269
ec = asio::error::not_socket;
270
#endif // defined(ENOTTY)
271
return ec ? false : value != 0;
274
// Determine the number of bytes available for reading.
275
std::size_t available(const implementation_type& impl,
276
asio::error_code& ec) const
280
ec = asio::error::bad_descriptor;
284
asio::detail::ioctl_arg_type value = 0;
285
socket_ops::ioctl(impl.socket_, FIONREAD, &value, ec);
287
if (ec.value() == ENOTTY)
288
ec = asio::error::not_socket;
289
#endif // defined(ENOTTY)
290
return ec ? static_cast<std::size_t>(0) : static_cast<std::size_t>(value);
293
// Bind the socket to the specified local endpoint.
294
asio::error_code bind(implementation_type& impl,
295
const endpoint_type& endpoint, asio::error_code& ec)
299
ec = asio::error::bad_descriptor;
303
socket_ops::bind(impl.socket_, endpoint.data(), endpoint.size(), ec);
307
// Place the socket into the state where it will listen for new connections.
308
asio::error_code listen(implementation_type& impl, int backlog,
309
asio::error_code& ec)
313
ec = asio::error::bad_descriptor;
317
socket_ops::listen(impl.socket_, backlog, ec);
321
// Set a socket option.
322
template <typename Option>
323
asio::error_code set_option(implementation_type& impl,
324
const Option& option, asio::error_code& ec)
328
ec = asio::error::bad_descriptor;
332
if (option.level(impl.protocol_) == custom_socket_option_level
333
&& option.name(impl.protocol_) == enable_connection_aborted_option)
335
if (option.size(impl.protocol_) != sizeof(int))
337
ec = asio::error::invalid_argument;
341
if (*reinterpret_cast<const int*>(option.data(impl.protocol_)))
342
impl.flags_ |= implementation_type::enable_connection_aborted;
344
impl.flags_ &= ~implementation_type::enable_connection_aborted;
345
ec = asio::error_code();
351
if (option.level(impl.protocol_) == SOL_SOCKET
352
&& option.name(impl.protocol_) == SO_LINGER)
354
impl.flags_ |= implementation_type::user_set_linger;
357
socket_ops::setsockopt(impl.socket_,
358
option.level(impl.protocol_), option.name(impl.protocol_),
359
option.data(impl.protocol_), option.size(impl.protocol_), ec);
361
#if defined(__MACH__) && defined(__APPLE__) \
362
|| defined(__NetBSD__) || defined(__FreeBSD__) || defined(__OpenBSD__)
363
// To implement portable behaviour for SO_REUSEADDR with UDP sockets we
364
// need to also set SO_REUSEPORT on BSD-based platforms.
365
if (!ec && impl.protocol_.type() == SOCK_DGRAM
366
&& option.level(impl.protocol_) == SOL_SOCKET
367
&& option.name(impl.protocol_) == SO_REUSEADDR)
369
asio::error_code ignored_ec;
370
socket_ops::setsockopt(impl.socket_, SOL_SOCKET, SO_REUSEPORT,
371
option.data(impl.protocol_), option.size(impl.protocol_),
380
// Set a socket option.
381
template <typename Option>
382
asio::error_code get_option(const implementation_type& impl,
383
Option& option, asio::error_code& ec) const
387
ec = asio::error::bad_descriptor;
391
if (option.level(impl.protocol_) == custom_socket_option_level
392
&& option.name(impl.protocol_) == enable_connection_aborted_option)
394
if (option.size(impl.protocol_) != sizeof(int))
396
ec = asio::error::invalid_argument;
400
int* target = reinterpret_cast<int*>(option.data(impl.protocol_));
401
if (impl.flags_ & implementation_type::enable_connection_aborted)
405
option.resize(impl.protocol_, sizeof(int));
406
ec = asio::error_code();
412
size_t size = option.size(impl.protocol_);
413
socket_ops::getsockopt(impl.socket_,
414
option.level(impl.protocol_), option.name(impl.protocol_),
415
option.data(impl.protocol_), &size, ec);
417
option.resize(impl.protocol_, size);
422
// Perform an IO control command on the socket.
423
template <typename IO_Control_Command>
424
asio::error_code io_control(implementation_type& impl,
425
IO_Control_Command& command, asio::error_code& ec)
429
ec = asio::error::bad_descriptor;
433
if (command.name() == static_cast<int>(FIONBIO))
436
impl.flags_ |= implementation_type::user_set_non_blocking;
438
impl.flags_ &= ~implementation_type::user_set_non_blocking;
439
ec = asio::error_code();
443
socket_ops::ioctl(impl.socket_, command.name(),
444
static_cast<ioctl_arg_type*>(command.data()), ec);
449
// Get the local endpoint.
450
endpoint_type local_endpoint(const implementation_type& impl,
451
asio::error_code& ec) const
455
ec = asio::error::bad_descriptor;
456
return endpoint_type();
459
endpoint_type endpoint;
460
std::size_t addr_len = endpoint.capacity();
461
if (socket_ops::getsockname(impl.socket_, endpoint.data(), &addr_len, ec))
462
return endpoint_type();
463
endpoint.resize(addr_len);
467
// Get the remote endpoint.
468
endpoint_type remote_endpoint(const implementation_type& impl,
469
asio::error_code& ec) const
473
ec = asio::error::bad_descriptor;
474
return endpoint_type();
477
endpoint_type endpoint;
478
std::size_t addr_len = endpoint.capacity();
479
if (socket_ops::getpeername(impl.socket_, endpoint.data(), &addr_len, ec))
480
return endpoint_type();
481
endpoint.resize(addr_len);
485
/// Disable sends or receives on the socket.
486
asio::error_code shutdown(implementation_type& impl,
487
socket_base::shutdown_type what, asio::error_code& ec)
491
ec = asio::error::bad_descriptor;
495
socket_ops::shutdown(impl.socket_, what, ec);
499
// Send the given data to the peer.
500
template <typename ConstBufferSequence>
501
size_t send(implementation_type& impl, const ConstBufferSequence& buffers,
502
socket_base::message_flags flags, asio::error_code& ec)
506
ec = asio::error::bad_descriptor;
510
// Copy buffers into array.
511
socket_ops::buf bufs[max_buffers];
512
typename ConstBufferSequence::const_iterator iter = buffers.begin();
513
typename ConstBufferSequence::const_iterator end = buffers.end();
515
size_t total_buffer_size = 0;
516
for (; iter != end && i < max_buffers; ++iter, ++i)
518
asio::const_buffer buffer(*iter);
519
socket_ops::init_buf(bufs[i],
520
asio::buffer_cast<const void*>(buffer),
521
asio::buffer_size(buffer));
522
total_buffer_size += asio::buffer_size(buffer);
525
// A request to receive 0 bytes on a stream socket is a no-op.
526
if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
528
ec = asio::error_code();
532
// Make socket non-blocking if user wants non-blocking.
533
if (impl.flags_ & implementation_type::user_set_non_blocking)
535
if (!(impl.flags_ & implementation_type::internal_non_blocking))
537
ioctl_arg_type non_blocking = 1;
538
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
540
impl.flags_ |= implementation_type::internal_non_blocking;
547
// Try to complete the operation without blocking.
548
int bytes_sent = socket_ops::send(impl.socket_, bufs, i, flags, ec);
550
// Check if operation succeeded.
555
if ((impl.flags_ & implementation_type::user_set_non_blocking)
556
|| (ec != asio::error::would_block
557
&& ec != asio::error::try_again))
560
// Wait for socket to become ready.
561
if (socket_ops::poll_write(impl.socket_, ec) < 0)
566
// Wait until data can be sent without blocking.
567
size_t send(implementation_type& impl, const null_buffers&,
568
socket_base::message_flags, asio::error_code& ec)
572
ec = asio::error::bad_descriptor;
576
// Wait for socket to become ready.
577
socket_ops::poll_write(impl.socket_, ec);
582
template <typename ConstBufferSequence, typename Handler>
583
class send_operation :
584
public handler_base_from_member<Handler>
587
send_operation(socket_type socket, asio::io_service& io_service,
588
const ConstBufferSequence& buffers, socket_base::message_flags flags,
590
: handler_base_from_member<Handler>(handler),
592
io_service_(io_service),
599
bool perform(asio::error_code& ec,
600
std::size_t& bytes_transferred)
602
// Check whether the operation was successful.
605
bytes_transferred = 0;
609
// Copy buffers into array.
610
socket_ops::buf bufs[max_buffers];
611
typename ConstBufferSequence::const_iterator iter = buffers_.begin();
612
typename ConstBufferSequence::const_iterator end = buffers_.end();
614
for (; iter != end && i < max_buffers; ++iter, ++i)
616
asio::const_buffer buffer(*iter);
617
socket_ops::init_buf(bufs[i],
618
asio::buffer_cast<const void*>(buffer),
619
asio::buffer_size(buffer));
623
int bytes = socket_ops::send(socket_, bufs, i, flags_, ec);
625
// Check if we need to run the operation again.
626
if (ec == asio::error::would_block
627
|| ec == asio::error::try_again)
630
bytes_transferred = (bytes < 0 ? 0 : bytes);
634
void complete(const asio::error_code& ec,
635
std::size_t bytes_transferred)
637
io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
642
asio::io_service& io_service_;
643
asio::io_service::work work_;
644
ConstBufferSequence buffers_;
645
socket_base::message_flags flags_;
648
// Start an asynchronous send. The data being sent must be valid for the
649
// lifetime of the asynchronous operation.
650
template <typename ConstBufferSequence, typename Handler>
651
void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
652
socket_base::message_flags flags, Handler handler)
656
this->get_io_service().post(bind_handler(handler,
657
asio::error::bad_descriptor, 0));
661
if (impl.protocol_.type() == SOCK_STREAM)
663
// Determine total size of buffers.
664
typename ConstBufferSequence::const_iterator iter = buffers.begin();
665
typename ConstBufferSequence::const_iterator end = buffers.end();
667
size_t total_buffer_size = 0;
668
for (; iter != end && i < max_buffers; ++iter, ++i)
670
asio::const_buffer buffer(*iter);
671
total_buffer_size += asio::buffer_size(buffer);
674
// A request to receive 0 bytes on a stream socket is a no-op.
675
if (total_buffer_size == 0)
677
this->get_io_service().post(bind_handler(handler,
678
asio::error_code(), 0));
683
// Make socket non-blocking.
684
if (!(impl.flags_ & implementation_type::internal_non_blocking))
686
ioctl_arg_type non_blocking = 1;
688
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
690
this->get_io_service().post(bind_handler(handler, ec, 0));
693
impl.flags_ |= implementation_type::internal_non_blocking;
696
reactor_.start_write_op(impl.socket_, impl.reactor_data_,
697
send_operation<ConstBufferSequence, Handler>(
698
impl.socket_, this->get_io_service(), buffers, flags, handler));
702
template <typename Handler>
703
class null_buffers_operation :
704
public handler_base_from_member<Handler>
707
null_buffers_operation(asio::io_service& io_service, Handler handler)
708
: handler_base_from_member<Handler>(handler),
713
bool perform(asio::error_code&,
714
std::size_t& bytes_transferred)
716
bytes_transferred = 0;
720
void complete(const asio::error_code& ec,
721
std::size_t bytes_transferred)
723
work_.get_io_service().post(bind_handler(
724
this->handler_, ec, bytes_transferred));
728
asio::io_service::work work_;
731
// Start an asynchronous wait until data can be sent without blocking.
732
template <typename Handler>
733
void async_send(implementation_type& impl, const null_buffers&,
734
socket_base::message_flags, Handler handler)
738
this->get_io_service().post(bind_handler(handler,
739
asio::error::bad_descriptor, 0));
743
reactor_.start_write_op(impl.socket_, impl.reactor_data_,
744
null_buffers_operation<Handler>(this->get_io_service(), handler),
749
// Send a datagram to the specified endpoint. Returns the number of bytes
751
template <typename ConstBufferSequence>
752
size_t send_to(implementation_type& impl, const ConstBufferSequence& buffers,
753
const endpoint_type& destination, socket_base::message_flags flags,
754
asio::error_code& ec)
758
ec = asio::error::bad_descriptor;
762
// Copy buffers into array.
763
socket_ops::buf bufs[max_buffers];
764
typename ConstBufferSequence::const_iterator iter = buffers.begin();
765
typename ConstBufferSequence::const_iterator end = buffers.end();
767
for (; iter != end && i < max_buffers; ++iter, ++i)
769
asio::const_buffer buffer(*iter);
770
socket_ops::init_buf(bufs[i],
771
asio::buffer_cast<const void*>(buffer),
772
asio::buffer_size(buffer));
775
// Make socket non-blocking if user wants non-blocking.
776
if (impl.flags_ & implementation_type::user_set_non_blocking)
778
if (!(impl.flags_ & implementation_type::internal_non_blocking))
780
ioctl_arg_type non_blocking = 1;
781
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
783
impl.flags_ |= implementation_type::internal_non_blocking;
790
// Try to complete the operation without blocking.
791
int bytes_sent = socket_ops::sendto(impl.socket_, bufs, i, flags,
792
destination.data(), destination.size(), ec);
794
// Check if operation succeeded.
799
if ((impl.flags_ & implementation_type::user_set_non_blocking)
800
|| (ec != asio::error::would_block
801
&& ec != asio::error::try_again))
804
// Wait for socket to become ready.
805
if (socket_ops::poll_write(impl.socket_, ec) < 0)
810
// Wait until data can be sent without blocking.
811
size_t send_to(implementation_type& impl, const null_buffers&,
812
socket_base::message_flags, const endpoint_type&,
813
asio::error_code& ec)
817
ec = asio::error::bad_descriptor;
821
// Wait for socket to become ready.
822
socket_ops::poll_write(impl.socket_, ec);
827
template <typename ConstBufferSequence, typename Handler>
828
class send_to_operation :
829
public handler_base_from_member<Handler>
832
send_to_operation(socket_type socket, asio::io_service& io_service,
833
const ConstBufferSequence& buffers, const endpoint_type& endpoint,
834
socket_base::message_flags flags, Handler handler)
835
: handler_base_from_member<Handler>(handler),
837
io_service_(io_service),
840
destination_(endpoint),
845
bool perform(asio::error_code& ec,
846
std::size_t& bytes_transferred)
848
// Check whether the operation was successful.
851
bytes_transferred = 0;
855
// Copy buffers into array.
856
socket_ops::buf bufs[max_buffers];
857
typename ConstBufferSequence::const_iterator iter = buffers_.begin();
858
typename ConstBufferSequence::const_iterator end = buffers_.end();
860
for (; iter != end && i < max_buffers; ++iter, ++i)
862
asio::const_buffer buffer(*iter);
863
socket_ops::init_buf(bufs[i],
864
asio::buffer_cast<const void*>(buffer),
865
asio::buffer_size(buffer));
869
int bytes = socket_ops::sendto(socket_, bufs, i, flags_,
870
destination_.data(), destination_.size(), ec);
872
// Check if we need to run the operation again.
873
if (ec == asio::error::would_block
874
|| ec == asio::error::try_again)
877
bytes_transferred = (bytes < 0 ? 0 : bytes);
881
void complete(const asio::error_code& ec,
882
std::size_t bytes_transferred)
884
io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
889
asio::io_service& io_service_;
890
asio::io_service::work work_;
891
ConstBufferSequence buffers_;
892
endpoint_type destination_;
893
socket_base::message_flags flags_;
896
// Start an asynchronous send. The data being sent must be valid for the
897
// lifetime of the asynchronous operation.
898
template <typename ConstBufferSequence, typename Handler>
899
void async_send_to(implementation_type& impl,
900
const ConstBufferSequence& buffers,
901
const endpoint_type& destination, socket_base::message_flags flags,
906
this->get_io_service().post(bind_handler(handler,
907
asio::error::bad_descriptor, 0));
911
// Make socket non-blocking.
912
if (!(impl.flags_ & implementation_type::internal_non_blocking))
914
ioctl_arg_type non_blocking = 1;
916
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
918
this->get_io_service().post(bind_handler(handler, ec, 0));
921
impl.flags_ |= implementation_type::internal_non_blocking;
924
reactor_.start_write_op(impl.socket_, impl.reactor_data_,
925
send_to_operation<ConstBufferSequence, Handler>(
926
impl.socket_, this->get_io_service(), buffers,
927
destination, flags, handler));
931
// Start an asynchronous wait until data can be sent without blocking.
932
template <typename Handler>
933
void async_send_to(implementation_type& impl, const null_buffers&,
934
socket_base::message_flags, const endpoint_type&, Handler handler)
938
this->get_io_service().post(bind_handler(handler,
939
asio::error::bad_descriptor, 0));
943
reactor_.start_write_op(impl.socket_, impl.reactor_data_,
944
null_buffers_operation<Handler>(this->get_io_service(), handler),
949
// Receive some data from the peer. Returns the number of bytes received.
950
template <typename MutableBufferSequence>
951
size_t receive(implementation_type& impl,
952
const MutableBufferSequence& buffers,
953
socket_base::message_flags flags, asio::error_code& ec)
957
ec = asio::error::bad_descriptor;
961
// Copy buffers into array.
962
socket_ops::buf bufs[max_buffers];
963
typename MutableBufferSequence::const_iterator iter = buffers.begin();
964
typename MutableBufferSequence::const_iterator end = buffers.end();
966
size_t total_buffer_size = 0;
967
for (; iter != end && i < max_buffers; ++iter, ++i)
969
asio::mutable_buffer buffer(*iter);
970
socket_ops::init_buf(bufs[i],
971
asio::buffer_cast<void*>(buffer),
972
asio::buffer_size(buffer));
973
total_buffer_size += asio::buffer_size(buffer);
976
// A request to receive 0 bytes on a stream socket is a no-op.
977
if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
979
ec = asio::error_code();
983
// Make socket non-blocking if user wants non-blocking.
984
if (impl.flags_ & implementation_type::user_set_non_blocking)
986
if (!(impl.flags_ & implementation_type::internal_non_blocking))
988
ioctl_arg_type non_blocking = 1;
989
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
991
impl.flags_ |= implementation_type::internal_non_blocking;
995
// Receive some data.
998
// Try to complete the operation without blocking.
999
int bytes_recvd = socket_ops::recv(impl.socket_, bufs, i, flags, ec);
1001
// Check if operation succeeded.
1002
if (bytes_recvd > 0)
1006
if (bytes_recvd == 0 && impl.protocol_.type() == SOCK_STREAM)
1008
ec = asio::error::eof;
1012
// Operation failed.
1013
if ((impl.flags_ & implementation_type::user_set_non_blocking)
1014
|| (ec != asio::error::would_block
1015
&& ec != asio::error::try_again))
1018
// Wait for socket to become ready.
1019
if (socket_ops::poll_read(impl.socket_, ec) < 0)
1024
// Wait until data can be received without blocking.
1025
size_t receive(implementation_type& impl, const null_buffers&,
1026
socket_base::message_flags, asio::error_code& ec)
1030
ec = asio::error::bad_descriptor;
1034
// Wait for socket to become ready.
1035
socket_ops::poll_read(impl.socket_, ec);
1040
template <typename MutableBufferSequence, typename Handler>
1041
class receive_operation :
1042
public handler_base_from_member<Handler>
1045
receive_operation(socket_type socket, int protocol_type,
1046
asio::io_service& io_service,
1047
const MutableBufferSequence& buffers,
1048
socket_base::message_flags flags, Handler handler)
1049
: handler_base_from_member<Handler>(handler),
1051
protocol_type_(protocol_type),
1052
io_service_(io_service),
1059
bool perform(asio::error_code& ec,
1060
std::size_t& bytes_transferred)
1062
// Check whether the operation was successful.
1065
bytes_transferred = 0;
1069
// Copy buffers into array.
1070
socket_ops::buf bufs[max_buffers];
1071
typename MutableBufferSequence::const_iterator iter = buffers_.begin();
1072
typename MutableBufferSequence::const_iterator end = buffers_.end();
1074
for (; iter != end && i < max_buffers; ++iter, ++i)
1076
asio::mutable_buffer buffer(*iter);
1077
socket_ops::init_buf(bufs[i],
1078
asio::buffer_cast<void*>(buffer),
1079
asio::buffer_size(buffer));
1082
// Receive some data.
1083
int bytes = socket_ops::recv(socket_, bufs, i, flags_, ec);
1084
if (bytes == 0 && protocol_type_ == SOCK_STREAM)
1085
ec = asio::error::eof;
1087
// Check if we need to run the operation again.
1088
if (ec == asio::error::would_block
1089
|| ec == asio::error::try_again)
1092
bytes_transferred = (bytes < 0 ? 0 : bytes);
1096
void complete(const asio::error_code& ec,
1097
std::size_t bytes_transferred)
1099
io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
1103
socket_type socket_;
1105
asio::io_service& io_service_;
1106
asio::io_service::work work_;
1107
MutableBufferSequence buffers_;
1108
socket_base::message_flags flags_;
1111
// Start an asynchronous receive. The buffer for the data being received
1112
// must be valid for the lifetime of the asynchronous operation.
1113
template <typename MutableBufferSequence, typename Handler>
1114
void async_receive(implementation_type& impl,
1115
const MutableBufferSequence& buffers,
1116
socket_base::message_flags flags, Handler handler)
1120
this->get_io_service().post(bind_handler(handler,
1121
asio::error::bad_descriptor, 0));
1125
if (impl.protocol_.type() == SOCK_STREAM)
1127
// Determine total size of buffers.
1128
typename MutableBufferSequence::const_iterator iter = buffers.begin();
1129
typename MutableBufferSequence::const_iterator end = buffers.end();
1131
size_t total_buffer_size = 0;
1132
for (; iter != end && i < max_buffers; ++iter, ++i)
1134
asio::mutable_buffer buffer(*iter);
1135
total_buffer_size += asio::buffer_size(buffer);
1138
// A request to receive 0 bytes on a stream socket is a no-op.
1139
if (total_buffer_size == 0)
1141
this->get_io_service().post(bind_handler(handler,
1142
asio::error_code(), 0));
1147
// Make socket non-blocking.
1148
if (!(impl.flags_ & implementation_type::internal_non_blocking))
1150
ioctl_arg_type non_blocking = 1;
1151
asio::error_code ec;
1152
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1154
this->get_io_service().post(bind_handler(handler, ec, 0));
1157
impl.flags_ |= implementation_type::internal_non_blocking;
1160
if (flags & socket_base::message_out_of_band)
1162
reactor_.start_except_op(impl.socket_, impl.reactor_data_,
1163
receive_operation<MutableBufferSequence, Handler>(
1164
impl.socket_, impl.protocol_.type(),
1165
this->get_io_service(), buffers, flags, handler));
1169
reactor_.start_read_op(impl.socket_, impl.reactor_data_,
1170
receive_operation<MutableBufferSequence, Handler>(
1171
impl.socket_, impl.protocol_.type(),
1172
this->get_io_service(), buffers, flags, handler));
1177
// Wait until data can be received without blocking.
1178
template <typename Handler>
1179
void async_receive(implementation_type& impl, const null_buffers&,
1180
socket_base::message_flags flags, Handler handler)
1184
this->get_io_service().post(bind_handler(handler,
1185
asio::error::bad_descriptor, 0));
1187
else if (flags & socket_base::message_out_of_band)
1189
reactor_.start_except_op(impl.socket_, impl.reactor_data_,
1190
null_buffers_operation<Handler>(this->get_io_service(), handler));
1194
reactor_.start_read_op(impl.socket_, impl.reactor_data_,
1195
null_buffers_operation<Handler>(this->get_io_service(), handler),
1200
// Receive a datagram with the endpoint of the sender. Returns the number of
1202
template <typename MutableBufferSequence>
1203
size_t receive_from(implementation_type& impl,
1204
const MutableBufferSequence& buffers,
1205
endpoint_type& sender_endpoint, socket_base::message_flags flags,
1206
asio::error_code& ec)
1210
ec = asio::error::bad_descriptor;
1214
// Copy buffers into array.
1215
socket_ops::buf bufs[max_buffers];
1216
typename MutableBufferSequence::const_iterator iter = buffers.begin();
1217
typename MutableBufferSequence::const_iterator end = buffers.end();
1219
for (; iter != end && i < max_buffers; ++iter, ++i)
1221
asio::mutable_buffer buffer(*iter);
1222
socket_ops::init_buf(bufs[i],
1223
asio::buffer_cast<void*>(buffer),
1224
asio::buffer_size(buffer));
1227
// Make socket non-blocking if user wants non-blocking.
1228
if (impl.flags_ & implementation_type::user_set_non_blocking)
1230
if (!(impl.flags_ & implementation_type::internal_non_blocking))
1232
ioctl_arg_type non_blocking = 1;
1233
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1235
impl.flags_ |= implementation_type::internal_non_blocking;
1239
// Receive some data.
1242
// Try to complete the operation without blocking.
1243
std::size_t addr_len = sender_endpoint.capacity();
1244
int bytes_recvd = socket_ops::recvfrom(impl.socket_, bufs, i, flags,
1245
sender_endpoint.data(), &addr_len, ec);
1247
// Check if operation succeeded.
1248
if (bytes_recvd > 0)
1250
sender_endpoint.resize(addr_len);
1255
if (bytes_recvd == 0 && impl.protocol_.type() == SOCK_STREAM)
1257
ec = asio::error::eof;
1261
// Operation failed.
1262
if ((impl.flags_ & implementation_type::user_set_non_blocking)
1263
|| (ec != asio::error::would_block
1264
&& ec != asio::error::try_again))
1267
// Wait for socket to become ready.
1268
if (socket_ops::poll_read(impl.socket_, ec) < 0)
1273
// Wait until data can be received without blocking.
1274
size_t receive_from(implementation_type& impl, const null_buffers&,
1275
endpoint_type& sender_endpoint, socket_base::message_flags,
1276
asio::error_code& ec)
1280
ec = asio::error::bad_descriptor;
1284
// Wait for socket to become ready.
1285
socket_ops::poll_read(impl.socket_, ec);
1287
// Reset endpoint since it can be given no sensible value at this time.
1288
sender_endpoint = endpoint_type();
1293
template <typename MutableBufferSequence, typename Handler>
1294
class receive_from_operation :
1295
public handler_base_from_member<Handler>
1298
receive_from_operation(socket_type socket, int protocol_type,
1299
asio::io_service& io_service,
1300
const MutableBufferSequence& buffers, endpoint_type& endpoint,
1301
socket_base::message_flags flags, Handler handler)
1302
: handler_base_from_member<Handler>(handler),
1304
protocol_type_(protocol_type),
1305
io_service_(io_service),
1308
sender_endpoint_(endpoint),
1313
bool perform(asio::error_code& ec,
1314
std::size_t& bytes_transferred)
1316
// Check whether the operation was successful.
1319
bytes_transferred = 0;
1323
// Copy buffers into array.
1324
socket_ops::buf bufs[max_buffers];
1325
typename MutableBufferSequence::const_iterator iter = buffers_.begin();
1326
typename MutableBufferSequence::const_iterator end = buffers_.end();
1328
for (; iter != end && i < max_buffers; ++iter, ++i)
1330
asio::mutable_buffer buffer(*iter);
1331
socket_ops::init_buf(bufs[i],
1332
asio::buffer_cast<void*>(buffer),
1333
asio::buffer_size(buffer));
1336
// Receive some data.
1337
std::size_t addr_len = sender_endpoint_.capacity();
1338
int bytes = socket_ops::recvfrom(socket_, bufs, i, flags_,
1339
sender_endpoint_.data(), &addr_len, ec);
1340
if (bytes == 0 && protocol_type_ == SOCK_STREAM)
1341
ec = asio::error::eof;
1343
// Check if we need to run the operation again.
1344
if (ec == asio::error::would_block
1345
|| ec == asio::error::try_again)
1348
sender_endpoint_.resize(addr_len);
1349
bytes_transferred = (bytes < 0 ? 0 : bytes);
1353
void complete(const asio::error_code& ec,
1354
std::size_t bytes_transferred)
1356
io_service_.post(bind_handler(this->handler_, ec, bytes_transferred));
1360
socket_type socket_;
1362
asio::io_service& io_service_;
1363
asio::io_service::work work_;
1364
MutableBufferSequence buffers_;
1365
endpoint_type& sender_endpoint_;
1366
socket_base::message_flags flags_;
1369
// Start an asynchronous receive. The buffer for the data being received and
1370
// the sender_endpoint object must both be valid for the lifetime of the
1371
// asynchronous operation.
1372
template <typename MutableBufferSequence, typename Handler>
1373
void async_receive_from(implementation_type& impl,
1374
const MutableBufferSequence& buffers, endpoint_type& sender_endpoint,
1375
socket_base::message_flags flags, Handler handler)
1379
this->get_io_service().post(bind_handler(handler,
1380
asio::error::bad_descriptor, 0));
1384
// Make socket non-blocking.
1385
if (!(impl.flags_ & implementation_type::internal_non_blocking))
1387
ioctl_arg_type non_blocking = 1;
1388
asio::error_code ec;
1389
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1391
this->get_io_service().post(bind_handler(handler, ec, 0));
1394
impl.flags_ |= implementation_type::internal_non_blocking;
1397
reactor_.start_read_op(impl.socket_, impl.reactor_data_,
1398
receive_from_operation<MutableBufferSequence, Handler>(
1399
impl.socket_, impl.protocol_.type(), this->get_io_service(),
1400
buffers, sender_endpoint, flags, handler));
1404
// Wait until data can be received without blocking.
1405
template <typename Handler>
1406
void async_receive_from(implementation_type& impl,
1407
const null_buffers&, endpoint_type& sender_endpoint,
1408
socket_base::message_flags flags, Handler handler)
1412
this->get_io_service().post(bind_handler(handler,
1413
asio::error::bad_descriptor, 0));
1417
// Reset endpoint since it can be given no sensible value at this time.
1418
sender_endpoint = endpoint_type();
1420
if (flags & socket_base::message_out_of_band)
1422
reactor_.start_except_op(impl.socket_, impl.reactor_data_,
1423
null_buffers_operation<Handler>(this->get_io_service(), handler));
1427
reactor_.start_read_op(impl.socket_, impl.reactor_data_,
1428
null_buffers_operation<Handler>(this->get_io_service(), handler),
1434
// Accept a new connection.
1435
template <typename Socket>
1436
asio::error_code accept(implementation_type& impl,
1437
Socket& peer, endpoint_type* peer_endpoint, asio::error_code& ec)
1441
ec = asio::error::bad_descriptor;
1445
// We cannot accept a socket that is already open.
1448
ec = asio::error::already_open;
1452
// Make socket non-blocking if user wants non-blocking.
1453
if (impl.flags_ & implementation_type::user_set_non_blocking)
1455
if (!(impl.flags_ & implementation_type::internal_non_blocking))
1457
ioctl_arg_type non_blocking = 1;
1458
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1460
impl.flags_ |= implementation_type::internal_non_blocking;
1467
// Try to complete the operation without blocking.
1468
asio::error_code ec;
1469
socket_holder new_socket;
1470
std::size_t addr_len = 0;
1473
addr_len = peer_endpoint->capacity();
1474
new_socket.reset(socket_ops::accept(impl.socket_,
1475
peer_endpoint->data(), &addr_len, ec));
1479
new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec));
1482
// Check if operation succeeded.
1483
if (new_socket.get() >= 0)
1486
peer_endpoint->resize(addr_len);
1487
peer.assign(impl.protocol_, new_socket.get(), ec);
1489
new_socket.release();
1493
// Operation failed.
1494
if (ec == asio::error::would_block
1495
|| ec == asio::error::try_again)
1497
if (impl.flags_ & implementation_type::user_set_non_blocking)
1499
// Fall through to retry operation.
1501
else if (ec == asio::error::connection_aborted)
1503
if (impl.flags_ & implementation_type::enable_connection_aborted)
1505
// Fall through to retry operation.
1508
else if (ec.value() == EPROTO)
1510
if (impl.flags_ & implementation_type::enable_connection_aborted)
1512
// Fall through to retry operation.
1514
#endif // defined(EPROTO)
1518
// Wait for socket to become ready.
1519
if (socket_ops::poll_read(impl.socket_, ec) < 0)
1524
template <typename Socket, typename Handler>
1525
class accept_operation :
1526
public handler_base_from_member<Handler>
1529
accept_operation(socket_type socket, asio::io_service& io_service,
1530
Socket& peer, const protocol_type& protocol,
1531
endpoint_type* peer_endpoint, bool enable_connection_aborted,
1533
: handler_base_from_member<Handler>(handler),
1535
io_service_(io_service),
1538
protocol_(protocol),
1539
peer_endpoint_(peer_endpoint),
1540
enable_connection_aborted_(enable_connection_aborted)
1544
bool perform(asio::error_code& ec, std::size_t&)
1546
// Check whether the operation was successful.
1550
// Accept the waiting connection.
1551
socket_holder new_socket;
1552
std::size_t addr_len = 0;
1555
addr_len = peer_endpoint_->capacity();
1556
new_socket.reset(socket_ops::accept(socket_,
1557
peer_endpoint_->data(), &addr_len, ec));
1561
new_socket.reset(socket_ops::accept(socket_, 0, 0, ec));
1564
// Check if we need to run the operation again.
1565
if (ec == asio::error::would_block
1566
|| ec == asio::error::try_again)
1568
if (ec == asio::error::connection_aborted
1569
&& !enable_connection_aborted_)
1572
if (ec.value() == EPROTO && !enable_connection_aborted_)
1574
#endif // defined(EPROTO)
1576
// Transfer ownership of the new socket to the peer object.
1580
peer_endpoint_->resize(addr_len);
1581
peer_.assign(protocol_, new_socket.get(), ec);
1583
new_socket.release();
1589
void complete(const asio::error_code& ec, std::size_t)
1591
io_service_.post(bind_handler(this->handler_, ec));
1595
socket_type socket_;
1596
asio::io_service& io_service_;
1597
asio::io_service::work work_;
1599
protocol_type protocol_;
1600
endpoint_type* peer_endpoint_;
1601
bool enable_connection_aborted_;
1604
// Start an asynchronous accept. The peer and peer_endpoint objects
1605
// must be valid until the accept's handler is invoked.
1606
template <typename Socket, typename Handler>
1607
void async_accept(implementation_type& impl, Socket& peer,
1608
endpoint_type* peer_endpoint, Handler handler)
1612
this->get_io_service().post(bind_handler(handler,
1613
asio::error::bad_descriptor));
1615
else if (peer.is_open())
1617
this->get_io_service().post(bind_handler(handler,
1618
asio::error::already_open));
1622
// Make socket non-blocking.
1623
if (!(impl.flags_ & implementation_type::internal_non_blocking))
1625
ioctl_arg_type non_blocking = 1;
1626
asio::error_code ec;
1627
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1629
this->get_io_service().post(bind_handler(handler, ec));
1632
impl.flags_ |= implementation_type::internal_non_blocking;
1635
reactor_.start_read_op(impl.socket_, impl.reactor_data_,
1636
accept_operation<Socket, Handler>(
1637
impl.socket_, this->get_io_service(),
1638
peer, impl.protocol_, peer_endpoint,
1639
(impl.flags_ & implementation_type::enable_connection_aborted) != 0,
1644
// Connect the socket to the specified endpoint.
1645
asio::error_code connect(implementation_type& impl,
1646
const endpoint_type& peer_endpoint, asio::error_code& ec)
1650
ec = asio::error::bad_descriptor;
1654
if (impl.flags_ & implementation_type::internal_non_blocking)
1656
// Mark the socket as blocking while we perform the connect.
1657
ioctl_arg_type non_blocking = 0;
1658
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1660
impl.flags_ &= ~implementation_type::internal_non_blocking;
1663
// Perform the connect operation.
1664
socket_ops::connect(impl.socket_,
1665
peer_endpoint.data(), peer_endpoint.size(), ec);
1669
template <typename Handler>
1670
class connect_operation :
1671
public handler_base_from_member<Handler>
1674
connect_operation(socket_type socket,
1675
asio::io_service& io_service, Handler handler)
1676
: handler_base_from_member<Handler>(handler),
1678
io_service_(io_service),
1683
bool perform(asio::error_code& ec, std::size_t&)
1685
// Check whether the operation was successful.
1689
// Get the error code from the connect operation.
1690
int connect_error = 0;
1691
size_t connect_error_len = sizeof(connect_error);
1692
if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR,
1693
&connect_error, &connect_error_len, ec) == socket_error_retval)
1696
// The connection failed so the handler will be posted with an error code.
1699
ec = asio::error_code(connect_error,
1700
asio::error::get_system_category());
1707
void complete(const asio::error_code& ec, std::size_t)
1709
io_service_.post(bind_handler(this->handler_, ec));
1713
socket_type socket_;
1714
asio::io_service& io_service_;
1715
asio::io_service::work work_;
1718
// Start an asynchronous connect.
1719
template <typename Handler>
1720
void async_connect(implementation_type& impl,
1721
const endpoint_type& peer_endpoint, Handler handler)
1725
this->get_io_service().post(bind_handler(handler,
1726
asio::error::bad_descriptor));
1730
// Make socket non-blocking.
1731
if (!(impl.flags_ & implementation_type::internal_non_blocking))
1733
ioctl_arg_type non_blocking = 1;
1734
asio::error_code ec;
1735
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
1737
this->get_io_service().post(bind_handler(handler, ec));
1740
impl.flags_ |= implementation_type::internal_non_blocking;
1743
// Start the connect operation. The socket is already marked as non-blocking
1744
// so the connection will take place asynchronously.
1745
asio::error_code ec;
1746
if (socket_ops::connect(impl.socket_, peer_endpoint.data(),
1747
peer_endpoint.size(), ec) == 0)
1749
// The connect operation has finished successfully so we need to post the
1750
// handler immediately.
1751
this->get_io_service().post(bind_handler(handler,
1752
asio::error_code()));
1754
else if (ec == asio::error::in_progress
1755
|| ec == asio::error::would_block)
1757
// The connection is happening in the background, and we need to wait
1758
// until the socket becomes writeable.
1759
reactor_.start_connect_op(impl.socket_, impl.reactor_data_,
1760
connect_operation<Handler>(impl.socket_,
1761
this->get_io_service(), handler));
1765
// The connect operation has failed, so post the handler immediately.
1766
this->get_io_service().post(bind_handler(handler, ec));
1771
// The selector that performs event demultiplexing for the service.
1775
} // namespace detail
1778
#include "asio/detail/pop_options.hpp"
1780
#endif // ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP