1
/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3
* Gearmand client and server library.
5
* Copyright (C) 2011 Data Differential, http://datadifferential.com/
6
* Copyright (C) 2008 Brian Aker, Eric Day
9
* Redistribution and use in source and binary forms, with or without
10
* modification, are permitted provided that the following conditions are
13
* * Redistributions of source code must retain the above copyright
14
* notice, this list of conditions and the following disclaimer.
16
* * Redistributions in binary form must reproduce the above
17
* copyright notice, this list of conditions and the following disclaimer
18
* in the documentation and/or other materials provided with the
21
* * The names of its contributors may not be used to endorse or
22
* promote products derived from this software without specific prior
25
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41
* @brief Connection Definitions
44
#include <libgearman/common.h>
54
#if HAVE_NETINET_TCP_H
55
#include <netinet/tcp.h> /* for TCP_NODELAY */
61
static gearman_return_t gearman_connection_set_option(gearman_connection_st *connection,
62
gearman_connection_options_t options,
67
gearman_return_t gearman_connection_st::connect_poll()
71
fds[0].events= POLLOUT;
76
if (universal.timeout == 0)
78
return gearman_error(universal, GEARMAN_TIMEOUT, "not connected");
82
while (--loop_max) // Should only loop on cases of ERESTART or EINTR
84
int error= poll(fds, 1, GEARMAN_DEFAULT_CONNECT_TIMEOUT);
90
socklen_t len= sizeof (err);
91
// We replace errno with err if getsockopt() passes, but err has been
93
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
95
// We check the value to see what happened wth the socket.
98
return GEARMAN_SUCCESS;
103
return gearman_perror(universal, "getsockopt() failed");
108
return gearman_error(universal, GEARMAN_TIMEOUT, "timeout occurred while trying to connect");
111
default: // A real error occurred and we need to completely bail
112
switch (get_socket_errno())
114
#ifdef TARGET_OS_LINUX
122
return gearman_perror(universal, "poll() failure");
125
return gearman_perror(universal, "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid");
127
default: // This should not happen
128
if (fds[0].revents & POLLERR)
131
socklen_t len= sizeof (err);
132
(void)getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
137
errno= get_socket_errno();
140
assert_msg(fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
142
return gearman_perror(universal, "socket error occurred");
147
// This should only be possible from ERESTART or EINTR;
148
return gearman_perror(universal, "connection failed (error should be from either ERESTART or EINTR");
152
* @addtogroup gearman_connection_static Static Connection Declarations
153
* @ingroup gearman_connection
157
gearman_connection_st::gearman_connection_st(gearman_universal_st &universal_arg,
158
gearman_connection_options_t *options_args) :
159
state(GEARMAN_CON_UNIVERSAL_ADDRINFO),
160
send_state(GEARMAN_CON_SEND_STATE_NONE),
161
recv_state(GEARMAN_CON_RECV_UNIVERSAL_NONE),
175
universal(universal_arg)
177
options.ready= false;
178
options.packet_in_use= false;
182
while (*options_args != GEARMAN_CON_MAX)
184
gearman_connection_set_option(this, *options_args, true);
189
if (universal.con_list)
191
universal.con_list->prev= this;
193
next= universal.con_list;
195
universal.con_list= this;
196
universal.con_count++;
201
send_buffer_ptr= send_buffer;
203
recv_buffer_ptr= recv_buffer;
207
gearman_connection_st *gearman_connection_create(gearman_universal_st &universal,
208
gearman_connection_options_t *options)
210
gearman_connection_st *connection= new (std::nothrow) gearman_connection_st(universal, options);
211
if (connection == NULL)
213
gearman_perror(universal, "gearman_connection_st new");
220
gearman_connection_st *gearman_connection_create_args(gearman_universal_st& universal,
221
const char *host, in_port_t port)
223
gearman_connection_st *connection= gearman_connection_create(universal, NULL);
224
if (connection == NULL)
229
connection->set_host(host, port);
231
if (gearman_failed(connection->lookup()))
240
gearman_connection_st *gearman_connection_copy(gearman_universal_st& universal,
241
const gearman_connection_st& from)
243
gearman_connection_st *connection= gearman_connection_create(universal, NULL);
245
if (connection == NULL)
250
connection->options.ready= from.options.ready;
251
// @todo Is this right?
252
connection->options.packet_in_use= from.options.packet_in_use;
254
strcpy(connection->host, from.host);
255
connection->port= from.port;
260
gearman_connection_st::~gearman_connection_st()
262
if (fd != INVALID_SOCKET)
269
{ // Remove from universal list
270
if (universal.con_list == this)
272
universal.con_list= next;
285
universal.con_count--;
288
free_private_packet();
291
void gearman_connection_st::free_private_packet()
293
if (options.packet_in_use)
295
gearman_packet_free(&_packet);
296
options.packet_in_use= false;
300
gearman_return_t gearman_connection_set_option(gearman_connection_st *connection,
301
gearman_connection_options_t options,
306
case GEARMAN_CON_READY:
307
connection->options.ready= value;
310
case GEARMAN_CON_PACKET_IN_USE:
311
connection->options.packet_in_use= value;
314
case GEARMAN_CON_IGNORE_LOST_CONNECTION:
317
case GEARMAN_CON_CLOSE_AFTER_FLUSH:
320
case GEARMAN_CON_EXTERNAL_FD:
321
case GEARMAN_CON_MAX:
323
return GEARMAN_INVALID_COMMAND;
326
return GEARMAN_SUCCESS;
330
* Set socket options for a connection.
332
static gearman_return_t _con_setsockopt(gearman_connection_st *connection);
340
void gearman_connection_st::set_host(const char *host_arg, const in_port_t port_arg)
344
strncpy(host, host_arg == NULL ? GEARMAN_DEFAULT_TCP_HOST : host_arg, GEARMAN_NI_MAXHOST);
345
host[GEARMAN_NI_MAXHOST - 1]= 0;
347
port= in_port_t(port_arg == 0 ? GEARMAN_DEFAULT_TCP_PORT : port_arg);
350
void gearman_connection_st::close_socket()
352
if (fd == INVALID_SOCKET)
357
/* in case of death shutdown to avoid blocking at close_socket() */
358
if (shutdown(fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
361
gearman_perror(universal, "shutdown");
362
assert(errno != ENOTSOCK);
367
if (closesocket(fd) == SOCKET_ERROR)
370
gearman_perror(universal, "close");
375
state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
380
send_state= GEARMAN_CON_SEND_STATE_NONE;
381
send_buffer_ptr= send_buffer;
386
recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
389
gearman_packet_free(recv_packet);
393
recv_buffer_ptr= recv_buffer;
397
void gearman_connection_st::reset_addrinfo()
401
freeaddrinfo(addrinfo);
408
gearman_return_t gearman_connection_st::send_packet(const gearman_packet_st& packet_arg, const bool flush_buffer)
412
case GEARMAN_CON_SEND_STATE_NONE:
413
if (packet_arg.options.complete == false)
415
return gearman_error(universal, GEARMAN_INVALID_PACKET, "packet not complete");
418
/* Pack first part of packet, which is everything but the payload. */
422
{ // Scoping to shut compiler up about switch/case jump
423
size_t send_size= gearman_packet_pack(packet_arg,
424
send_buffer +send_buffer_size,
425
GEARMAN_SEND_BUFFER_SIZE -send_buffer_size, rc);
427
if (gearman_success(rc))
429
send_buffer_size+= send_size;
432
else if (rc == GEARMAN_IGNORE_PACKET)
434
return GEARMAN_SUCCESS;
436
else if (rc != GEARMAN_FLUSH_DATA)
442
/* We were asked to flush when the buffer is already flushed! */
443
if (send_buffer_size == 0)
445
return gearman_universal_set_error(universal, GEARMAN_SEND_BUFFER_TOO_SMALL, __func__, AT,
446
"send buffer too small (%u)", GEARMAN_SEND_BUFFER_SIZE);
449
/* Flush buffer now if first part of packet won't fit in. */
450
send_state= GEARMAN_CON_SEND_UNIVERSAL_PRE_FLUSH;
452
case GEARMAN_CON_SEND_UNIVERSAL_PRE_FLUSH:
454
gearman_return_t ret= flush();
455
if (gearman_failed(ret))
462
/* Return here if we have no data to send. */
463
if (packet_arg.data_size == 0)
468
/* If there is any room in the buffer, copy in data. */
469
if (packet_arg.data and (GEARMAN_SEND_BUFFER_SIZE - send_buffer_size) > 0)
471
send_data_offset= GEARMAN_SEND_BUFFER_SIZE - send_buffer_size;
472
if (send_data_offset > packet_arg.data_size)
474
send_data_offset= packet_arg.data_size;
477
memcpy(send_buffer + send_buffer_size, packet_arg.data, send_data_offset);
478
send_buffer_size+= send_data_offset;
480
/* Return if all data fit in the send buffer. */
481
if (send_data_offset == packet_arg.data_size)
488
/* Flush buffer now so we can start writing directly from data buffer. */
489
send_state= GEARMAN_CON_SEND_UNIVERSAL_FORCE_FLUSH;
491
case GEARMAN_CON_SEND_UNIVERSAL_FORCE_FLUSH:
493
gearman_return_t ret= flush();
494
if (gearman_failed(ret))
500
send_data_size= packet_arg.data_size;
502
/* If this is NULL, then gearman_connection_send_data function will be used. */
503
if (not packet_arg.data)
505
send_state= GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA;
506
return GEARMAN_SUCCESS;
509
/* Copy into the buffer if it fits, otherwise flush from packet buffer. */
510
send_buffer_size= packet_arg.data_size - send_data_offset;
511
if (send_buffer_size < GEARMAN_SEND_BUFFER_SIZE)
514
static_cast<char *>(const_cast<void *>(packet_arg.data)) + send_data_offset,
521
send_buffer_ptr= static_cast<char *>(const_cast<void *>(packet_arg.data)) + send_data_offset;
522
send_state= GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA;
524
case GEARMAN_CON_SEND_UNIVERSAL_FLUSH:
525
case GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA:
531
send_state= GEARMAN_CON_SEND_UNIVERSAL_FLUSH;
535
send_state= GEARMAN_CON_SEND_STATE_NONE;
537
return GEARMAN_SUCCESS;
540
size_t gearman_connection_st::send_and_flush(const void *data, size_t data_size, gearman_return_t *ret_ptr)
542
if (send_state != GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA)
544
return gearman_error(universal, GEARMAN_NOT_FLUSHING, "not flushing");
547
if (data_size > (send_data_size - send_data_offset))
549
return gearman_error(universal, GEARMAN_DATA_TOO_LARGE, "data too large");
552
send_buffer_ptr= static_cast<char *>(const_cast<void *>(data));
553
send_buffer_size= data_size;
557
return data_size -send_buffer_size;
560
gearman_return_t gearman_connection_st::lookup()
564
freeaddrinfo(addrinfo);
568
char port_str[GEARMAN_NI_MAXSERV];
569
snprintf(port_str, GEARMAN_NI_MAXSERV, "%hu", uint16_t(port));
572
memset(&ai, 0, sizeof(struct addrinfo));
573
ai.ai_socktype= SOCK_STREAM;
574
ai.ai_protocol= IPPROTO_TCP;
577
if ((ret= getaddrinfo(host, port_str, &ai, &(addrinfo))))
579
return gearman_universal_set_error(universal, GEARMAN_GETADDRINFO, AT, "getaddrinfo:%s", gai_strerror(ret));
582
addrinfo_next= addrinfo;
583
state= GEARMAN_CON_UNIVERSAL_CONNECT;
585
return GEARMAN_SUCCESS;
588
gearman_return_t gearman_connection_st::flush()
594
case GEARMAN_CON_UNIVERSAL_ADDRINFO:
596
gearman_return_t ret= lookup();
598
if (gearman_failed(ret))
604
case GEARMAN_CON_UNIVERSAL_CONNECT:
605
if (fd != INVALID_SOCKET)
610
if (addrinfo_next == NULL)
612
state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
613
return gearman_universal_set_error(universal, GEARMAN_COULD_NOT_CONNECT, GEARMAN_AT, "%s:%hu", host, uint16_t(port));
616
fd= socket(addrinfo_next->ai_family, addrinfo_next->ai_socktype, addrinfo_next->ai_protocol);
617
if (fd == INVALID_SOCKET)
619
state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
620
return gearman_perror(universal, "socket");
624
gearman_return_t gret= _con_setsockopt(this);
625
if (gearman_failed(gret))
634
if (connect(fd, addrinfo_next->ai_addr, addrinfo_next->ai_addrlen) == 0)
636
state= GEARMAN_CON_UNIVERSAL_CONNECTED;
641
if (errno == EAGAIN || errno == EINTR)
646
if (errno == EINPROGRESS)
648
gearman_return_t gret= connect_poll();
649
if (gearman_failed(gret))
651
assert_msg(universal.error.rc != GEARMAN_SUCCESS, "Programmer error, connect_poll() returned an error, but it was not set");
656
state= GEARMAN_CON_UNIVERSAL_CONNECTING;
660
if (errno == ECONNREFUSED || errno == ENETUNREACH || errno == ETIMEDOUT)
662
state= GEARMAN_CON_UNIVERSAL_CONNECT;
663
addrinfo_next= addrinfo_next->ai_next;
667
gearman_perror(universal, "connect");
669
return GEARMAN_COULD_NOT_CONNECT;
672
if (state != GEARMAN_CON_UNIVERSAL_CONNECTING)
677
case GEARMAN_CON_UNIVERSAL_CONNECTING:
680
if (revents & (POLLERR | POLLHUP | POLLNVAL))
682
state= GEARMAN_CON_UNIVERSAL_CONNECT;
683
addrinfo_next= addrinfo_next->ai_next;
686
else if (revents & POLLOUT)
688
state= GEARMAN_CON_UNIVERSAL_CONNECTED;
694
if (gearman_universal_is_non_blocking(universal))
696
state= GEARMAN_CON_UNIVERSAL_CONNECTING;
697
return gearman_gerror(universal, GEARMAN_IO_WAIT);
700
gearman_return_t gret= gearman_wait(universal);
701
if (gearman_failed(gret))
707
if (state != GEARMAN_CON_UNIVERSAL_CONNECTED)
712
case GEARMAN_CON_UNIVERSAL_CONNECTED:
713
while (send_buffer_size != 0)
715
ssize_t write_size= ::send(fd, send_buffer_ptr, send_buffer_size,
716
gearman_universal_is_non_blocking(universal) ? MSG_NOSIGNAL| MSG_DONTWAIT : MSG_NOSIGNAL);
718
if (write_size == 0) // Zero value on send()
720
else if (write_size == -1)
726
if (gearman_universal_is_non_blocking(universal))
728
gearman_gerror(universal, GEARMAN_IO_WAIT);
729
return GEARMAN_IO_WAIT;
732
gearman_return_t gret= gearman_wait(universal);
733
if (gearman_failed(gret))
740
else if (errno == EINTR)
744
else if (errno == EPIPE || errno == ECONNRESET || errno == EHOSTDOWN)
746
gearman_return_t ret= gearman_perror(universal, "lost connection to server during send");
751
gearman_return_t ret= gearman_perror(universal, "send");
757
send_buffer_size-= size_t(write_size);
758
if (send_state == GEARMAN_CON_SEND_UNIVERSAL_FLUSH_DATA)
760
send_data_offset+= size_t(write_size);
761
if (send_data_offset == send_data_size)
768
if (send_buffer_size == 0)
770
return GEARMAN_SUCCESS;
773
else if (send_buffer_size == 0)
778
send_buffer_ptr+= write_size;
781
send_state= GEARMAN_CON_SEND_STATE_NONE;
782
send_buffer_ptr= send_buffer;
784
return GEARMAN_SUCCESS;
789
gearman_packet_st *gearman_connection_st::receiving(gearman_packet_st& packet_arg,
790
gearman_return_t& ret,
791
const bool recv_data)
795
case GEARMAN_CON_RECV_UNIVERSAL_NONE:
796
if (state != GEARMAN_CON_UNIVERSAL_CONNECTED)
798
gearman_error(universal, GEARMAN_NOT_CONNECTED, "not connected");
799
ret= GEARMAN_NOT_CONNECTED;
803
recv_packet= gearman_packet_create(universal, &packet_arg);
804
if (recv_packet == NULL)
806
ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
810
recv_state= GEARMAN_CON_RECV_UNIVERSAL_READ;
812
case GEARMAN_CON_RECV_UNIVERSAL_READ:
815
if (recv_buffer_size > 0)
817
size_t recv_size= gearman_packet_unpack(*recv_packet,
819
recv_buffer_size, ret);
820
recv_buffer_ptr+= recv_size;
821
recv_buffer_size-= recv_size;
823
if (gearman_success(ret))
827
else if (ret != GEARMAN_IO_WAIT)
834
/* Shift buffer contents if needed. */
835
if (recv_buffer_size > 0)
837
memmove(recv_buffer, recv_buffer_ptr, recv_buffer_size);
839
recv_buffer_ptr= recv_buffer;
841
size_t recv_size= recv_socket(recv_buffer + recv_buffer_size, GEARMAN_RECV_BUFFER_SIZE - recv_buffer_size, ret);
842
if (gearman_failed(ret))
847
recv_buffer_size+= recv_size;
850
if (packet_arg.data_size == 0)
852
recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
856
recv_data_size= packet_arg.data_size;
860
recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
864
assert(packet_arg.universal);
865
packet_arg.data= gearman_malloc((*packet_arg.universal), packet_arg.data_size);
866
if (not packet_arg.data)
868
ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
873
packet_arg.options.free_data= true;
874
recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
876
case GEARMAN_CON_RECV_STATE_READ_DATA:
877
while (recv_data_size)
879
(void)receiving(static_cast<uint8_t *>(const_cast<void *>(packet_arg.data)) +
881
packet_arg.data_size -recv_data_offset, ret);
882
if (gearman_failed(ret))
888
recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
892
gearman_packet_st *tmp_packet_arg= recv_packet;
895
return tmp_packet_arg;
898
size_t gearman_connection_st::receiving(void *data, size_t data_size, gearman_return_t& ret)
902
if (recv_data_size == 0)
904
ret= GEARMAN_SUCCESS;
908
if ((recv_data_size - recv_data_offset) < data_size)
909
data_size= recv_data_size - recv_data_offset;
911
if (recv_buffer_size > 0)
913
if (recv_buffer_size < data_size)
914
recv_size= recv_buffer_size;
916
recv_size= data_size;
918
memcpy(data, recv_buffer_ptr, recv_size);
919
recv_buffer_ptr+= recv_size;
920
recv_buffer_size-= recv_size;
923
if (data_size != recv_size)
925
recv_size+= recv_socket(static_cast<uint8_t *>(const_cast<void *>(data)) + recv_size, data_size - recv_size, ret);
926
recv_data_offset+= recv_size;
930
recv_data_offset+= recv_size;
931
ret= GEARMAN_SUCCESS;
934
if (recv_data_size == recv_data_offset)
938
recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
944
size_t gearman_connection_st::recv_socket(void *data, size_t data_size, gearman_return_t& ret)
950
read_size= ::recv(fd, data, data_size, 0);
953
gearman_error(universal, GEARMAN_LOST_CONNECTION, "lost connection to server (EOF)");
955
ret= GEARMAN_LOST_CONNECTION;
959
else if (read_size == -1)
965
if (gearman_universal_is_non_blocking(universal))
967
gearman_gerror(universal, GEARMAN_IO_WAIT);
968
ret= GEARMAN_IO_WAIT;
972
ret= gearman_wait(universal);
974
if (ret == GEARMAN_SHUTDOWN_GRACEFUL)
976
ret= gearman_kill(gearman_universal_id(universal), GEARMAN_INTERRUPT);
978
if (gearman_failed(ret))
980
ret= GEARMAN_SHUTDOWN;
983
else if (ret == GEARMAN_SHUTDOWN)
989
if (gearman_failed(ret))
996
else if (errno == EINTR)
1000
else if (errno == EPIPE || errno == ECONNRESET || errno == EHOSTDOWN)
1002
gearman_perror(universal, "lost connection to server during read");
1003
ret= GEARMAN_LOST_CONNECTION;
1007
gearman_perror(universal, "read");
1018
ret= GEARMAN_SUCCESS;
1019
return size_t(read_size);
1022
void gearman_connection_st::set_events(short arg)
1024
if ((events | arg) == events)
1030
void gearman_connection_st::set_revents(short arg)
1033
options.ready= true;
1036
events&= short(~arg);
1040
* Static Definitions
1043
static gearman_return_t _con_setsockopt(gearman_connection_st *connection)
1046
struct linger linger;
1047
struct timeval waittime;
1050
ret= setsockopt(connection->fd, IPPROTO_TCP, TCP_NODELAY, &ret,
1051
socklen_t(sizeof(int)));
1052
if (ret == -1 && errno != EOPNOTSUPP)
1054
gearman_perror(connection->universal, "setsockopt(TCP_NODELAY)");
1055
return GEARMAN_ERRNO;
1059
linger.l_linger= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
1060
ret= setsockopt(connection->fd, SOL_SOCKET, SO_LINGER, &linger,
1061
socklen_t(sizeof(struct linger)));
1064
gearman_perror(connection->universal, "setsockopt(SO_LINGER)");
1065
return GEARMAN_ERRNO;
1068
waittime.tv_sec= GEARMAN_DEFAULT_SOCKET_TIMEOUT;
1069
waittime.tv_usec= 0;
1070
ret= setsockopt(connection->fd, SOL_SOCKET, SO_SNDTIMEO, &waittime,
1071
socklen_t(sizeof(struct timeval)));
1072
if (ret == -1 && errno != ENOPROTOOPT)
1074
gearman_perror(connection->universal, "setsockopt(SO_SNDTIMEO)");
1075
return GEARMAN_ERRNO;
1080
ret= setsockopt(connection->fd, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval));
1081
if (ret == -1 && errno != ENOPROTOOPT)
1083
gearman_perror(connection->universal, "setsockopt(SO_RCVTIMEO)");
1084
return GEARMAN_ERRNO;
1089
ret= setsockopt(connection->fd, SOL_SOCKET, SO_RCVTIMEO, &waittime,
1090
socklen_t(sizeof(struct timeval)));
1091
if (ret == -1 && errno != ENOPROTOOPT)
1093
gearman_perror(connection->universal, "setsockopt(SO_RCVTIMEO)");
1094
return GEARMAN_ERRNO;
1097
ret= GEARMAN_DEFAULT_SOCKET_SEND_SIZE;
1098
ret= setsockopt(connection->fd, SOL_SOCKET, SO_SNDBUF, &ret, socklen_t(sizeof(int)));
1101
gearman_perror(connection->universal, "setsockopt(SO_SNDBUF)");
1102
return GEARMAN_ERRNO;
1105
#if defined(__MACH__) && defined(__APPLE__) || defined(__FreeBSD__)
1108
setsockopt(connection->fd, SOL_SOCKET, SO_NOSIGPIPE, static_cast<void *>(&ret), sizeof(int));
1110
// This is not considered a fatal error
1113
gearman_perror(connection->universal, "setsockopt(SO_NOSIGPIPE)");
1118
ret= GEARMAN_DEFAULT_SOCKET_RECV_SIZE;
1119
ret= setsockopt(connection->fd, SOL_SOCKET, SO_RCVBUF, &ret, socklen_t(sizeof(int)));
1122
gearman_perror(connection->universal, "setsockopt(SO_RCVBUF)");
1123
return GEARMAN_ERRNO;
1126
ret= fcntl(connection->fd, F_GETFL, 0);
1129
gearman_perror(connection->universal, "fcntl(F_GETFL)");
1130
return GEARMAN_ERRNO;
1133
ret= fcntl(connection->fd, F_SETFL, ret | O_NONBLOCK);
1136
gearman_perror(connection->universal, "fcntl(F_SETFL)");
1137
return GEARMAN_ERRNO;
1140
return GEARMAN_SUCCESS;