~stewart/percona-playback/cassert-header

« back to all changes in this revision

Viewing changes to src/queue.h

  • Committer: Oleg Tsarev
  • Date: 2011-05-04 21:38:59 UTC
  • mfrom: (108.1.18 sbt)
  • Revision ID: oleg.tsarev@percona.com-20110504213859-pw1mgjmuj9gz44vb
merge split_by_transaction

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
{
13
13
public:
14
14
  typedef queue_item_length result_type;
15
 
  Length() : m_length(0) {}
16
 
  template<typename T>
17
 
  void begin_compound(T const &/*value*/)
18
 
  {
19
 
  }
20
 
  template<typename T>
21
 
  void end_compound(T const &/*value*/)
22
 
  {
23
 
  }
24
 
  template<typename T>
25
 
  void superclass(T const &value)
26
 
  {
27
 
    Introspection<T>::apply(*this, value);
28
 
  }
29
 
  template<typename T>
30
 
  void member(const char* /*name*/, T const &value)
31
 
  {
32
 
    Introspection<T>::apply(*this, value);
33
 
  }
34
 
  template<typename T>
35
 
  void integral(T const &value)
 
15
Length() : m_length(0) {}
 
16
  template<typename T>
 
17
    void begin_compound(T const &/*value*/)
 
18
  {
 
19
  }
 
20
  template<typename T>
 
21
    void end_compound(T const &/*value*/)
 
22
  {
 
23
  }
 
24
  template<typename T>
 
25
    void superclass(T const &value)
 
26
  {
 
27
    Introspection<T>::apply(*this, value);
 
28
  }
 
29
  template<typename T>
 
30
    void member(const char* /*name*/, T const &value)
 
31
  {
 
32
    Introspection<T>::apply(*this, value);
 
33
  }
 
34
  template<typename T>
 
35
    void integral(T const &value)
36
36
  {
37
37
    add(sizeof(value));
38
38
  }
58
58
  friend class String;
59
59
  class Writer
60
60
  {
61
 
  public:
 
61
public:
62
62
    typedef Not_Safety_Queue< T > queue;
63
63
    typedef void       result_type;
64
64
    void result() const {}
65
 
    Writer(queue &queue) : m_queue(queue)
66
 
    {
67
 
    }
68
 
    template<typename V>
69
 
    void begin_compound(V const &/*value*/)
70
 
    {
71
 
    }
72
 
    template<typename V>
73
 
    void end_compound(V const &/*value*/)
74
 
    {
75
 
    }
76
 
    template<typename V>
77
 
    void superclass(V const &value)
78
 
    {
79
 
      Introspection<V>::apply(*this, value);
80
 
    }
81
 
    template<typename V>
82
 
    void member(const char* /*name*/, V const &value)
83
 
    {
84
 
      Introspection<V>::apply(*this, value);
85
 
    }
86
 
    template<typename V>
87
 
    void integral(V const &value)
 
65
Writer(queue &queue) : m_queue(queue)
 
66
    {
 
67
    }
 
68
    template<typename V>
 
69
      void begin_compound(V const &/*value*/)
 
70
    {
 
71
    }
 
72
    template<typename V>
 
73
      void end_compound(V const &/*value*/)
 
74
    {
 
75
    }
 
76
    template<typename V>
 
77
      void superclass(V const &value)
 
78
    {
 
79
      Introspection<V>::apply(*this, value);
 
80
    }
 
81
    template<typename V>
 
82
      void member(const char* /*name*/, V const &value)
 
83
    {
 
84
      Introspection<V>::apply(*this, value);
 
85
    }
 
86
    template<typename V>
 
87
      void integral(V const &value)
88
88
    {
89
89
      m_queue.write_integral(value);
90
90
    }
92
92
    {
93
93
      m_queue.write(data, size);
94
94
    }
95
 
  private:
 
95
private:
96
96
    queue &m_queue;
97
97
  };
98
98
  friend class Writer;
99
99
private:
100
100
  class Reader
101
101
  {
102
 
  public:
 
102
public:
103
103
    typedef Not_Safety_Queue< T > queue;
104
104
    typedef void result_type;
105
105
    void result() const {}
106
 
    Reader(queue &queue) : m_queue(queue)
107
 
    {
108
 
    }
109
 
    template<typename V>
110
 
    void begin_compound(V const &/*value*/)
111
 
    {
112
 
    }
113
 
    template<typename V>
114
 
    void end_compound(V const &/*value*/)
115
 
    {
116
 
    }
117
 
    template<typename V>
118
 
    void superclass(V &value)
119
 
    {
120
 
      Introspection<V>::apply(*this, value);
121
 
    }
122
 
    template<typename V>
123
 
    void member(const char* /*name*/, V &value)
124
 
    {
125
 
      Introspection<V>::apply(*this, value);
126
 
    }
127
 
    template<typename V>
128
 
    void integral(V &value)
 
106
Reader(queue &queue) : m_queue(queue)
 
107
    {
 
108
    }
 
109
    template<typename V>
 
110
      void begin_compound(V const &/*value*/)
 
111
    {
 
112
    }
 
113
    template<typename V>
 
114
      void end_compound(V const &/*value*/)
 
115
    {
 
116
    }
 
117
    template<typename V>
 
118
      void superclass(V &value)
 
119
    {
 
120
      Introspection<V>::apply(*this, value);
 
121
    }
 
122
    template<typename V>
 
123
      void member(const char* /*name*/, V &value)
 
124
    {
 
125
      Introspection<V>::apply(*this, value);
 
126
    }
 
127
    template<typename V>
 
128
      void integral(V &value)
129
129
    {
130
130
      m_queue.read_integral(value);
131
131
    }
133
133
    {
134
134
      m_queue.read(data, size);
135
135
    }
136
 
  private:
 
136
private:
137
137
    queue &m_queue;
138
138
  };
139
139
  friend class Reader;
140
140
private:
141
141
  enum State{ eData= 0, eFree= 1, ePad= 2 };
142
142
public:
143
 
  Not_Safety_Queue(queue_item_length a_buffer_size) :
144
 
    m_write_offset(0), m_write_pass(0),
 
143
Not_Safety_Queue(queue_item_length a_buffer_size) :
 
144
  m_write_offset(0), m_write_pass(0),
145
145
    m_read_offset(0), m_read_pass(0),
146
146
    m_padd_offset(a_buffer_size),
147
147
    m_buffer(0), m_buffer_size(a_buffer_size)
148
148
  {
149
149
    m_buffer= new char[m_buffer_size];
150
150
  }
 
151
  void reset()
 
152
  {
 
153
    m_write_offset= 0;
 
154
    m_write_pass= 0;
 
155
    m_read_offset= 0;
 
156
    m_read_pass= 0;
 
157
    m_padd_offset= m_buffer_size;
 
158
  };
151
159
  ~Not_Safety_Queue()
152
160
  {
153
161
    delete [] m_buffer;
176
184
      if(m_read_pass < m_write_pass)
177
185
      {
178
186
        // queue full
179
 
        return false; 
 
187
        return false;
180
188
      }
181
189
      // queue empty
182
190
      ASSERT(m_read_pass == m_write_pass);
203
211
    // TO DO think about exceptions
204
212
    if (!can_push(value))
205
213
      return false;
206
 
    #ifdef QUEUE_TRACE
 
214
#ifdef QUEUE_TRACE
207
215
    std::ostringstream s;
208
216
    s << "Before push to " << Introspection< Not_Safety_Queue<T> >::name();
209
217
    s << ":\n";
210
218
    trace(s, *this);
211
 
    #endif
 
219
#endif
212
220
    unsafety_push(value);
213
 
    #ifdef QUEUE_TRACE
 
221
#ifdef QUEUE_TRACE
214
222
    s << "After push to " << Introspection< Not_Safety_Queue<T> >::name();
215
223
    s << ":\n";
216
224
    trace(s, *this);
217
225
    s << "Value:\n>";
218
226
    trace(s, value);
219
227
    std::cerr << s.str() << std::flush;
220
 
    #endif    
 
228
#endif
221
229
    return true;
222
230
  }
223
231
  bool pop(T &value)
225
233
    // TO DO think about exceptions
226
234
    if (empty())
227
235
      return false;
228
 
    #ifdef QUEUE_TRACE
 
236
#ifdef QUEUE_TRACE
229
237
    std::ostringstream s;
230
238
    s << "Before pop from " << Introspection< Not_Safety_Queue<T> >::name();
231
239
    s << ":\n";
232
240
    trace(s, *this);
233
241
    std::cerr<< "Value:\n";
234
242
    trace(s, value);
235
 
    #endif
 
243
#endif
236
244
    unsafety_pop(value);
237
 
    #ifdef QUEUE_TRACE
 
245
#ifdef QUEUE_TRACE
238
246
    s << "After pop from " << Introspection< Not_Safety_Queue<T> >::name();
239
247
    s << ":\n";
240
248
    trace(s, *this);
241
249
    s << "Value:\n";
242
250
    trace(s, value);
243
251
    std::cerr << s.str() << std::flush;
244
 
    #endif   
245
 
    
 
252
#endif
 
253
 
246
254
  }
