~ubuntu-branches/ubuntu/quantal/ceph/quantal

« back to all changes in this revision

Viewing changes to src/leveldb/util/env_posix.cc

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2012-07-16 09:56:24 UTC
  • mfrom: (0.3.11)
  • mto: This revision was merged to the branch mainline in revision 17.
  • Revision ID: package-import@ubuntu.com-20120716095624-azr2w4hbhei1rxmx
Tags: upstream-0.48
ImportĀ upstreamĀ versionĀ 0.48

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2
 
// Use of this source code is governed by a BSD-style license that can be
3
 
// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
 
 
5
 
#include <deque>
6
 
#include <dirent.h>
7
 
#include <errno.h>
8
 
#include <fcntl.h>
9
 
#include <pthread.h>
10
 
#include <stdio.h>
11
 
#include <stdlib.h>
12
 
#include <string.h>
13
 
#include <sys/mman.h>
14
 
#include <sys/stat.h>
15
 
#include <sys/time.h>
16
 
#include <sys/types.h>
17
 
#include <time.h>
18
 
#include <unistd.h>
19
 
#if defined(LEVELDB_PLATFORM_ANDROID)
20
 
#include <sys/stat.h>
21
 
#endif
22
 
#include "leveldb/env.h"
23
 
#include "leveldb/slice.h"
24
 
#include "port/port.h"
25
 
#include "util/logging.h"
26
 
#include "util/posix_logger.h"
27
 
 
28
 
namespace leveldb {
29
 
 
30
 
namespace {
31
 
 
32
 
static Status IOError(const std::string& context, int err_number) {
33
 
  return Status::IOError(context, strerror(err_number));
34
 
}
35
 
 
36
 
class PosixSequentialFile: public SequentialFile {
37
 
 private:
38
 
  std::string filename_;
39
 
  FILE* file_;
40
 
 
41
 
 public:
42
 
  PosixSequentialFile(const std::string& fname, FILE* f)
43
 
      : filename_(fname), file_(f) { }
44
 
  virtual ~PosixSequentialFile() { fclose(file_); }
45
 
 
46
 
  virtual Status Read(size_t n, Slice* result, char* scratch) {
47
 
    Status s;
48
 
    size_t r = fread_unlocked(scratch, 1, n, file_);
49
 
    *result = Slice(scratch, r);
50
 
    if (r < n) {
51
 
      if (feof(file_)) {
52
 
        // We leave status as ok if we hit the end of the file
53
 
      } else {
54
 
        // A partial read with an error: return a non-ok status
55
 
        s = IOError(filename_, errno);
56
 
      }
57
 
    }
58
 
    return s;
59
 
  }
60
 
 
61
 
  virtual Status Skip(uint64_t n) {
62
 
    if (fseek(file_, n, SEEK_CUR)) {
63
 
      return IOError(filename_, errno);
64
 
    }
65
 
    return Status::OK();
66
 
  }
67
 
};
68
 
 
69
 
class PosixRandomAccessFile: public RandomAccessFile {
70
 
 private:
71
 
  std::string filename_;
72
 
  int fd_;
73
 
 
74
 
 public:
75
 
  PosixRandomAccessFile(const std::string& fname, int fd)
76
 
      : filename_(fname), fd_(fd) { }
77
 
  virtual ~PosixRandomAccessFile() { close(fd_); }
78
 
 
79
 
  virtual Status Read(uint64_t offset, size_t n, Slice* result,
80
 
                      char* scratch) const {
81
 
    Status s;
82
 
    ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
83
 
    *result = Slice(scratch, (r < 0) ? 0 : r);
84
 
    if (r < 0) {
85
 
      // An error: return a non-ok status
86
 
      s = IOError(filename_, errno);
87
 
    }
88
 
    return s;
89
 
  }
90
 
};
91
 
 
92
 
// We preallocate up to an extra megabyte and use memcpy to append new
93
 
// data to the file.  This is safe since we either properly close the
94
 
// file before reading from it, or for log files, the reading code
95
 
// knows enough to skip zero suffixes.
96
 
class PosixMmapFile : public WritableFile {
97
 
 private:
98
 
  std::string filename_;
99
 
  int fd_;
100
 
  size_t page_size_;
101
 
