~nchohan/appscale/zk3.3.4

« back to all changes in this revision

Viewing changes to AppServer/google/appengine/api/files/records.py

  • Committer: Chris Bunch
  • Date: 2012-02-17 08:19:21 UTC
  • mfrom: (787.2.3 appscale-raj-merge)
  • Revision ID: cgb@cs.ucsb.edu-20120217081921-pakidyksaenlpzur
merged with main branch, gaining rabbitmq and upgrades for hbase, cassandra, and hypertable, as well as upgrading to gae 1.6.1 for python and go

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
#
 
3
# Copyright 2007 Google Inc.
 
4
#
 
5
# Licensed under the Apache License, Version 2.0 (the "License");
 
6
# you may not use this file except in compliance with the License.
 
7
# You may obtain a copy of the License at
 
8
#
 
9
#     http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
# Unless required by applicable law or agreed to in writing, software
 
12
# distributed under the License is distributed on an "AS IS" BASIS,
 
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
14
# See the License for the specific language governing permissions and
 
15
# limitations under the License.
 
16
#
 
17
 
 
18
 
 
19
 
 
20
 
 
21
"""Lightweight record format.
 
22
 
 
23
This format implements log file format from leveldb:
 
24
http://leveldb.googlecode.com/svn/trunk/doc/log_format.txt
 
25
 
 
26
Full specification of format follows in case leveldb decides to change it.
 
27
 
 
28
 
 
29
The log file contents are a sequence of 32KB blocks.  The only
 
30
exception is that the tail of the file may contain a partial block.
 
31
 
 
32
Each block consists of a sequence of records:
 
33
   block := record* trailer?
 
34
   record :=
 
35
      checksum: uint32  // masked crc32c of type and data[]
 
36
      length: uint16
 
37
      type: uint8       // One of FULL, FIRST, MIDDLE, LAST
 
38
      data: uint8[length]
 
39
 
 
40
A record never starts within the last six bytes of a block (since it
 
41
won't fit).  Any leftover bytes here form the trailer, which must
 
42
consist entirely of zero bytes and must be skipped by readers.
 
43
 
 
44
Aside: if exactly seven bytes are left in the current block, and a new
 
45
non-zero length record is added, the writer must emit a FIRST record
 
46
(which contains zero bytes of user data) to fill up the trailing seven
 
47
bytes of the block and then emit all of the user data in subsequent
 
48
blocks.
 
49
 
 
50
More types may be added in the future.  Some Readers may skip record
 
51
types they do not understand, others may report that some data was
 
52
skipped.
 
53
 
 
54
FULL == 1
 
55
FIRST == 2
 
56
MIDDLE == 3
 
57
LAST == 4
 
58
 
 
59
The FULL record contains the contents of an entire user record.
 
60
 
 
61
FIRST, MIDDLE, LAST are types used for user records that have been
 
62
split into multiple fragments (typically because of block boundaries).
 
63
FIRST is the type of the first fragment of a user record, LAST is the
 
64
type of the last fragment of a user record, and MID is the type of all
 
65
interior fragments of a user record.
 
66
 
 
67
Example: consider a sequence of user records:
 
68
   A: length 1000
 
69
   B: length 97270
 
70
   C: length 8000
 
71
A will be stored as a FULL record in the first block.
 
72
 
 
73
B will be split into three fragments: first fragment occupies the rest
 
74
of the first block, second fragment occupies the entirety of the
 
75
second block, and the third fragment occupies a prefix of the third
 
76
block.  This will leave six bytes free in the third block, which will
 
77
be left empty as the trailer.
 
78
 
 
79
C will be stored as a FULL record in the fourth block.
 
80
 
 
81
"""
 
82
 
 
83
 
 
84
import struct
 
85
 
 
86
import google
 
87
from google.appengine.api.files import crc32c
 
88
 
 
89
 
 
90
 
 
91
BLOCK_SIZE = 32 * 1024
 
92
 
 
93
 
 
94
HEADER_FORMAT = '<IHB'
 
95
 
 
96
 
 
97
HEADER_LENGTH = struct.calcsize(HEADER_FORMAT)
 
98
 
 
99
 
 
100
RECORD_TYPE_NONE = 0
 
101
 
 
102
 
 
103
RECORD_TYPE_FULL = 1
 
104
 
 
105
 
 
106
RECORD_TYPE_FIRST = 2
 
107
 
 
108
 
 
109
RECORD_TYPE_MIDDLE = 3
 
110
 
 
111
 
 
112
RECORD_TYPE_LAST = 4
 
113
 
 
114
 
 
115
class Error(Exception):
 
116
  """Base class for exceptions in this module."""
 
117
 
 
118
 
 
119
class InvalidRecordError(Exception):
 
120
  """Raised when invalid record encountered."""
 
121
 
 
122
 
 
123
class FileWriter(object):
 
124
  """Interface specification for writers to be used with records module."""
 
