~ubuntu-branches/ubuntu/precise/ceph/precise-proposed

« back to all changes in this revision

Viewing changes to src/os/FileJournal.cc

  • Committer: Bazaar Package Importer
  • Author(s): Laszlo Boszormenyi (GCS)
  • Date: 2011-04-25 10:09:05 UTC
  • mfrom: (1.1.3 upstream) (0.1.5 sid)
  • Revision ID: james.westby@ubuntu.com-20110425100905-exm7dfvi2v5ick02
Tags: 0.27-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
 * 
13
13
 */
14
14
 
15
 
#include "config.h"
 
15
#include "common/config.h"
16
16
#include "common/errno.h"
 
17
#include "common/safe_io.h"
17
18
#include "FileJournal.h"
18
19
#include "include/color.h"
 
20
#include "common/ProfLogger.h"
 
21
#include "os/ObjectStore.h"
19
22
 
 
23
#include <fcntl.h>
 
24
#include <sstream>
20
25
#include <stdio.h>
21
26
#include <sys/types.h>
22
27
#include <sys/stat.h>
23
28
#include <sys/mount.h>
24
 
#include <fcntl.h>
25
29
 
26
30
 
27
31
#define DOUT_SUBSYS journal
28
32
#undef dout_prefix
29
 
#define dout_prefix *_dout << dbeginl << "journal "
 
33
#define dout_prefix *_dout << "journal "
30
34
 
31
35
const static int64_t ONE_MEG(1 << 20);
32
36
 
36
40
 
37
41
  if (forwrite) {
38
42
    flags = O_RDWR;
39
 
    if (directio) flags |= O_DIRECT | O_SYNC;
 
43
    if (directio)
 
44
      flags |= O_DIRECT | O_SYNC;
40
45
  } else {
41
46
    flags = O_RDONLY;
42
47
  }
43
48
  if (create)
44
49
    flags |= O_CREAT;
45
50
  
46
 
  if (fd >= 0) 
47
 
    ::close(fd);
48
 
  fd = ::open(fn.c_str(), flags, 0644);
 
51
  if (fd >= 0) {
 
52
    if (TEMP_FAILURE_RETRY(::close(fd))) {
 
53
      int err = errno;
 
54
      derr << "FileJournal::_open: error closing old fd: "
 
55
           << cpp_strerror(err) << dendl;
 
56
    }
 
57
  }
 
58
  fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags, 0644));
49
59
  if (fd < 0) {
50
60
    int err = errno;
51
 
    dout(2) << "_open failed " << cpp_strerror(err) << dendl;
52
 
    cerr << "unable to open journal " << fn << ": " << cpp_strerror(err) << std::endl;
 
61
    derr << "FileJournal::_open : unable to open journal: open() "
 
62
         << "failed: " << cpp_strerror(err) << dendl;
53
63
    return -err;
54
64
  }
55
65
 
57
67
  ret = ::fstat(fd, &st);
58
68
  if (ret) {
59
69
    int err = errno;
60
 
    dout(2) << "_open failed to fstat! " << cpp_strerror(err) << dendl;
 
70
    derr << "FileJournal::_open: unable to fstat journal: "
 
71
         << cpp_strerror(err) << dendl;
61
72
    return -err;
62
73
  }
63
74
 
77
88
  zero_buf = new char[header.alignment];
78
89
  memset(zero_buf, 0, header.alignment);
79
90
 
80
 
  dout(2) << "_open " << fn << " fd " << fd 
 
91
  dout(1) << "_open " << fn << " fd " << fd
81
92
          << ": " << max_size 
82
93
          << " bytes, block size " << block_size
83
94
          << " bytes, directio = " << directio << dendl;
138
149
  return 0;
139
150
}
140
151
 
 
152
static int get_kernel_version(int *a, int *b, int *c)
 
