~freenx-team/nx-x11/nxcomp-upstream

« back to all changes in this revision

Viewing changes to ReadBuffer.cpp

  • Committer: Marcelo Boveto Shima
  • Date: 2009-03-28 22:24:56 UTC
  • Revision ID: mshima@ufserv-20090328222456-rdtaq3oedfyq890c
Import nxcomp 3.3.0-3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**************************************************************************/
 
2
/*                                                                        */
 
3
/* Copyright (c) 2001, 2007 NoMachine, http://www.nomachine.com/.         */
 
4
/*                                                                        */
 
5
/* NXCOMP, NX protocol compression and NX extensions to this software     */
 
6
/* are copyright of NoMachine. Redistribution and use of the present      */
 
7
/* software is allowed according to terms specified in the file LICENSE   */
 
8
/* which comes in the source distribution.                                */
 
9
/*                                                                        */
 
10
/* Check http://www.nomachine.com/licensing.html for applicability.       */
 
11
/*                                                                        */
 
12
/* NX and NoMachine are trademarks of NoMachine S.r.l.                    */
 
13
/*                                                                        */
 
14
/* All rights reserved.                                                   */
 
15
/*                                                                        */
 
16
/**************************************************************************/
 
17
 
 
18
#include "ReadBuffer.h"
 
19
 
 
20
#include "Transport.h"
 
21
 
 
22
//
 
23
// Set the verbosity level.
 
24
//
 
25
 
 
26
#define PANIC
 
27
#define WARNING
 
28
#undef  TEST
 
29
#undef  DEBUG
 
30
 
 
31
ReadBuffer::ReadBuffer(Transport *transport)
 
32
 
 
33
  : transport_(transport)
 
34
{
 
35
  //
 
36
  // The read buffer will grow until
 
37
  // reaching the maximum buffer size
 
38
  // and then will remain stable at
 
39
  // that size.
 
40
  //
 
41
 
 
42
  initialReadSize_   = READ_BUFFER_DEFAULT_SIZE;
 
43
  maximumBufferSize_ = READ_BUFFER_DEFAULT_SIZE;
 
44
 
 
45
  size_   = 0;
 
46
  buffer_ = NULL;
 
47
 
 
48
  owner_  = 1;
 
49
  length_ = 0;
 
50
  start_  = 0;
 
51
 
 
52
  remaining_ = 0;
 
53
}
 
54
 
 
55
ReadBuffer::~ReadBuffer()
 
56
{
 
57
  if (owner_ == 1)
 
58
  {
 
59
    delete [] buffer_;
 
60
  }
 
61
}
 
62
 
 
63
void ReadBuffer::readMessage(const unsigned char *message, unsigned int length)
 
64
{
 
65
  //
 
66
  // To be here we must be the real owner
 
67
  // of the buffer and there must not be
 
68
  // pending bytes in the transport.
 
69
  //
 
70
 
 
71
  #ifdef TEST
 
72
 
 
73
  if (owner_ == 0)
 
74
  {
 
75
    *logofs << "ReadBuffer: PANIC! Class for FD#"
 
76
            << transport_ -> fd() << " doesn't "
 
77
            << "appear to be the owner of the buffer "
 
78
            << "while borrowing from the caller.\n"
 
79
            << logofs_flush;
 
80
 
 
81
    HandleCleanup();
 
82
  }
 
83
 
 
84
  #endif
 
85
 
 
86
  //
 
87
  // Be sure that any outstanding data from
 
88
  // the transport is appended to our own
 
89
  // byffer.
 
90
  //
 
91
 
 
92
  if (transport_ -> pending() != 0)
 
93
  {
 
94
    #ifdef WARNING
 
95
    *logofs << "ReadBuffer: WARNING! Class for FD#"
 
96
            << transport_ -> fd() << " has pending "
 
97
            << "data in the transport while "
 
98
            << "borrowing from the caller.\n"
 
99
            << logofs_flush;
 
100
    #endif
 
101
 
 
102
    readMessage();
 
103
 
 
104
    if (owner_ == 0)
 
105
    {
 
106
      convertBuffer();
 
107
    }
 
108
  }
 
109
 
 
110
  //
 
111
  // Can't borrow the buffer if there is data
 
112
  // from a partial message. In this case add
 
113
  // the new data to the end of our buffer.
 
114
  //
 
115
 
 
116
  if (length_ == 0)
 
117
  {
 
118
    #ifdef TEST
 
119
    *logofs << "ReadBuffer: Borrowing " << length
 
120
            << " bytes from the caller for FD#"
 
121
            << transport_ -> fd() << " with "
 
122
            << length_ << " bytes in the buffer.\n"
 
123
            << logofs_flush;
 
124
    #endif
 
125
 
 
126
    delete [] buffer_;
 
127
 
 
128
    buffer_ = (unsigned char *) message;
 
129
    size_   = length;
 
130
 
 
131
    length_ = length;
 
132
 
 
133
    owner_ = 0;
 
134
    start_ = 0;
 
135
  }
 
136
  else
 
137
  {
 
138
    #ifdef TEST
 
139
    *logofs << "ReadBuffer: Appending " << length
 
140
            << " bytes from the caller for FD#"
 
141
            << transport_ -> fd() << " with "
 
142
            << length_ << " bytes in the buffer.\n"
 
143
            << logofs_flush;
 
144
    #endif
 
145
 
 
146
    appendBuffer(message, length);
 
147
  }
 
148
}
 
