~ubuntu-branches/ubuntu/precise/mysql-5.1/precise

« back to all changes in this revision

Viewing changes to storage/ndb/src/common/util/socket_io.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Norbert Tretkowski
  • Date: 2010-03-17 14:56:02 UTC
  • Revision ID: james.westby@ubuntu.com-20100317145602-x7e30l1b2sb5s6w6
Tags: upstream-5.1.45
ImportĀ upstreamĀ versionĀ 5.1.45

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (C) 2003 MySQL AB
 
2
 
 
3
   This program is free software; you can redistribute it and/or modify
 
4
   it under the terms of the GNU General Public License as published by
 
5
   the Free Software Foundation; version 2 of the License.
 
6
 
 
7
   This program is distributed in the hope that it will be useful,
 
8
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
9
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
10
   GNU General Public License for more details.
 
11
 
 
12
   You should have received a copy of the GNU General Public License
 
13
   along with this program; if not, write to the Free Software
 
14
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
15
 
 
16
#include <ndb_global.h>
 
17
 
 
18
#include <NdbTCP.h>
 
19
#include <socket_io.h>
 
20
#include <NdbOut.hpp>
 
21
#include <NdbTick.h>
 
22
 
 
23
extern "C"
 
24
int
 
25
read_socket(NDB_SOCKET_TYPE socket, int timeout_millis, 
 
26
            char * buf, int buflen){
 
27
  if(buflen < 1)
 
28
    return 0;
 
29
  
 
30
  fd_set readset;
 
31
  FD_ZERO(&readset);
 
32
  FD_SET(socket, &readset);
 
33
  
 
34
  struct timeval timeout;
 
35
  timeout.tv_sec  = (timeout_millis / 1000);
 
36
  timeout.tv_usec = (timeout_millis % 1000) * 1000;
 
37
 
 
38
  const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
 
39
  if(selectRes == 0)
 
40
    return 0;
 
41
  
 
42
  if(selectRes == -1){
 
43
    return -1;
 
44
  }
 
45
 
 
46
  return recv(socket, &buf[0], buflen, 0);
 
47
}
 
48
 
 
49
extern "C"
 
50
int
 
51
readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
 
52
              char * buf, int buflen, NdbMutex *mutex){
 
53
  if(buflen <= 1)
 
54
    return 0;
 
55
 
 
56
  fd_set readset;
 
57
  FD_ZERO(&readset);
 
58
  FD_SET(socket, &readset);
 
59
 
 
60
  struct timeval timeout;
 
61
  timeout.tv_sec  = (timeout_millis / 1000);
 
62
  timeout.tv_usec = (timeout_millis % 1000) * 1000;
 
63
 
 
64
  if(mutex)
 
65
    NdbMutex_Unlock(mutex);
 
66
  Uint64 tick= NdbTick_CurrentMillisecond();
 
67
  const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
 
68
 
 
69
  *time= NdbTick_CurrentMillisecond() - tick;
 
70
  if(mutex)
 
71
    NdbMutex_Lock(mutex);
 
72
 
 
73
  if(selectRes == 0){
 
74
    return 0;
 
75
  }
 
76
 
 
77
  if(selectRes == -1){
 
78
    return -1;
 
79
  }
 
80
 
 
81
  char* ptr = buf;
 
82
  int len = buflen;
 
83
  do
 
84
  {
 
85
    int t;
 
86
    while((t = recv(socket, ptr, len, MSG_PEEK)) == -1 && errno == EINTR);
 
87
    
 
88
    if(t < 1)
 
89
    {
 
90
      return -1;
 
91
    }
 
92
 
 
93
    
 
94
    for(int i = 0; i<t; i++)
 
95
    {
 
96
      if(ptr[i] == '\n')
 
97
      {
 
98
        /**
 
99
         * Now consume
 
100
         */
 
101
        for (len = 1 + i; len; )
 
102
        {
 
103
          while ((t = recv(socket, ptr, len, 0)) == -1 && errno == EINTR);
 
104
          if (t < 1)
 
105
            return -1;
 
106
          ptr += t;
 
107
          len -= t;
 
108
        }
 
109
        if (i > 0 && buf[i-1] == '\r')
 
110
        {
 
111
          buf[i-1] = '\n';
 
112
          ptr--;
 
113
        }
 
114
        ptr[0]= 0;
 
115
        return ptr - buf;
 
116
      }
 
117
    }
 
118
    
 
119
    for (int tmp = t; tmp; )
 
120
    {
 
121
      while ((t = recv(socket, ptr, tmp, 0)) == -1 && errno == EINTR);
 
122
      if (t < 1)
 
123
      {
 
124
        return -1;
 
125
      }
 
126
      ptr += t;
 
127
      len -= t;
 
128
      tmp -= t;
 
129
    }
 
130
 
 
131
    FD_ZERO(&readset);
 
132
    FD_SET(socket, &readset);
 
133
    timeout.tv_sec  = ((timeout_millis - *time) / 1000);
 
134
    timeout.tv_usec = ((timeout_millis - *time) % 1000) * 1000;
 
135
 
 
136
    tick= NdbTick_CurrentMillisecond();
 
137
    const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
 
138
    *time= NdbTick_CurrentMillisecond() - tick;
 
139
 
 
140
    if(selectRes != 1){
 
141
      return -1;
 
142
    }
 
143
  } while (len > 0);
 
144
  
 
145
  return -1;
 
146
}
 
