~ubuntu-branches/ubuntu/trusty/mysql-5.6/trusty

« back to all changes in this revision

Viewing changes to storage/ndb/src/ndbapi/TransporterFacade.hpp

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-02-12 11:54:27 UTC
  • Revision ID: package-import@ubuntu.com-20140212115427-oq6tfsqxl1wuwehi
Tags: upstream-5.6.15
ImportĀ upstreamĀ versionĀ 5.6.15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
   Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved.
 
3
 
 
4
   This program is free software; you can redistribute it and/or modify
 
5
   it under the terms of the GNU General Public License as published by
 
6
   the Free Software Foundation; version 2 of the License.
 
7
 
 
8
   This program is distributed in the hope that it will be useful,
 
9
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
   GNU General Public License for more details.
 
12
 
 
13
   You should have received a copy of the GNU General Public License
 
14
   along with this program; if not, write to the Free Software
 
15
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
 
16
*/
 
17
 
 
18
#ifndef TransporterFacade_H
 
19
#define TransporterFacade_H
 
20
 
 
21
#include <kernel_types.h>
 
22
#include <ndb_limits.h>
 
23
#include <NdbThread.h>
 
24
#include <TransporterRegistry.hpp>
 
25
#include <NdbMutex.h>
 
26
#include "DictCache.hpp"
 
27
#include <BlockNumbers.h>
 
28
#include <mgmapi.h>
 
29
 
 
30
class ClusterMgr;
 
31
class ArbitMgr;
 
32
struct ndb_mgm_configuration;
 
33
 
 
34
class Ndb;
 
35
class NdbApiSignal;
 
36
class NdbWaiter;
 
37
class trp_client;
 
38
 
 
39
extern "C" {
 
40
  void* runSendRequest_C(void*);
 
41
  void* runReceiveResponse_C(void*);
 
42
}
 
43
 
 
44
class TransporterFacade : public TransporterCallback
 