149
 
 
150
int ReadBuffer::readMessage()
 
151
{
 
152
  int pendingLength = transport_ -> pending();
 
153
 
 
154
  if (pendingLength > 0)
 
155
  {
 
156
    //
 
157
    // Can't move the data in the borrowed buffer,
 
158
    // so use the tansport buffer only if we don't
 
159
    // have any partial message. This can happen
 
160
    // with the proxy where we need to deflate the
 
161
    // stream.
 
162
    //
 
163
 
 
164
    if (length_ == 0)
 
165
    {
 
166
      unsigned char *newBuffer;
 
167
 
 
168
      length_ = transport_ -> getPending(newBuffer);
 
169
 
 
170
      if (newBuffer == NULL)
 
171
      {
 
172
        #ifdef PANIC
 
173
        *logofs << "ReadBuffer: PANIC! Failed to borrow "
 
174
                << length_ << " bytes of memory for buffer "
 
175
                << "in context [A].\n" << logofs_flush;
 
176
        #endif
 
177
 
 
178
        cerr << "Error" << ": Failed to borrow memory for "
 
179
             << "read buffer in context [A].\n";
 
180
 
 
181
        HandleCleanup();
 
182
      }
 
183
 
 
184
      delete [] buffer_;
 
185
 
 
186
      buffer_ = newBuffer;
 
187
      size_   = length_;
 
188
 
 
189
      owner_ = 0;
 
190
      start_ = 0;
 
191
 
 
192
      #ifdef TEST
 
193
      *logofs << "ReadBuffer: Borrowed " << length_
 
194
              << " pending bytes for FD#" << transport_ ->
 
195
                 fd() << ".\n" << logofs_flush;
 
196
      #endif
 
197
 
 
198
      return length_;
 
199
    }
 
200
    #ifdef TEST
 
201
    else
 
202
    {
 
203
      *logofs << "ReadBuffer: WARNING! Cannot borrow "
 
204
              << pendingLength << " bytes for FD#"
 
205
              << transport_ -> fd() << " with "
 
206
              << length_ << " bytes in the buffer.\n"
 
207
              << logofs_flush;
 
208
    }
 
209
    #endif
 
210
  }
 
211
 
 
212
  unsigned int readLength = suggestedLength(pendingLength);
 
213
 
 
214
  #ifdef DEBUG
 
215
  *logofs << "ReadBuffer: Requested " << readLength
 
216
          << " bytes for FD#" << transport_ -> fd()
 
217
          << " with readable " << transport_ -> readable()
 
218
          << " remaining " << remaining_ << " pending "
 
219
          << transport_ -> pending() << ".\n"
 
220
          << logofs_flush;
 
221
  #endif
 
222
 
 
223
  if (readLength < initialReadSize_)
 
224
  {
 
225
    readLength = initialReadSize_;
 
226
  }
 
227
 
 
228
  #ifdef DEBUG
 
229
  *logofs << "ReadBuffer: Buffer size is " << size_
 
230
          << " length " << length_ << " and start "
 
231
          << start_  << ".\n" << logofs_flush;
 
232
  #endif
 
233
 
 
234
  //
 
235
  // We can't use the transport buffer
 
236
  // to read our own data in it.
 
237
  //
 
238
 
 
239
  #ifdef TEST
 
240
 
 
241
  if (owner_ == 0)
 
242
  {
 
243
    *logofs << "ReadBuffer: PANIC! Class for FD#"
 
244
            << transport_ -> fd() << " doesn't "
 
245
            << "appear to be the owner of the buffer "
 
246
            << "while reading.\n" << logofs_flush;
 
247
 
 
248
    HandleCleanup();
 
249
  }
 
250
 
 
251
  #endif
 
252
 
 
253
  //
 
254
  // Be sure that we have enough space
 
255
  // to store all the requested data.
 
256
  //
 
257
 
 
258
  if (buffer_ == NULL || length_ + readLength > size_)
 
259
  {
 
260
    unsigned int newSize = length_ + readLength;
 
261
 
 
262
    #ifdef TEST
 
263
    *logofs << "ReadBuffer: Resizing buffer for FD#"
 
264
            << transport_ -> fd() << " in read from "
 
265
            << size_ << " to " << newSize << " bytes.\n"
 
266
            << logofs_flush;
 
267
    #endif
 
268
 
 
269
    unsigned char *newBuffer = allocateBuffer(newSize);
 
270
 
 
271
    memcpy(newBuffer, buffer_ + start_, length_);
 
272
 
 
273
    delete [] buffer_;
 
274
 
 
275
    buffer_ = newBuffer;
 
276
    size_   = newSize;
 
277
 
 
278
    transport_ -> pendingReset();
 
279
 
 
280
    owner_ = 1;
 
281
  }
 
282
  else if (start_ != 0 && length_ != 0)
 
283
  {
 
284
    //
 
285
    // If any bytes are left due to a partial
 
286
    // message, shift them to the beginning
 
287
    // of the buffer.
 
288
    //
 
289
 
 
290
    #ifdef TEST
 
291
    *logofs << "ReadBuffer: Moving " << length_
 
292
            << " bytes of data " << "at beginning of "
 
293
            << "the buffer for FD#" << transport_ -> fd() 
 
294
            << ".\n" << logofs_flush;
 
295
    #endif
 
296
 
 
297
    memmove(buffer_, buffer_ + start_, length_);
 
298
  }
 
299
 
 
300
  start_ = 0;
 
301
 
 
302
  #ifdef DEBUG
 
303
  *logofs << "ReadBuffer: Buffer size is now " << size_ 
 
304
          << " length is " << length_ << " and start is " 
 
305
          << start_ << ".\n" << logofs_flush;
 
306
  #endif
 
307
 
 
308
  unsigned char *readData = buffer_ + length_;
 
309
 
 
310
  #ifdef DEBUG
 
311
  *logofs << "ReadBuffer: Going to read " << readLength 
 
312
          << " bytes from FD#" << transport_ -> fd() << ".\n"
 
313
          << logofs_flush;
 
314
  #endif
 
315
 
 
316
  int bytesRead = transport_ -> read(readData, readLength);
 
317
 
 
318
  if (bytesRead > 0)
 
319
  {
 
320
    #ifdef TEST
 
321
    *logofs << "ReadBuffer: Read " << bytesRead
 
322
            << " bytes from FD#" << transport_ -> fd()
 
323
            << ".\n" << logofs_flush;
 
324
    #endif
 
325
 
 
326
    length_ += bytesRead;
 
327
  }
 
328
  else if (bytesRead < 0)
 
329
  {
 
330
    //
 
331
    // Check if there is more data pending than the
 
332
    // size of the provided buffer. After reading
 
333
    // the requested amount, in fact, the transport
 
334
    // may have decompressed the data and produced
 
335
    // more input. This trick allows us to always
 
336
    // borrow the buffer from the transport, even
 
337
    // when the partial read would have prevented
 
338
    // that.
 
339
    //
 
340
 
 
341
    if (transport_ -> pending() > 0)
 
342
    {
 
343
      #ifdef TEST
 
344
      *logofs << "ReadBuffer: WARNING! Trying to read some "
 
345
              << "more with " << transport_ -> pending()
 
346
              << " bytes pending for FD#" << transport_ ->
 
347
                 fd() << ".\n" << logofs_flush;
 
348
      #endif
 
349
 
 
350
      return readMessage();
 
351
    }
 
352
 
 
353
    #ifdef TEST
 
354
    *logofs << "ReadBuffer: Error detected reading "
 
355
            << "from FD#" << transport_ -> fd()
 
356
            << ".\n" << logofs_flush;
 
357
    #endif
 
358
 
 
359
    return -1;
 
360
  }
 
361
  #ifdef TEST
 
362
  else
 
363
  {
 
364
    *logofs << "ReadBuffer: No data read from FD#"
 
365
            << transport_ -> fd() << " with remaining "
 
366
            << remaining_ << ".\n" << logofs_flush;
 
367
  }
 
368
  #endif
 
369
 
 
370
  return bytesRead;
 
371
}
 
