~j-pureftpd/gearmand/tokyo

« back to all changes in this revision

Viewing changes to libgearman/conn.c

  • Committer: Frank Denis
  • Date: 2009-06-22 14:50:52 UTC
  • mfrom: (50.1.1 gearmand)
  • Revision ID: j@jedi.devteam.orbus.fr-20090622145052-s40f87uem2zrmvdz
Sync with trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
28
28
 */
29
29
static gearman_return_t _con_setsockopt(gearman_con_st *con);
30
30
 
31
 
/**
32
 
 * Read data from a connection.
33
 
 */
34
 
static size_t _con_read(gearman_con_st *con, void *data, size_t data_size,
35
 
                        gearman_return_t *ret_ptr);
36
 
 
37
31
/** @} */
38
32
 
39
33
/*
64
58
      return NULL;
65
59
    }
66
60
 
67
 
    memset(con, 0, sizeof(gearman_con_st));
68
 
    con->options|= GEARMAN_CON_ALLOCATED;
 
61
    con->options= GEARMAN_CON_ALLOCATED;
69
62
  }
70
63
  else
71
 
    memset(con, 0, sizeof(gearman_con_st));
 
64
    con->options= 0;
72
65
 
 
66
  con->state= 0;
 
67
  con->send_state= 0;
 
68
  con->recv_state= 0;
 
69
  con->port= 0;
 
70
  con->events= 0;
 
71
  con->revents= 0;
 
72
  con->fd= -1;
 
73
  con->created_id= 0;
 
74
  con->created_id_next= 0;
 
75
  con->send_buffer_size= 0;
 
76
  con->send_data_size= 0;
 
77
  con->send_data_offset= 0;
 
78
  con->recv_buffer_size= 0;
 
79
  con->recv_data_size= 0;
 
80
  con->recv_data_offset= 0;
73
81
  con->gearman= gearman;
74
 
 
75
82
  GEARMAN_LIST_ADD(gearman->con, con,)
76
 
 
77
 
  con->fd= -1;
 
83
  con->data= NULL;
 
84
  con->addrinfo= NULL;
 
85
  con->addrinfo_next= NULL;
78
86
  con->send_buffer_ptr= con->send_buffer;
 
87
  con->recv_packet= NULL;
 
88
  con->recv_buffer_ptr= con->recv_buffer;
 
89
  con->protocol_data= NULL;
 
90
  con->protocol_data_free_fn= NULL;
 
91
  con->recv_fn= NULL;
 
92
  con->recv_data_fn= NULL;
 
93
  con->send_fn= NULL;
 
94
  con->send_data_fn= NULL;
 
95
  con->packet_pack_fn= gearman_packet_pack;
 
96
  con->packet_unpack_fn= gearman_packet_unpack;
 
97
  con->host[0]= 0;
79
98
 
80
99
  return con;
81
100
}
101
120
 
102
121
  gearman_con_reset_addrinfo(con);
103
122
 
 
123
  if (con->protocol_data != NULL && con->protocol_data_free_fn != NULL)
 
124
    (*con->protocol_data_free_fn)(con, con->protocol_data);
 
125
 
104
126
  GEARMAN_LIST_DEL(con->gearman->con, con,)
105
127
 
106
128
  if (con->options & GEARMAN_CON_PACKET_IN_USE)
123
145
{
124
146
  gearman_con_reset_addrinfo(con);
125
147
 
126
 
  con->port= port == 0 ? GEARMAN_DEFAULT_TCP_PORT : port;
 
148
  con->port= (in_port_t)(port == 0 ? GEARMAN_DEFAULT_TCP_PORT : port);
127
149
}
128
150
 
129
151
void gearman_con_set_options(gearman_con_st *con, gearman_con_options_t options,
157
179
{
158
180
  return con->data;
159
181
}
160
 
  
 
182
 
161
183
void gearman_con_set_data(gearman_con_st *con, void *data)
162
184
{
163
185
  con->data= data;
211
233
                                  gearman_packet_st *packet, bool flush)
212
234
{
213
235
  gearman_return_t ret;
 
236
  size_t send_size;
 
237
 
 
238
  if (con->send_fn != NULL)
 
239
    return (*con->send_fn)(con, packet, flush);
214
240
 
215
241
  switch (con->send_state)
216
242
  {
221
247
      return GEARMAN_INVALID_PACKET;
222
248
    }
223
249
 
224
 
    /* Flush buffer now if args wont' fit in. */