45
{
 
46
public:
 
47
  /**
 
48
   * Max number of Ndb objects.  
 
49
   * (Ndb objects should not be shared by different threads.)
 
50
   */
 
51
  STATIC_CONST( MAX_NO_THREADS = 4711 );
 
52
  TransporterFacade(GlobalDictCache *cache);
 
53
  virtual ~TransporterFacade();
 
54
 
 
55
  int start_instance(NodeId, const ndb_mgm_configuration*);
 
56
  void stop_instance();
 
57
 
 
58
  /*
 
59
    (Re)configure the TransporterFacade
 
60
    to a specific configuration
 
61
  */
 
62
  bool configure(NodeId, const ndb_mgm_configuration *);
 
63
 
 
64
  /**
 
65
   * Register this block for sending/receiving signals
 
66
   * @blockNo block number to use, -1 => any blockNumber
 
67
   * @return BlockNumber or -1 for failure
 
68
   */
 
69
  Uint32 open_clnt(trp_client*, int blockNo = -1);
 
70
  int close_clnt(trp_client*);
 
71
 
 
72
  Uint32 get_active_ndb_objects() const;
 
73
 
 
74
  // Only sends to nodes which are alive
 
75
private:
 
76
  int sendSignal(const NdbApiSignal * signal, NodeId nodeId);
 
77
  int sendSignal(const NdbApiSignal*, NodeId,
 
78
                 const LinearSectionPtr ptr[3], Uint32 secs);
 
79
  int sendSignal(const NdbApiSignal*, NodeId,
 
80
                 const GenericSectionPtr ptr[3], Uint32 secs);
 
81
  int sendFragmentedSignal(const NdbApiSignal*, NodeId,
 
82
                           const LinearSectionPtr ptr[3], Uint32 secs);
 
83
  int sendFragmentedSignal(const NdbApiSignal*, NodeId,
 
84
                           const GenericSectionPtr ptr[3], Uint32 secs);
 
85
public:
 
86
 
 
87
  /**
 
88
   * These are functions used by ndb_mgmd
 
89
   */
 
90
  void ext_set_max_api_reg_req_interval(Uint32 ms);
 
91
  void ext_update_connections();
 
92
  struct in_addr ext_get_connect_address(Uint32 nodeId);
 
93
  void ext_forceHB();
 
94
  bool ext_isConnected(NodeId aNodeId);
 
95
  void ext_doConnect(int aNodeId);
 
96
 
 
97
  // Is node available for running transactions
 
98
private:
 
99
  bool   get_node_alive(NodeId nodeId) const;
 
100
  bool   getIsNodeSendable(NodeId nodeId) const;
 
101
 
 
102
public:
 
103
  Uint32 getMinDbNodeVersion() const;
 
104
 
 
105
  // My own processor id
 
106
  NodeId ownId() const;
 
107
 
 
108
  void connected();
 
109
 
 
110
  void doConnect(int NodeId);
 
111
  void reportConnected(int NodeId);
 
112
  void doDisconnect(int NodeId);
 
113
  void reportDisconnected(int NodeId);
 
114
 
 
115
  NodeId get_an_alive_node();
 
116
  void trp_node_status(NodeId, Uint32 event);
 
117
 
 
118
  /**
 
119
   * Send signal to each registered object
 
120
   */
 
121
  void for_each(trp_client* clnt,
 
122
                const NdbApiSignal* aSignal, const LinearSectionPtr ptr[3]);
 
123
  
 
124
  void lock_mutex();
 
125
  void unlock_mutex();
 
126
 
 
127
  // Improving the API performance
 
128
  void forceSend(Uint32 block_number);
 
129
  void checkForceSend(Uint32 block_number);
 
130
 
 
131
  TransporterRegistry* get_registry() { return theTransporterRegistry;};
 
132
 
 
133
/*
 
134
  When a thread has sent its signals and is ready to wait for reception
 
135
  of these it does normally always wait on a conditional mutex and
 
136
  the actual reception is handled by the receiver thread in the NDB API.
 
137
  With the below new methods and variables each thread has the possibility
 
138
  of becoming owner of the "right" to poll for signals. Effectually this
 
139
  means that the thread acts temporarily as a receiver thread.
 
140
  For the thread that succeeds in grabbing this "ownership" it will avoid
 
141
  a number of expensive calls to conditional mutex and even more expensive
 
142
  context switches to wake up.
 
143
  When an owner of the poll "right" has completed its own task it is likely
 
144
  that there are others still waiting. In this case we pick one of the
 
145
  threads as new owner of the poll "right". Since we want to switch owner
 
146
  as seldom as possible we always pick the last thread which is likely to
 
147
  be the last to complete its reception.
 
148
*/
 
149
  void start_poll(trp_client*);
 
150
  void do_poll(trp_client* clnt, Uint32 wait_time);
 
151
  void complete_poll(trp_client*);
 
152
  void wakeup(trp_client*);
 
153
 
 
154
  void external_poll(Uint32 wait_time);
 
155
 
 
156
  trp_client* get_poll_owner(bool) const { return m_poll_owner;}
 
157
  trp_client* remove_last_from_poll_queue();
 
158
  void add_to_poll_queue(trp_client* clnt);
 
159
  void remove_from_poll_queue(trp_client* clnt);
 
160
 
 
161
  trp_client * m_poll_owner;
 
162
  trp_client * m_poll_queue_head; // First in queue
 
163
  trp_client * m_poll_queue_tail; // Last in queue
 
164
  /* End poll owner stuff */
 
165
 
 
166
  // heart beat received from a node (e.g. a signal came)
 
167
  void hb_received(NodeId n);
 
168
  void set_auto_reconnect(int val);
 
169
  int get_auto_reconnect() const;
 
170
 
 
171
  /* TransporterCallback interface. */
 
172
  void deliver_signal(SignalHeader * const header,
 
173
                      Uint8 prio,
 
174
                      Uint32 * const signalData,
 
175
                      LinearSectionPtr ptr[3]);
 
176
  int checkJobBuffer();
 
177
  void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
 
178
  void reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes);
 
179
  void reportConnect(NodeId nodeId);
 
180
  void reportDisconnect(NodeId nodeId, Uint32 errNo);
 
181
  void reportError(NodeId nodeId, TransporterError errorCode,
 
182
                   const char *info = 0);
 
183
  void transporter_recv_from(NodeId node);
 
184
  Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
 
185
  {
 
186
    return theTransporterRegistry->get_bytes_to_send_iovec(node, dst, max);
 
187
  }
 
188
  Uint32 bytes_sent(NodeId node, Uint32 bytes)
 
189
  {
 
190
    return theTransporterRegistry->bytes_sent(node, bytes);
 
191
  }
 
192
  bool has_data_to_send(NodeId node)
 
193
  {
 
194
    return theTransporterRegistry->has_data_to_send(node);
 
195
  }
 
196
  void reset_send_buffer(NodeId node, bool should_be_empty)
 
197
  {
 
198
    theTransporterRegistry->reset_send_buffer(node, should_be_empty);
 
199
  }
 
200
 
 
201
private:
 
202
 
 
203
  friend class trp_client;
 
204
  friend class ClusterMgr;
 
205
  friend class ArbitMgr;
 