372
 
 
373
const unsigned char *ReadBuffer::getMessage(unsigned int &controlLength,
 
374
                                                unsigned int &dataLength)
 
375
{
 
376
  #ifdef TEST
 
377
 
 
378
  if (transport_ -> pending() > 0)
 
379
  {
 
380
    *logofs << "ReadBuffer: PANIC! The transport "
 
381
            << "appears to have data pending.\n"
 
382
            << logofs_flush;
 
383
 
 
384
    HandleCleanup();
 
385
  }
 
386
 
 
387
  #endif
 
388
 
 
389
  if (length_ == 0)
 
390
  {
 
391
    #ifdef DEBUG
 
392
    *logofs << "ReadBuffer: No message can be located "
 
393
            << "for FD#" << transport_ -> fd() << ".\n"
 
394
            << logofs_flush;
 
395
    #endif
 
396
 
 
397
    if (owner_ == 0)
 
398
    {
 
399
      buffer_ = NULL;
 
400
      size_   = 0;
 
401
 
 
402
      transport_ -> pendingReset();
 
403
 
 
404
      owner_ = 1;
 
405
      start_ = 0;
 
406
    }
 
407
 
 
408
    return NULL;
 
409
  }
 
410
 
 
411
  unsigned int trailerLength;
 
412
 
 
413
  #ifdef DEBUG
 
414
  *logofs << "ReadBuffer: Going to locate message with "
 
415
          << "start at " << start_ << " and length "
 
416
          << length_ << " for FD#" << transport_ -> fd()
 
417
          << ".\n" << logofs_flush;
 
418
  #endif
 
419
 
 
420
  int located = locateMessage(buffer_ + start_, buffer_ + start_ + length_,
 
421
                                  controlLength, dataLength, trailerLength);
 
422
 
 
423
  if (located == 0)
 
424
  {
 
425
    //
 
426
    // No more complete messages are in
 
427
    // the buffer.
 
428
    //
 
429
 
 
430
    #ifdef DEBUG
 
431
    *logofs << "ReadBuffer: No message was located "
 
432
            << "for FD#" << transport_ -> fd()
 
433
            << ".\n" << logofs_flush;
 
434
    #endif
 
435
 
 
436
    if (owner_ == 0)
 
437
    {
 
438
      //
 
439
      // Must move the remaining bytes in
 
440
      // our own buffer.
 
441
      //
 
442
 
 
443
      convertBuffer();
 
444
    }
 
445
  
 
446
    return NULL;
 
447
  }
 
448
  else
 
449
  {
 
450
    const unsigned char *result = buffer_ + start_;
 
451
 
 
452
    if (dataLength > 0)
 
453
    {
 
454
      //
 
455
      // Message contains data, so go to the
 
456
      // first byte of payload.
 
457
      //
 
458
 
 
459
      result += trailerLength;
 
460
 
 
461
      start_  += (dataLength + trailerLength);
 
462
      length_ -= (dataLength + trailerLength);
 
463
    }
 
464
    else
 
465
    {
 
466
      //
 
467
      // It is a control message.
 
468
      //
 
469
 
 
470
      start_  += (controlLength + trailerLength);
 
471
      length_ -= (controlLength + trailerLength);
 
472
    }
 
473
 
 
474
    #ifdef DEBUG
 
475
    *logofs << "ReadBuffer: Located message for FD#"
 
476
            << transport_ -> fd() << " with control length "
 
477
            << controlLength << " and data length "
 
478
            << dataLength << ".\n" << logofs_flush;
 
479
    #endif
 
480
 
 
481
    remaining_ = 0;
 
482
 
 
483
    return result;
 
484
  }
 
485
}
 