247
255
protected:
248
256
  void unsafety_push(T const &value)
276
284
    ASSERT(m_read_offset < m_buffer_size);
277
285
    if(m_read_offset == m_write_offset) // queue is empty or full.
278
286
    {
279
 
      ASSERT((m_read_pass == m_write_pass) || (m_read_pass + 1 == m_write_pass)); 
 
287
      ASSERT((m_read_pass == m_write_pass) || (m_read_pass + 1 == m_write_pass));
280
288
      if(m_read_pass == m_write_pass)
281
289
        return m_buffer_size - m_write_offset; // queue is empty
282
290
      else
283
291
        return 0; // queue is full
284
292
    }
285
 
    if(m_write_offset > m_read_offset) 
 
293
    if(m_write_offset > m_read_offset)
286
294
      return m_buffer_size - m_write_offset; // [empty [read] data [write] empty]
287
295
    else
288
296
      return m_read_offset - m_write_offset; // [data [write] empty [read] data]
311
319
#endif // DEBUG
312
320
private:
313
321
  template<typename I>
314
 
  void write_integral(I const &value)
 
322
    void write_integral(I const &value)
315
323
  {
316
324
    write_check(sizeof(value));
317
325
    (*reinterpret_cast<I*>(m_buffer+m_write_offset))= value;
325
333
  }