  size_t map_size_;       // How much extra memory to map at a time
102
 
  char* base_;            // The mapped region
103
 
  char* limit_;           // Limit of the mapped region
104
 
  char* dst_;             // Where to write next  (in range [base_,limit_])
105
 
  char* last_sync_;       // Where have we synced up to
106
 
  uint64_t file_offset_;  // Offset of base_ in file
107
 
 
108
 
  // Have we done an munmap of unsynced data?
109
 
  bool pending_sync_;
110
 
 
111
 
  // Roundup x to a multiple of y
112
 
  static size_t Roundup(size_t x, size_t y) {
113
 
    return ((x + y - 1) / y) * y;
114
 
  }
115
 
 
116
 
  size_t TruncateToPageBoundary(size_t s) {
117
 
    s -= (s & (page_size_ - 1));
118
 
    assert((s % page_size_) == 0);
119
 
    return s;
120
 
  }
121
 
 
122
 
  bool UnmapCurrentRegion() {
123
 
    bool result = true;
124
 
    if (base_ != NULL) {
125
 
      if (last_sync_ < limit_) {
126
 
        // Defer syncing this data until next Sync() call, if any
127
 
        pending_sync_ = true;
128
 
      }
129
 
      if (munmap(base_, limit_ - base_) != 0) {
130
 
        result = false;
131
 
      }
132
 
      file_offset_ += limit_ - base_;
133
 
      base_ = NULL;
134
 
      limit_ = NULL;
135
 
      last_sync_ = NULL;
136
 
      dst_ = NULL;
137
 
 
138
 
      // Increase the amount we map the next time, but capped at 1MB
139
 
      if (map_size_ < (1<<20)) {
140
 
        map_size_ *= 2;
141
 
      }
142
 
    }
143
 
    return result;
144
 
  }
145
 
 
146
 
  bool MapNewRegion() {
147
 
    assert(base_ == NULL);
148
 
    if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
149
 
      return false;
150
 
    }
151
 
    void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
152
 
                     fd_, file_offset_);
153
 
    if (ptr == MAP_FAILED) {
154
 
      return false;
155
 
    }
156
 
    base_ = reinterpret_cast<char*>(ptr);
157
 
    limit_ = base_ + map_size_;
158
 
    dst_ = base_;
159
 
    last_sync_ = base_;
160
 
    return true;
161
 
  }
162
 
 
163
 
 public:
164
 
  PosixMmapFile(const std::string& fname, int fd, size_t page_size)
165
 
      : filename_(fname),
166
 
        fd_(fd),
167
 
        page_size_(page_size),
168
 
        map_size_(Roundup(65536, page_size)),
169
 
        base_(NULL),
170
 
        limit_(NULL),
171
 
        dst_(NULL),
172
 
        last_sync_(NULL),
173
 
        file_offset_(0),
174
 
        pending_sync_(false) {
175
 
    assert((page_size & (page_size - 1)) == 0);
176
 
  }
177
 
 
178
 
 
179
 
  ~PosixMmapFile() {
180
 
    if (fd_ >= 0) {
181
 
      PosixMmapFile::Close();
182
 
    }
183
 
  }
184
 
 
185
 
  virtual Status Append(const Slice& data) {
186
 
    const char* src = data.data();
187
 
    size_t left = data.size();
188
 
    while (left > 0) {
189
 
      assert(base_ <= dst_);
190
 
      assert(dst_ <= limit_);
191
 
      size_t avail = limit_ - dst_;
192
 
      if (avail == 0) {
193
 
        if (!UnmapCurrentRegion() ||
194
 
            !MapNewRegion()) {
195
 
          return IOError(filename_, errno);
196
 
        }
197
 
      }
198
 
 
199
 
      size_t n = (left <= avail) ? left : avail;
200
 
      memcpy(dst_, src, n);
201
 
      dst_ += n;
202
 
      src += n;
203
 
      left -= n;
204
 
    }
205
 
    return Status::OK();
206
 
  }
207
 
 
208
 