153
{
 
154
  int ret;
 
155
  char buf[128];
 
156
  memset(buf, 0, sizeof(buf));
 
157
  int fd = TEMP_FAILURE_RETRY(::open("/proc/version", O_RDONLY));
 
158
  if (fd < 0) {
 
159
    ret = errno;
 
160
    derr << "get_kernel_version: failed to open /proc/version: "
 
161
         << cpp_strerror(ret) << dendl;
 
162
    goto out;
 
163
  }
 
164
  ret = safe_read(fd, buf, sizeof(buf) - 1);
 
165
  if (ret < 0) {
 
166
    derr << "get_kernel_version: failed to read from /proc/version: "
 
167
         << cpp_strerror(ret) << dendl;
 
168
    goto close_fd;
 
169
  }
 
170
 
 
171
  if (sscanf(buf, "Linux version %d.%d.%d", a, b, c) != 3) {
 
172
    derr << "get_kernel_version: failed to parse string: '"
 
173
         << buf << "'" << dendl;
 
174
    ret = EIO;
 
175
    goto close_fd;
 
176
  }
 
177
 
 
178
  dout(0) << " kernel version is " << *a <<"." << *b << "." << *c << dendl;
 
179
  ret = 0;
 
180
 
 
181
close_fd:
 
182
  TEMP_FAILURE_RETRY(::close(fd));
 
183
out:
 
184
  return ret;
 
185
}
 
186
 