486
 
 
487
int ReadBuffer::setSize(int initialReadSize, int maximumBufferSize)
 
488
{
 
489
  initialReadSize_   = initialReadSize;
 
490
  maximumBufferSize_ = maximumBufferSize;
 
491
 
 
492
  #ifdef TEST
 
493
  *logofs << "ReadBuffer: WARNING! Set buffer parameters to "
 
494
          << initialReadSize_ << "/" << maximumBufferSize_
 
495
          << " for object at "<< this << ".\n"
 
496
          << logofs_flush;
 
497
  #endif
 
498
 
 
499
  return 1;
 
500
}
 
501
 
 
502
void ReadBuffer::fullReset()
 
503
{
 
504
  #ifdef TEST
 
505
 
 
506
  if (owner_ == 0)
 
507
  {
 
508
    *logofs << "ReadBuffer: PANIC! Class for FD#"
 
509
            << transport_ -> fd() << " doesn't "
 
510
            << "appear to be the owner of the buffer "
 
511
            << "in reset.\n" << logofs_flush;
 
512
 
 
513
    HandleCleanup();
 
514
  }
 
515
 
 
516
  #endif
 
517
 
 
518
  if (length_ == 0 && size_ > maximumBufferSize_)
 
519
  {
 
520
    #ifdef TEST
 
521
    *logofs << "ReadBuffer: Resizing buffer for FD#"
 
522
            << transport_ -> fd() << " in reset from "
 
523
            << size_ << " to " << maximumBufferSize_
 
524
            << " bytes.\n" << logofs_flush;
 
525
    #endif
 
526
 
 
527
    delete [] buffer_;
 
528
 
 
529
    int newSize = maximumBufferSize_;
 
530
 
 
531
    unsigned char *newBuffer = allocateBuffer(newSize);
 
532
 
 
533
    buffer_ = newBuffer;
 
534
    size_   = newSize;
 
535
 
 
536
    transport_ -> pendingReset();
 
537
 
 
538
    owner_ = 1;
 
539
    start_ = 0;
 
540
  }
 
541
}
 
