29
29
static gearman_return_t _con_setsockopt(gearman_con_st *con);
32
* Read data from a connection.
34
static size_t _con_read(gearman_con_st *con, void *data, size_t data_size,
35
gearman_return_t *ret_ptr);
67
memset(con, 0, sizeof(gearman_con_st));
68
con->options|= GEARMAN_CON_ALLOCATED;
61
con->options= GEARMAN_CON_ALLOCATED;
71
memset(con, 0, sizeof(gearman_con_st));
74
con->created_id_next= 0;
75
con->send_buffer_size= 0;
76
con->send_data_size= 0;
77
con->send_data_offset= 0;
78
con->recv_buffer_size= 0;
79
con->recv_data_size= 0;
80
con->recv_data_offset= 0;
73
81
con->gearman= gearman;
75
82
GEARMAN_LIST_ADD(gearman->con, con,)
85
con->addrinfo_next= NULL;
78
86
con->send_buffer_ptr= con->send_buffer;
87
con->recv_packet= NULL;
88
con->recv_buffer_ptr= con->recv_buffer;
89
con->protocol_data= NULL;
90
con->protocol_data_free_fn= NULL;
92
con->recv_data_fn= NULL;
94
con->send_data_fn= NULL;
95
con->packet_pack_fn= gearman_packet_pack;
96
con->packet_unpack_fn= gearman_packet_unpack;
102
121
gearman_con_reset_addrinfo(con);
123
if (con->protocol_data != NULL && con->protocol_data_free_fn != NULL)
124
(*con->protocol_data_free_fn)(con, con->protocol_data);
104
126
GEARMAN_LIST_DEL(con->gearman->con, con,)
106
128
if (con->options & GEARMAN_CON_PACKET_IN_USE)
124
146
gearman_con_reset_addrinfo(con);
126
con->port= port == 0 ? GEARMAN_DEFAULT_TCP_PORT : port;
148
con->port= (in_port_t)(port == 0 ? GEARMAN_DEFAULT_TCP_PORT : port);
129
151
void gearman_con_set_options(gearman_con_st *con, gearman_con_options_t options,
211
233
gearman_packet_st *packet, bool flush)
213
235
gearman_return_t ret;
238
if (con->send_fn != NULL)
239
return (*con->send_fn)(con, packet, flush);
215
241
switch (con->send_state)
221
247
return GEARMAN_INVALID_PACKET;
224
/* Flush buffer now if args wont' fit in. */
225
if (packet->args_size > (GEARMAN_SEND_BUFFER_SIZE - con->send_buffer_size))
250
/* Pack first part of packet, which is everything but the payload. */
253
send_size= (*con->packet_pack_fn)(packet, con,
255
con->send_buffer_size,
256
GEARMAN_SEND_BUFFER_SIZE -
257
con->send_buffer_size,
259
if (ret == GEARMAN_SUCCESS)
261
con->send_buffer_size+= send_size;
264
else if (ret == GEARMAN_IGNORE_PACKET)
265
return GEARMAN_SUCCESS;
266
else if (ret != GEARMAN_FLUSH_DATA)
269
/* We were asked to flush when the buffer is already flushed! */
270
if (con->send_buffer_size == 0)
272
GEARMAN_ERROR_SET(con->gearman, "gearman_con_send",
273
"send buffer too small (%u)",
274
GEARMAN_SEND_BUFFER_SIZE)
275
return GEARMAN_SEND_BUFFER_TOO_SMALL;
278
/* Flush buffer now if first part of packet won't fit in. */
227
279
con->send_state= GEARMAN_CON_SEND_STATE_PRE_FLUSH;
229
281
case GEARMAN_CON_SEND_STATE_PRE_FLUSH:
235
if (packet->args_size > 0)
237
memcpy(con->send_buffer + con->send_buffer_size, packet->args,
239
con->send_buffer_size+= packet->args_size;
242
287
/* Return here if we have no data to send. */
243
288
if (packet->data_size == 0)
298
343
case GEARMAN_CON_SEND_STATE_FLUSH:
299
344
case GEARMAN_CON_SEND_STATE_FLUSH_DATA:
300
return gearman_con_flush(con);
345
ret= gearman_con_flush(con);
346
if (ret == GEARMAN_SUCCESS && con->options & GEARMAN_CON_CLOSE_AFTER_FLUSH)
348
gearman_con_close(con);
349
ret= GEARMAN_LOST_CONNECTION;
303
354
GEARMAN_ERROR_SET(con->gearman, "gearman_con_send", "unknown state: %u",
310
361
con->send_state= GEARMAN_CON_SEND_STATE_FLUSH;
311
return gearman_con_flush(con);
362
ret= gearman_con_flush(con);
363
if (ret == GEARMAN_SUCCESS && con->options & GEARMAN_CON_CLOSE_AFTER_FLUSH)
365
gearman_con_close(con);
366
ret= GEARMAN_LOST_CONNECTION;
314
371
con->send_state= GEARMAN_CON_SEND_STATE_NONE;
318
375
size_t gearman_con_send_data(gearman_con_st *con, const void *data,
319
376
size_t data_size, gearman_return_t *ret_ptr)
378
if (con->send_data_fn != NULL)
379
return (*con->send_data_fn)(con, data, data_size, ret_ptr);
321
381
if (con->send_state != GEARMAN_CON_SEND_STATE_FLUSH_DATA)
323
383
GEARMAN_ERROR_SET(con->gearman, "gearman_con_send_data", "not flushing")
480
540
write_size= write(con->fd, con->send_buffer_ptr, con->send_buffer_size);
481
541
if (write_size == 0)
483
if (con->gearman->log_fn == NULL ||
484
con->gearman->verbose >= GEARMAN_VERBOSE_DEBUG)
543
if (!(con->options & GEARMAN_CON_IGNORE_LOST_CONNECTION))
486
545
GEARMAN_ERROR_SET(con->gearman, "gearman_con_flush",
487
546
"lost connection to server (EOF)")
509
568
else if (errno == EINTR)
511
570
else if (errno == EPIPE || errno == ECONNRESET)
513
if (con->gearman->log_fn == NULL ||
514
con->gearman->verbose >= GEARMAN_VERBOSE_DEBUG)
572
if (!(con->options & GEARMAN_CON_IGNORE_LOST_CONNECTION))
516
574
GEARMAN_ERROR_SET(con->gearman, "gearman_con_flush",
517
575
"lost connection to server (%d)", errno)
527
585
return GEARMAN_ERRNO;
530
con->send_buffer_size-= write_size;
588
con->send_buffer_size-= (size_t)write_size;
531
589
if (con->send_state == GEARMAN_CON_SEND_STATE_FLUSH_DATA)
533
con->send_data_offset+= write_size;
591
con->send_data_offset+= (size_t)write_size;
534
592
if (con->send_data_offset == con->send_data_size)
536
594
con->send_data_size= 0;
589
647
if (gearman->sending == 0)
591
649
for (con= gearman->con_list; con != NULL; con= con->next)
593
651
ret= gearman_con_send(con, packet, true);
594
652
if (ret != GEARMAN_SUCCESS)
596
654
if (ret != GEARMAN_IO_WAIT)
598
656
gearman->options= options;
608
666
while (gearman->sending != 0)
610
668
while ((con= gearman_con_ready(gearman)) != NULL)
612
670
ret= gearman_con_send(con, packet, true);
613
671
if (ret != GEARMAN_SUCCESS)
615
673
if (ret != GEARMAN_IO_WAIT)
617
675
gearman->options= options;
652
710
size_t recv_size;
712
if (con->recv_fn != NULL)
713
return (*con->recv_fn)(con, packet, ret_ptr, recv_data);
654
715
switch (con->recv_state)
656
717
case GEARMAN_CON_RECV_STATE_NONE:
676
737
if (con->recv_buffer_size > 0)
678
recv_size= gearman_packet_parse(con->recv_packet, con->recv_buffer_ptr,
679
con->recv_buffer_size, ret_ptr);
739
recv_size= (*con->packet_unpack_fn)(con->recv_packet, con,
740
con->recv_buffer_ptr,
741
con->recv_buffer_size, ret_ptr);
680
742
con->recv_buffer_ptr+= recv_size;
681
743
con->recv_buffer_size-= recv_size;
682
744
if (*ret_ptr == GEARMAN_SUCCESS)
693
755
memmove(con->recv_buffer, con->recv_buffer_ptr, con->recv_buffer_size);
694
756
con->recv_buffer_ptr= con->recv_buffer;
696
recv_size= _con_read(con, con->recv_buffer + con->recv_buffer_size,
697
GEARMAN_RECV_BUFFER_SIZE - con->recv_buffer_size,
758
recv_size= gearman_con_read(con, con->recv_buffer + con->recv_buffer_size,
759
GEARMAN_RECV_BUFFER_SIZE - con->recv_buffer_size,
699
761
if (*ret_ptr != GEARMAN_SUCCESS)
767
829
size_t recv_size= 0;
831
if (con->recv_data_fn != NULL)
832
return (*con->recv_data_fn)(con, data, data_size, ret_ptr);
769
834
if (con->recv_data_size == 0)
771
836
*ret_ptr= GEARMAN_SUCCESS;
790
855
if (data_size != recv_size)
792
recv_size+= _con_read(con, ((uint8_t *)data) + recv_size,
793
data_size - recv_size, ret_ptr);
857
recv_size+= gearman_con_read(con, ((uint8_t *)data) + recv_size,
858
data_size - recv_size, ret_ptr);
794
859
con->recv_data_offset+= recv_size;
809
874
return recv_size;
877
size_t gearman_con_read(gearman_con_st *con, void *data, size_t data_size,
878
gearman_return_t *ret_ptr)
884
read_size= read(con->fd, data, data_size);
887
if (!(con->options & GEARMAN_CON_IGNORE_LOST_CONNECTION))
889
GEARMAN_ERROR_SET(con->gearman, "gearman_con_read",
890
"lost connection to server (EOF)")
892
gearman_con_close(con);
893
*ret_ptr= GEARMAN_LOST_CONNECTION;
896
else if (read_size == -1)
900
*ret_ptr= gearman_con_set_events(con, POLLIN);
901
if (*ret_ptr != GEARMAN_SUCCESS)
904
if (con->gearman->options & GEARMAN_NON_BLOCKING)
906
*ret_ptr= GEARMAN_IO_WAIT;
910
*ret_ptr= gearman_con_wait(con->gearman, -1);
911
if (*ret_ptr != GEARMAN_SUCCESS)
916
else if (errno == EINTR)
918
else if (errno == EPIPE || errno == ECONNRESET)
920
if (!(con->options & GEARMAN_CON_IGNORE_LOST_CONNECTION))
922
GEARMAN_ERROR_SET(con->gearman, "gearman_con_read",
923
"lost connection to server (%d)", errno)
925
*ret_ptr= GEARMAN_LOST_CONNECTION;
929
GEARMAN_ERROR_SET(con->gearman, "gearman_con_read", "read:%d", errno)
930
con->gearman->last_errno= errno;
931
*ret_ptr= GEARMAN_ERRNO;
934
gearman_con_close(con);
941
*ret_ptr= GEARMAN_SUCCESS;
942
return (size_t)read_size;
812
945
gearman_return_t gearman_con_wait(gearman_st *gearman, int timeout)
814
947
gearman_con_st *con;
815
948
struct pollfd *pfds;
819
952
if (gearman->pfds_size < gearman->con_count)
821
954
pfds= realloc(gearman->pfds, gearman->con_count * sizeof(struct pollfd));
822
955
if (pfds == NULL)
824
957
GEARMAN_ERROR_SET(gearman, "gearman_con_wait", "realloc")
825
958
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
828
961
gearman->pfds= pfds;
829
962
gearman->pfds_size= gearman->con_count;
832
965
pfds= gearman->pfds;
981
1114
return GEARMAN_SUCCESS;
1117
void *gearman_con_protocol_data(gearman_con_st *con)
1119
return con->protocol_data;
1122
void gearman_con_set_protocol_data(gearman_con_st *con, void *data)
1124
con->protocol_data= data;
1127
void gearman_con_set_protocol_data_free_fn(gearman_con_st *con,
1128
gearman_con_protocol_data_free_fn *free_fn)
1130
con->protocol_data_free_fn= free_fn;
1133
void gearman_con_set_recv_fn(gearman_con_st *con, gearman_con_recv_fn recv_fn)
1135
con->recv_fn= recv_fn;
1138
void gearman_con_set_recv_data_fn(gearman_con_st *con,
1139
gearman_con_recv_data_fn recv_data_fn)
1141
con->recv_data_fn= recv_data_fn;
1144
void gearman_con_set_send_fn(gearman_con_st *con, gearman_con_send_fn send_fn)
1146
con->send_fn= send_fn;
1149
void gearman_con_set_send_data_fn(gearman_con_st *con,
1150
gearman_con_send_data_fn send_data_fn)
1152
con->send_data_fn= send_data_fn;
1155
void gearman_con_set_packet_pack_fn(gearman_con_st *con,
1156
gearman_packet_pack_fn packet_pack_fn)
1158
con->packet_pack_fn= packet_pack_fn;
1161
void gearman_con_set_packet_unpack_fn(gearman_con_st *con,
1162
gearman_packet_unpack_fn packet_unpack_fn)
1164
con->packet_unpack_fn= packet_unpack_fn;
985
1168
* Private definitions
1069
1252
return GEARMAN_SUCCESS;
1072
static size_t _con_read(gearman_con_st *con, void *data, size_t data_size,
1073
gearman_return_t *ret_ptr)
1079
read_size= read(con->fd, data, data_size);
1082
if (con->gearman->log_fn == NULL ||
1083
con->gearman->verbose >= GEARMAN_VERBOSE_DEBUG)
1085
GEARMAN_ERROR_SET(con->gearman, "_con_read",
1086
"lost connection to server (EOF)")
1088
gearman_con_close(con);
1089
*ret_ptr= GEARMAN_LOST_CONNECTION;
1092
else if (read_size == -1)
1094
if (errno == EAGAIN)
1096
*ret_ptr= gearman_con_set_events(con, POLLIN);
1097
if (*ret_ptr != GEARMAN_SUCCESS)
1100
if (con->gearman->options & GEARMAN_NON_BLOCKING)
1102
*ret_ptr= GEARMAN_IO_WAIT;
1106
*ret_ptr= gearman_con_wait(con->gearman, -1);
1107
if (*ret_ptr != GEARMAN_SUCCESS)
1112
else if (errno == EINTR)
1114
else if (errno == EPIPE || errno == ECONNRESET)
1116
if (con->gearman->log_fn == NULL ||
1117
con->gearman->verbose >= GEARMAN_VERBOSE_DEBUG)
1119
GEARMAN_ERROR_SET(con->gearman, "_con_read",
1120
"lost connection to server (%d)", errno)
1122
*ret_ptr= GEARMAN_LOST_CONNECTION;
1126
GEARMAN_ERROR_SET(con->gearman, "_con_read", "read:%d", errno)
1127
con->gearman->last_errno= errno;
1128
*ret_ptr= GEARMAN_ERRNO;
1131
gearman_con_close(con);
1138
*ret_ptr= GEARMAN_SUCCESS;