141
187
void FileJournal::_check_disk_write_cache() const
142
188
{
 
189
  ostringstream hdparm_cmd;
 
190
  FILE *fp = NULL;
 
191
  int a, b, c;
 
192
 
143
193
  if (geteuid() != 0) {
144
 
    dout(10) << __func__ << ": not root, NOT checking disk write "
 
194
    dout(10) << "_check_disk_write_cache: not root, NOT checking disk write "
145
195
      << "cache on raw block device " << fn << dendl;
146
 
    return;
147
 
  }
148
 
 
149
 
  char cmd[4096];
150
 
  snprintf(cmd, sizeof(cmd), "/sbin/hdparm -W %s > /tmp/out.%d",
151
 
           fn.c_str(), getpid());
152
 
  int r = ::system(cmd);
153
 
  if (r != 0) {
154
 
    dout(10) << __func__ << ": failed to run '" << cmd
155
 
      << "', NOT checking disk write cache on " << fn << dendl;
156
 
    return;
157
 
  }
158
 
 
159
 
  snprintf(cmd, sizeof(cmd), "/tmp/out.%d", getpid());
160
 
  FILE *f = ::fopen(cmd, "r");
161
 
  if (!f) {
162
 
    dout(10) << "_open failed to read '" << cmd
163
 
      << "', NOT checking disk write cache on " << fn << dendl;
164
 
    ::unlink(cmd);
165
 
    return;
166
 
  }
167
 
 
168
 
  while (!feof(f)) {
169
 
    char s[100];
170
 
    fgets(s, sizeof(s), f);
 
196
    goto done;
 
197
  }
 
198
 
 
199
  hdparm_cmd << "/sbin/hdparm -W " << fn;
 
200
  fp = popen(hdparm_cmd.str().c_str(), "r");
 
201
  if (!fp) {
 
202
    dout(10) << "_check_disk_write_cache: failed to run /sbin/hdparm: NOT "
 
203
      << "checking disk write cache on raw block device " << fn << dendl;
 
204
    goto done;
 
205
  }
 
206
 
 
207
  while (true) {
 
208
    char buf[256];
 
209
    memset(buf, 0, sizeof(buf));
 
210
    char *line = fgets(buf, sizeof(buf) - 1, fp);
 
211
    if (!line) {
 
212
      if (ferror(fp)) {
 
213
        int ret = -errno;
 
214
        derr << "_check_disk_write_cache: fgets error: " << cpp_strerror(ret)
 
215
             << dendl;
 
216
        goto close_f;
 
217
      }
 
218
      else {
 
219
        // EOF.
 
220
        break;
 
221
      }
 
222
    }
 
223
 
171
224
    int on;
172
 
    if (sscanf(s, " write-caching =  %d", &on) == 1) {
173
 
      if (on) {
174
 
 
175
 
        // check kenrel version
176
 
        char buf[40];
177
 
        int fd = ::open("/proc/version", O_RDONLY);
178
 
        ::read(fd, buf, 39);
179
 
        buf[39] = 0;
180
 
        ::close(fd);
181
 
 
182
 
        int b, c;
183
 
        int r = sscanf(buf, "Linux version 2.%d.%d", &b, &c);
184
 
        dout(0) << " kernel version is 2." << b << "." << c << dendl;
185
 
        if (r == 2 &&
186
 
            b >= 6 &&
187
 
            c >= 33) {
188
 
          // a-ok
189
 
        } else {
190
 
          dout(0) << "WARNING: disk write cache is ON; journaling will not be reliable" << dendl;
191
 
          dout(0) << "         on kernels prior to 2.6.33 (recent kernels are safe)" << dendl;
192
 
          dout(0) << "         disable with 'hdparm -W 0 " << fn << "'" << dendl;
193
 
          cout << TEXT_RED
194
 
               << " ** WARNING: disk write cache is ON on " << fn << ".\n"
195
 
               << "    Journaling will not be reliable on kernels prior to 2.6.33\n"
196
 
               << "    (recent kernels are safe).  You can disable the write cache with\n"
197
 
               << "    'hdparm -W 0 " << fn << "'"
198
 
               << TEXT_NORMAL
199
 
               << std::endl;
200
 
        }
201
 
      } else {
202
 
        dout(10) << "_open disk write cache is off (good) on " << fn << dendl;
203
 
      }
204
 
      break;
205
 
    }
206
 
  }
207
 
  fclose(f);
208
 
  ::unlink(cmd);
 
225
    if (sscanf(line, " write-caching =  %d", &on) != 1)
 
226
      continue;
 
227
    if (!on) {
 
228
      dout(10) << "_check_disk_write_cache: disk write cache is off (good) on "
 
229
               << fn << dendl;
 
230
      break;
 
231
    }
 
232
 
 
233
    // is our kernel new enough?
 
234
    if (get_kernel_version(&a, &b, &c)) {
 
235
      dout(10) << "_check_disk_write_cache: failed to get kernel version."
 
236
               << dendl;
 
237
    }
 
238
    else if (a >= 2 && b >= 6 && c >= 33) {
 
239
      dout(20) << "_check_disk_write_cache: disk write cache is on, but your "
 
240
               << "kernel is new enough to handle it correctly. (fn:"
 
241
               << fn << ")" << dendl;
 
242
      break;
 
243
    }
 
244
    derr << TEXT_RED
 
245
         << " ** WARNING: disk write cache is ON on " << fn << ".\n"
 
246
         << "    Journaling will not be reliable on kernels prior to 2.6.33\n"
 
247
         << "    (recent kernels are safe).  You can disable the write cache with\n"
 
248
         << "    'hdparm -W 0 " << fn << "'"
 
249
         << TEXT_NORMAL
 
250
         << dendl;
 
251
    break;
 
252
  }
 
253
 
 
254
close_f:
 
255
  if (::fclose(fp)) {
 
256
    int ret = -errno;
 
257
    derr << "_check_disk_write_cache: fclose error: " << cpp_strerror(ret)
 
258
         << dendl;
 
259
  }
 
260
done:
 
261
  ;
209
262
}
210
263
 