  virtual Status Close() {
209
 
    Status s;
210
 
    size_t unused = limit_ - dst_;
211
 
    if (!UnmapCurrentRegion()) {
212
 
      s = IOError(filename_, errno);
213
 
    } else if (unused > 0) {
214
 
      // Trim the extra space at the end of the file
215
 
      if (ftruncate(fd_, file_offset_ - unused) < 0) {
216
 
        s = IOError(filename_, errno);
217
 
      }
218
 
    }
219
 
 
220
 
    if (close(fd_) < 0) {
221
 
      if (s.ok()) {
222
 
        s = IOError(filename_, errno);
223
 
      }
224
 
    }
225
 
 
226
 
    fd_ = -1;
227
 
    base_ = NULL;
228
 
    limit_ = NULL;
229
 
    return s;
230
 
  }
231
 
 
232
 
  virtual Status Flush() {
233
 
    return Status::OK();
234
 
  }
235
 
 
236
 
  virtual Status Sync() {
237
 
    Status s;
238
 
 
239
 
    if (pending_sync_) {
240
 
      // Some unmapped data was not synced
241
 
      pending_sync_ = false;
242
 
      if (fdatasync(fd_) < 0) {
243
 
        s = IOError(filename_, errno);
244
 
      }
245
 
    }
246
 
 
247
 
    if (dst_ > last_sync_) {
248
 
      // Find the beginnings of the pages that contain the first and last
249
 
      // bytes to be synced.
250
 
      size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
251
 
      size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
252
 
      last_sync_ = dst_;
253
 
      if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
254
 
        s = IOError(filename_, errno);
255
 
      }
256
 
    }
257
 
 
258
 
    return s;
259
 
  }
260
 
};
261
 
 
262
 
static int LockOrUnlock(int fd, bool lock) {
263
 
  errno = 0;
264
 
  struct flock f;
265
 
  memset(&f, 0, sizeof(f));
266
 
  f.l_type = (lock ? F_WRLCK : F_UNLCK);
267
 
  f.l_whence = SEEK_SET;
268
 
  f.l_start = 0;
269
 
  f.l_len = 0;        // Lock/unlock entire file
270
 
  return fcntl(fd, F_SETLK, &f);
271
 
}
272
 
 
273
 
class PosixFileLock : public FileLock {
274
 
 public:
275
 
  int fd_;
276
 
};
277
 
 
278
 
class PosixEnv : public Env {
279
 
 public:
280
 
  PosixEnv();
281
 
  virtual ~PosixEnv() {
282
 
    fprintf(stderr, "Destroying Env::Default()\n");
283
 
    exit(1);
284
 
  }
285
 
 
286
 
  virtual Status NewSequentialFile(const std::string& fname,
287
 
                                   SequentialFile** result) {
288
 
    FILE* f = fopen(fname.c_str(), "r");
289
 
    if (f == NULL) {
290
 
      *result = NULL;
291
 
      return IOError(fname, errno);
292
 
    } else {
293
 
      *result = new PosixSequentialFile(fname, f);
294
 
      return Status::OK();
295
 
    }
296
 
  }
297
 
 
298
 
  virtual Status NewRandomAccessFile(const std::string& fname,
299
 
                                     RandomAccessFile** result) {
300
 
    int fd = open(fname.c_str(), O_RDONLY);
301
 
    if (fd < 0) {
302
 
      *result = NULL;
303
 
      return IOError(fname, errno);
304
 
    }
305
 
    *result = new PosixRandomAccessFile(fname, fd);
306
 
    return Status::OK();
307
 
  }
308
 
 
309
 
  virtual Status NewWritableFile(const std::string& fname,
310
 
                                 WritableFile** result) {
311
 
    Status s;
312
 
    const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
313
 
    if (fd < 0) {
314
 
      *result = NULL;
315
 
      s = IOError(fname, errno);
316
 
    } else {
317
 
      *result = new PosixMmapFile(fname, fd, page_size_);
318
 
    }
319
 
    return s;
320
 
  }
321
 
 
322
 
  virtual bool FileExists(const std::string& fname) {
323
 
    return access(fname.c_str(), F_OK) == 0;
324
 
  }
325
 
 
326
 