326
334
private:
327
335
  template<typename I>
328
 
  void read_integral(I &value)
 
336
    void read_integral(I &value)
329
337
  {
330
338
    read_check(sizeof(value));
331
339
    value = *reinterpret_cast<I const*>(m_buffer + m_read_offset);
383
391
    }
384
392
  }
385
393
private:
386
 
COMPOUND_BEGIN()
387
 
MEMBER(m_write_offset)
388
 
MEMBER(m_write_pass)
389
 
MEMBER(m_read_offset)
390
 
MEMBER(m_read_pass)
391
 
MEMBER(m_padd_offset)
392
 
MEMBER(m_buffer_size)
393
 
COMPOUND_END()
394
 
private:
 
394
  COMPOUND_BEGIN()
 
395
    MEMBER(m_write_offset)
 
396
    MEMBER(m_write_pass)
 
397
    MEMBER(m_read_offset)
 
398
    MEMBER(m_read_pass)
 
399
    MEMBER(m_padd_offset)
 
400
    MEMBER(m_buffer_size)
 
401
    COMPOUND_END()
 
402
    private:
395
403
  queue_item_length m_write_offset;
396
404
  queue_item_length m_write_pass;
397
405
private:
411
419
  typedef Not_Safety_Queue< T > base_type;
412
420
  typedef Mutex::Lock           lock;
413
421
public:
414
 
  Queue(queue_item_length a_buffer_size) :
415
 
    base_type(a_buffer_size),
 
422
Queue(queue_item_length a_buffer_size) :
 
423
  base_type(a_buffer_size),
416
424
    m_run(true),m_flush(false)
417
425
  {
418
426
  }