211
264
int FileJournal::_open_file(int64_t oldsize, blksize_t blksize,
216
269
  conf_journal_sz <<= 20;
217
270
 
218
271
  if ((g_conf.osd_journal_size == 0) && (oldsize < ONE_MEG)) {
219
 
    dout(0) << "I'm sorry, I don't know how large of a journal to create."
220
 
            << "Please specify a block device to use as the journal OR "
221
 
            << "set osd_journal_size in your ceph.conf" << dendl;
 
272
    derr << "I'm sorry, I don't know how large of a journal to create."
 
273
         << "Please specify a block device to use as the journal OR "
 
274
         << "set osd_journal_size in your ceph.conf" << dendl;
222
275
    return -EINVAL;
223
276
  }
224
277
 
228
281
    dout(10) << "_open extending to " << newsize << " bytes" << dendl;
229
282
    ret = ::ftruncate(fd, newsize);
230
283
    if (ret < 0) {
231
 
      dout(0) << __func__ << ": unable to extend journal to " << newsize
232
 
              << " bytes" << dendl;
233
 
      return -errno;
 
284
      int err = errno;
 
285
      derr << "FileJournal::_open_file : unable to extend journal to "
 
286
           << newsize << " bytes: " << cpp_strerror(err) << dendl;
 
287
      return -err;
234
288
    }
235
289
    max_size = newsize;
236
290
  }
237
291
  else {
238
292
    max_size = oldsize;
239
293
  }
240
 
  block_size = MAX(blksize, PAGE_SIZE);
 
294
  block_size = MAX(blksize, (blksize_t)PAGE_SIZE);
241
295
 
242
296
  dout(10) << "_open journal is not a block device, NOT checking disk "
243
297
           << "write cache on '" << fn << "'" << dendl;
247
301
 
248
302
int FileJournal::create()
249
303
{
250
 
  char buf[80];
 
304
  void *buf = 0;
 
305
  int64_t needed_space;
 
306
  int ret;
 
307
  buffer::ptr bp;
251
308
  dout(2) << "create " << fn << dendl;
252
309
 
253
 
  int err = _open(true, true);
254
 
  if (err < 0)
255
 
    return err;
 
310
  ret = _open(true, true);
 
311
  if (ret < 0)
 
312
    goto done;
256
313
 
257
314
  // write empty header
258
315
  memset(&header, 0, sizeof(header));
267
324
  header.start = get_top();
268
325
  print_header();
269
326
 
270
 
  buffer::ptr bp = prepare_header();
271
 
  int r = ::pwrite(fd, bp.c_str(), bp.length(), 0);
272
 
  if (r < 0) {
273
 
    dout(0) << "create write header error " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
274
 
    return -errno;
 
327
  bp = prepare_header();
 
328
  if (TEMP_FAILURE_RETRY(::pwrite(fd, bp.c_str(), bp.length(), 0)) < 0) {
 
329
    ret = errno;
 
330
    derr << "FileJournal::create : create write header error "
 
331
         << cpp_strerror(ret) << dendl;
 
332
    goto close_fd;
275
333
  }
276
334
 
277
335
  // zero first little bit, too.
278
 
  char z[block_size];
279
 
  memset(z, 0, block_size);
280
 
  ::pwrite(fd, z, block_size, get_top());
281
 
 
282
 
  ::close(fd);
283
 
  fd = -1;
 
336
  ret = posix_memalign(&buf, block_size, block_size);
 
337
  if (ret) {
 
338
    derr << "FileJournal::create: failed to allocate " << block_size
 
339
         << " bytes of memory: " << cpp_strerror(ret) << dendl;
 
340
    goto close_fd;
 
341
  }
 
342
  memset(buf, 0, block_size);
 
343
  if (TEMP_FAILURE_RETRY(::pwrite(fd, buf, block_size, get_top())) < 0) {
 
344
    ret = errno;
 
345
    derr << "FileJournal::create: error zeroing first " << block_size
 
346
         << " bytes " << cpp_strerror(ret) << dendl;
 
347
    goto free_buf;
 
348
  }
 
349
 
 
350
  needed_space = g_conf.osd_max_write_size << 20;
 
351
  needed_space += (2 * sizeof(entry_header_t)) + get_top();
 
352
  if (header.max_size - header.start < needed_space) {
 
353
    derr << "FileJournal::create: OSD journal is not large enough to hold "
 
354
         << "osd_max_write_size bytes!" << dendl;
 
355
    ret = -ENOSPC;
 
356
    goto free_buf;
 
357
  }
 
358
 
284
359
  dout(2) << "create done" << dendl;
285
 
  return 0;
 
360
  ret = 0;
 
361
 
 
362
free_buf:
 
363
  free(buf);
 
364
  buf = 0;
 
365
close_fd:
 
366
  if (TEMP_FAILURE_RETRY(::close(fd)) < 0) {
 
367
    ret = errno;
 
368
    derr << "FileJournal::create: error closing fd: " << cpp_strerror(ret)
 
369
         << dendl;
 
370
    goto done;
 
371
  }
 
372
done:
 
373
  fd = -1;
 
374
  return ret;
286
375
}
287
376
 
288
377
int FileJournal::open(uint64_t next_seq)
305
394
    //<< " vs expected fsid = " << fsid 
306
395
           << dendl;
307
396
  if (header.fsid != fsid) {
308
 
    dout(2) << "open fsid doesn't match, invalid (someone else's?) journal" << dendl;
309
 
    err = -EINVAL;
310
 
  } 
 
397
    derr << "FileJournal::open: open fsid doesn't match, invalid "
 
398
         << "(someone else's?) journal" << dendl;
 
399
    return -EINVAL;
 
400
  }