542
 
 
543
unsigned char *ReadBuffer::allocateBuffer(unsigned int newSize)
 
544
{
 
545
  unsigned char *newBuffer = new unsigned char[newSize];
 
546
 
 
547
  if (newBuffer == NULL)
 
548
  {
 
549
    #ifdef PANIC
 
550
    *logofs << "ReadBuffer: PANIC! Can't allocate "
 
551
            << newSize << " bytes of memory for buffer "
 
552
            << "in context [B].\n" << logofs_flush;
 
553
    #endif
 
554
 
 
555
    cerr << "Error" << ": Can't allocate memory for "
 
556
         << "read buffer in context [B].\n";
 
557
 
 
558
    HandleCleanup();
 
559
  }
 
560
 
 
561
  #ifdef VALGRIND
 
562
 
 
563
  memset(newBuffer, '\0', newSize);
 
564
 
 
565
  #endif
 
566
 
 
567
  return newBuffer;
 
568
}
 
569
 
 
570
void ReadBuffer::appendBuffer(const unsigned char *message, unsigned int length)
 
571
{
 
572
  if (start_ + length_ + length > size_)
 
573
  {
 
574
    unsigned int newSize = length_ + length + initialReadSize_;
 
575
 
 
576
    #ifdef TEST
 
577
    *logofs << "ReadBuffer: WARNING! Resizing buffer "
 
578
            << "for FD#" << transport_ -> fd()
 
579
            << " from " << size_ << " to " << newSize
 
580
            << " bytes.\n" << logofs_flush;
 
581
    #endif
 
582
 
 
583
    unsigned char *newBuffer = allocateBuffer(newSize);
 
584
 
 
585
    memcpy(newBuffer, buffer_ + start_, length_);
 
586
 
 
587
    delete [] buffer_;
 
588
 
 
589
    buffer_ = newBuffer;
 
590
    size_   = newSize;
 
591
 
 
592
    start_ = 0;
 
593
  }
 
594
 
 
595
  memcpy(buffer_ + start_ + length_, message, length);
 
596
 
 
597
  length_ += length;
 
598
 
 
599
  transport_ -> pendingReset();
 
600
 
 
601
  owner_ = 1;
 
602
}
 
603
 
 
604
void ReadBuffer::convertBuffer()
 
605
{
 
606
  unsigned int newSize = length_ + initialReadSize_;
 
607
 
 
608
  #ifdef TEST
 
609
  *logofs << "ReadBuffer: WARNING! Converting "
 
610
          << length_ << " bytes to own buffer "
 
611
          << "for FD#" << transport_ -> fd()
 
612
          << " with new size " << newSize
 
613
          << " bytes.\n" << logofs_flush;
 
614
  #endif
 
615
 
 
616
  unsigned char *newBuffer = allocateBuffer(newSize);
 
617
 
 
618
  memcpy(newBuffer, buffer_ + start_, length_);
 
619
 
 
620
  buffer_ = newBuffer;
 
621
  size_   = newSize;
 
622
 
 
623
  transport_ -> pendingReset();
 
624
 
 
625
  owner_ = 1;
 
626
  start_ = 0;
 
627
}