  virtual Status GetChildren(const std::string& dir,
327
 
                             std::vector<std::string>* result) {
328
 
    result->clear();
329
 
    DIR* d = opendir(dir.c_str());
330
 
    if (d == NULL) {
331
 
      return IOError(dir, errno);
332
 
    }
333
 
    struct dirent* entry;
334
 
    while ((entry = readdir(d)) != NULL) {
335
 
      result->push_back(entry->d_name);
336
 
    }
337
 
    closedir(d);
338
 
    return Status::OK();
339
 
  }
340
 
 
341
 
  virtual Status DeleteFile(const std::string& fname) {
342
 
    Status result;
343
 
    if (unlink(fname.c_str()) != 0) {
344
 
      result = IOError(fname, errno);
345
 
    }
346
 
    return result;
347
 
  };
348
 
 
349
 
  virtual Status CreateDir(const std::string& name) {
350
 
    Status result;
351
 
    if (mkdir(name.c_str(), 0755) != 0) {
352
 
      result = IOError(name, errno);
353
 
    }
354
 
    return result;
355
 
  };
356
 
 
357
 
  virtual Status DeleteDir(const std::string& name) {
358
 
    Status result;
359
 
    if (rmdir(name.c_str()) != 0) {
360
 
      result = IOError(name, errno);
361
 
    }
362
 
    return result;
363
 
  };
364
 
 
365
 
  virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
366
 
    Status s;
367
 
    struct stat sbuf;
368
 
    if (stat(fname.c_str(), &sbuf) != 0) {
369
 
      *size = 0;
370
 
      s = IOError(fname, errno);
371
 
    } else {
372
 
      *size = sbuf.st_size;
373
 
    }
374
 
    return s;
375
 
  }
376
 
 
377
 
  virtual Status RenameFile(const std::string& src, const std::string& target) {
378
 
    Status result;
379
 
    if (rename(src.c_str(), target.c_str()) != 0) {
380
 
      result = IOError(src, errno);
381
 
    }
382
 
    return result;
383
 
  }
384
 
 
385
 
  virtual Status LockFile(const std::string& fname, FileLock** lock) {
386
 
    *lock = NULL;
387
 
    Status result;
388
 
    int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
389
 
    if (fd < 0) {
390
 
      result = IOError(fname, errno);
391
 
    } else if (LockOrUnlock(fd, true) == -1) {
392
 
      result = IOError("lock " + fname, errno);
393
 
      close(fd);
394
 
    } else {
395
 
      PosixFileLock* my_lock = new PosixFileLock;
396
 
      my_lock->fd_ = fd;
397
 
      *lock = my_lock;
398
 
    }
399
 
    return result;
400
 
  }
401
 
 
402
 
  virtual Status UnlockFile(FileLock* lock) {
403
 
    PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
404
 
    Status result;
405
 
    if (LockOrUnlock(my_lock->fd_, false) == -1) {
406
 
      result = IOError("unlock", errno);
407
 
    }
408
 
    close(my_lock->fd_);
409
 
    delete my_lock;
410
 
    return result;
411
 
  }
412
 
 
413
 
  virtual void Schedule(void (*function)(void*), void* arg);
414
 
 
415
 
  virtual void StartThread(void (*function)(void* arg), void* arg);
416
 
 
417
 
  virtual Status GetTestDirectory(std::string* result) {
418
 
    const char* env = getenv("TEST_TMPDIR");
419
 
    if (env && env[0] != '\0') {
420
 
      *result = env;
421
 
    } else {
422
 
      char buf[100];
423
 
      snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
424
 
      *result = buf;
425
 
    }
426
 
    // Directory may already exist
427
 
    CreateDir(*result);
428
 
    return Status::OK();
429
 
  }
430
 
 
431
 
  static uint64_t gettid() {
432
 
    pthread_t tid = pthread_self();
433
 
    uint64_t thread_id = 0;
434
 
    memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
435
 
    return thread_id;
436
 
  }
437
 
 
438
 
  virtual Status NewLogger(const std::string& fname, Logger** result) {
439
 
    FILE* f = fopen(fname.c_str(), "w");
440
 
    if (f == NULL) {
441
 
      *result = NULL;
442
 
      return IOError(fname, errno);
443
 
    } else {
444
 
      *result = new PosixLogger(f, &PosixEnv::gettid);
445
 
      return Status::OK();
446
 
    }
447
 
  }