311
401
  if (header.max_size > max_size) {
312
402
    dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
313
 
    err = -EINVAL;
 
403
    return -EINVAL;
314
404
  }
315
405
  if (header.block_size != block_size) {
316
406
    dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
317
 
    err = -EINVAL;
 
407
    return -EINVAL;
318
408
  }
319
409
  if (header.max_size % header.block_size) {
320
410
    dout(2) << "open journal max size " << header.max_size
321
411
            << " not a multiple of block size " << header.block_size << dendl;
322
 
    err = -EINVAL;
 
412
    return -EINVAL;
323
413
  }
324
414
  if (header.alignment != block_size && directio) {
325
 
    derr(0) << "open journal alignment " << header.alignment << " does not match block size " 
 
415
    dout(0) << "open journal alignment " << header.alignment << " does not match block size " 
326
416
            << block_size << " (required for direct_io journal mode)" << dendl;
327
 
    err = -EINVAL;
 
417
    return -EINVAL;
328
418
  }
329
419
  if ((header.alignment % PAGE_SIZE) && directio) {
330
 
    derr(0) << "open journal alignment " << header.alignment << " is not multiple of page size " << PAGE_SIZE
 
420
    dout(0) << "open journal alignment " << header.alignment << " is not multiple of page size " << PAGE_SIZE
331
421
            << " (required for direct_io journal mode)" << dendl;
332
 
    err = -EINVAL;
 
422
    return -EINVAL;
333
423
  }
334
 
  if (err)
335
 
    return err;
336
424
 
337
425
  // looks like a valid header.
338
426
  write_pos = 0;  // not writeable yet
376
464
 
377
465
  // close
378
466
  assert(writeq.empty());
379
 
  assert(fd > 0);
380
 
  ::close(fd);
 
467
  assert(fd >= 0);
 
468
  TEMP_FAILURE_RETRY(::close(fd));
381
469
  fd = -1;
382
470
}
383
471
 
511
599
        // throw out what we have so far
512
600
        full_state = FULL_FULL;
513
601
        while (!writeq.empty()) {
514
 
          dout(30) << "XXX throttle put " << writeq.front().bl.length() << dendl;
515
 
          throttle_ops.put(1);
516
 
          throttle_bytes.put(writeq.front().bl.length());
 
602
          put_throttle(1, writeq.front().bl.length());
517
603
          writeq.pop_front();
518
604
        }  
519
605
        print_header();
633
719
  return 0;
634
720
}
635
721
 
636
 
void FileJournal::write_bl(off64_t& pos, bufferlist& bl)
 