419
427
  ~Queue()
420
428
  {
421
429
  }
 
430
  void reset()
 
431
  {
 
432
    lock lk(m_mutex);
 
433
    m_run= true;
 
434
    m_flush= false;
 
435
    base_type::reset();
 
436
  }
422
437
  bool pop(T& value)
423
438
  {
424
439
    lock lk(m_mutex);
425
440
    if (!m_run)
426
441
      return false;
427
442
    while(base_type::empty())
428
 
      {
429
 
        if (m_flush)
430
 
          return false;
431
 
        m_not_empty.wait(lk);   
432
 
        if (!m_run)
433
 
          return false;
434
 
      }
 
443
    {
 
444
      if (m_flush)
 
445
        return false;
 
446
      m_not_empty.wait(lk);
 
447
      if (!m_run)
 
448
        return false;
 
449
    }
435
450
    ASSERT(m_run);
436
451
    ASSERT(!base_type::empty());
437
452
    /*#ifdef QUEUE_TRACE
438
 
    std::ostringstream s;
439
 
    s << "Before pop from " << Introspection< Queue<T> >::name();
440
 
    s << ":\n";
441
 
    trace(s,*this);
442
 
    std::cerr<< "Value:\n";
443
 
    trace(s,value);
444
 
    #endif*/
445
 
    base_type::unsafety_pop(value); 
 
453
      std::ostringstream s;
 
454
      s << "Before pop from " << Introspection< Queue<T> >::name();
 
455
      s << ":\n";
 
456
      trace(s,*this);
 
457
      std::cerr<< "Value:\n";
 
458
      trace(s,value);
 
459
      #endif*/
 
460
    base_type::unsafety_pop(value);
446
461
    /*#ifdef QUEUE_TRACE
447
 
    s << "After pop from " << Introspection< Queue<T> >::name();
448
 
    s << ":\n";
449
 
    trace(s,*this);
450
 
    s << "Value:\n";
451
 
    trace(s,value);
452
 
    std::cerr << s.str() << std::flush;
453
 
    #endif  */
 
462
      s << "After pop from " << Introspection< Queue<T> >::name();
 
463
      s << ":\n";
 
464
      trace(s,*this);
 
465
      s << "Value:\n";
 
466
      trace(s,value);
 
467
      std::cerr << s.str() << std::flush;
 
468
      #endif  */
454
469
    m_not_full.notify_one();
455
470
    return true;
456
471
  }
463
478
    {
464
479
      m_not_full.wait(lk);
465
480
      if (!m_run || m_flush)
466
 
        return false;
 
481
        return false;
467
482
    }
468
483
    ASSERT(base_type::can_push(value));
469
 
    #ifdef QUEUE_TRACE
 
484
#ifdef QUEUE_TRACE
470
485
    std::ostringstream s;
471
486
    s << "Before push to " << Introspection< Queue<T> >::name();
472
487
    s << ":\n";
473
488
    trace(s,*this);
474
 
    #endif
 
489
#endif
475
490
    base_type::push(value);
476
 
    #ifdef QUEUE_TRACE
 
491
#ifdef QUEUE_TRACE
477
492
    s << "After push to " << Introspection< Queue<T> >::name();
478
493
    s << ":\n";
479
494
    trace(s,*this);
480
495
    s << "Value:\n>";
481
496
    trace(s,value);
482
497
    std::cerr << s.str() << std::flush;
483
 
    #endif    
 
498
#endif
484
499
    m_not_empty.notify_one();
485
500
    return true;
486
501
  }
502
517
  }
503
518
private:
504
519
  COMPOUND_BEGIN()
505
 
  SUPERCLASS(base_type)
506
 
  MEMBER(m_run)
507
 
  MEMBER(m_flush)
508
 
  COMPOUND_END()
509
 
private:
 
520
    SUPERCLASS(base_type)
 
521
    MEMBER(m_run)
 
522
    MEMBER(m_flush)
 
523
    COMPOUND_END()
 
524
    private:
510
525
  bool m_run;
511
526
  bool m_flush;
512
527
  Mutex m_mutex;