225
 
    if (packet->args_size > (GEARMAN_SEND_BUFFER_SIZE - con->send_buffer_size))
 
250
    /* Pack first part of packet, which is everything but the payload. */
 
251
    while (1)
226
252
    {
 
253
      send_size= (*con->packet_pack_fn)(packet, con,
 
254
                                        con->send_buffer +
 
255
                                        con->send_buffer_size,
 
256
                                        GEARMAN_SEND_BUFFER_SIZE -
 
257
                                        con->send_buffer_size,
 
258
                                        &ret);
 
259
      if (ret == GEARMAN_SUCCESS)
 
260
      {
 
261
        con->send_buffer_size+= send_size;
 
262
        break;
 
263
      }
 
264
      else if (ret == GEARMAN_IGNORE_PACKET)
 
265
        return GEARMAN_SUCCESS;
 
266
      else if (ret != GEARMAN_FLUSH_DATA)
 
267
        return ret;
 
268
 
 
269
      /* We were asked to flush when the buffer is already flushed! */
 
270
      if (con->send_buffer_size == 0)
 
271
      {
 
272
        GEARMAN_ERROR_SET(con->gearman, "gearman_con_send",
 
273
                          "send buffer too small (%u)",
 
274
                          GEARMAN_SEND_BUFFER_SIZE)
 
275
        return GEARMAN_SEND_BUFFER_TOO_SMALL;
 
276
      }
 
277
 
 
278
      /* Flush buffer now if first part of packet won't fit in. */
227
279
      con->send_state= GEARMAN_CON_SEND_STATE_PRE_FLUSH;
228
280
 
229
281
  case GEARMAN_CON_SEND_STATE_PRE_FLUSH:
232
284
        return ret;
233
285
    }
234
286
 
235
 
    if (packet->args_size > 0)
236
 
    {
237
 
      memcpy(con->send_buffer + con->send_buffer_size, packet->args,
238
 
             packet->args_size);
239
 
      con->send_buffer_size+= packet->args_size;
240
 
    }
241
 
 
242
287
    /* Return here if we have no data to send. */
243
288
    if (packet->data_size == 0)
244
289
      break;
297
342
 
298
343
  case GEARMAN_CON_SEND_STATE_FLUSH:
299
344
  case GEARMAN_CON_SEND_STATE_FLUSH_DATA:
300
 
    return gearman_con_flush(con);
 
345
    ret= gearman_con_flush(con);
 
346
    if (ret == GEARMAN_SUCCESS && con->options & GEARMAN_CON_CLOSE_AFTER_FLUSH)
 
347
    {
 
348
      gearman_con_close(con);
 
349
      ret= GEARMAN_LOST_CONNECTION;
 
350
    }
 
351
    return ret;
301
352
 
302
353
  default:
