2
// win_iocp_socket_service.hpp
3
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~
5
// Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7
// Distributed under the Boost Software License, Version 1.0. (See accompanying
8
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
11
#ifndef ASIO_DETAIL_WIN_IOCP_SOCKET_SERVICE_HPP
12
#define ASIO_DETAIL_WIN_IOCP_SOCKET_SERVICE_HPP
14
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
16
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
18
#include "asio/detail/push_options.hpp"
20
#include "asio/detail/win_iocp_io_service_fwd.hpp"
22
#if defined(ASIO_HAS_IOCP)
24
#include "asio/detail/push_options.hpp"
26
#include <boost/shared_ptr.hpp>
27
#include <boost/type_traits/is_same.hpp>
28
#include <boost/weak_ptr.hpp>
29
#include "asio/detail/pop_options.hpp"
31
#include "asio/buffer.hpp"
32
#include "asio/error.hpp"
33
#include "asio/io_service.hpp"
34
#include "asio/socket_base.hpp"
35
#include "asio/detail/bind_handler.hpp"
36
#include "asio/detail/handler_alloc_helpers.hpp"
37
#include "asio/detail/handler_invoke_helpers.hpp"
38
#include "asio/detail/mutex.hpp"
39
#include "asio/detail/select_reactor.hpp"
40
#include "asio/detail/socket_holder.hpp"
41
#include "asio/detail/socket_ops.hpp"
42
#include "asio/detail/socket_types.hpp"
43
#include "asio/detail/win_iocp_io_service.hpp"
48
template <typename Protocol>
49
class win_iocp_socket_service
50
: public asio::detail::service_base<win_iocp_socket_service<Protocol> >
54
typedef Protocol protocol_type;
57
typedef typename Protocol::endpoint endpoint_type;
59
// Base class for all operations.
60
typedef win_iocp_io_service::operation operation;
62
struct noop_deleter { void operator()(void*) {} };
63
typedef boost::shared_ptr<void> shared_cancel_token_type;
64
typedef boost::weak_ptr<void> weak_cancel_token_type;
66
// The native type of a socket.
70
native_type(socket_type s)
72
have_remote_endpoint_(false)
76
native_type(socket_type s, const endpoint_type& ep)
78
have_remote_endpoint_(true),
83
void operator=(socket_type s)
86
have_remote_endpoint_ = false;
87
remote_endpoint_ = endpoint_type();
90
operator socket_type() const
95
HANDLE as_handle() const
97
return reinterpret_cast<HANDLE>(socket_);
100
bool have_remote_endpoint() const
102
return have_remote_endpoint_;
105
endpoint_type remote_endpoint() const
107
return remote_endpoint_;
112
bool have_remote_endpoint_;
113
endpoint_type remote_endpoint_;
116
// The type of the reactor used for connect operations.
117
typedef detail::select_reactor<true> reactor_type;
119
// The implementation type of the socket.
120
class implementation_type
123
// Default constructor.
124
implementation_type()
125
: socket_(invalid_socket),
128
protocol_(endpoint_type().protocol()),
135
// Only this service will have access to the internal values.
136
friend class win_iocp_socket_service;
138
// The native socket representation.
143
enable_connection_aborted = 1, // User wants connection_aborted errors.
144
close_might_block = 2, // User set linger option for blocking close.
145
user_set_non_blocking = 4 // The user wants a non-blocking socket.
148
// Flags indicating the current state of the socket.
149
unsigned char flags_;
151
// We use a shared pointer as a cancellation token here to work around the
152
// broken Windows support for cancellation. MSDN says that when you call
153
// closesocket any outstanding WSARecv or WSASend operations will complete
154
// with the error ERROR_OPERATION_ABORTED. In practice they complete with
155
// ERROR_NETNAME_DELETED, which means you can't tell the difference between
156
// a local cancellation and the socket being hard-closed by the peer.
157
shared_cancel_token_type cancel_token_;
159
// The protocol associated with the socket.
160
protocol_type protocol_;
162
// Per-descriptor data used by the reactor.
163
reactor_type::per_descriptor_data reactor_data_;
165
#if defined(ASIO_ENABLE_CANCELIO)
166
// The ID of the thread from which it is safe to cancel asynchronous
167
// operations. 0 means no asynchronous operations have been started yet.
168
// ~0 means asynchronous operations have been started from more than one
169
// thread, and cancellation is not supported for the socket.
170
DWORD safe_cancellation_thread_id_;
171
#endif // defined(ASIO_ENABLE_CANCELIO)
173
// Pointers to adjacent socket implementations in linked list.
174
implementation_type* next_;
175
implementation_type* prev_;
178
// The maximum number of buffers to support in a single operation.
179
enum { max_buffers = 64 < max_iov_len ? 64 : max_iov_len };
182
win_iocp_socket_service(asio::io_service& io_service)
183
: asio::detail::service_base<
184
win_iocp_socket_service<Protocol> >(io_service),
185
iocp_service_(asio::use_service<win_iocp_io_service>(io_service)),
192
// Destroy all user-defined handler objects owned by the service.
193
void shutdown_service()
195
// Close all implementations, causing all operations to complete.
196
asio::detail::mutex::scoped_lock lock(mutex_);
197
implementation_type* impl = impl_list_;
200
asio::error_code ignored_ec;
201
close_for_destruction(*impl);
206
// Construct a new socket implementation.
207
void construct(implementation_type& impl)
209
impl.socket_ = invalid_socket;
211
impl.cancel_token_.reset();
212
#if defined(ASIO_ENABLE_CANCELIO)
213
impl.safe_cancellation_thread_id_ = 0;
214
#endif // defined(ASIO_ENABLE_CANCELIO)
216
// Insert implementation into linked list of all implementations.
217
asio::detail::mutex::scoped_lock lock(mutex_);
218
impl.next_ = impl_list_;
221
impl_list_->prev_ = &impl;
225
// Destroy a socket implementation.
226
void destroy(implementation_type& impl)
228
close_for_destruction(impl);
230
// Remove implementation from linked list of all implementations.
231
asio::detail::mutex::scoped_lock lock(mutex_);
232
if (impl_list_ == &impl)
233
impl_list_ = impl.next_;
235
impl.prev_->next_ = impl.next_;
237
impl.next_->prev_= impl.prev_;
242
// Open a new socket implementation.
243
asio::error_code open(implementation_type& impl,
244
const protocol_type& protocol, asio::error_code& ec)
248
ec = asio::error::already_open;
252
socket_holder sock(socket_ops::socket(protocol.family(), protocol.type(),
253
protocol.protocol(), ec));
254
if (sock.get() == invalid_socket)
257
HANDLE sock_as_handle = reinterpret_cast<HANDLE>(sock.get());
258
if (iocp_service_.register_handle(sock_as_handle, ec))
261
impl.socket_ = sock.release();
263
impl.cancel_token_.reset(static_cast<void*>(0), noop_deleter());
264
impl.protocol_ = protocol;
265
ec = asio::error_code();
269
// Assign a native socket to a socket implementation.
270
asio::error_code assign(implementation_type& impl,
271
const protocol_type& protocol, const native_type& native_socket,
272
asio::error_code& ec)
276
ec = asio::error::already_open;
280
if (iocp_service_.register_handle(native_socket.as_handle(), ec))
283
impl.socket_ = native_socket;
285
impl.cancel_token_.reset(static_cast<void*>(0), noop_deleter());
286
impl.protocol_ = protocol;
287
ec = asio::error_code();
291
// Determine whether the socket is open.
292
bool is_open(const implementation_type& impl) const
294
return impl.socket_ != invalid_socket;
297
// Destroy a socket implementation.
298
asio::error_code close(implementation_type& impl,
299
asio::error_code& ec)
303
// Check if the reactor was created, in which case we need to close the
304
// socket on the reactor as well to cancel any operations that might be
306
reactor_type* reactor = static_cast<reactor_type*>(
307
interlocked_compare_exchange_pointer(
308
reinterpret_cast<void**>(&reactor_), 0, 0));
310
reactor->close_descriptor(impl.socket_, impl.reactor_data_);
312
if (socket_ops::close(impl.socket_, ec) == socket_error_retval)
315
impl.socket_ = invalid_socket;
317
impl.cancel_token_.reset();
318
#if defined(ASIO_ENABLE_CANCELIO)
319
impl.safe_cancellation_thread_id_ = 0;
320
#endif // defined(ASIO_ENABLE_CANCELIO)
323
ec = asio::error_code();
327
// Get the native socket representation.
328
native_type native(implementation_type& impl)
333
// Cancel all operations associated with the socket.
334
asio::error_code cancel(implementation_type& impl,
335
asio::error_code& ec)
339
ec = asio::error::bad_descriptor;
342
else if (FARPROC cancel_io_ex_ptr = ::GetProcAddress(
343
::GetModuleHandleA("KERNEL32"), "CancelIoEx"))
345
// The version of Windows supports cancellation from any thread.
346
typedef BOOL (WINAPI* cancel_io_ex_t)(HANDLE, LPOVERLAPPED);
347
cancel_io_ex_t cancel_io_ex = (cancel_io_ex_t)cancel_io_ex_ptr;
348
socket_type sock = impl.socket_;
349
HANDLE sock_as_handle = reinterpret_cast<HANDLE>(sock);
350
if (!cancel_io_ex(sock_as_handle, 0))
352
DWORD last_error = ::GetLastError();
353
if (last_error == ERROR_NOT_FOUND)
355
// ERROR_NOT_FOUND means that there were no operations to be
356
// cancelled. We swallow this error to match the behaviour on other
358
ec = asio::error_code();
362
ec = asio::error_code(last_error,
363
asio::error::get_system_category());
368
ec = asio::error_code();
371
#if defined(ASIO_ENABLE_CANCELIO)
372
else if (impl.safe_cancellation_thread_id_ == 0)
374
// No operations have been started, so there's nothing to cancel.
375
ec = asio::error_code();
377
else if (impl.safe_cancellation_thread_id_ == ::GetCurrentThreadId())
379
// Asynchronous operations have been started from the current thread only,
380
// so it is safe to try to cancel them using CancelIo.
381
socket_type sock = impl.socket_;
382
HANDLE sock_as_handle = reinterpret_cast<HANDLE>(sock);
383
if (!::CancelIo(sock_as_handle))
385
DWORD last_error = ::GetLastError();
386
ec = asio::error_code(last_error,
387
asio::error::get_system_category());
391
ec = asio::error_code();
396
// Asynchronous operations have been started from more than one thread,
397
// so cancellation is not safe.
398
ec = asio::error::operation_not_supported;
400
#else // defined(ASIO_ENABLE_CANCELIO)
403
// Cancellation is not supported as CancelIo may not be used.
404
ec = asio::error::operation_not_supported;
406
#endif // defined(ASIO_ENABLE_CANCELIO)
411
// Determine whether the socket is at the out-of-band data mark.
412
bool at_mark(const implementation_type& impl,
413
asio::error_code& ec) const
417
ec = asio::error::bad_descriptor;
421
asio::detail::ioctl_arg_type value = 0;
422
socket_ops::ioctl(impl.socket_, SIOCATMARK, &value, ec);
423
return ec ? false : value != 0;
426
// Determine the number of bytes available for reading.
427
std::size_t available(const implementation_type& impl,
428
asio::error_code& ec) const
432
ec = asio::error::bad_descriptor;
436
asio::detail::ioctl_arg_type value = 0;
437
socket_ops::ioctl(impl.socket_, FIONREAD, &value, ec);
438
return ec ? static_cast<std::size_t>(0) : static_cast<std::size_t>(value);
441
// Bind the socket to the specified local endpoint.
442
asio::error_code bind(implementation_type& impl,
443
const endpoint_type& endpoint, asio::error_code& ec)
447
ec = asio::error::bad_descriptor;
451
socket_ops::bind(impl.socket_, endpoint.data(), endpoint.size(), ec);
455
// Place the socket into the state where it will listen for new connections.
456
asio::error_code listen(implementation_type& impl, int backlog,
457
asio::error_code& ec)
461
ec = asio::error::bad_descriptor;
465
socket_ops::listen(impl.socket_, backlog, ec);
469
// Set a socket option.
470
template <typename Option>
471
asio::error_code set_option(implementation_type& impl,
472
const Option& option, asio::error_code& ec)
476
ec = asio::error::bad_descriptor;
480
if (option.level(impl.protocol_) == custom_socket_option_level
481
&& option.name(impl.protocol_) == enable_connection_aborted_option)
483
if (option.size(impl.protocol_) != sizeof(int))
485
ec = asio::error::invalid_argument;
489
if (*reinterpret_cast<const int*>(option.data(impl.protocol_)))
490
impl.flags_ |= implementation_type::enable_connection_aborted;
492
impl.flags_ &= ~implementation_type::enable_connection_aborted;
493
ec = asio::error_code();
499
if (option.level(impl.protocol_) == SOL_SOCKET
500
&& option.name(impl.protocol_) == SO_LINGER)
502
const ::linger* linger_option =
503
reinterpret_cast<const ::linger*>(option.data(impl.protocol_));
504
if (linger_option->l_onoff != 0 && linger_option->l_linger != 0)
505
impl.flags_ |= implementation_type::close_might_block;
507
impl.flags_ &= ~implementation_type::close_might_block;
510
socket_ops::setsockopt(impl.socket_,
511
option.level(impl.protocol_), option.name(impl.protocol_),
512
option.data(impl.protocol_), option.size(impl.protocol_), ec);
517
// Set a socket option.
518
template <typename Option>
519
asio::error_code get_option(const implementation_type& impl,
520
Option& option, asio::error_code& ec) const
524
ec = asio::error::bad_descriptor;
528
if (option.level(impl.protocol_) == custom_socket_option_level
529
&& option.name(impl.protocol_) == enable_connection_aborted_option)
531
if (option.size(impl.protocol_) != sizeof(int))
533
ec = asio::error::invalid_argument;
537
int* target = reinterpret_cast<int*>(option.data(impl.protocol_));
538
if (impl.flags_ & implementation_type::enable_connection_aborted)
542
option.resize(impl.protocol_, sizeof(int));
543
ec = asio::error_code();
549
size_t size = option.size(impl.protocol_);
550
socket_ops::getsockopt(impl.socket_,
551
option.level(impl.protocol_), option.name(impl.protocol_),
552
option.data(impl.protocol_), &size, ec);
554
option.resize(impl.protocol_, size);
559
// Perform an IO control command on the socket.
560
template <typename IO_Control_Command>
561
asio::error_code io_control(implementation_type& impl,
562
IO_Control_Command& command, asio::error_code& ec)
566
ec = asio::error::bad_descriptor;
570
socket_ops::ioctl(impl.socket_, command.name(),
571
static_cast<ioctl_arg_type*>(command.data()), ec);
573
if (!ec && command.name() == static_cast<int>(FIONBIO))
576
impl.flags_ |= implementation_type::user_set_non_blocking;
578
impl.flags_ &= ~implementation_type::user_set_non_blocking;
584
// Get the local endpoint.
585
endpoint_type local_endpoint(const implementation_type& impl,
586
asio::error_code& ec) const
590
ec = asio::error::bad_descriptor;
591
return endpoint_type();
594
endpoint_type endpoint;
595
std::size_t addr_len = endpoint.capacity();
596
if (socket_ops::getsockname(impl.socket_, endpoint.data(), &addr_len, ec))
597
return endpoint_type();
598
endpoint.resize(addr_len);
602
// Get the remote endpoint.
603
endpoint_type remote_endpoint(const implementation_type& impl,
604
asio::error_code& ec) const
608
ec = asio::error::bad_descriptor;
609
return endpoint_type();
612
if (impl.socket_.have_remote_endpoint())
614
// Check if socket is still connected.
615
DWORD connect_time = 0;
616
size_t connect_time_len = sizeof(connect_time);
617
if (socket_ops::getsockopt(impl.socket_, SOL_SOCKET, SO_CONNECT_TIME,
618
&connect_time, &connect_time_len, ec) == socket_error_retval)
620
return endpoint_type();
622
if (connect_time == 0xFFFFFFFF)
624
ec = asio::error::not_connected;
625
return endpoint_type();
628
ec = asio::error_code();
629
return impl.socket_.remote_endpoint();
633
endpoint_type endpoint;
634
std::size_t addr_len = endpoint.capacity();
635
if (socket_ops::getpeername(impl.socket_, endpoint.data(), &addr_len, ec))
636
return endpoint_type();
637
endpoint.resize(addr_len);
642
/// Disable sends or receives on the socket.
643
asio::error_code shutdown(implementation_type& impl,
644
socket_base::shutdown_type what, asio::error_code& ec)
648
ec = asio::error::bad_descriptor;
652
socket_ops::shutdown(impl.socket_, what, ec);
656
// Send the given data to the peer. Returns the number of bytes sent.
657
template <typename ConstBufferSequence>
658
size_t send(implementation_type& impl, const ConstBufferSequence& buffers,
659
socket_base::message_flags flags, asio::error_code& ec)
663
ec = asio::error::bad_descriptor;
667
// Copy buffers into WSABUF array.
668
::WSABUF bufs[max_buffers];
669
typename ConstBufferSequence::const_iterator iter = buffers.begin();
670
typename ConstBufferSequence::const_iterator end = buffers.end();
672
size_t total_buffer_size = 0;
673
for (; iter != end && i < max_buffers; ++iter, ++i)
675
asio::const_buffer buffer(*iter);
676
bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
677
bufs[i].buf = const_cast<char*>(
678
asio::buffer_cast<const char*>(buffer));
679
total_buffer_size += asio::buffer_size(buffer);
682
// A request to receive 0 bytes on a stream socket is a no-op.
683
if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
685
ec = asio::error_code();
690
DWORD bytes_transferred = 0;
691
int result = ::WSASend(impl.socket_, bufs,
692
i, &bytes_transferred, flags, 0, 0);
695
DWORD last_error = ::WSAGetLastError();
696
if (last_error == ERROR_NETNAME_DELETED)
697
last_error = WSAECONNRESET;
698
else if (last_error == ERROR_PORT_UNREACHABLE)
699
last_error = WSAECONNREFUSED;
700
ec = asio::error_code(last_error,
701
asio::error::get_system_category());
705
ec = asio::error_code();
706
return bytes_transferred;
709
// Wait until data can be sent without blocking.
710
size_t send(implementation_type& impl, const null_buffers&,
711
socket_base::message_flags, asio::error_code& ec)
715
ec = asio::error::bad_descriptor;
719
// Wait for socket to become ready.
720
socket_ops::poll_write(impl.socket_, ec);
725
template <typename ConstBufferSequence, typename Handler>
730
send_operation(win_iocp_io_service& io_service,
731
weak_cancel_token_type cancel_token,
732
const ConstBufferSequence& buffers, Handler handler)
733
: operation(io_service,
734
&send_operation<ConstBufferSequence, Handler>::do_completion_impl,
735
&send_operation<ConstBufferSequence, Handler>::destroy_impl),
736
work_(io_service.get_io_service()),
737
cancel_token_(cancel_token),
744
static void do_completion_impl(operation* op,
745
DWORD last_error, size_t bytes_transferred)
747
// Take ownership of the operation object.
748
typedef send_operation<ConstBufferSequence, Handler> op_type;
749
op_type* handler_op(static_cast<op_type*>(op));
750
typedef handler_alloc_traits<Handler, op_type> alloc_traits;
751
handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
753
#if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
754
// Check whether buffers are still valid.
755
typename ConstBufferSequence::const_iterator iter
756
= handler_op->buffers_.begin();
757
typename ConstBufferSequence::const_iterator end
758
= handler_op->buffers_.end();
761
asio::const_buffer buffer(*iter);
762
asio::buffer_cast<const char*>(buffer);
765
#endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING)
767
// Map non-portable errors to their portable counterparts.
768
asio::error_code ec(last_error,
769
asio::error::get_system_category());
770
if (ec.value() == ERROR_NETNAME_DELETED)
772
if (handler_op->cancel_token_.expired())
773
ec = asio::error::operation_aborted;
775
ec = asio::error::connection_reset;
777
else if (ec.value() == ERROR_PORT_UNREACHABLE)
779
ec = asio::error::connection_refused;
782
// Make a copy of the handler so that the memory can be deallocated before
783
// the upcall is made.
784
Handler handler(handler_op->handler_);
786
// Free the memory associated with the handler.
790
asio_handler_invoke_helpers::invoke(
791
detail::bind_handler(handler, ec, bytes_transferred), &handler);
794
static void destroy_impl(operation* op)
796
// Take ownership of the operation object.
797
typedef send_operation<ConstBufferSequence, Handler> op_type;
798
op_type* handler_op(static_cast<op_type*>(op));
799
typedef handler_alloc_traits<Handler, op_type> alloc_traits;
800
handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
802
// A sub-object of the handler may be the true owner of the memory
803
// associated with the handler. Consequently, a local copy of the handler
804
// is required to ensure that any owning sub-object remains valid until
805
// after we have deallocated the memory here.
806
Handler handler(handler_op->handler_);
809
// Free the memory associated with the handler.
813
asio::io_service::work work_;
814
weak_cancel_token_type cancel_token_;
815
ConstBufferSequence buffers_;
819
// Start an asynchronous send. The data being sent must be valid for the
820
// lifetime of the asynchronous operation.
821
template <typename ConstBufferSequence, typename Handler>
822
void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
823
socket_base::message_flags flags, Handler handler)
827
this->get_io_service().post(bind_handler(handler,
828
asio::error::bad_descriptor, 0));
832
#if defined(ASIO_ENABLE_CANCELIO)
833
// Update the ID of the thread from which cancellation is safe.
834
if (impl.safe_cancellation_thread_id_ == 0)
835
impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
836
else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
837
impl.safe_cancellation_thread_id_ = ~DWORD(0);
838
#endif // defined(ASIO_ENABLE_CANCELIO)
840
// Allocate and construct an operation to wrap the handler.
841
typedef send_operation<ConstBufferSequence, Handler> value_type;
842
typedef handler_alloc_traits<Handler, value_type> alloc_traits;
843
raw_handler_ptr<alloc_traits> raw_ptr(handler);
844
handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
845
impl.cancel_token_, buffers, handler);
847
// Copy buffers into WSABUF array.
848
::WSABUF bufs[max_buffers];
849
typename ConstBufferSequence::const_iterator iter = buffers.begin();
850
typename ConstBufferSequence::const_iterator end = buffers.end();
852
size_t total_buffer_size = 0;
853
for (; iter != end && i < max_buffers; ++iter, ++i)
855
asio::const_buffer buffer(*iter);
856
bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
857
bufs[i].buf = const_cast<char*>(
858
asio::buffer_cast<const char*>(buffer));
859
total_buffer_size += asio::buffer_size(buffer);
862
// A request to receive 0 bytes on a stream socket is a no-op.
863
if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
865
asio::io_service::work work(this->get_io_service());
867
asio::error_code error;
868
iocp_service_.post(bind_handler(handler, error, 0));
873
DWORD bytes_transferred = 0;
874
int result = ::WSASend(impl.socket_, bufs, i,
875
&bytes_transferred, flags, ptr.get(), 0);
876
DWORD last_error = ::WSAGetLastError();
878
// Check if the operation completed immediately.
879
if (result != 0 && last_error != WSA_IO_PENDING)
881
asio::io_service::work work(this->get_io_service());
883
asio::error_code ec(last_error,
884
asio::error::get_system_category());
885
iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
893
template <typename Handler>
894
class null_buffers_operation
897
null_buffers_operation(asio::io_service& io_service, Handler handler)
903
bool perform(asio::error_code&,
904
std::size_t& bytes_transferred)
906
bytes_transferred = 0;
910
void complete(const asio::error_code& ec,
911
std::size_t bytes_transferred)
913
work_.get_io_service().post(bind_handler(
914
handler_, ec, bytes_transferred));
918
asio::io_service::work work_;
922
// Start an asynchronous wait until data can be sent without blocking.
923
template <typename Handler>
924
void async_send(implementation_type& impl, const null_buffers&,
925
socket_base::message_flags, Handler handler)
929
this->get_io_service().post(bind_handler(handler,
930
asio::error::bad_descriptor, 0));
934
// Check if the reactor was already obtained from the io_service.
935
reactor_type* reactor = static_cast<reactor_type*>(
936
interlocked_compare_exchange_pointer(
937
reinterpret_cast<void**>(&reactor_), 0, 0));
940
reactor = &(asio::use_service<reactor_type>(
941
this->get_io_service()));
942
interlocked_exchange_pointer(
943
reinterpret_cast<void**>(&reactor_), reactor);
946
reactor->start_write_op(impl.socket_, impl.reactor_data_,
947
null_buffers_operation<Handler>(this->get_io_service(), handler),
952
// Send a datagram to the specified endpoint. Returns the number of bytes
954
template <typename ConstBufferSequence>
955
size_t send_to(implementation_type& impl, const ConstBufferSequence& buffers,
956
const endpoint_type& destination, socket_base::message_flags flags,
957
asio::error_code& ec)
961
ec = asio::error::bad_descriptor;
965
// Copy buffers into WSABUF array.
966
::WSABUF bufs[max_buffers];
967
typename ConstBufferSequence::const_iterator iter = buffers.begin();
968
typename ConstBufferSequence::const_iterator end = buffers.end();
970
for (; iter != end && i < max_buffers; ++iter, ++i)
972
asio::const_buffer buffer(*iter);
973
bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
974
bufs[i].buf = const_cast<char*>(
975
asio::buffer_cast<const char*>(buffer));
979
DWORD bytes_transferred = 0;
980
int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred,
981
flags, destination.data(), static_cast<int>(destination.size()), 0, 0);
984
DWORD last_error = ::WSAGetLastError();
985
if (last_error == ERROR_PORT_UNREACHABLE)
986
last_error = WSAECONNREFUSED;
987
ec = asio::error_code(last_error,
988
asio::error::get_system_category());
992
ec = asio::error_code();
993
return bytes_transferred;
996
// Wait until data can be sent without blocking.
997
size_t send_to(implementation_type& impl, const null_buffers&,
998
socket_base::message_flags, const endpoint_type&,
999
asio::error_code& ec)
1003
ec = asio::error::bad_descriptor;
1007
// Wait for socket to become ready.
1008
socket_ops::poll_write(impl.socket_, ec);
1013
template <typename ConstBufferSequence, typename Handler>
1014
class send_to_operation
1018
send_to_operation(win_iocp_io_service& io_service,
1019
const ConstBufferSequence& buffers, Handler handler)
1020
: operation(io_service,
1021
&send_to_operation<ConstBufferSequence, Handler>::do_completion_impl,
1022
&send_to_operation<ConstBufferSequence, Handler>::destroy_impl),
1023
work_(io_service.get_io_service()),
1030
static void do_completion_impl(operation* op,
1031
DWORD last_error, size_t bytes_transferred)
1033
// Take ownership of the operation object.
1034
typedef send_to_operation<ConstBufferSequence, Handler> op_type;
1035
op_type* handler_op(static_cast<op_type*>(op));
1036
typedef handler_alloc_traits<Handler, op_type> alloc_traits;
1037
handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
1039
#if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
1040
// Check whether buffers are still valid.
1041
typename ConstBufferSequence::const_iterator iter
1042
= handler_op->buffers_.begin();
1043
typename ConstBufferSequence::const_iterator end
1044
= handler_op->buffers_.end();
1047
asio::const_buffer buffer(*iter);
1048
asio::buffer_cast<const char*>(buffer);
1051
#endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING)
1053
// Map non-portable errors to their portable counterparts.
1054
asio::error_code ec(last_error,
1055
asio::error::get_system_category());
1056
if (ec.value() == ERROR_PORT_UNREACHABLE)
1058
ec = asio::error::connection_refused;
1061
// Make a copy of the handler so that the memory can be deallocated before
1062
// the upcall is made.
1063
Handler handler(handler_op->handler_);
1065
// Free the memory associated with the handler.
1068
// Call the handler.
1069
asio_handler_invoke_helpers::invoke(
1070
detail::bind_handler(handler, ec, bytes_transferred), &handler);
1073
static void destroy_impl(operation* op)
1075
// Take ownership of the operation object.
1076
typedef send_to_operation<ConstBufferSequence, Handler> op_type;
1077
op_type* handler_op(static_cast<op_type*>(op));
1078
typedef handler_alloc_traits<Handler, op_type> alloc_traits;
1079
handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
1081
// A sub-object of the handler may be the true owner of the memory
1082
// associated with the handler. Consequently, a local copy of the handler
1083
// is required to ensure that any owning sub-object remains valid until
1084
// after we have deallocated the memory here.
1085
Handler handler(handler_op->handler_);
1088
// Free the memory associated with the handler.
1092
asio::io_service::work work_;
1093
ConstBufferSequence buffers_;
1097
// Start an asynchronous send. The data being sent must be valid for the
1098
// lifetime of the asynchronous operation.
1099
template <typename ConstBufferSequence, typename Handler>
1100
void async_send_to(implementation_type& impl,
1101
const ConstBufferSequence& buffers, const endpoint_type& destination,
1102
socket_base::message_flags flags, Handler handler)
1106
this->get_io_service().post(bind_handler(handler,
1107
asio::error::bad_descriptor, 0));
1111
#if defined(ASIO_ENABLE_CANCELIO)
1112
// Update the ID of the thread from which cancellation is safe.
1113
if (impl.safe_cancellation_thread_id_ == 0)
1114
impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
1115
else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
1116
impl.safe_cancellation_thread_id_ = ~DWORD(0);
1117
#endif // defined(ASIO_ENABLE_CANCELIO)
1119
// Allocate and construct an operation to wrap the handler.
1120
typedef send_to_operation<ConstBufferSequence, Handler> value_type;
1121
typedef handler_alloc_traits<Handler, value_type> alloc_traits;
1122
raw_handler_ptr<alloc_traits> raw_ptr(handler);
1123
handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, buffers, handler);
1125
// Copy buffers into WSABUF array.
1126
::WSABUF bufs[max_buffers];
1127
typename ConstBufferSequence::const_iterator iter = buffers.begin();
1128
typename ConstBufferSequence::const_iterator end = buffers.end();
1130
for (; iter != end && i < max_buffers; ++iter, ++i)
1132
asio::const_buffer buffer(*iter);
1133
bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
1134
bufs[i].buf = const_cast<char*>(
1135
asio::buffer_cast<const char*>(buffer));
1139
DWORD bytes_transferred = 0;
1140
int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, flags,
1141
destination.data(), static_cast<int>(destination.size()), ptr.get(), 0);
1142
DWORD last_error = ::WSAGetLastError();
1144
// Check if the operation completed immediately.
1145
if (result != 0 && last_error != WSA_IO_PENDING)
1147
asio::io_service::work work(this->get_io_service());
1149
asio::error_code ec(last_error,
1150
asio::error::get_system_category());
1151
iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
1159
// Start an asynchronous wait until data can be sent without blocking.
1160
template <typename Handler>
1161
void async_send_to(implementation_type& impl, const null_buffers&,
1162
socket_base::message_flags, const endpoint_type&, Handler handler)
1166
this->get_io_service().post(bind_handler(handler,
1167
asio::error::bad_descriptor, 0));
1171
// Check if the reactor was already obtained from the io_service.
1172
reactor_type* reactor = static_cast<reactor_type*>(
1173
interlocked_compare_exchange_pointer(
1174
reinterpret_cast<void**>(&reactor_), 0, 0));
1177
reactor = &(asio::use_service<reactor_type>(
1178
this->get_io_service()));
1179
interlocked_exchange_pointer(
1180
reinterpret_cast<void**>(&reactor_), reactor);
1183
reactor->start_write_op(impl.socket_, impl.reactor_data_,
1184
null_buffers_operation<Handler>(this->get_io_service(), handler),
1189
// Receive some data from the peer. Returns the number of bytes received.
1190
template <typename MutableBufferSequence>
1191
size_t receive(implementation_type& impl,
1192
const MutableBufferSequence& buffers,
1193
socket_base::message_flags flags, asio::error_code& ec)
1197
ec = asio::error::bad_descriptor;
1201
// Copy buffers into WSABUF array.
1202
::WSABUF bufs[max_buffers];
1203
typename MutableBufferSequence::const_iterator iter = buffers.begin();
1204
typename MutableBufferSequence::const_iterator end = buffers.end();
1206
size_t total_buffer_size = 0;
1207
for (; iter != end && i < max_buffers; ++iter, ++i)
1209
asio::mutable_buffer buffer(*iter);
1210
bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
1211
bufs[i].buf = asio::buffer_cast<char*>(buffer);
1212
total_buffer_size += asio::buffer_size(buffer);
1215
// A request to receive 0 bytes on a stream socket is a no-op.
1216
if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
1218
ec = asio::error_code();
1222
// Receive some data.
1223
DWORD bytes_transferred = 0;
1224
DWORD recv_flags = flags;
1225
int result = ::WSARecv(impl.socket_, bufs, i,
1226
&bytes_transferred, &recv_flags, 0, 0);
1229
DWORD last_error = ::WSAGetLastError();
1230
if (last_error == ERROR_NETNAME_DELETED)
1231
last_error = WSAECONNRESET;
1232
else if (last_error == ERROR_PORT_UNREACHABLE)
1233
last_error = WSAECONNREFUSED;
1234
ec = asio::error_code(last_error,
1235
asio::error::get_system_category());
1238
if (bytes_transferred == 0 && impl.protocol_.type() == SOCK_STREAM)
1240
ec = asio::error::eof;
1244
ec = asio::error_code();
1245
return bytes_transferred;
1248
// Wait until data can be received without blocking.
1249
size_t receive(implementation_type& impl, const null_buffers&,
1250
socket_base::message_flags, asio::error_code& ec)
1254
ec = asio::error::bad_descriptor;
1258
// Wait for socket to become ready.
1259
socket_ops::poll_read(impl.socket_, ec);
1264
template <typename MutableBufferSequence, typename Handler>
1265
class receive_operation
1269
receive_operation(int protocol_type, win_iocp_io_service& io_service,
1270
weak_cancel_token_type cancel_token,
1271
const MutableBufferSequence& buffers, Handler handler)
1272
: operation(io_service,
1274
MutableBufferSequence, Handler>::do_completion_impl,
1276
MutableBufferSequence, Handler>::destroy_impl),
1277
protocol_type_(protocol_type),
1278
work_(io_service.get_io_service()),
1279
cancel_token_(cancel_token),
1286
static void do_completion_impl(operation* op,
1287
DWORD last_error, size_t bytes_transferred)
1289
// Take ownership of the operation object.
1290
typedef receive_operation<MutableBufferSequence, Handler> op_type;
1291
op_type* handler_op(static_cast<op_type*>(op));
1292
typedef handler_alloc_traits<Handler, op_type> alloc_traits;
1293
handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
1295
#if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
1296
// Check whether buffers are still valid.
1297
typename MutableBufferSequence::const_iterator iter
1298
= handler_op->buffers_.begin();
1299
typename MutableBufferSequence::const_iterator end
1300
= handler_op->buffers_.end();
1303
asio::mutable_buffer buffer(*iter);
1304
asio::buffer_cast<char*>(buffer);
1307
#endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING)
1309
// Map non-portable errors to their portable counterparts.
1310
asio::error_code ec(last_error,
1311
asio::error::get_system_category());
1312
if (ec.value() == ERROR_NETNAME_DELETED)
1314
if (handler_op->cancel_token_.expired())
1315
ec = asio::error::operation_aborted;
1317
ec = asio::error::connection_reset;
1319
else if (ec.value() == ERROR_PORT_UNREACHABLE)
1321
ec = asio::error::connection_refused;
1324
// Check for connection closed.
1325
else if (!ec && bytes_transferred == 0
1326
&& handler_op->protocol_type_ == SOCK_STREAM
1327
&& !boost::is_same<MutableBufferSequence, null_buffers>::value)
1329
ec = asio::error::eof;
1332
// Make a copy of the handler so that the memory can be deallocated before
1333
// the upcall is made.
1334
Handler handler(handler_op->handler_);
1336
// Free the memory associated with the handler.
1339
// Call the handler.
1340
asio_handler_invoke_helpers::invoke(
1341
detail::bind_handler(handler, ec, bytes_transferred), &handler);
1344
static void destroy_impl(operation* op)
1346
// Take ownership of the operation object.
1347
typedef receive_operation<MutableBufferSequence, Handler> op_type;
1348
op_type* handler_op(static_cast<op_type*>(op));
1349
typedef handler_alloc_traits<Handler, op_type> alloc_traits;
1350
handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
1352
// A sub-object of the handler may be the true owner of the memory
1353
// associated with the handler. Consequently, a local copy of the handler
1354
// is required to ensure that any owning sub-object remains valid until
1355
// after we have deallocated the memory here.
1356
Handler handler(handler_op->handler_);
1359
// Free the memory associated with the handler.
1364
asio::io_service::work work_;
1365
weak_cancel_token_type cancel_token_;
1366
MutableBufferSequence buffers_;
1370
// Start an asynchronous receive. The buffer for the data being received
1371
// must be valid for the lifetime of the asynchronous operation.
1372
template <typename MutableBufferSequence, typename Handler>
1373
void async_receive(implementation_type& impl,
1374
const MutableBufferSequence& buffers,
1375
socket_base::message_flags flags, Handler handler)
1379
this->get_io_service().post(bind_handler(handler,
1380
asio::error::bad_descriptor, 0));
1384
#if defined(ASIO_ENABLE_CANCELIO)
1385
// Update the ID of the thread from which cancellation is safe.
1386
if (impl.safe_cancellation_thread_id_ == 0)
1387
impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
1388
else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
1389
impl.safe_cancellation_thread_id_ = ~DWORD(0);
1390
#endif // defined(ASIO_ENABLE_CANCELIO)
1392
// Allocate and construct an operation to wrap the handler.
1393
typedef receive_operation<MutableBufferSequence, Handler> value_type;
1394
typedef handler_alloc_traits<Handler, value_type> alloc_traits;
1395
raw_handler_ptr<alloc_traits> raw_ptr(handler);
1396
int protocol_type = impl.protocol_.type();
1397
handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
1398
iocp_service_, impl.cancel_token_, buffers, handler);
1400
// Copy buffers into WSABUF array.
1401
::WSABUF bufs[max_buffers];
1402
typename MutableBufferSequence::const_iterator iter = buffers.begin();
1403
typename MutableBufferSequence::const_iterator end = buffers.end();
1405
size_t total_buffer_size = 0;
1406
for (; iter != end && i < max_buffers; ++iter, ++i)
1408
asio::mutable_buffer buffer(*iter);
1409
bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
1410
bufs[i].buf = asio::buffer_cast<char*>(buffer);
1411
total_buffer_size += asio::buffer_size(buffer);
1414
// A request to receive 0 bytes on a stream socket is a no-op.
1415
if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
1417
asio::io_service::work work(this->get_io_service());
1419
asio::error_code error;
1420
iocp_service_.post(bind_handler(handler, error, 0));
1424
// Receive some data.
1425
DWORD bytes_transferred = 0;
1426
DWORD recv_flags = flags;
1427
int result = ::WSARecv(impl.socket_, bufs, i,
1428
&bytes_transferred, &recv_flags, ptr.get(), 0);
1429
DWORD last_error = ::WSAGetLastError();
1430
if (result != 0 && last_error != WSA_IO_PENDING)
1432
asio::io_service::work work(this->get_io_service());
1434
asio::error_code ec(last_error,
1435
asio::error::get_system_category());
1436
iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
1444
// Wait until data can be received without blocking.
1445
template <typename Handler>
1446
void async_receive(implementation_type& impl, const null_buffers& buffers,
1447
socket_base::message_flags flags, Handler handler)
1451
this->get_io_service().post(bind_handler(handler,
1452
asio::error::bad_descriptor, 0));
1454
else if (impl.protocol_.type() == SOCK_STREAM)
1456
// For stream sockets on Windows, we may issue a 0-byte overlapped
1457
// WSARecv to wait until there is data available on the socket.
1459
#if defined(ASIO_ENABLE_CANCELIO)
1460
// Update the ID of the thread from which cancellation is safe.
1461
if (impl.safe_cancellation_thread_id_ == 0)
1462
impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
1463
else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
1464
impl.safe_cancellation_thread_id_ = ~DWORD(0);
1465
#endif // defined(ASIO_ENABLE_CANCELIO)
1467
// Allocate and construct an operation to wrap the handler.
1468
typedef receive_operation<null_buffers, Handler> value_type;
1469
typedef handler_alloc_traits<Handler, value_type> alloc_traits;
1470
raw_handler_ptr<alloc_traits> raw_ptr(handler);
1471
int protocol_type = impl.protocol_.type();
1472
handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
1473
iocp_service_, impl.cancel_token_, buffers, handler);
1475
// Issue a receive operation with an empty buffer.
1476
::WSABUF buf = { 0, 0 };
1477
DWORD bytes_transferred = 0;
1478
DWORD recv_flags = flags;
1479
int result = ::WSARecv(impl.socket_, &buf, 1,
1480
&bytes_transferred, &recv_flags, ptr.get(), 0);
1481
DWORD last_error = ::WSAGetLastError();
1482
if (result != 0 && last_error != WSA_IO_PENDING)
1484
asio::io_service::work work(this->get_io_service());
1486
asio::error_code ec(last_error,
1487
asio::error::get_system_category());
1488
iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
1497
// Check if the reactor was already obtained from the io_service.
1498
reactor_type* reactor = static_cast<reactor_type*>(
1499
interlocked_compare_exchange_pointer(
1500
reinterpret_cast<void**>(&reactor_), 0, 0));
1503
reactor = &(asio::use_service<reactor_type>(
1504
this->get_io_service()));
1505
interlocked_exchange_pointer(
1506
reinterpret_cast<void**>(&reactor_), reactor);
1509
if (flags & socket_base::message_out_of_band)
1511
reactor->start_except_op(impl.socket_, impl.reactor_data_,
1512
null_buffers_operation<Handler>(this->get_io_service(), handler));
1516
reactor->start_read_op(impl.socket_, impl.reactor_data_,
1517
null_buffers_operation<Handler>(this->get_io_service(), handler),
1523
// Receive a datagram with the endpoint of the sender. Returns the number of
1525
template <typename MutableBufferSequence>
1526
size_t receive_from(implementation_type& impl,
1527
const MutableBufferSequence& buffers,
1528
endpoint_type& sender_endpoint, socket_base::message_flags flags,
1529
asio::error_code& ec)
1533
ec = asio::error::bad_descriptor;
1537
// Copy buffers into WSABUF array.
1538
::WSABUF bufs[max_buffers];
1539
typename MutableBufferSequence::const_iterator iter = buffers.begin();
1540
typename MutableBufferSequence::const_iterator end = buffers.end();
1542
for (; iter != end && i < max_buffers; ++iter, ++i)
1544
asio::mutable_buffer buffer(*iter);
1545
bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
1546
bufs[i].buf = asio::buffer_cast<char*>(buffer);
1549
// Receive some data.
1550
DWORD bytes_transferred = 0;
1551
DWORD recv_flags = flags;
1552
int endpoint_size = static_cast<int>(sender_endpoint.capacity());
1553
int result = ::WSARecvFrom(impl.socket_, bufs, i, &bytes_transferred,
1554
&recv_flags, sender_endpoint.data(), &endpoint_size, 0, 0);
1557
DWORD last_error = ::WSAGetLastError();
1558
if (last_error == ERROR_PORT_UNREACHABLE)
1559
last_error = WSAECONNREFUSED;
1560
ec = asio::error_code(last_error,
1561
asio::error::get_system_category());
1564
if (bytes_transferred == 0 && impl.protocol_.type() == SOCK_STREAM)
1566
ec = asio::error::eof;
1570
sender_endpoint.resize(static_cast<std::size_t>(endpoint_size));
1572
ec = asio::error_code();
1573
return bytes_transferred;
1576
// Wait until data can be received without blocking.
1577
size_t receive_from(implementation_type& impl,
1578
const null_buffers&, endpoint_type& sender_endpoint,
1579
socket_base::message_flags, asio::error_code& ec)
1583
ec = asio::error::bad_descriptor;
1587
// Wait for socket to become ready.
1588
socket_ops::poll_read(impl.socket_, ec);
1590
// Reset endpoint since it can be given no sensible value at this time.
1591
sender_endpoint = endpoint_type();
1596
template <typename MutableBufferSequence, typename Handler>
1597
class receive_from_operation
1601
receive_from_operation(int protocol_type, win_iocp_io_service& io_service,
1602
endpoint_type& endpoint, const MutableBufferSequence& buffers,
1604
: operation(io_service,
1605
&receive_from_operation<
1606
MutableBufferSequence, Handler>::do_completion_impl,
1607
&receive_from_operation<
1608
MutableBufferSequence, Handler>::destroy_impl),
1609
protocol_type_(protocol_type),
1610
endpoint_(endpoint),
1611
endpoint_size_(static_cast<int>(endpoint.capacity())),
1612
work_(io_service.get_io_service()),
1618
int& endpoint_size()
1620
return endpoint_size_;
1624
static void do_completion_impl(operation* op,
1625
DWORD last_error, size_t bytes_transferred)
1627
// Take ownership of the operation object.
1628
typedef receive_from_operation<MutableBufferSequence, Handler> op_type;
1629
op_type* handler_op(static_cast<op_type*>(op));
1630
typedef handler_alloc_traits<Handler, op_type> alloc_traits;
1631
handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
1633
#if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
1634
// Check whether buffers are still valid.
1635
typename MutableBufferSequence::const_iterator iter
1636
= handler_op->buffers_.begin();
1637
typename MutableBufferSequence::const_iterator end
1638
= handler_op->buffers_.end();
1641
asio::mutable_buffer buffer(*iter);
1642
asio::buffer_cast<char*>(buffer);
1645
#endif // defined(ASIO_ENABLE_BUFFER_DEBUGGING)
1647
// Map non-portable errors to their portable counterparts.
1648
asio::error_code ec(last_error,
1649
asio::error::get_system_category());
1650
if (ec.value() == ERROR_PORT_UNREACHABLE)
1652
ec = asio::error::connection_refused;
1655
// Check for connection closed.
1656
if (!ec && bytes_transferred == 0
1657
&& handler_op->protocol_type_ == SOCK_STREAM)
1659
ec = asio::error::eof;
1662
// Record the size of the endpoint returned by the operation.
1663
handler_op->endpoint_.resize(handler_op->endpoint_size_);
1665
// Make a copy of the handler so that the memory can be deallocated before
1666
// the upcall is made.
1667
Handler handler(handler_op->handler_);
1669
// Free the memory associated with the handler.
1672
// Call the handler.
1673
asio_handler_invoke_helpers::invoke(
1674
detail::bind_handler(handler, ec, bytes_transferred), &handler);
1677
static void destroy_impl(operation* op)
1679
// Take ownership of the operation object.
1680
typedef receive_from_operation<MutableBufferSequence, Handler> op_type;
1681
op_type* handler_op(static_cast<op_type*>(op));
1682
typedef handler_alloc_traits<Handler, op_type> alloc_traits;
1683
handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
1685
// A sub-object of the handler may be the true owner of the memory
1686
// associated with the handler. Consequently, a local copy of the handler
1687
// is required to ensure that any owning sub-object remains valid until
1688
// after we have deallocated the memory here.
1689
Handler handler(handler_op->handler_);
1692
// Free the memory associated with the handler.
1697
endpoint_type& endpoint_;
1699
asio::io_service::work work_;
1700
MutableBufferSequence buffers_;
1704
// Start an asynchronous receive. The buffer for the data being received and
1705
// the sender_endpoint object must both be valid for the lifetime of the
1706
// asynchronous operation.
1707
template <typename MutableBufferSequence, typename Handler>
1708
void async_receive_from(implementation_type& impl,
1709
const MutableBufferSequence& buffers, endpoint_type& sender_endp,
1710
socket_base::message_flags flags, Handler handler)
1714
this->get_io_service().post(bind_handler(handler,
1715
asio::error::bad_descriptor, 0));
1719
#if defined(ASIO_ENABLE_CANCELIO)
1720
// Update the ID of the thread from which cancellation is safe.
1721
if (impl.safe_cancellation_thread_id_ == 0)
1722
impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
1723
else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
1724
impl.safe_cancellation_thread_id_ = ~DWORD(0);
1725
#endif // defined(ASIO_ENABLE_CANCELIO)
1727
// Allocate and construct an operation to wrap the handler.
1728
typedef receive_from_operation<MutableBufferSequence, Handler> value_type;
1729
typedef handler_alloc_traits<Handler, value_type> alloc_traits;
1730
raw_handler_ptr<alloc_traits> raw_ptr(handler);
1731
int protocol_type = impl.protocol_.type();
1732
handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
1733
iocp_service_, sender_endp, buffers, handler);
1735
// Copy buffers into WSABUF array.
1736
::WSABUF bufs[max_buffers];
1737
typename MutableBufferSequence::const_iterator iter = buffers.begin();
1738
typename MutableBufferSequence::const_iterator end = buffers.end();
1740
for (; iter != end && i < max_buffers; ++iter, ++i)
1742
asio::mutable_buffer buffer(*iter);
1743
bufs[i].len = static_cast<u_long>(asio::buffer_size(buffer));
1744
bufs[i].buf = asio::buffer_cast<char*>(buffer);
1747
// Receive some data.
1748
DWORD bytes_transferred = 0;
1749
DWORD recv_flags = flags;
1750
int result = ::WSARecvFrom(impl.socket_, bufs, i, &bytes_transferred,
1751
&recv_flags, sender_endp.data(), &ptr.get()->endpoint_size(),
1753
DWORD last_error = ::WSAGetLastError();
1754
if (result != 0 && last_error != WSA_IO_PENDING)
1756
asio::io_service::work work(this->get_io_service());
1758
asio::error_code ec(last_error,
1759
asio::error::get_system_category());
1760
iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
1768
// Wait until data can be received without blocking.
1769
template <typename Handler>
1770
void async_receive_from(implementation_type& impl,
1771
const null_buffers&, endpoint_type& sender_endpoint,
1772
socket_base::message_flags flags, Handler handler)
1776
this->get_io_service().post(bind_handler(handler,
1777
asio::error::bad_descriptor, 0));
1781
// Check if the reactor was already obtained from the io_service.
1782
reactor_type* reactor = static_cast<reactor_type*>(
1783
interlocked_compare_exchange_pointer(
1784
reinterpret_cast<void**>(&reactor_), 0, 0));
1787
reactor = &(asio::use_service<reactor_type>(
1788
this->get_io_service()));
1789
interlocked_exchange_pointer(
1790
reinterpret_cast<void**>(&reactor_), reactor);
1793
// Reset endpoint since it can be given no sensible value at this time.
1794
sender_endpoint = endpoint_type();
1796
if (flags & socket_base::message_out_of_band)
1798
reactor->start_except_op(impl.socket_, impl.reactor_data_,
1799
null_buffers_operation<Handler>(this->get_io_service(), handler));
1803
reactor->start_read_op(impl.socket_, impl.reactor_data_,
1804
null_buffers_operation<Handler>(this->get_io_service(), handler),
1810
// Accept a new connection.
1811
template <typename Socket>
1812
asio::error_code accept(implementation_type& impl, Socket& peer,
1813
endpoint_type* peer_endpoint, asio::error_code& ec)
1817
ec = asio::error::bad_descriptor;
1821
// We cannot accept a socket that is already open.
1824
ec = asio::error::already_open;
1830
socket_holder new_socket;
1831
std::size_t addr_len = 0;
1834
addr_len = peer_endpoint->capacity();
1835
new_socket.reset(socket_ops::accept(impl.socket_,
1836
peer_endpoint->data(), &addr_len, ec));
1840
new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec));
1845
if (ec == asio::error::connection_aborted
1846
&& !(impl.flags_ & implementation_type::enable_connection_aborted))
1848
// Retry accept operation.
1858
peer_endpoint->resize(addr_len);
1860
peer.assign(impl.protocol_, new_socket.get(), ec);
1862
new_socket.release();
1867
template <typename Socket, typename Handler>
1868
class accept_operation
1872
accept_operation(win_iocp_io_service& io_service,
1873
socket_type socket, socket_type new_socket, Socket& peer,
1874
const protocol_type& protocol, endpoint_type* peer_endpoint,
1875
bool enable_connection_aborted, Handler handler)
1876
: operation(io_service,
1877
&accept_operation<Socket, Handler>::do_completion_impl,
1878
&accept_operation<Socket, Handler>::destroy_impl),
1879
io_service_(io_service),
1881
new_socket_(new_socket),
1883
protocol_(protocol),
1884
peer_endpoint_(peer_endpoint),
1885
work_(io_service.get_io_service()),
1886
enable_connection_aborted_(enable_connection_aborted),
1891
socket_type new_socket()
1893
return new_socket_.get();
1896
void* output_buffer()
1898
return output_buffer_;
1901
DWORD address_length()
1903
return sizeof(sockaddr_storage_type) + 16;
1907
static void do_completion_impl(operation* op, DWORD last_error, size_t)
1909
// Take ownership of the operation object.
1910
typedef accept_operation<Socket, Handler> op_type;
1911
op_type* handler_op(static_cast<op_type*>(op));
1912
typedef handler_alloc_traits<Handler, op_type> alloc_traits;
1913
handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
1915
// Map Windows error ERROR_NETNAME_DELETED to connection_aborted.
1916
if (last_error == ERROR_NETNAME_DELETED)
1918
last_error = WSAECONNABORTED;
1921
// Restart the accept operation if we got the connection_aborted error
1922
// and the enable_connection_aborted socket option is not set.
1923
if (last_error == WSAECONNABORTED
1924
&& !ptr.get()->enable_connection_aborted_)
1926
// Reset OVERLAPPED structure.
1927
ptr.get()->Internal = 0;
1928
ptr.get()->InternalHigh = 0;
1929
ptr.get()->Offset = 0;
1930
ptr.get()->OffsetHigh = 0;
1931
ptr.get()->hEvent = 0;
1933
// Create a new socket for the next connection, since the AcceptEx call
1934
// fails with WSAEINVAL if we try to reuse the same socket.
1935
asio::error_code ec;
1936
ptr.get()->new_socket_.reset();
1937
ptr.get()->new_socket_.reset(socket_ops::socket(
1938
ptr.get()->protocol_.family(), ptr.get()->protocol_.type(),
1939
ptr.get()->protocol_.protocol(), ec));
1940
if (ptr.get()->new_socket() != invalid_socket)
1942
// Accept a connection.
1943
DWORD bytes_read = 0;
1944
BOOL result = ::AcceptEx(ptr.get()->socket_, ptr.get()->new_socket(),
1945
ptr.get()->output_buffer(), 0, ptr.get()->address_length(),
1946
ptr.get()->address_length(), &bytes_read, ptr.get());
1947
last_error = ::WSAGetLastError();
1949
// Check if the operation completed immediately.
1950
if (!result && last_error != WSA_IO_PENDING)
1952
if (last_error == ERROR_NETNAME_DELETED
1953
|| last_error == WSAECONNABORTED)
1955
// Post this handler so that operation will be restarted again.
1956
ptr.get()->io_service_.post_completion(ptr.get(), last_error, 0);
1962
// Operation already complete. Continue with rest of this handler.
1967
// Asynchronous operation has been successfully restarted.
1974
// Get the address of the peer.
1975
endpoint_type peer_endpoint;
1976
if (last_error == 0)
1978
LPSOCKADDR local_addr = 0;
1979
int local_addr_length = 0;
1980
LPSOCKADDR remote_addr = 0;
1981
int remote_addr_length = 0;
1982
GetAcceptExSockaddrs(handler_op->output_buffer(), 0,
1983
handler_op->address_length(), handler_op->address_length(),
1984
&local_addr, &local_addr_length, &remote_addr, &remote_addr_length);
1985
if (static_cast<std::size_t>(remote_addr_length)
1986
> peer_endpoint.capacity())
1988
last_error = WSAEINVAL;
1992
using namespace std; // For memcpy.
1993
memcpy(peer_endpoint.data(), remote_addr, remote_addr_length);
1994
peer_endpoint.resize(static_cast<std::size_t>(remote_addr_length));
1998
// Need to set the SO_UPDATE_ACCEPT_CONTEXT option so that getsockname
1999
// and getpeername will work on the accepted socket.
2000
if (last_error == 0)
2002
SOCKET update_ctx_param = handler_op->socket_;
2003
asio::error_code ec;
2004
if (socket_ops::setsockopt(handler_op->new_socket_.get(),
2005
SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
2006
&update_ctx_param, sizeof(SOCKET), ec) != 0)
2008
last_error = ec.value();
2012
// If the socket was successfully accepted, transfer ownership of the
2013
// socket to the peer object.
2014
if (last_error == 0)
2016
asio::error_code ec;
2017
handler_op->peer_.assign(handler_op->protocol_,
2018
native_type(handler_op->new_socket_.get(), peer_endpoint), ec);
2020
last_error = ec.value();
2022
handler_op->new_socket_.release();
2025
// Pass endpoint back to caller.
2026
if (handler_op->peer_endpoint_)
2027
*handler_op->peer_endpoint_ = peer_endpoint;
2029
// Make a copy of the handler so that the memory can be deallocated before
2030
// the upcall is made.
2031
Handler handler(handler_op->handler_);
2033
// Free the memory associated with the handler.
2036
// Call the handler.
2037
asio::error_code ec(last_error,
2038
asio::error::get_system_category());
2039
asio_handler_invoke_helpers::invoke(
2040
detail::bind_handler(handler, ec), &handler);
2043
static void destroy_impl(operation* op)
2045
// Take ownership of the operation object.
2046
typedef accept_operation<Socket, Handler> op_type;
2047
op_type* handler_op(static_cast<op_type*>(op));
2048
typedef handler_alloc_traits<Handler, op_type> alloc_traits;
2049
handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
2051
// A sub-object of the handler may be the true owner of the memory
2052
// associated with the handler. Consequently, a local copy of the handler
2053
// is required to ensure that any owning sub-object remains valid until
2054
// after we have deallocated the memory here.
2055
Handler handler(handler_op->handler_);
2058
// Free the memory associated with the handler.
2062
win_iocp_io_service& io_service_;
2063
socket_type socket_;
2064
socket_holder new_socket_;
2066
protocol_type protocol_;
2067
endpoint_type* peer_endpoint_;
2068
asio::io_service::work work_;
2069
unsigned char output_buffer_[(sizeof(sockaddr_storage_type) + 16) * 2];
2070
bool enable_connection_aborted_;
2074
// Start an asynchronous accept. The peer and peer_endpoint objects
2075
// must be valid until the accept's handler is invoked.
2076
template <typename Socket, typename Handler>
2077
void async_accept(implementation_type& impl, Socket& peer,
2078
endpoint_type* peer_endpoint, Handler handler)
2080
// Check whether acceptor has been initialised.
2083
this->get_io_service().post(bind_handler(handler,
2084
asio::error::bad_descriptor));
2088
// Check that peer socket has not already been opened.
2091
this->get_io_service().post(bind_handler(handler,
2092
asio::error::already_open));
2096
#if defined(ASIO_ENABLE_CANCELIO)
2097
// Update the ID of the thread from which cancellation is safe.
2098
if (impl.safe_cancellation_thread_id_ == 0)
2099
impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
2100
else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
2101
impl.safe_cancellation_thread_id_ = ~DWORD(0);
2102
#endif // defined(ASIO_ENABLE_CANCELIO)
2104
// Create a new socket for the connection.
2105
asio::error_code ec;
2106
socket_holder sock(socket_ops::socket(impl.protocol_.family(),
2107
impl.protocol_.type(), impl.protocol_.protocol(), ec));
2108
if (sock.get() == invalid_socket)
2110
this->get_io_service().post(bind_handler(handler, ec));
2114
// Allocate and construct an operation to wrap the handler.
2115
typedef accept_operation<Socket, Handler> value_type;
2116
typedef handler_alloc_traits<Handler, value_type> alloc_traits;
2117
raw_handler_ptr<alloc_traits> raw_ptr(handler);
2118
socket_type new_socket = sock.get();
2119
bool enable_connection_aborted =
2120
(impl.flags_ & implementation_type::enable_connection_aborted);
2121
handler_ptr<alloc_traits> ptr(raw_ptr,
2122
iocp_service_, impl.socket_, new_socket, peer, impl.protocol_,
2123
peer_endpoint, enable_connection_aborted, handler);
2126
// Accept a connection.
2127
DWORD bytes_read = 0;
2128
BOOL result = ::AcceptEx(impl.socket_, ptr.get()->new_socket(),
2129
ptr.get()->output_buffer(), 0, ptr.get()->address_length(),
2130
ptr.get()->address_length(), &bytes_read, ptr.get());
2131
DWORD last_error = ::WSAGetLastError();
2133
// Check if the operation completed immediately.
2134
if (!result && last_error != WSA_IO_PENDING)
2136
if (!enable_connection_aborted
2137
&& (last_error == ERROR_NETNAME_DELETED
2138
|| last_error == WSAECONNABORTED))
2140
// Post handler so that operation will be restarted again. We do not
2141
// perform the AcceptEx again here to avoid the possibility of starving
2143
iocp_service_.post_completion(ptr.get(), last_error, 0);
2148
asio::io_service::work work(this->get_io_service());
2150
asio::error_code ec(last_error,
2151
asio::error::get_system_category());
2152
iocp_service_.post(bind_handler(handler, ec));
2161
// Connect the socket to the specified endpoint.
2162
asio::error_code connect(implementation_type& impl,
2163
const endpoint_type& peer_endpoint, asio::error_code& ec)
2167
ec = asio::error::bad_descriptor;
2171
// Perform the connect operation.
2172
socket_ops::connect(impl.socket_,
2173
peer_endpoint.data(), peer_endpoint.size(), ec);
2177
template <typename Handler>
2178
class connect_operation
2181
connect_operation(socket_type socket, bool user_set_non_blocking,
2182
asio::io_service& io_service, Handler handler)
2184
user_set_non_blocking_(user_set_non_blocking),
2185
io_service_(io_service),
2191
bool perform(asio::error_code& ec,
2192
std::size_t& bytes_transferred)
2194
// Check whether the operation was successful.
2198
// Get the error code from the connect operation.
2199
int connect_error = 0;
2200
size_t connect_error_len = sizeof(connect_error);
2201
if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR,
2202
&connect_error, &connect_error_len, ec) == socket_error_retval)
2205
// If connection failed then post the handler with the error code.
2208
ec = asio::error_code(connect_error,
2209
asio::error::get_system_category());
2213
// Revert socket to blocking mode unless the user requested otherwise.
2214
if (!user_set_non_blocking_)
2216
ioctl_arg_type non_blocking = 0;
2217
if (socket_ops::ioctl(socket_, FIONBIO, &non_blocking, ec))
2221
// Post the result of the successful connection operation.
2222
ec = asio::error_code();
2226
void complete(const asio::error_code& ec, std::size_t)
2228
io_service_.post(bind_handler(handler_, ec));
2232
socket_type socket_;
2233
bool user_set_non_blocking_;
2234
asio::io_service& io_service_;
2235
asio::io_service::work work_;
2239
// Start an asynchronous connect.
2240
template <typename Handler>
2241
void async_connect(implementation_type& impl,
2242
const endpoint_type& peer_endpoint, Handler handler)
2246
this->get_io_service().post(bind_handler(handler,
2247
asio::error::bad_descriptor));
2251
#if defined(ASIO_ENABLE_CANCELIO)
2252
// Update the ID of the thread from which cancellation is safe.
2253
if (impl.safe_cancellation_thread_id_ == 0)
2254
impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
2255
else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
2256
impl.safe_cancellation_thread_id_ = ~DWORD(0);
2257
#endif // defined(ASIO_ENABLE_CANCELIO)
2259
// Check if the reactor was already obtained from the io_service.
2260
reactor_type* reactor = static_cast<reactor_type*>(
2261
interlocked_compare_exchange_pointer(
2262
reinterpret_cast<void**>(&reactor_), 0, 0));
2265
reactor = &(asio::use_service<reactor_type>(
2266
this->get_io_service()));
2267
interlocked_exchange_pointer(
2268
reinterpret_cast<void**>(&reactor_), reactor);
2271
// Mark the socket as non-blocking so that the connection will take place
2273
ioctl_arg_type non_blocking = 1;
2274
asio::error_code ec;
2275
if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
2277
this->get_io_service().post(bind_handler(handler, ec));
2281
// Start the connect operation.
2282
if (socket_ops::connect(impl.socket_, peer_endpoint.data(),
2283
peer_endpoint.size(), ec) == 0)
2285
// Revert socket to blocking mode unless the user requested otherwise.
2286
if (!(impl.flags_ & implementation_type::user_set_non_blocking))
2289
socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec);
2292
// The connect operation has finished successfully so we need to post the
2293
// handler immediately.
2294
this->get_io_service().post(bind_handler(handler, ec));
2296
else if (ec == asio::error::in_progress
2297
|| ec == asio::error::would_block)
2299
// The connection is happening in the background, and we need to wait
2300
// until the socket becomes writeable.
2301
boost::shared_ptr<bool> completed(new bool(false));
2302
reactor->start_connect_op(impl.socket_, impl.reactor_data_,
2303
connect_operation<Handler>(
2305
(impl.flags_ & implementation_type::user_set_non_blocking) != 0,
2306
this->get_io_service(), handler));
2310
// Revert socket to blocking mode unless the user requested otherwise.
2311
if (!(impl.flags_ & implementation_type::user_set_non_blocking))
2314
asio::error_code ignored_ec;
2315
socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
2318
// The connect operation has failed, so post the handler immediately.
2319
this->get_io_service().post(bind_handler(handler, ec));
2324
// Helper function to close a socket when the associated object is being
2326
void close_for_destruction(implementation_type& impl)
2330
// Check if the reactor was created, in which case we need to close the
2331
// socket on the reactor as well to cancel any operations that might be
2333
reactor_type* reactor = static_cast<reactor_type*>(
2334
interlocked_compare_exchange_pointer(
2335
reinterpret_cast<void**>(&reactor_), 0, 0));
2337
reactor->close_descriptor(impl.socket_, impl.reactor_data_);
2339
// The socket destructor must not block. If the user has changed the
2340
// linger option to block in the foreground, we will change it back to the
2341
// default so that the closure is performed in the background.
2342
if (impl.flags_ & implementation_type::close_might_block)
2347
asio::error_code ignored_ec;
2348
socket_ops::setsockopt(impl.socket_,
2349
SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec);
2352
asio::error_code ignored_ec;
2353
socket_ops::close(impl.socket_, ignored_ec);
2354
impl.socket_ = invalid_socket;
2356
impl.cancel_token_.reset();
2357
#if defined(ASIO_ENABLE_CANCELIO)
2358
impl.safe_cancellation_thread_id_ = 0;
2359
#endif // defined(ASIO_ENABLE_CANCELIO)
2363
// Helper function to emulate InterlockedCompareExchangePointer functionality
2365
// - very old Platform SDKs; and
2366
// - platform SDKs where MSVC's /Wp64 option causes spurious warnings.
2367
void* interlocked_compare_exchange_pointer(void** dest, void* exch, void* cmp)
2369
#if defined(_M_IX86)
2370
return reinterpret_cast<void*>(InterlockedCompareExchange(
2371
reinterpret_cast<PLONG>(dest), reinterpret_cast<LONG>(exch),
2372
reinterpret_cast<LONG>(cmp)));
2374
return InterlockedCompareExchangePointer(dest, exch, cmp);
2378
// Helper function to emulate InterlockedExchangePointer functionality for:
2379
// - very old Platform SDKs; and
2380
// - platform SDKs where MSVC's /Wp64 option causes spurious warnings.
2381
void* interlocked_exchange_pointer(void** dest, void* val)
2383
#if defined(_M_IX86)
2384
return reinterpret_cast<void*>(InterlockedExchange(
2385
reinterpret_cast<PLONG>(dest), reinterpret_cast<LONG>(val)));
2387
return InterlockedExchangePointer(dest, val);
2391
// The IOCP service used for running asynchronous operations and dispatching
2393
win_iocp_io_service& iocp_service_;
2395
// The reactor used for performing connect operations. This object is created
2397
reactor_type* reactor_;
2399
// Mutex to protect access to the linked list of implementations.
2400
asio::detail::mutex mutex_;
2402
// The head of a linked list of all implementations.
2403
implementation_type* impl_list_;
2406
} // namespace detail
2409
#endif // defined(ASIO_HAS_IOCP)
2411
#include "asio/detail/pop_options.hpp"
2413
#endif // ASIO_DETAIL_WIN_IOCP_SOCKET_SERVICE_HPP