206
  friend class Ndb_cluster_connection;
 
207
  friend class Ndb_cluster_connection_impl;
 
208
 
 
209
  bool isConnected(NodeId aNodeId);
 
210
  void doStop();
 
211
 
 
212
  TransporterRegistry* theTransporterRegistry;
 
213
  SocketServer m_socket_server;
 
214
  int sendPerformedLastInterval;
 
215
  NodeId theOwnId;
 
216
  NodeId theStartNodeId;
 
217
 
 
218
  ClusterMgr* theClusterMgr;
 
219
  
 
220
  // Improving the API response time
 
221
  int checkCounter;
 
222
  Uint32 currentSendLimit;
 
223
  
 
224
  void calculateSendLimit();
 
225
 
 
226
  // Declarations for the receive and send thread
 
227
  int  theStopReceive;
 
228
 
 
229
  void threadMainSend(void);
 
230
  NdbThread* theSendThread;
 
231
  void threadMainReceive(void);
 
232
  NdbThread* theReceiveThread;
 
233
 
 
234
  friend void* runSendRequest_C(void*);
 
235
  friend void* runReceiveResponse_C(void*);
 
236
 
 
237
  bool do_connect_mgm(NodeId, const ndb_mgm_configuration*);
 
238
 
 
239
  /**
 
240
   * Block number handling
 
241
   */
 
242
private:
 
243
 
 
244
  struct ThreadData {
 
245
    STATIC_CONST( ACTIVE = (1 << 16) | 1 );
 
246
    STATIC_CONST( INACTIVE = (1 << 16) );
 
247
    STATIC_CONST( END_OF_LIST = MAX_NO_THREADS + 1 );
 
248
    
 
249
    ThreadData(Uint32 initialSize = 32);
 
250
    
 
251
    Uint32 m_use_cnt;
 
252
    Uint32 m_firstFree;
 
253
    Vector<Uint32> m_statusNext;
 
254
    Vector<trp_client*> m_objectExecute;
 
255
    
 
256
    int open(trp_client*);
 
257
    int close(int number);
 
258
    void expand(Uint32 size);
 
259
 
 
260
    inline trp_client* get(Uint16 blockNo) const {
 
261
      blockNo -= MIN_API_BLOCK_NO;
 
262
      if(likely (blockNo < m_objectExecute.size()))
 
263
      {
 
264
        return m_objectExecute.getBase()[blockNo];
 
265
      }
 
266
      return 0;
 
267
    }
 
268
  } m_threads;
 
269
 
 
270
  Uint32 m_fixed2dynamic[NO_API_FIXED_BLOCKS];
 
271
  Uint32 m_fragmented_signal_id;
 
272
 
 
273
public:
 
274
  NdbMutex* theMutexPtr;
 
275
 
 
276
public:
 
277
  GlobalDictCache *m_globalDictCache;
 
278
};
 
279
 
 
280
inline
 
281
void 
 
282
TransporterFacade::lock_mutex()
 
283
{
 
284
  NdbMutex_Lock(theMutexPtr);
 
285
}
 
286
 
 
287
inline
 
288
void 
 
289
TransporterFacade::unlock_mutex()
 
290
{
 
291
  NdbMutex_Unlock(theMutexPtr);
 
292
}
 
293
 
 
294
#include "ClusterMgr.hpp"
 
295
#include "ndb_cluster_connection_impl.hpp"
 
296
 
 
297
inline
 
298
unsigned Ndb_cluster_connection_impl::get_connect_count() const
 
299
{
 
300
  if (m_transporter_facade->theClusterMgr)
 
301
    return m_transporter_facade->theClusterMgr->m_connect_count;
 
302
  return 0;
 
303
}
 
304
 
 
305
inline
 
306
unsigned Ndb_cluster_connection_impl::get_min_db_version() const
 
307
{
 
308
  return m_transporter_facade->getMinDbNodeVersion();
 
309
}
 
310
 
 
311
inline
 
312
bool
 
313
TransporterFacade::get_node_alive(NodeId n) const {
 
314
  if (theClusterMgr)
 
315
  {
 
316
    return theClusterMgr->getNodeInfo(n).m_alive;
 
317
  }
 
318
  return 0;
 
319
}
 
320
 
 
321
inline
 
322
void
 
323
TransporterFacade::hb_received(NodeId n) {
 
324
  theClusterMgr->hb_received(n);
 
325
}
 
326
 
 
327
inline
 
328
Uint32
 
329
TransporterFacade::getMinDbNodeVersion() const
 
330
{
 
331
  if (theClusterMgr)
 
332
    return theClusterMgr->minDbVersion;
 
333
  else
 
334
    return 0;
 
335
}
 