303
354
    GEARMAN_ERROR_SET(con->gearman, "gearman_con_send", "unknown state: %u",
308
359
  if (flush)
309
360
  {
310
361
    con->send_state= GEARMAN_CON_SEND_STATE_FLUSH;
311
 
    return gearman_con_flush(con);
 
362
    ret= gearman_con_flush(con);
 
363
    if (ret == GEARMAN_SUCCESS && con->options & GEARMAN_CON_CLOSE_AFTER_FLUSH)
 
364
    {
 
365
      gearman_con_close(con);
 
366
      ret= GEARMAN_LOST_CONNECTION;
 
367
    }
 
368
    return ret;
312
369
  }
313
370
 
314
371
  con->send_state= GEARMAN_CON_SEND_STATE_NONE;
318
375
size_t gearman_con_send_data(gearman_con_st *con, const void *data,
319
376
                             size_t data_size, gearman_return_t *ret_ptr)
320
377
{
 
378
  if (con->send_data_fn != NULL)
 
379
    return (*con->send_data_fn)(con, data, data_size, ret_ptr);
 
380
 
321
381
  if (con->send_state != GEARMAN_CON_SEND_STATE_FLUSH_DATA)
322
382
  {
323
383
    GEARMAN_ERROR_SET(con->gearman, "gearman_con_send_data", "not flushing")
480
540
        write_size= write(con->fd, con->send_buffer_ptr, con->send_buffer_size);
481
541
        if (write_size == 0)
482
542
        {
483
 
          if (con->gearman->log_fn == NULL ||
484
 
              con->gearman->verbose >= GEARMAN_VERBOSE_DEBUG)
 
543
          if (!(con->options & GEARMAN_CON_IGNORE_LOST_CONNECTION))
485
544
          {
486
545
            GEARMAN_ERROR_SET(con->gearman, "gearman_con_flush",
487
546
                              "lost connection to server (EOF)")
492
551
        else if (write_size == -1)
493
552
        {
494
553
          if (errno == EAGAIN)
495
 
          { 
 
554
          {
496
555
            gret= gearman_con_set_events(con, POLLOUT);
497
556
            if (gret != GEARMAN_SUCCESS)
498
557
              return gret;
505
564
              return gret;
506
565
 
507
566
            continue;
508
 
          } 
 
567
          }
509
568
          else if (errno == EINTR)
510
569
            continue;
511
570
          else if (errno == EPIPE || errno == ECONNRESET)
512
571
          {
513
 
            if (con->gearman->log_fn == NULL ||
514
 
                con->gearman->verbose >= GEARMAN_VERBOSE_DEBUG)
 
572
            if (!(con->options & GEARMAN_CON_IGNORE_LOST_CONNECTION))
515
573
            {
516
574
              GEARMAN_ERROR_SET(con->gearman, "gearman_con_flush",
517
575
                                "lost connection to server (%d)", errno)
527
585
          return GEARMAN_ERRNO;
528
586
        }
529
587
 
530
 
        con->send_buffer_size-= write_size;
 
588
        con->send_buffer_size-= (size_t)write_size;
531
589
        if (con->send_state == GEARMAN_CON_SEND_STATE_FLUSH_DATA)
532
590
        {
533
 
          con->send_data_offset+= write_size;
 
591
          con->send_data_offset+= (size_t)write_size;
534
592
          if (con->send_data_offset == con->send_data_size)
535
593
          {
536
594
            con->send_data_size= 0;
589
647
  if (gearman->sending == 0)
590
648
  {
591
649
    for (con= gearman->con_list; con != NULL; con= con->next)
592
 
    { 
 
650
    {
593
651
      ret= gearman_con_send(con, packet, true);
594
652
      if (ret != GEARMAN_SUCCESS)
595
 
      { 
 
653
      {
596
654
        if (ret != GEARMAN_IO_WAIT)
597
655
        {
598
656
          gearman->options= options;
608
666
  while (gearman->sending != 0)
609
667
  {
610
668
    while ((con= gearman_con_ready(gearman)) != NULL)
611
 
    { 
 
669
    {
612
670
      ret= gearman_con_send(con, packet, true);
613
671
      if (ret != GEARMAN_SUCCESS)
614
 
      { 
 
672
      {
615
673
        if (ret != GEARMAN_IO_WAIT)
616
674
        {
617
675
          gearman->options= options;
651
709
{
652
710
  size_t recv_size;
653
711
 
 
712
  if (con->recv_fn != NULL)
 
713
    return (*con->recv_fn)(con, packet, ret_ptr, recv_data);
 
714
 
654
715
  switch (con->recv_state)
655
716
  {
656
717
  case GEARMAN_CON_RECV_STATE_NONE:
675
736
    {
676
737
      if (con->recv_buffer_size > 0)
677
738
      {
678
 
        recv_size= gearman_packet_parse(con->recv_packet, con->recv_buffer_ptr,
679
 
                                        con->recv_buffer_size, ret_ptr);
 
739
        recv_size= (*con->packet_unpack_fn)(con->recv_packet, con,
 
740
                                            con->recv_buffer_ptr,
 
741
                                            con->recv_buffer_size, ret_ptr);
680
742
        con->recv_buffer_ptr+= recv_size;
681
743
        con->recv_buffer_size-= recv_size;
682
744
        if (*ret_ptr == GEARMAN_SUCCESS)
693
755
        memmove(con->recv_buffer, con->recv_buffer_ptr, con->recv_buffer_size);
694
756
      con->recv_buffer_ptr= con->recv_buffer;
695
757
 
696
 
      recv_size= _con_read(con, con->recv_buffer + con->recv_buffer_size,
697
 
                           GEARMAN_RECV_BUFFER_SIZE - con->recv_buffer_size,
698
 
                           ret_ptr);
 
758
      recv_size= gearman_con_read(con, con->recv_buffer + con->recv_buffer_size,
 
759
                               GEARMAN_RECV_BUFFER_SIZE - con->recv_buffer_size,
 
760
                               ret_ptr);
699
761
      if (*ret_ptr != GEARMAN_SUCCESS)
700
762
        return NULL;
701
763
 
766
828
{
767
829
  size_t recv_size= 0;
768
830
 
 
831
  if (con->recv_data_fn != NULL)
 
832
    return (*con->recv_data_fn)(con, data, data_size, ret_ptr);
 
833
 
769
834
  if (con->recv_data_size == 0)
770
835
  {
771
836
    *ret_ptr= GEARMAN_SUCCESS;
789
854
 
790
855
  if (data_size != recv_size)
791
856
  {
792
 
    recv_size+= _con_read(con, ((uint8_t *)data) + recv_size,
793
 
                          data_size - recv_size, ret_ptr);
 
857
    recv_size+= gearman_con_read(con, ((uint8_t *)data) + recv_size,
 
858
                                 data_size - recv_size, ret_ptr);
794
859
    con->recv_data_offset+= recv_size;
795
860
  }
796
861
  else
809
874
  return recv_size;
810
875
}
811
876
 
 
877
size_t gearman_con_read(gearman_con_st *con, void *data, size_t data_size,
 
878
                        gearman_return_t *ret_ptr)
 
879
{
 
880
  ssize_t read_size;
 
881
 
 
882
  while (1)
 
883
  {
 
884
    read_size= read(con->fd, data, data_size);
 
885
    if (read_size == 0)
 
886
    {
 
887
      if (!(con->options & GEARMAN_CON_IGNORE_LOST_CONNECTION))
 
888
      {
 
889
        GEARMAN_ERROR_SET(con->gearman, "gearman_con_read",
 
890
                          "lost connection to server (EOF)")
 
891
      }
 
892
      gearman_con_close(con);
 
893
      *ret_ptr= GEARMAN_LOST_CONNECTION;
 
894
      return 0;
 
895
    }
 
896
    else if (read_size == -1)
 
897
    {
 
898
      if (errno == EAGAIN)
 
899
      {
 
900
        *ret_ptr= gearman_con_set_events(con, POLLIN);
 
901
        if (*ret_ptr != GEARMAN_SUCCESS)
 
902
          return 0;
 
903
 
 
904
        if (con->gearman->options & GEARMAN_NON_BLOCKING)
 
905
        {
 
906
          *ret_ptr= GEARMAN_IO_WAIT;
 
907
          return 0;
 
908
        }
 
909
 
 
910
        *ret_ptr= gearman_con_wait(con->gearman, -1);
 
911
        if (*ret_ptr != GEARMAN_SUCCESS)
 
912
          return 0;
 
913
 
 
914
        continue;
 
915
      }
 
916
      else if (errno == EINTR)
 
917
        continue;
 
918
      else if (errno == EPIPE || errno == ECONNRESET)
 
919
      {
 
920
        if (!(con->options & GEARMAN_CON_IGNORE_LOST_CONNECTION))
 
921
        {
 
922
          GEARMAN_ERROR_SET(con->gearman, "gearman_con_read",
 
923
                            "lost connection to server (%d)", errno)
 
924
        }
 
925
        *ret_ptr= GEARMAN_LOST_CONNECTION;
 
926
      }
 
927
      else
 
928
      {
 
929
        GEARMAN_ERROR_SET(con->gearman, "gearman_con_read", "read:%d", errno)
 
930
        con->gearman->last_errno= errno;
 
931
        *ret_ptr= GEARMAN_ERRNO;
 
932
      }
 
933
 
 
934
      gearman_con_close(con);
 
935
      return 0;
 
936
    }
 
937
 
 
938
    break;
 
939
  }
 
940
 
 
941
  *ret_ptr= GEARMAN_SUCCESS;
 
942
  return (size_t)read_size;
 
943
}
 
944
 
812
945
gearman_return_t gearman_con_wait(gearman_st *gearman, int timeout)
813
946
{
814
947
  gearman_con_st *con;
815
948
  struct pollfd *pfds;
816
 
  int x; 
 
949
  nfds_t x;
817
950
  int ret;
818
951
 
819
952
  if (gearman->pfds_size < gearman->con_count)
820
953
  {
821
954
    pfds= realloc(gearman->pfds, gearman->con_count * sizeof(struct pollfd));
822
955
    if (pfds == NULL)
823
 
    { 
 
956
    {
824
957
      GEARMAN_ERROR_SET(gearman, "gearman_con_wait", "realloc")
825
958
      return GEARMAN_MEMORY_ALLOCATION_FAILURE;
826
959
    }
827
960
 
828
961
    gearman->pfds= pfds;
829
962
    gearman->pfds_size= gearman->con_count;
830
 
  }  
 
963
  }
831
964
  else
832
965
    pfds= gearman->pfds;
833
966
 
842
975
    pfds[x].revents= 0;
843
976
    x++;
844
977
  }
845
 
  
 
978
 
846
979
  if (x == 0)
847
980
  {
848
981
    GEARMAN_ERROR_SET(gearman, "gearman_con_wait", "no active file descriptors")
864
997
 
865
998
    break;
866
999
  }
867
 
  
 
1000
 
868
1001
  x= 0;
869
1002
  for (con= gearman->con_list; con != NULL; con= con->next)
870
1003
  {
874
1007
    gearman_con_set_revents(con, pfds[x].revents);
875
1008
    x++;
876
1009
  }
877
 
  
 
1010
 
878
1011
  return GEARMAN_SUCCESS;
879
1012
}
880
1013
 
925
1058
      return con;
926
1059
    }
927
1060
  }
928
 
  
 
1061
 
929
1062
  return NULL;
930
1063
}
931
1064
 
981
1114
  return GEARMAN_SUCCESS;
982
1115
}
983
1116
 
 
1117
void *gearman_con_protocol_data(gearman_con_st *con)
 
1118
{
 
1119
  return con->protocol_data;
 
1120
}
 
1121
 
 
1122
void gearman_con_set_protocol_data(gearman_con_st *con, void *data)
 
1123
{
 
1124
  con->protocol_data= data;
 
1125
}
 
1126
 
 
1127
void gearman_con_set_protocol_data_free_fn(gearman_con_st *con,
 
1128
                                    gearman_con_protocol_data_free_fn *free_fn)
 
1129
{
 
1130
  con->protocol_data_free_fn= free_fn;
 
1131
}
 
1132
 
 
1133
void gearman_con_set_recv_fn(gearman_con_st *con, gearman_con_recv_fn recv_fn)
 
1134
{
 
1135
  con->recv_fn= recv_fn;
 
1136
}
 
1137
 
 
1138
void gearman_con_set_recv_data_fn(gearman_con_st *con,
 
1139
                                  gearman_con_recv_data_fn recv_data_fn)
 
1140
{
 
1141
  con->recv_data_fn= recv_data_fn;
 
1142
}
 
1143
 
 
1144
void gearman_con_set_send_fn(gearman_con_st *con, gearman_con_send_fn send_fn)
 
1145
{
 
1146
  con->send_fn= send_fn;
 
1147
}
 
1148
 
 
1149
void gearman_con_set_send_data_fn(gearman_con_st *con,
 
1150
                                  gearman_con_send_data_fn send_data_fn)
 
1151
{
 
1152
  con->send_data_fn= send_data_fn;
 
1153
}
 
1154
 
 
1155
void gearman_con_set_packet_pack_fn(gearman_con_st *con,
 
1156
                                    gearman_packet_pack_fn packet_pack_fn)
 
1157
{
 
1158
  con->packet_pack_fn= packet_pack_fn;
 
1159
}
 
1160
 
 
1161
void gearman_con_set_packet_unpack_fn(gearman_con_st *con,
 
1162
                                     gearman_packet_unpack_fn packet_unpack_fn)
 
1163
{
 
1164
  con->packet_unpack_fn= packet_unpack_fn;
 
1165
}
 
1166
 
984
1167
/*
985
1168
 * Private definitions
986
1169
 */
1068
1251
 
1069
1252
  return GEARMAN_SUCCESS;
1070
1253
}
1071
 
 
1072
 
static size_t _con_read(gearman_con_st *con, void *data, size_t data_size,
1073
 
                        gearman_return_t *ret_ptr)
1074
 
{
1075
 
  ssize_t read_size;
1076
 
 
1077
 
  while (1)
1078
 
  {
1079
 
    read_size= read(con->fd, data, data_size);
1080
 
    if (read_size == 0)
1081
 
    {
1082
 
      if (con->gearman->log_fn == NULL ||
1083
 
          con->gearman->verbose >= GEARMAN_VERBOSE_DEBUG)
1084
 
      {
1085
 
        GEARMAN_ERROR_SET(con->gearman, "_con_read",
1086
 
                          "lost connection to server (EOF)")
1087
 
      }
1088
 
      gearman_con_close(con);
1089
 
      *ret_ptr= GEARMAN_LOST_CONNECTION;
1090
 
      return 0;
1091
 
    }
1092
 
    else if (read_size == -1)
1093
 
    {
1094
 
      if (errno == EAGAIN)
1095
 
      {
1096
 
        *ret_ptr= gearman_con_set_events(con, POLLIN);
1097
 
        if (*ret_ptr != GEARMAN_SUCCESS)
1098
 
          return 0;
1099
 
 
1100
 
        if (con->gearman->options & GEARMAN_NON_BLOCKING)
1101
 
        {
1102
 
          *ret_ptr= GEARMAN_IO_WAIT;
1103
 
          return 0;
1104
 
        }
1105
 
 
1106
 
        *ret_ptr= gearman_con_wait(con->gearman, -1);
1107
 
        if (*ret_ptr != GEARMAN_SUCCESS)
1108
 
          return 0;
1109
 
 
1110
 
        continue;
1111
 
      }
1112
 
      else if (errno == EINTR)
1113
 
        continue;
1114
 
      else if (errno == EPIPE || errno == ECONNRESET)
1115
 
      {
1116
 
        if (con->gearman->log_fn == NULL ||
1117
 
            con->gearman->verbose >= GEARMAN_VERBOSE_DEBUG)
1118
 
        {
1119
 
          GEARMAN_ERROR_SET(con->gearman, "_con_read",
1120
 
                            "lost connection to server (%d)", errno)
1121
 
        }
1122
 
        *ret_ptr= GEARMAN_LOST_CONNECTION;
1123
 
      }
1124
 
      else
1125
 
      {
1126
 
        GEARMAN_ERROR_SET(con->gearman, "_con_read", "read:%d", errno)
1127
 
        con->gearman->last_errno= errno;
1128
 
        *ret_ptr= GEARMAN_ERRNO;
1129
 
      }
1130
 
 
1131
 
      gearman_con_close(con);
1132
 
      return 0;
1133
 
    }
1134
 
 
1135
 
    break;
1136
 
  }
1137
 
 
1138
 
  *ret_ptr= GEARMAN_SUCCESS;
1139
 
  return read_size;
1140
 
}