125
 
 
126
  def write(self, data):
 
127
    """Write data to the file.
 
128
 
 
129
    Args:
 
130
      data: byte array, string or iterable over bytes.
 
131
    """
 
132
    raise NotImplementedError()
 
133
 
 
134
 
 
135
class FileReader(object):
 
136
  """Interface specification for writers to be used with recordrecords module.
 
137
 
 
138
  FileReader defines a reader with position and efficient seek/position
 
139
  determining. All reads occur at current position.
 
140
  """
 
141
 
 
142
  def read(self, size):
 
143
    """Read data from file.
 
144
 
 
145
    Reads data from current position and advances position past the read data
 
146
    block.
 
147
 
 
148
    Args:
 
149
      size: number of bytes to read.
 
150
    Returns:
 
151
      iterable over bytes. If number of bytes read is less then 'size' argument,
 
152
      it is assumed that end of file was reached.
 
153
    """
 
154
    raise NotImplementedError()
 
155
 
 
156
  def tell(self):
 
157
    """Get current file position.
 
158
 
 
159
    Returns:
 
160
      current position as a byte offset in the file as integer.
 
161
    """
 
162
    raise NotImplementedError()
 
163
 
 
164
 
 
165
_CRC_MASK_DELTA = 0xa282ead8
 
166
 
 
167
def _mask_crc(crc):
 
168
  """Mask crc.
 
169
 
 
170
  Args:
 
171
    crc: integer crc.
 
172
  Returns:
 
173
    masked integer crc.
 
174
  """
 
175
  return (((crc >> 15) | (crc << 17)) + _CRC_MASK_DELTA) & 0xFFFFFFFFL
 
176
 
 
177
 
 
178
def _unmask_crc(masked_crc):
 
179
  """Unmask crc.
 
180
 
 
181
  Args:
 
182
    masked_crc: masked integer crc.
 
183
  Retruns:
 
184
    orignal crc.
 
185
  """
 
186
  rot = (masked_crc - _CRC_MASK_DELTA) & 0xFFFFFFFFL
 
187
  return ((rot >> 17) | (rot << 15)) & 0xFFFFFFFFL
 
188
 
 
189
 
 
190
class RecordsWriter(object):
 
191
  """A writer for records format.
 
192
 
 
193
  This writer should be used only inside with statement:
 
194
 
 
195
    with records.RecordsWriter(file) as writer:
 
196
      writer.write("record")
 
197
 
 
198
  RecordsWriter will pad last block with 0 when exiting with statement scope.
 
199
  """
 
200
 
 
201
  def __init__(self, writer, _pad_last_block=True):
 
202
    """Constructor.
 
203
 
 
204
    Args:
 
205
      writer: a writer to use. Should conform to FileWriter interface.
 
206
    """
 
207
    self.__writer = writer
 
208
    self.__position = 0
 
209
    self.__entered = False
 
210
    self.__pad_last_block = _pad_last_block
 
211
 
 
212
  def __write_record(self, record_type, data):
 
213
    """Write single physical record."""
 
214
    length = len(data)
 
215
 
 
216
    crc = crc32c.crc_update(crc32c.CRC_INIT, [record_type])
 
217
    crc = crc32c.crc_update(crc, data)
 
218
    crc = crc32c.crc_finalize(crc)
 
219
 
 
220
    self.__writer.write(
 
221
        struct.pack(HEADER_FORMAT, _mask_crc(crc), length, record_type))
 
222
    self.__writer.write(data)
 
223
    self.__position += HEADER_LENGTH + length
 
224
 
 
225
  def write(self, data):
 
226
    """Write single record.
 
227
 
 
228
    Args:
 
229
      data: record data to write as string, byte array or byte sequence.
 
230
    """
 
231
    if not self.__entered:
 
232
      raise Exception("RecordWriter should be used only with 'with' statement.")
 
233
    block_remaining = BLOCK_SIZE - self.__position % BLOCK_SIZE
 
234
 
 
235
    if block_remaining < HEADER_LENGTH:
 
236
 
 
237
      self.__writer.write('\x00' * block_remaining)
 
238
      self.__position += block_remaining
 
239
      block_remaining = BLOCK_SIZE
 
240
 
 
241
    if block_remaining < len(data) + HEADER_LENGTH:
 
242
      first_chunk = data[:block_remaining - HEADER_LENGTH]
 
243
      self.__write_record(RECORD_TYPE_FIRST, first_chunk)
 
244
      data = data[len(first_chunk):]
 
245
 
 
246
      while True:
 
247
        block_remaining = BLOCK_SIZE - self.__position % BLOCK_SIZE
 
248
        if block_remaining >= len(data) + HEADER_LENGTH:
 
249
          self.__write_record(RECORD_TYPE_LAST, data)
 
250
          break
 
251
        else:
 
252
          chunk = data[:block_remaining - HEADER_LENGTH]
 
