~mingqiangzhuang/libmemcached/icc-memslap

« back to all changes in this revision

Viewing changes to clients/ms_conn.c

  • Committer: Brian Aker
  • Date: 2010-01-19 04:59:05 UTC
  • mfrom: (753.1.1 libmemcached)
  • Revision ID: brian@gaz-20100119045905-e6vjp6lcp0b7bymi
Merge memslap

Show diffs side-by-side

added added

removed removed

Lines of Context:
292
292
  /* for replication, each connection need connect all the server */
293
293
  if (ms_setting.rep_write_srv > 0)
294
294
  {
295
 
    c->total_sfds= ms_setting.srv_cnt;
 
295
    c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
296
296
  }
297
297
  else
298
298
  {
361
361
  {
362
362
    c->protocol= binary_prot;
363
363
  }
364
 
  else if (is_udp)
365
 
  {
366
 
    c->protocol= ascii_udp_prot;
367
 
  }
368
364
  else
369
365
  {
370
366
    c->protocol= ascii_prot;
490
486
    if (ms_setting.rep_write_srv > 0)
491
487
    {
492
488
      /* for replication, each connection need connect all the server */
493
 
      srv_idx= i;
 
489
      srv_idx= i % ms_setting.srv_cnt;
494
490
    }
495
491
    else
496
492
    {
571
567
static int ms_conn_event_init(ms_conn_t *c)
572
568
{
573
569
  ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
574
 
  /* default event timeout 10 seconds */
575
 
  struct timeval t=
576
 
  {
577
 
    .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
578
 
  };
579
570
  short event_flags= EV_WRITE | EV_PERSIST;
580
571
 
581
572
  event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
582
573
  event_base_set(ms_thread->base, &c->event);
583
574
  c->ev_flags= event_flags;
584
575
 
585
 
  if (c->total_sfds == 1)
586
 
  {
587
 
    if (event_add(&c->event, NULL) == -1)
588
 
    {
589
 
      return -1;
590
 
    }
591
 
  }
592
 
  else
593
 
  {
594
 
    if (event_add(&c->event, &t) == -1)
595
 
    {
596
 
      return -1;
597
 
    }
 
576
  if (event_add(&c->event, NULL) == -1)
 
577
  {
 
578
    return -1;
598
579
  }
599
580
 
600
581
  return 0;
918
899
 
919
900
  if (ms_setting.rep_write_srv > 0)
920
901
  {
921
 
    srv_idx= c->cur_idx;
922
 
    srv_conn_cnt= (int)ms_setting.nconns;
 
902
    srv_idx= c->cur_idx % ms_setting.srv_cnt;
 
903
    srv_conn_cnt= (int)(ms_setting.sock_per_conn  * ms_setting.nconns);
923
904
  }
924
905
  else
925
906
  {
982
963
        break;
983
964
      }
984
965
 
985
 
      if (c->total_sfds == 1)
 
966
      if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
986
967
      {
987
968
        /* wait a second and reconnect */
988
969
        sleep(1);
989
970
      }
990
971
    }
991
 
    while (c->total_sfds == 1);
 
972
    while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
992
973
  }
993
974
 
994
975
  if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
1046
1027
 
1047
1028
      if (ms_setting.rep_write_srv > 0)
1048
1029
      {
1049
 
        srv_idx= i;
1050
 
        srv_conn_cnt= (int)ms_setting.nconns;
 
1030
        srv_idx= i % ms_setting.srv_cnt;
 
1031
        srv_conn_cnt= (int)(ms_setting.sock_per_conn * ms_setting.nconns);
1051
1032
      }
1052
1033
      else
1053
1034
      {
1307
1288
  c->ctnwrite= false;
1308
1289
  c->rbytes= 0;
1309
1290
  c->rcurr= c->rbuf;
 
1291
  c->msgcurr = 0;
 
1292
  c->msgused = 0;
 
1293
  c->iovused = 0;
1310
1294
  ms_conn_set_state(c, conn_write);
1311
1295
  memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t));    /* replicate command state */
1312
1296
 
1864
1848
{
1865
1849
  assert(c != NULL);
1866
1850
  assert(c->rbytes >= c->rvbytes);
1867
 
  assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
 
1851
  assert(c->protocol == ascii_prot);
1868
1852
  if (c->rvbytes > 2)
1869
1853
  {
1870
1854
    assert(
2004
1988
{
2005
1989
  assert(c != NULL);
2006
1990
  assert(c->rbytes >= c->rvbytes);
2007
 
  assert(c->protocol == ascii_udp_prot
2008
 
         || c->protocol == ascii_prot
 
1991
  assert(c->protocol == ascii_prot
2009
1992
         || c->protocol == binary_prot);
2010
1993
 
2011
1994
  if (c->protocol == binary_prot)
2419
2402
 */
2420
2403
static bool ms_update_event(ms_conn_t *c, const int new_flags)
2421
2404
{
2422
 
  /* default event timeout 10 seconds */
2423
 
  struct timeval t=
2424
 
  {
2425
 
    .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
2426
 
  };
2427
 
 
2428
2405
  assert(c != NULL);
2429
2406
 
2430
2407
  struct event_base *base= c->event.ev_base;
2451
2428
  event_base_set(base, &c->event);
2452
2429
  c->ev_flags= (short)new_flags;
2453
2430
 
2454
 
  if (c->total_sfds == 1)
2455
 
  {
2456
 
    if (event_add(&c->event, NULL) == -1)
2457
 
    {
2458
 
      return false;
2459
 
    }
2460
 
  }
2461
 
  else
2462
 
  {
2463
 
    if (event_add(&c->event, &t) == -1)
2464
 
    {
2465
 
      return false;
2466
 
    }
 
2431
  if (event_add(&c->event, NULL) == -1)
 
2432
  {
 
2433
    return false;
2467
2434
  }
2468
2435
 
2469
2436
  return true;
2740
2707
  }
2741
2708
  assert(fd == c->sfd);
2742
2709
 
2743
 
  /* event timeout, close the current connection */
2744
 
  if (c->which == EV_TIMEOUT)
2745
 
  {
2746
 
    ms_conn_set_state(c, conn_closing);
2747
 
  }
2748
 
 
2749
2710
  ms_drive_machine(c);
2750
2711
 
2751
2712
  /* wait for next event */
3048
3009
 * @param c, pointer of the concurrency
3049
3010
 * @param item, pointer of task item which includes the object
3050
3011
 *            information
3051
 
 * @param verify, whether do verification
3052
3012
 *
3053
3013
 * @return int, if success, return 0, else return -1
3054
3014
 */
3055
 
int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
 
3015
int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
3056
3016
{
3057
 
  /* verify not supported yet */
3058
 
  UNUSED_ARGUMENT(verify);
3059
 
 
3060
3017
  assert(c != NULL);
3061
3018
 
3062
3019
  c->currcmd.cmd= CMD_GET;