1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
5
# Copyright 2011 Justin Santa Barbara
8
# Licensed under the Apache License, Version 2.0 (the "License"); you may
9
# not use this file except in compliance with the License. You may obtain
10
# a copy of the License at
12
# http://www.apache.org/licenses/LICENSE-2.0
14
# Unless required by applicable law or agreed to in writing, software
15
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17
# License for the specific language governing permissions and limitations
20
"""Utilities and helper functions."""
41
from xml.sax import saxutils
43
from eventlet import event
44
from eventlet import greenthread
45
from eventlet import semaphore
46
from eventlet.green import subprocess
48
from nova import exception
49
from nova import flags
50
from nova import log as logging
51
from nova import version
55
LOG = logging.getLogger("nova.utils")
56
ISO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
57
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
61
def import_class(import_str):
62
"""Returns a class from a string including module and class."""
63
mod_str, _sep, class_str = import_str.rpartition('.')
66
return getattr(sys.modules[mod_str], class_str)
67
except (ImportError, ValueError, AttributeError), exc:
68
LOG.debug(_('Inner Exception: %s'), exc)
69
raise exception.ClassNotFound(class_name=class_str)
72
def import_object(import_str):
73
"""Returns an object including a module or module and class."""
75
__import__(import_str)
76
return sys.modules[import_str]
78
cls = import_class(import_str)
82
def vpn_ping(address, port, timeout=0.05, session_id=None):
83
"""Sends a vpn negotiation packet and returns the server session.
85
Returns False on a failure. Basic packet structure is below.
87
Client packet (14 bytes)::
92
x = packet identifier 0x38
93
cli_id = 64 bit identifier
94
? = unknown, probably flags/padding
96
Server packet (26 bytes)::
98
+-+--------+-----+--------+----+
99
|x| srv_id |?????| cli_id |????|
100
+-+--------+-----+--------+----+
101
x = packet identifier 0x40
102
cli_id = 64 bit identifier
103
? = unknown, probably flags/padding
104
bit 9 was 1 and the rest were 0 in testing
107
if session_id is None:
108
session_id = random.randint(0, 0xffffffffffffffff)
109
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
110
data = struct.pack('!BQxxxxxx', 0x38, session_id)
111
sock.sendto(data, (address, port))
112
sock.settimeout(timeout)
114
received = sock.recv(2048)
115
except socket.timeout:
119
fmt = '!BQxxxxxQxxxx'
120
if len(received) != struct.calcsize(fmt):
121
print struct.calcsize(fmt)
123
(identifier, server_sess, client_sess) = struct.unpack(fmt, received)
124
if identifier == 0x40 and client_sess == session_id:
128
def fetchfile(url, target):
129
LOG.debug(_('Fetching %s') % url)
130
execute('curl', '--fail', url, '-o', target)
133
def execute(*cmd, **kwargs):
135
Helper method to execute command with optional retry.
137
:cmd Passed to subprocess.Popen.
138
:process_input Send to opened process.
139
:check_exit_code Defaults to 0. Raise exception.ProcessExecutionError
140
unless program exits with this code.
141
:delay_on_retry True | False. Defaults to True. If set to True, wait a
142
short amount of time before retrying.
143
:attempts How many times to retry cmd.
144
:run_as_root True | False. Defaults to False. If set to True,
145
the command is prefixed by the command specified
146
in the root_helper FLAG.
148
:raises exception.Error on receiving unknown arguments
149
:raises exception.ProcessExecutionError
152
process_input = kwargs.pop('process_input', None)
153
check_exit_code = kwargs.pop('check_exit_code', 0)
154
delay_on_retry = kwargs.pop('delay_on_retry', True)
155
attempts = kwargs.pop('attempts', 1)
156
run_as_root = kwargs.pop('run_as_root', False)
158
raise exception.Error(_('Got unknown keyword args '
159
'to utils.execute: %r') % kwargs)
162
cmd = shlex.split(FLAGS.root_helper) + list(cmd)
168
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
169
_PIPE = subprocess.PIPE # pylint: disable=E1101
170
obj = subprocess.Popen(cmd,
176
if process_input is not None:
177
result = obj.communicate(process_input)
179
result = obj.communicate()
180
obj.stdin.close() # pylint: disable=E1101
181
_returncode = obj.returncode # pylint: disable=E1101
183
LOG.debug(_('Result was %s') % _returncode)
184
if type(check_exit_code) == types.IntType \
185
and _returncode != check_exit_code:
186
(stdout, stderr) = result
187
raise exception.ProcessExecutionError(
188
exit_code=_returncode,
193
except exception.ProcessExecutionError:
197
LOG.debug(_('%r failed. Retrying.'), cmd)
199
greenthread.sleep(random.randint(20, 200) / 100.0)
201
# NOTE(termie): this appears to be necessary to let the subprocess
202
# call clean something up in between calls, without
203
# it two execute calls in a row hangs the second one
207
def ssh_execute(ssh, cmd, process_input=None,
208
addl_env=None, check_exit_code=True):
209
LOG.debug(_('Running cmd (SSH): %s'), ' '.join(cmd))
211
raise exception.Error(_('Environment not supported over SSH'))
214
# This is (probably) fixable if we need it...
215
raise exception.Error(_('process_input not supported over SSH'))
217
stdin_stream, stdout_stream, stderr_stream = ssh.exec_command(cmd)
218
channel = stdout_stream.channel
220
#stdin.write('process_input would go here')
223
# NOTE(justinsb): This seems suspicious...
224
# ...other SSH clients have buffering issues with this approach
225
stdout = stdout_stream.read()
226
stderr = stderr_stream.read()
229
exit_status = channel.recv_exit_status()
231
# exit_status == -1 if no exit code was returned
232
if exit_status != -1:
233
LOG.debug(_('Result was %s') % exit_status)
234
if check_exit_code and exit_status != 0:
235
raise exception.ProcessExecutionError(exit_code=exit_status,
240
return (stdout, stderr)
244
return os.path.join(os.path.dirname(__file__), s)
249
return os.path.abspath(nova.__file__).split('nova/__init__.py')[0]
252
def default_flagfile(filename='nova.conf', args=None):
256
if arg.find('flagfile') != -1:
259
if not os.path.isabs(filename):
260
# turn relative filename into an absolute path
261
script_dir = os.path.dirname(inspect.stack()[-1][1])
262
filename = os.path.abspath(os.path.join(script_dir, filename))
263
if not os.path.exists(filename):
264
filename = "./nova.conf"
265
if not os.path.exists(filename):
266
filename = '/etc/nova/nova.conf'
267
if os.path.exists(filename):
268
flagfile = '--flagfile=%s' % filename
269
args.insert(1, flagfile)
273
LOG.debug(_('debug in callback: %s'), arg)
277
def runthis(prompt, *cmd, **kwargs):
278
LOG.debug(_('Running %s'), (' '.join(cmd)))
279
rv, err = execute(*cmd, **kwargs)
282
def generate_uid(topic, size=8):
283
characters = '01234567890abcdefghijklmnopqrstuvwxyz'
284
choices = [random.choice(characters) for x in xrange(size)]
285
return '%s-%s' % (topic, ''.join(choices))
288
# Default symbols to use for passwords. Avoids visually confusing characters.
290
DEFAULT_PASSWORD_SYMBOLS = ('23456789' # Removed: 0,1
291
'ABCDEFGHJKLMNPQRSTUVWXYZ' # Removed: I, O
292
'abcdefghijkmnopqrstuvwxyz') # Removed: l
296
EASIER_PASSWORD_SYMBOLS = ('23456789' # Removed: 0, 1
297
'ABCDEFGHJKLMNPQRSTUVWXYZ') # Removed: I, O
300
def usage_from_instance(instance_ref, **kw):
302
project_id=instance_ref['project_id'],
303
user_id=instance_ref['user_id'],
304
instance_id=instance_ref['id'],
305
instance_type=instance_ref['instance_type']['name'],
306
instance_type_id=instance_ref['instance_type_id'],
307
display_name=instance_ref['display_name'],
308
created_at=str(instance_ref['created_at']),
309
launched_at=str(instance_ref['launched_at']) \
310
if instance_ref['launched_at'] else '',
311
image_ref=instance_ref['image_ref'])
312
usage_info.update(kw)
316
def generate_password(length=20, symbols=DEFAULT_PASSWORD_SYMBOLS):
317
"""Generate a random password from the supplied symbols.
319
Believed to be reasonably secure (with a reasonable password length!)
322
r = random.SystemRandom()
323
return ''.join([r.choice(symbols) for _i in xrange(length)])
326
def last_octet(address):
327
return int(address.split('.')[-1])
330
def get_my_linklocal(interface):
332
if_str = execute('ip', '-f', 'inet6', '-o', 'addr', 'show', interface)
333
condition = '\s+inet6\s+([0-9a-f:]+)/\d+\s+scope\s+link'
334
links = [re.search(condition, x) for x in if_str[0].split('\n')]
335
address = [w.group(1) for w in links if w is not None]
336
if address[0] is not None:
339
raise exception.Error(_('Link Local address is not found.:%s')
341
except Exception as ex:
342
raise exception.Error(_("Couldn't get Link Local IP of %(interface)s"
343
" :%(ex)s") % locals())
347
"""Overridable version of utils.utcnow."""
348
if utcnow.override_time:
349
return utcnow.override_time
350
return datetime.datetime.utcnow()
353
utcnow.override_time = None
356
def is_older_than(before, seconds):
357
"""Return True if before is older than seconds."""
358
return utcnow() - before > datetime.timedelta(seconds=seconds)
362
"""Timestamp version of our utcnow function."""
363
return time.mktime(utcnow().timetuple())
366
def set_time_override(override_time=datetime.datetime.utcnow()):
367
"""Override utils.utcnow to return a constant time."""
368
utcnow.override_time = override_time
371
def advance_time_delta(timedelta):
372
"""Advance overriden time using a datetime.timedelta."""
373
assert(not utcnow.override_time is None)
374
utcnow.override_time += timedelta
377
def advance_time_seconds(seconds):
378
"""Advance overriden time by seconds."""
379
advance_time_delta(datetime.timedelta(0, seconds))
382
def clear_time_override():
383
"""Remove the overridden time."""
384
utcnow.override_time = None
387
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
388
"""Returns formatted utcnow."""
391
return at.strftime(fmt)
394
def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
395
"""Turn a formatted time back into a datetime."""
396
return datetime.datetime.strptime(timestr, fmt)
399
def isotime(at=None):
400
"""Returns iso formatted utcnow."""
401
return strtime(at, ISO_TIME_FORMAT)
404
def parse_isotime(timestr):
405
"""Turn an iso formatted time back into a datetime."""
406
return parse_strtime(timestr, ISO_TIME_FORMAT)
409
def parse_mailmap(mailmap='.mailmap'):
411
if os.path.exists(mailmap):
412
fp = open(mailmap, 'r')
415
if not l.startswith('#') and ' ' in l:
416
canonical_email, alias = l.split(' ')
417
mapping[alias] = canonical_email
421
def str_dict_replace(s, mapping):
422
for s1, s2 in mapping.iteritems():
423
s = s.replace(s1, s2)
427
class LazyPluggable(object):
428
"""A pluggable backend loaded lazily based on some value."""
430
def __init__(self, pivot, **backends):
431
self.__backends = backends
433
self.__backend = None
435
def __get_backend(self):
436
if not self.__backend:
437
backend_name = self.__pivot.value
438
if backend_name not in self.__backends:
439
raise exception.Error(_('Invalid backend: %s') % backend_name)
441
backend = self.__backends[backend_name]
442
if type(backend) == type(tuple()):
444
fromlist = backend[1]
449
self.__backend = __import__(name, None, None, fromlist)
450
LOG.debug(_('backend %s'), self.__backend)
451
return self.__backend
453
def __getattr__(self, key):
454
backend = self.__get_backend()
455
return getattr(backend, key)
458
class LoopingCallDone(Exception):
459
"""Exception to break out and stop a LoopingCall.
461
The poll-function passed to LoopingCall can raise this exception to
462
break out of the loop normally. This is somewhat analogous to
465
An optional return-value can be included as the argument to the exception;
466
this return-value will be returned by LoopingCall.wait()
470
def __init__(self, retvalue=True):
471
""":param retvalue: Value that LoopingCall.wait() should return."""
472
self.retvalue = retvalue
475
class LoopingCall(object):
476
def __init__(self, f=None, *args, **kw):
480
self._running = False
482
def start(self, interval, now=True):
488
greenthread.sleep(interval)
491
self.f(*self.args, **self.kw)
492
if not self._running:
494
greenthread.sleep(interval)
495
except LoopingCallDone, e:
497
done.send(e.retvalue)
499
logging.exception('in looping call')
500
done.send_exception(*sys.exc_info())
507
greenthread.spawn(_inner)
511
self._running = False
514
return self.done.wait()
517
def xhtml_escape(value):
518
"""Escapes a string so it is valid within XML or XHTML.
520
Code is directly from the utf8 function in
521
http://github.com/facebook/tornado/blob/master/tornado/escape.py
524
return saxutils.escape(value, {'"': '"'})
528
"""Try to turn a string into utf-8 if possible.
530
Code is directly from the utf8 function in
531
http://github.com/facebook/tornado/blob/master/tornado/escape.py
534
if isinstance(value, unicode):
535
return value.encode('utf-8')
536
assert isinstance(value, str)
540
def to_primitive(value, convert_instances=False, level=0):
541
"""Convert a complex object into primitives.
543
Handy for JSON serialization. We can optionally handle instances,
544
but since this is a recursive function, we could have cyclical
547
To handle cyclical data structures we could track the actual objects
548
visited in a set, but not all objects are hashable. Instead we just
549
track the depth of the object inspections and don't go too deep.
551
Therefore, convert_instances=True is lossy ... be aware.
554
nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod,
555
inspect.isfunction, inspect.isgeneratorfunction,
556
inspect.isgenerator, inspect.istraceback, inspect.isframe,
557
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
561
return unicode(value)
566
# The try block may not be necessary after the class check above,
567
# but just in case ...
569
if type(value) is type([]) or type(value) is type((None,)):
572
o.append(to_primitive(v, convert_instances=convert_instances,
575
elif type(value) is type({}):
577
for k, v in value.iteritems():
578
o[k] = to_primitive(v, convert_instances=convert_instances,
581
elif isinstance(value, datetime.datetime):
583
elif hasattr(value, 'iteritems'):
584
return to_primitive(dict(value.iteritems()),
585
convert_instances=convert_instances,
587
elif hasattr(value, '__iter__'):
588
return to_primitive(list(value), level)
589
elif convert_instances and hasattr(value, '__dict__'):
590
# Likely an instance of something. Watch for cycles.
591
# Ignore class member vars.
592
return to_primitive(value.__dict__,
593
convert_instances=convert_instances,
598
# Class objects are tricky since they may define something like
599
# __iter__ defined but it isn't callable as list().
600
return unicode(value)
605
return json.dumps(value)
608
return json.dumps(to_primitive(value))
620
anyjson._modules.append(("nova.utils", "dumps", TypeError,
621
"loads", ValueError))
622
anyjson.force_implementation("nova.utils")
628
class _NoopContextManager(object):
632
def __exit__(self, exc_type, exc_val, exc_tb):
636
def synchronized(name, external=False):
637
"""Synchronization decorator.
639
Decorating a method like so:
640
@synchronized('mylock')
641
def foo(self, *args):
644
ensures that only one thread will execute the bar method at a time.
646
Different methods can share the same lock:
647
@synchronized('mylock')
648
def foo(self, *args):
651
@synchronized('mylock')
652
def bar(self, *args):
655
This way only one of either foo or bar can be executing at a time.
657
The external keyword argument denotes whether this lock should work across
658
multiple processes. This means that if two different workers both run a
659
a method decorated with @synchronized('mylock', external=True), only one
660
of them will execute at a time.
666
def inner(*args, **kwargs):
667
# NOTE(soren): If we ever go natively threaded, this will be racy.
668
# See http://stackoverflow.com/questions/5390569/dyn\
669
# amically-allocating-and-destroying-mutexes
670
if name not in _semaphores:
671
_semaphores[name] = semaphore.Semaphore()
672
sem = _semaphores[name]
673
LOG.debug(_('Attempting to grab semaphore "%(lock)s" for method '
674
'"%(method)s"...' % {'lock': name,
675
'method': f.__name__}))
678
LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
679
'method "%(method)s"...' %
680
{'lock': name, 'method': f.__name__}))
681
lock_file_path = os.path.join(FLAGS.lock_path,
682
'nova-%s.lock' % name)
683
lock = lockfile.FileLock(lock_file_path)
685
lock = _NoopContextManager()
688
retval = f(*args, **kwargs)
690
# If no-one else is waiting for it, delete it.
691
# See note about possible raciness above.
692
if not sem.balance < 1:
693
del _semaphores[name]
700
def get_from_path(items, path):
701
"""Returns a list of items matching the specified path.
703
Takes an XPath-like expression e.g. prop1/prop2/prop3, and for each item
704
in items, looks up items[prop1][prop2][prop3]. Like XPath, if any of the
705
intermediate results are lists it will treat each list item individually.
706
A 'None' in items or any child expressions will be ignored, this function
707
will not throw because of None (anywhere) in items. The returned list
708
will contain no None values.
712
raise exception.Error('Invalid mini_xpath')
714
(first_token, sep, remainder) = path.partition('/')
716
if first_token == '':
717
raise exception.Error('Invalid mini_xpath')
724
if not isinstance(items, types.ListType):
725
# Wrap single objects in a list
731
get_method = getattr(item, 'get', None)
732
if get_method is None:
734
child = get_method(first_token)
737
if isinstance(child, types.ListType):
738
# Flatten intermediate lists
742
results.append(child)
748
return get_from_path(results, remainder)
751
def flatten_dict(dict_, flattened=None):
752
"""Recursively flatten a nested dictionary."""
753
flattened = flattened or {}
754
for key, value in dict_.iteritems():
755
if hasattr(value, 'iteritems'):
756
flatten_dict(value, flattened)
758
flattened[key] = value
762
def partition_dict(dict_, keys):
763
"""Return two dicts, one with `keys` the other with everything else."""
766
for key, value in dict_.iteritems():
768
intersection[key] = value
770
difference[key] = value
771
return intersection, difference
774
def map_dict_keys(dict_, key_map):
775
"""Return a dict in which the dictionaries keys are mapped to new keys."""
777
for key, value in dict_.iteritems():
778
mapped_key = key_map[key] if key in key_map else key
779
mapped[mapped_key] = value
783
def subset_dict(dict_, keys):
784
"""Return a dict that only contains a subset of keys."""
785
subset = partition_dict(dict_, keys)[0]
789
def check_isinstance(obj, cls):
790
"""Checks that obj is of type cls, and lets PyLint infer types."""
791
if isinstance(obj, cls):
793
raise Exception(_('Expected object of type: %s') % (str(cls)))
794
# TODO(justinsb): Can we make this better??
795
return cls() # Ugly PyLint hack
798
def parse_server_string(server_str):
800
Parses the given server_string and returns a list of host and port.
801
If it's not a combination of host part and port, the port element
802
is a null string. If the input is invalid expression, return a null
806
# First of all, exclude pure IPv6 address (w/o port).
807
if netaddr.valid_ipv6(server_str):
808
return (server_str, '')
810
# Next, check if this is IPv6 address with a port number combination.
811
if server_str.find("]:") != -1:
812
(address, port) = server_str.replace('[', '', 1).split(']:')
813
return (address, port)
815
# Third, check if this is a combination of an address and a port
816
if server_str.find(':') == -1:
817
return (server_str, '')
819
# This must be a combination of an address and a port
820
(address, port) = server_str.split(':')
821
return (address, port)
824
LOG.debug(_('Invalid server_string: %s' % server_str))
832
def is_uuid_like(val):
833
"""For our purposes, a UUID is a string in canoical form:
835
aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
837
if not isinstance(val, basestring):
839
return (len(val) == 36) and (val.count('-') == 4)
842
def bool_from_str(val):
843
"""Convert a string representation of a bool into a bool value"""
848
return True if int(val) else False
850
return val.lower() == 'true'
853
def is_valid_ipv4(address):
854
"""valid the address strictly as per format xxx.xxx.xxx.xxx.
855
where xxx is a value between 0 and 255.
857
parts = address.split(".")
862
if not 0 <= int(item) <= 255:
870
""" If the Flags.monkey_patch set as True,
871
this functuion patches a decorator
872
for all functions in specified modules.
873
You can set decorators for each modules
874
using FLAGS.monkey_patch_modules.
875
The format is "Module path:Decorator function".
876
Example: 'nova.api.ec2.cloud:nova.notifier.api.notify_decorator'
878
Parameters of the decorator is as follows.
879
(See nova.notifier.api.notify_decorator)
881
name - name of the function
882
function - object of the function
884
# If FLAGS.monkey_patch is not True, this function do nothing.
885
if not FLAGS.monkey_patch:
887
# Get list of modules and decorators
888
for module_and_decorator in FLAGS.monkey_patch_modules:
889
module, decorator_name = module_and_decorator.split(':')
890
# import decorator function
891
decorator = import_class(decorator_name)
893
# Retrieve module information using pyclbr
894
module_data = pyclbr.readmodule_ex(module)
895
for key in module_data.keys():
896
# set the decorator for the class methods
897
if isinstance(module_data[key], pyclbr.Class):
898
clz = import_class("%s.%s" % (module, key))
899
for method, func in inspect.getmembers(clz, inspect.ismethod):
900
setattr(clz, method,\
901
decorator("%s.%s.%s" % (module, key, method), func))
902
# set the decorator for the function
903
if isinstance(module_data[key], pyclbr.Function):
904
func = import_class("%s.%s" % (module, key))
905
setattr(sys.modules[module], key,\
906
decorator("%s.%s" % (module, key), func))
909
def convert_to_list_dict(lst, label):
910
"""Convert a value or list into a list of dicts"""
913
if not isinstance(lst, list):
915
return [{label: x} for x in lst]
917
class RingBuffer(object):
918
"""Generic userspace on-disk ringbuffer implementation."""
919
_header_max_int = (2 ** (struct.calcsize('I') * BITS_PER_BYTE)) - 1
920
_header_format = 'II'
921
_header_size = struct.calcsize(_header_format)
923
def __init__(self, backing_file, max_size=65536):
924
# We need one extra byte as the buffer is kept with
925
# one byte free to avoid the head==tail full/empty
929
if not 0 < max_size <= RingBuffer._header_max_int:
930
raise ValueError(_('RingBuffer size out of range'))
931
had_already_existed = os.path.exists(backing_file)
932
self.f = self._open(backing_file)
933
if had_already_existed:
934
file_size = os.fstat(self.f.fileno()).st_size
936
current_size = file_size - self._header_size
937
if not 0 < current_size <= RingBuffer._header_max_int:
939
raise ValueError(_('Disk RingBuffer size out of range'))
940
self.max_size = current_size
942
# If the file doesn't contain a header, assume it is corrupt
944
if file_size < self._header_size:
945
self._write_header(0, 0) # initialise to empty
947
# If head or tail point beyond the file then bomb out
948
head, tail = self._read_header()
949
if head >= current_size or tail >= current_size:
951
raise ValueError(_('RingBuffer %s is corrupt') %
954
# File is zero bytes: treat as new file
955
self.max_size = max_size
956
self._initialise_empty_file()
958
self.max_size = max_size
959
self._initialise_empty_file()
961
def _initialise_empty_file(self):
962
os.ftruncate(self.f.fileno(), self.max_size + self._header_size)
963
self._write_header(0, 0) # head == tail means no data
967
"""Open a file without truncating it for both reading and writing in
969
# Built-in open() cannot open in read/write mode without truncating.
970
fd = os.open(filename, os.O_RDWR | os.O_CREAT, 0666)
971
return os.fdopen(fd, 'w+')
973
def _read_header(self):
975
return struct.unpack(self._header_format,
976
self.f.read(self._header_size))
978
def _write_header(self, head, tail):
980
self.f.write(struct.pack(self._header_format, head, tail))
982
def _seek(self, pos):
983
"""Seek to pos in data (ignoring header)."""
984
self.f.seek(self._header_size + pos)
986
def _read_slice(self, start, end):
991
return self.f.read(end - start)
993
def _write_slice(self, pos, data):
998
"""Read the entire ringbuffer without consuming it."""
999
head, tail = self._read_header()
1002
before_wrap = self._read_slice(tail, self.max_size)
1003
after_wrap = self._read_slice(0, head)
1004
return before_wrap + after_wrap
1006
# Just from here to head
1007
return self._read_slice(tail, head)
1009
def write(self, data):
1010
"""Write some amount of data to the ringbuffer, discarding the oldest
1011
data as max_size is exceeded."""
1012
head, tail = self._read_header()
1014
# Amount of data to be written on this pass
1015
len_to_write = min(len(data), self.max_size - head)
1017
# Where head will be after this write
1018
new_head = head + len_to_write
1020
# In the next comparison, new_head may be self.max_size which is
1021
# logically the same point as tail == 0 and must still be within
1023
unwrapped_tail = tail if tail else self.max_size
1025
if head < unwrapped_tail <= new_head:
1026
# Write will go past tail so tail needs to be pushed back
1027
tail = new_head + 1 # one past head to indicate full
1028
tail %= self.max_size
1029
self._write_header(head, tail)
1032
self._write_slice(head, data[:len_to_write])
1033
data = data[len_to_write:] # data now left
1037
head %= self.max_size
1038
self._write_header(head, tail)