147
 
 
148
extern "C"
 
149
int
 
150
write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
 
151
             const char buf[], int len){
 
152
  fd_set writeset;
 
153
  FD_ZERO(&writeset);
 
154
  FD_SET(socket, &writeset);
 
155
  struct timeval timeout;
 
156
  timeout.tv_sec  = (timeout_millis / 1000);
 
157
  timeout.tv_usec = (timeout_millis % 1000) * 1000;
 
158
 
 
159
 
 
160
  Uint64 tick= NdbTick_CurrentMillisecond();
 
161
  const int selectRes = select(socket + 1, 0, &writeset, 0, &timeout);
 
162
  *time= NdbTick_CurrentMillisecond() - tick;
 
163
 
 
164
  if(selectRes != 1){
 
165
    return -1;
 
166
  }
 
167
 
 
168
  const char * tmp = &buf[0];
 
169
  while(len > 0){
 
170
    const int w = send(socket, tmp, len, 0);
 
171
    if(w == -1){
 
172
      return -1;
 
173
    }
 
174
    len -= w;
 
175
    tmp += w;
 
176
    
 
177
    if(len == 0)
 
178
      break;
 
179
    
 
180
    FD_ZERO(&writeset);
 
181
    FD_SET(socket, &writeset);
 
182
    timeout.tv_sec  = ((timeout_millis - *time) / 1000);
 
183
    timeout.tv_usec = ((timeout_millis - *time) % 1000) * 1000;
 
184
 
 
185
    Uint64 tick= NdbTick_CurrentMillisecond();
 
186
    const int selectRes2 = select(socket + 1, 0, &writeset, 0, &timeout);
 
187
    *time= NdbTick_CurrentMillisecond() - tick;
 
188
 
 
189
    if(selectRes2 != 1){
 
190
      return -1;
 
191
    }
 
192
  }
 
193
  
 
194
  return 0;
 
195
}
 
196
 
 
197
extern "C"
 
198
int
 
199
print_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
 
200
             const char * fmt, ...){
 
201
  va_list ap;
 
202
  va_start(ap, fmt);
 
203
  int ret = vprint_socket(socket, timeout_millis, time, fmt, ap);
 
204
  va_end(ap);
 
205
 
 
206
  return ret;
 
207
}
 
208
 
 
209
extern "C"
 
210
int
 
211
println_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
 
212
               const char * fmt, ...){
 
213
  va_list ap;
 
214
  va_start(ap, fmt);
 
215
  int ret = vprintln_socket(socket, timeout_millis, time, fmt, ap);
 
216
  va_end(ap);
 
217
  return ret;
 
218
}
 
219
 
 
220
extern "C"
 
221
int
 
222
vprint_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
 