722
int FileJournal::write_bl(off64_t& pos, bufferlist& bl)
637
723
{
638
724
  // make sure list segments are page aligned
639
725
  if (directio && (!bl.is_page_aligned() ||
647
733
  }
648
734
 
649
735
  ::lseek64(fd, pos, SEEK_SET);
650
 
  int err = bl.write_fd(fd);
651
 
  if (err) {
652
 
    char buf[80];
653
 
    derr(0) << "write_bl failed with " << err << " " << strerror_r(-err, buf, sizeof(buf)) 
654
 
            << dendl;
 
736
  int ret = bl.write_fd(fd);
 
737
  if (ret) {
 
738
    derr << "FileJournal::write_bl : write_fd failed: "
 
739
         << cpp_strerror(ret) << dendl;
 
740
    return ret;
655
741
  }
656
742
  pos += bl.length();
 
743
  return 0;
657
744
}
658
745
 
659
746
void FileJournal::do_write(bufferlist& bl)
694
781
    dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length()
695
782
             << " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl;
696
783
 
697
 
    write_bl(pos, first);
 
784
    if (write_bl(pos, first)) {
 
785
      derr << "FileJournal::do_write: write_bl(pos=" << pos
 
786
           << ") failed" << dendl;
 
787
      ceph_abort();
 
788
    }
698
789
    assert(pos == header.max_size);
699
790
    if (hbp.length()) {
700
791
      // be sneaky: include the header in the second fragment
702
793
      pos = 0;          // we included the header
703
794
    } else
704
795
      pos = get_top();  // no header, start after that
705
 
    write_bl(pos, second);
 
796
    if (write_bl(pos, second)) {
 
797
      derr << "FileJournal::do_write: write_bl(pos=" << pos
 
798
           << ") failed" << dendl;
 
799
      ceph_abort();
 
800
    }
706
801
  } else {
707
802
    // header too?
708
 
    if (hbp.length())
709
 
      ::pwrite(fd, hbp.c_str(), hbp.length(), 0);
 
803
    if (hbp.length()) {
 
804
      if (TEMP_FAILURE_RETRY(::pwrite(fd, hbp.c_str(), hbp.length(), 0)) < 0) {
 
805
        int err = errno;
 
806
        derr << "FileJournal::do_write: pwrite(fd=" << fd
 
807
             << ", hbp.length=" << hbp.length() << ") failed :"
 
808
             << cpp_strerror(err) << dendl;
 
809
        ceph_abort();
 
810
      }
 
811
    }
710
812
 
711
 
    write_bl(pos, bl);
 
813
    if (write_bl(pos, bl)) {
 
814
      derr << "FileJournal::do_write: write_bl(pos=" << pos
 
815
           << ") failed" << dendl;
 
816
      ceph_abort();
 
817
    }
712
818
  }
713
819
 
714
820
  if (!directio) {
768
874
{
769
875
  write_lock.Lock();
770
876
  while ((!writeq.empty() || writing) && !write_stop) {
771
 
    dout(10) << "flush waiting for writeq to empty and writes to complete" << dendl;
 
877
    dout(5) << "flush waiting for writeq to empty and writes to complete" << dendl;
772
878
    write_empty_cond.Wait(write_lock);
773
879
  }
774
880
  write_lock.Unlock();
775
 
  dout(10) << "flush waiting for finisher" << dendl;
 
881
  dout(5) << "flush waiting for finisher" << dendl;
776
882
  finisher->wait_for_empty();
777
 
  dout(10) << "flush done" << dendl;
 
883
  dout(5) << "flush done" << dendl;
778
884
}
779
885
 
780
886
 
808
914
    assert(r == 0);
809
915
    do_write(bl);
810
916
    
811
 
    dout(30) << "XXX throttle put " << orig_bytes << dendl;
812
 
    uint64_t new_ops = throttle_ops.put(orig_ops);
813
 
    uint64_t new_bytes = throttle_bytes.put(orig_bytes);
814
 
    dout(10) << "write_thread throttle finished " << orig_ops << " ops and " 
815
 
             << orig_bytes << " bytes, now "
816
 
             << new_ops << " ops and " << new_bytes << " bytes"
817
 
             << dendl;
 
917
    put_throttle(orig_ops, orig_bytes);
818
918
  }
819
919
  write_empty_cond.Signal();
820
920
  write_lock.Unlock();
827
927
  Mutex::Locker locker(write_lock);  // ** lock **
828
928
 
829
929
  // dump on queue
830
 
  dout(10) << "submit_entry seq " << seq
 
930
  dout(5) << "submit_entry seq " << seq
831
931
           << " len " << e.length()
832
932
           << " (" << oncommit << ")" << dendl;
833
933
 
840
940
    throttle_ops.take(1);
841
941
    throttle_bytes.take(e.length());
842
942
 
 
943
    if (logger) {
 
944
      logger->set(l_os_jq_max_ops, throttle_ops.get_max());
 
945
      logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
 
946
      logger->set(l_os_jq_ops, throttle_ops.get_current());
 
947
      logger->set(l_os_jq_bytes, throttle_bytes.get_current());
 
948
    }
 
949
 
843
950
    writeq.push_back(write_item(seq, e, alignment));
844
951
    write_cond.Signal();
845
952
  } else {
875
982
  Mutex::Locker locker(write_lock);
876
983
 
877
984
  if (seq < last_committed_seq) {
878
 
    dout(10) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
 
985
    dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
879
986
    assert(seq >= last_committed_seq);
880
987
    return;
881
988
  }
882
989
  if (seq == last_committed_seq) {
883
 
    dout(10) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl;
 
990
    dout(5) << "committed_thru " << seq << " == last_committed_seq " << last_committed_seq << dendl;
884
991
    return;
885
992
  }
886
993
 
887
 
  dout(10) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl;
 
994
  dout(5) << "committed_thru " << seq << " (last_committed_seq " << last_committed_seq << ")" << dendl;
888
995
  last_committed_seq = seq;
889
996
 
890
997
  // adjust start pointer
912
1019
    dout(15) << " dropping committed but unwritten seq " << writeq.front().seq 
913
1020
             << " len " << writeq.front().bl.length()
914
1021
             << dendl;
915
 
    dout(30) << "XXX throttle put " << writeq.front().bl.length() << dendl;
916
 
    throttle_ops.put(1);
917
 
    throttle_bytes.put(writeq.front().bl.length());
 
1022
    put_throttle(1, writeq.front().bl.length());
918
1023
    writeq.pop_front();  
919
1024
  }
