/* * File: ms_conn.h * Author: Mingqiang Zhuang * * Created on February 10, 2009 * * (c) Copyright 2009, Schooner Information Technology, Inc. * http://www.schoonerinfotech.com/ * */ #ifndef MS_CONN_H #define MS_CONN_H #include #include #include #include #include "ms_task.h" #include #ifdef __cplusplus extern "C" { #endif #define DATA_BUFFER_SIZE (1024 * 1024 + 2048) /* read buffer, 1M + 2k, enough for the max value(1M) */ #define WRITE_BUFFER_SIZE (32 * 1024) /* write buffer, 32k */ #define UDP_DATA_BUFFER_SIZE (1 * 1024 * 1024) /* read buffer for UDP, 1M */ #define UDP_MAX_PAYLOAD_SIZE 1400 /* server limit UDP payload size */ #define UDP_MAX_SEND_PAYLOAD_SIZE 1400 /* mtu size is 1500 */ #define UDP_HEADER_SIZE 8 /* UDP header size */ #define MAX_SENDBUF_SIZE (256 * 1024 * 1024) /* Maximum socket buffer size */ #define SOCK_WAIT_TIMEOUT 30 /* maximum waiting time of UDP, 30s */ #define MAX_UDP_PACKET (1 << 16) /* maximum UDP packets, 65536 */ /* Initial size of the sendmsg() scatter/gather array. */ #define IOV_LIST_INITIAL 400 /* Initial number of sendmsg() argument structures to allocate. */ #define MSG_LIST_INITIAL 10 /* High water marks for buffer shrinking */ #define READ_BUFFER_HIGHWAT (2 * DATA_BUFFER_SIZE) #define UDP_DATA_BUFFER_HIGHWAT (4 * UDP_DATA_BUFFER_SIZE) #define IOV_LIST_HIGHWAT 600 #define MSG_LIST_HIGHWAT 100 /* parse udp header */ #define HEADER_TO_REQID(ptr) ((uint16_t)*ptr * 256 \ + (uint16_t)*(ptr + 1)) #define HEADER_TO_SEQNUM(ptr) ((uint16_t)*(ptr \ + 2) * 256 \ + (uint16_t)*(ptr + 3)) #define HEADER_TO_PACKETS(ptr) ((uint16_t)*(ptr \ + 4) * 256 \ + (uint16_t)*(ptr + 5)) /* states of connection */ enum conn_states { conn_read, /* reading in a command line */ conn_write, /* writing out a simple response */ conn_closing, /* closing this connection */ }; /* returned states of memcached command */ enum mcd_ret { MCD_SUCCESS, /* command success */ MCD_FAILURE, /* command failure */ MCD_UNKNOWN_READ_FAILURE, /* unknown read failure */ MCD_PROTOCOL_ERROR, /* protocol error */ MCD_CLIENT_ERROR, /* client error, wrong command */ MCD_SERVER_ERROR, /* server error, server run command failed */ MCD_DATA_EXISTS, /* object is existent in server */ MCD_NOTSTORED, /* server doesn't set the object successfully */ MCD_STORED, /* server set the object successfully */ MCD_NOTFOUND, /* server not find the object */ MCD_END, /* end of the response of get command */ MCD_DELETED, /* server delete the object successfully */ MCD_STAT, /* response of stats command */ }; /* used to store the current or previous running command state */ typedef struct cmdstat { int cmd; /* command name */ int retstat; /* return state of this command */ bool isfinish; /* if it read all the response data */ uint64_t key_prefix; /* key prefix */ } ms_cmdstat_t; /* udp packet structure */ typedef struct udppkt { uint8_t *header; /* udp header of the packet */ char *data; /* udp data of the packet */ int rbytes; /* number of data in the packet */ int copybytes; /* number of copied data in the packet */ } ms_udppkt_t; /* three protocols supported */ enum protocol { ascii_prot = 3, /* ASCII protocol */ binary_prot, /* binary protocol */ }; /** * concurrency structure * * Each thread has a libevent to manage the events of network. * Each thread has one or more self-governed concurrencies; * each concurrency has one or more socket connections. This * concurrency structure includes all the private variables of * the concurrency. */ typedef struct conn { uint32_t conn_idx; /* connection index in the thread */ int sfd; /* current tcp sock handler of the connection structure */ int udpsfd; /* current udp sock handler of the connection structure*/ int state; /* state of the connection */ struct event event; /* event for libevent */ short ev_flags; /* event flag for libevent */ short which; /* which events were just triggered */ bool change_sfd; /* whether change sfd */ int *tcpsfd; /* TCP sock array */ uint32_t total_sfds; /* how many socks in the tcpsfd array */ uint32_t alive_sfds; /* alive socks */ uint32_t cur_idx; /* current sock index in tcpsfd array */ ms_cmdstat_t precmd; /* previous command state */ ms_cmdstat_t currcmd; /* current command state */ char *rbuf; /* buffer to read commands into */ char *rcurr; /* but if we parsed some already, this is where we stopped */ int rsize; /* total allocated size of rbuf */ int rbytes; /* how much data, starting from rcur, do we have unparsed */ bool readval; /* read value state, read known data size */ int rvbytes; /* total value size need to read */ char *wbuf; /* buffer to write commands out */ char *wcurr; /* for multi-get, where we stopped */ int wsize; /* total allocated size of wbuf */ bool ctnwrite; /* continue to write */ /* data for the mwrite state */ struct iovec *iov; int iovsize; /* number of elements allocated in iov[] */ int iovused; /* number of elements used in iov[] */ struct msghdr *msglist; int msgsize; /* number of elements allocated in msglist[] */ int msgused; /* number of elements used in msglist[] */ int msgcurr; /* element in msglist[] being transmitted now */ int msgbytes; /* number of bytes in current msg */ /* data for UDP clients */ bool udp; /* is this is a UDP "connection" */ uint32_t request_id; /* UDP request ID of current operation, if this is a UDP "connection" */ uint8_t *hdrbuf; /* udp packet headers */ int hdrsize; /* number of headers' worth of space is allocated */ struct sockaddr srv_recv_addr; /* Sent the most recent request to which server */ socklen_t srv_recv_addr_size; /* udp read buffer */ char *rudpbuf; /* buffer to read commands into for udp */ int rudpsize; /* total allocated size of rudpbuf */ int rudpbytes; /* how much data, starting from rudpbuf */ /* order udp packet */ ms_udppkt_t *udppkt; /* the offset of udp packet in rudpbuf */ int packets; /* number of total packets need to read */ int recvpkt; /* number of received packets */ int pktcurr; /* current packet in rudpbuf being ordered */ int ordcurr; /* current ordered packet */ ms_task_item_t *item_win; /* task sequence */ int win_size; /* current task window size */ uint64_t set_cursor; /* current set item index in the item window */ ms_task_t curr_task; /* current running task */ ms_mlget_task_t mlget_task; /* multi-get task */ int warmup_num; /* to run how many warm up operations*/ int remain_warmup_num; /* left how many warm up operations to run */ int64_t exec_num; /* to run how many task operations */ int64_t remain_exec_num; /* how many remained task operations to run */ /* response time statistic and time out control */ struct timeval start_time; /* start time of current operation(s) */ struct timeval end_time; /* end time of current operation(s) */ /* Binary protocol stuff */ protocol_binary_response_header binary_header; /* local temporary binary header */ enum protocol protocol; /* which protocol this connection speaks */ } ms_conn_t; /* used to generate the key prefix */ uint64_t ms_get_key_prefix(void); /** * setup a connection, each connection structure of each * thread must call this function to initialize. */ int ms_setup_conn(ms_conn_t *c); /* after one operation completes, reset the connection */ void ms_reset_conn(ms_conn_t *c, bool timeout); /** * reconnect several disconnected socks in the connection * structure, the ever-1-second timer of the thread will check * whether some socks in the connections disconnect. if * disconnect, reconnect the sock. */ int ms_reconn_socks(ms_conn_t *c); /* used to send set command to server */ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item); /* used to send the get command to server */ int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item); /* used to send the multi-get command to server */ int ms_mcd_mlget(ms_conn_t *c); #ifdef __cplusplus } #endif #endif /* end of MS_CONN_H */