336
 
 
337
inline
 
338
const trp_node &
 
339
trp_client::getNodeInfo(Uint32 nodeId) const
 
340
{
 
341
  return m_facade->theClusterMgr->getNodeInfo(nodeId);
 
342
}
 
343
 
 
344
/** 
 
345
 * LinearSectionIterator
 
346
 *
 
347
 * This is an implementation of GenericSectionIterator 
 
348
 * that iterates over one linear section of memory.
 
349
 * The iterator is used by the transporter at signal
 
350
 * send time to obtain all of the relevant words for the
 
351
 * signal section
 
352
 */
 
353
class LinearSectionIterator: public GenericSectionIterator
 
354
{
 
355
private :
 
356
  const Uint32* data;
 
357
  Uint32 len;
 
358
  bool read;
 
359
public :
 
360
  LinearSectionIterator(const Uint32* _data, Uint32 _len)
 
361
  {
 
362
    data= (_len == 0)? NULL:_data;
 
363
    len= _len;
 
364
    read= false;
 
365
  }
 
366
 
 
367
  ~LinearSectionIterator()
 
368
  {};
 
369
  
 
370
  void reset()
 
371
  {
 
372
    /* Reset iterator */
 
373
    read= false;
 
374
  }
 
375
 
 
376
  const Uint32* getNextWords(Uint32& sz)
 
377
  {
 
378
    if (likely(!read))
 
379
    {
 
380
      read= true;
 
381
      sz= len;
 
382
      return data;
 
383
    }
 
384
    sz= 0;
 
385
    return NULL;
 
386
  }
 
387
};
 
388
 
 
389
 
 
390
/** 
 
391
 * SignalSectionIterator
 
392
 *
 
393
 * This is an implementation of GenericSectionIterator 
 
394
 * that uses chained NdbApiSignal objects to store a 
 
395
 * signal section.
 
396
 * The iterator is used by the transporter at signal
 
397
 * send time to obtain all of the relevant words for the
 
398
 * signal section
 
399
 */
 
400
class SignalSectionIterator: public GenericSectionIterator
 
401
{
 
402
private :
 
403
  NdbApiSignal* firstSignal;
 
404
  NdbApiSignal* currentSignal;
 
405
public :
 
406
  SignalSectionIterator(NdbApiSignal* signal)
 
407
  {
 
408
    firstSignal= currentSignal= signal;
 
409
  }
 
410
 
 
411
  ~SignalSectionIterator()
 
412
  {};
 
413
  
 
414
  void reset()
 
415
  {
 
416
    /* Reset iterator */
 
417
    currentSignal= firstSignal;
 
418
  }
 
419
 
 
420
  const Uint32* getNextWords(Uint32& sz);
 
421
};
 
422
 
 
423
/*
 
424
 * GenericSectionIteratorReader
 
425
 * Helper class to simplify reading data from 
 
426
 * GenericSectionIterator implementations
 
427
 */
 
428
 
 
429
class GSIReader
 
430
{
 
431
private :
 
432
  GenericSectionIterator* gsi;
 
433
  const Uint32* chunkPtr;
 
434
  Uint32 chunkRemain;
 
435
public :
 
436
  GSIReader(GenericSectionIterator* _gsi)
 
437
  {
 
438
    gsi = _gsi;
 
439
    chunkPtr = NULL;
 
440
    chunkRemain = 0;
 
441
  }
 
442
 
 
443
  void copyNWords(Uint32* dest, Uint32 n)
 
444
  {
 
445
    while (n)
 
446
    {
 
447
      if (chunkRemain == 0)
 
448
      {
 
449
        /* Get next contiguous stretch of words from
 
450
         * the iterator
 
451
         */
 
452
        chunkPtr = gsi->getNextWords(chunkRemain);
 
453
        if (!chunkRemain)
 
454
          abort(); // Must have the words the caller asks for
 
455
      }
 
456
      else
 
457
      {
 
458
        /* Have some words from the iterator, copy some/
 
459
         * all of them
 
460
         */
 
461
        Uint32 wordsToCopy = MIN(chunkRemain, n);
 
462
        memcpy(dest, chunkPtr, wordsToCopy << 2);
 
463
        chunkPtr += wordsToCopy;
 
464
        chunkRemain -= wordsToCopy;
 
465
 
 
466
        dest += wordsToCopy;
 
467
        n -= wordsToCopy;
 
468
      }
 
469
    }
 
470
  }
 
471
};
 
472
 
 
473
  
 
474
 
 
475
 
 
476
#endif // TransporterFacade_H