920
1025
  
924
1029
}
925
1030
 
926
1031
 
 
1032
void FileJournal::put_throttle(uint64_t ops, uint64_t bytes)
 
1033
{
 
1034
  uint64_t new_ops = throttle_ops.put(ops);
 
1035
  uint64_t new_bytes = throttle_bytes.put(bytes);
 
1036
  dout(5) << "put_throttle finished " << ops << " ops and " 
 
1037
           << bytes << " bytes, now "
 
1038
           << new_ops << " ops and " << new_bytes << " bytes"
 
1039
           << dendl;
 
1040
 
 
1041
  if (logger) {
 
1042
    logger->inc(l_os_j_ops, ops);
 
1043
    logger->inc(l_os_j_bytes, bytes);
 
1044
    logger->set(l_os_jq_ops, new_ops);
 
1045
    logger->set(l_os_jq_bytes, new_bytes);
 
1046
    logger->set(l_os_jq_max_ops, throttle_ops.get_max());
 
1047
    logger->set(l_os_jq_max_bytes, throttle_bytes.get_max());
 
1048
  }
 
1049
}
 
1050
 
927
1051
void FileJournal::make_writeable()
928
1052
{
929
1053
  _open(true);
957
1081
#endif
958
1082
    
959
1083
    bufferptr bp = buffer::create(len);
960
 
    int r = ::read(fd, bp.c_str(), len);
961
 
    assert(r == len);
 
1084
    int r = safe_read_exact(fd, bp.c_str(), len);
 
1085
    if (r) {
 
1086
      derr << "FileJournal::wrap_read_bl: safe_read_exact returned "
 
1087
           << r << dendl;
 
1088
      ceph_abort();
 
1089
    }
962
1090
    bl.push_back(bp);
963
1091
    pos += len;
964
1092
    olen -= len;