223
              const char * fmt, va_list ap){
 
224
  char buf[1000];
 
225
  char *buf2 = buf;
 
226
  size_t size;
 
227
 
 
228
  if (fmt != 0 && fmt[0] != 0) {
 
229
    size = BaseString::vsnprintf(buf, sizeof(buf), fmt, ap);
 
230
    /* Check if the output was truncated */
 
231
    if(size > sizeof(buf)) {
 
232
      buf2 = (char *)malloc(size);
 
233
      if(buf2 == NULL)
 
234
        return -1;
 
235
      BaseString::vsnprintf(buf2, size, fmt, ap);
 
236
    }
 
237
  } else
 
238
    return 0;
 
239
 
 
240
  int ret = write_socket(socket, timeout_millis, time, buf2, size);
 
241
  if(buf2 != buf)
 
242
    free(buf2);
 
243
  return ret;
 
244
}
 
245
 
 
246
extern "C"
 
247
int
 
248
vprintln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
 
249
                const char * fmt, va_list ap){
 
250
  char buf[1000];
 
251
  char *buf2 = buf;
 
252
  size_t size;
 
253
 
 
254
  if (fmt != 0 && fmt[0] != 0) {
 
255
    size = BaseString::vsnprintf(buf, sizeof(buf), fmt, ap)+1;// extra byte for '/n'
 
256
    /* Check if the output was truncated */
 
257
    if(size > sizeof(buf)) {
 
258
      buf2 = (char *)malloc(size);
 
259
      if(buf2 == NULL)
 
260
        return -1;
 
261
      BaseString::vsnprintf(buf2, size, fmt, ap);
 
262
    }
 
263
  } else {
 
264
    size = 1;
 
265
  }
 
266
  buf2[size-1]='\n';
 
267
 
 
268
  int ret = write_socket(socket, timeout_millis, time, buf2, size);
 
269
  if(buf2 != buf)
 
270
    free(buf2);
 
271
  return ret;
 
272
}
 
273
 
 
274
#ifdef NDB_WIN32
 
275
 
 
276
class INIT_WINSOCK2
 
277
{
 
278
public:
 
279
    INIT_WINSOCK2(void);
 
280
    ~INIT_WINSOCK2(void);
 
281
 
 
282
private:
 
283
    bool m_bAcceptable;
 
284
};
 
285
 
 
286
INIT_WINSOCK2 g_init_winsock2;
 
287
 
 
288
INIT_WINSOCK2::INIT_WINSOCK2(void)
 
289
: m_bAcceptable(false)
 
290
{
 
291
    WORD wVersionRequested;
 
292
    WSADATA wsaData;
 
293
    int err;
 
294
    
 
295
    wVersionRequested = MAKEWORD( 2, 2 );
 
296
    
 
297
    err = WSAStartup( wVersionRequested, &wsaData );
 
298
    if ( err != 0 ) {
 
299
        /* Tell the user that we could not find a usable */
 
300
        /* WinSock DLL.                                  */
 
301
        m_bAcceptable = false;
 
302
    }
 
303
    
 
304
    /* Confirm that the WinSock DLL supports 2.2.*/
 
305
    /* Note that if the DLL supports versions greater    */
 
306
    /* than 2.2 in addition to 2.2, it will still return */
 
307
    /* 2.2 in wVersion since that is the version we      */
 
308
    /* requested.                                        */
 
309
    
 
310
    if ( LOBYTE( wsaData.wVersion ) != 2 ||
 
311
        HIBYTE( wsaData.wVersion ) != 2 ) {
 
312
        /* Tell the user that we could not find a usable */
 
313
        /* WinSock DLL.                                  */
 
314
        WSACleanup( );
 
315
        m_bAcceptable = false; 
 
316
    }
 
317
    
 
318
    /* The WinSock DLL is acceptable. Proceed. */
 
319
    m_bAcceptable = true;
 
320
}
 
321
 
 
322
INIT_WINSOCK2::~INIT_WINSOCK2(void)
 
323
{
 
324
    if(m_bAcceptable)
 
325
    {
 
326
        m_bAcceptable = false;
 
327
        WSACleanup();
 
328
    }
 
329
}
 
330
 
 
331
#endif
 
332