448
 
 
449
 
  virtual uint64_t NowMicros() {
450
 
    struct timeval tv;
451
 
    gettimeofday(&tv, NULL);
452
 
    return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
453
 
  }
454
 
 
455
 
  virtual void SleepForMicroseconds(int micros) {
456
 
    usleep(micros);
457
 
  }
458
 
 
459
 
 private:
460
 
  void PthreadCall(const char* label, int result) {
461
 
    if (result != 0) {
462
 
      fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
463
 
      exit(1);
464
 
    }
465
 
  }
466
 
 
467
 
  // BGThread() is the body of the background thread
468
 
  void BGThread();
469
 
  static void* BGThreadWrapper(void* arg) {
470
 
    reinterpret_cast<PosixEnv*>(arg)->BGThread();
471
 
    return NULL;
472
 
  }
473
 
 
474
 
  size_t page_size_;
475
 
  pthread_mutex_t mu_;
476
 
  pthread_cond_t bgsignal_;
477
 
  pthread_t bgthread_;
478
 
  bool started_bgthread_;
479
 
 
480
 
  // Entry per Schedule() call
481
 
  struct BGItem { void* arg; void (*function)(void*); };
482
 
  typedef std::deque<BGItem> BGQueue;
483
 
  BGQueue queue_;
484
 
};
485
 
 
486
 
PosixEnv::PosixEnv() : page_size_(getpagesize()),
487
 
                       started_bgthread_(false) {
488
 
  PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
489
 
  PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
490
 
}
491
 
 
492
 
void PosixEnv::Schedule(void (*function)(void*), void* arg) {
493
 
  PthreadCall("lock", pthread_mutex_lock(&mu_));
494
 
 
495
 
  // Start background thread if necessary
496
 
  if (!started_bgthread_) {
497
 
    started_bgthread_ = true;
498
 
    PthreadCall(
499
 
        "create thread",
500
 
        pthread_create(&bgthread_, NULL,  &PosixEnv::BGThreadWrapper, this));
501
 
  }
502
 
 
503
 
  // If the queue is currently empty, the background thread may currently be
504
 
  // waiting.
505
 
  if (queue_.empty()) {
506
 
    PthreadCall("signal", pthread_cond_signal(&bgsignal_));
507
 
  }
508
 
 
509
 
  // Add to priority queue
510
 
  queue_.push_back(BGItem());
511
 
  queue_.back().function = function;
512
 
  queue_.back().arg = arg;
513
 
 
514
 
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
515
 
}
516
 
 
517
 
void PosixEnv::BGThread() {
518
 
  while (true) {
519
 
    // Wait until there is an item that is ready to run
520
 
    PthreadCall("lock", pthread_mutex_lock(&mu_));
521
 
    while (queue_.empty()) {
522
 
      PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
523
 
    }
524
 
 
525
 
    void (*function)(void*) = queue_.front().function;
526
 
    void* arg = queue_.front().arg;
527
 
    queue_.pop_front();
528
 
 
529
 
    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
530
 
    (*function)(arg);
531
 
  }
532
 
}
533
 
 
534
 
namespace {
535
 
struct StartThreadState {
536
 
  void (*user_function)(void*);
537
 
  void* arg;
538
 
};
539
 
}
540
 
static void* StartThreadWrapper(void* arg) {
541
 
  StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
542
 
  state->user_function(state->arg);
543
 
  delete state;
544
 
  return NULL;
545
 
}
546
 
 
547
 
void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
548
 
  pthread_t t;
549
 
  StartThreadState* state = new StartThreadState;
550
 
  state->user_function = function;
551
 
  state->arg = arg;
552
 
  PthreadCall("start thread",
553
 
              pthread_create(&t, NULL,  &StartThreadWrapper, state));
554
 
}
555
 
 
556
 
}  // namespace
557
 
 
558
 
static pthread_once_t once = PTHREAD_ONCE_INIT;
559
 
static Env* default_env;
560
 
static void InitDefaultEnv() { default_env = new PosixEnv; }
561
 
 
562
 
Env* Env::Default() {
563
 
  pthread_once(&once, InitDefaultEnv);
564
 
  return default_env;
565
 
}
566
 
 
567
 
}  // namespace leveldb