253
          self.__write_record(RECORD_TYPE_MIDDLE, chunk)
 
254
          data = data[len(chunk):]
 
255
    else:
 
256
      self.__write_record(RECORD_TYPE_FULL, data)
 
257
 
 
258
  def __enter__(self):
 
259
    self.__entered = True
 
260
    return self
 
261
 
 
262
  def __exit__(self, atype, value, traceback):
 
263
    self.close()
 
264
 
 
265
  def close(self):
 
266
    if self.__pad_last_block:
 
267
      pad_length = BLOCK_SIZE - self.__position % BLOCK_SIZE
 
268
      if pad_length and pad_length != BLOCK_SIZE:
 
269
        self.__writer.write('\x00' * pad_length)
 
270
 
 
271
 
 
272
class RecordsReader(object):
 
273
  """A reader for records format."""
 
274
 
 
275
  def __init__(self, reader):
 
276
    self.__reader = reader
 
277
 
 
278
  def __try_read_record(self):
 
279
    """Try reading a record.
 
280
 
 
281
    Returns:
 
282
      (data, record_type) tuple.
 
283
    Raises:
 
284
      EOFError: when end of file was reached.
 
285
      InvalidRecordError: when valid record could not be read.
 
286
    """
 
287
    block_remaining = BLOCK_SIZE - self.__reader.tell() % BLOCK_SIZE
 
288
    if block_remaining < HEADER_LENGTH:
 
289
      return ('', RECORD_TYPE_NONE)
 
290
 
 
291
    header = self.__reader.read(HEADER_LENGTH)
 
292
    if len(header) != HEADER_LENGTH:
 
293
      raise EOFError('Read %s bytes instead of %s' %
 
294
                     (len(header), HEADER_LENGTH))
 
295
 
 
296
    (masked_crc, length, record_type) = struct.unpack(HEADER_FORMAT, header)
 
297
    crc = _unmask_crc(masked_crc)
 
298
 
 
299
    if length + HEADER_LENGTH > block_remaining:
 
300
 
 
301
      raise InvalidRecordError('Length is too big')
 
302
 
 
303
    data = self.__reader.read(length)
 
304
    if len(data) != length:
 
305
      raise EOFError('Not enough data read. Expected: %s but got %s' %
 
306
                     (length, len(data)))
 
307
 
 
308
    if record_type == RECORD_TYPE_NONE:
 
309
      return ('', record_type)
 
310
 
 
311
    actual_crc = crc32c.crc_update(crc32c.CRC_INIT, [record_type])
 
312
    actual_crc = crc32c.crc_update(actual_crc, data)
 
313
    actual_crc = crc32c.crc_finalize(actual_crc)
 
314
 
 
315
    if actual_crc != crc:
 
316
      raise InvalidRecordError('Data crc does not match')
 
317
    return (data, record_type)
 
318
 
 
319
  def __sync(self):
 
320
    """Skip reader to the block boundary."""
 
321
    pad_length = BLOCK_SIZE - self.__reader.tell() % BLOCK_SIZE
 
322
    if pad_length and pad_length != BLOCK_SIZE:
 
323
      self.__reader.read(pad_length)
 
324
 
 
325
  def read(self):
 
326
    """Reads record from current position in reader."""
 
327
    data = None
 
328
    while True:
 
329
      try:
 
330
        (chunk, record_type) = self.__try_read_record()
 
331
        if record_type == RECORD_TYPE_NONE:
 
332
          self.__sync()
 
333
        elif record_type == RECORD_TYPE_FULL:
 
334
          return chunk
 
335
        elif record_type == RECORD_TYPE_FIRST:
 
336
          if data is not None:
 
337
            raise InvalidRecordError()
 
338
          data = chunk
 
339
        elif record_type == RECORD_TYPE_MIDDLE:
 
340
          if data is None:
 
341
            raise InvalidRecordError()
 
342
          data += chunk
 
343
        elif record_type == RECORD_TYPE_LAST:
 
344
          if data is None:
 
345
            raise InvalidRecordError()
 
346
          result = data + chunk
 
347
          data = None
 
348
          return result
 
349
        else:
 
350
          raise InvalidRecordError('Unsupported record type: %s' %
 
351
                                   (record_type))
 
352
      except InvalidRecordError:
 
353
        self.__sync()
 
354
 
 
355
  def __iter__(self):
 
356
    try:
 
357
      while True:
 
358
        yield self.read()
 
359
    except EOFError:
 
360
      pass
 
361
 
 
362
  def tell(self):
 
363
    """Return file's current position."""
 
364
    return self.__reader.tell()
 
365
 
 
366
  def seek(self, *args, **kwargs):
 
367
    """Set the file's current position.
 
368
 
 
369
    Arguments are passed directly to the underlying reader.
 
370
    """
 
371
    return self.__reader.seek(*args, **kwargs)