~ubuntu-branches/ubuntu/precise/fibranet/precise

« back to all changes in this revision

Viewing changes to gherkin.py

  • Committer: Bazaar Package Importer
  • Author(s): Varun Hiremath
  • Date: 2006-10-27 01:02:56 UTC
  • Revision ID: james.westby@ubuntu.com-20061027010256-n265y24bk98s9lbe
Tags: upstream-10
ImportĀ upstreamĀ versionĀ 10

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#Copyright (c) 2006 Simon Wittber
 
2
#
 
3
#Permission is hereby granted, free of charge, to any person
 
4
#obtaining a copy of this software and associated documentation files
 
5
#(the "Software"), to deal in the Software without restriction,
 
6
#including without limitation the rights to use, copy, modify, merge,
 
7
#publish, distribute, sublicense, and/or sell copies of the Software,
 
8
#and to permit persons to whom the Software is furnished to do so,
 
9
#subject to the following conditions:
 
10
#
 
11
#The above copyright notice and this permission notice shall be
 
12
#included in all copies or substantial portions of the Software.
 
13
#
 
14
#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 
15
#EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 
16
#MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 
17
#NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 
18
#BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 
19
#ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 
20
#CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 
21
#SOFTWARE.
 
22
 
 
23
"""
 
24
 
 
25
Gherkin provides safe serialization for simple python types.
 
26
 
 
27
"""
 
28
 
 
29
from types import IntType,TupleType,StringType,FloatType,LongType,ListType,DictType,NoneType,BooleanType,UnicodeType
 
30
 
 
31
from struct import pack, unpack
 
32
from cStringIO import StringIO
 
33
 
 
34
SIZEOF_INT = 4
 
35
SIZEOF_FLOAT = 8
 
36
UNICODE_CODEC = 'utf-8'
 
37
 
 
38
def memoize(func):
 
39
    cache = {}
 
40
    def check_memo(*args):
 
41
        if args in cache:
 
42
            return cache[args]
 
43
        else:
 
44
            return cache.setdefault(args, func(*args))
 
45
    return check_memo
 
46
 
 
47
class Gherkin(object):
 
48
    def __init__(self):
 
49
        self.strings = {}
 
50
        self.header = 'GHE'
 
51
        self.version = 1
 
52
        self.protocol = {
 
53
            TupleType:"T",
 
54
            ListType:"L",
 
55
            DictType:"D",
 
56
            LongType:"B",
 
57
            IntType:"I",
 
58
            FloatType:"F",
 
59
            StringType:"S",
 
60
            NoneType:"N",
 
61
            BooleanType:"b",
 
62
            UnicodeType:"U",
 
63
            'HomogenousList':'H',
 
64
            'HomogenousTuple':'h'
 
65
        }
 
66
 
 
67
        self.int_size = SIZEOF_INT
 
68
        self.float_size = SIZEOF_FLOAT
 
69
 
 
70
        self.encoder = {
 
71
            DictType:self.enc_dict_type,
 
72
            ListType:self.enc_list_type,
 
73
            TupleType:self.enc_list_type,
 
74
            IntType:memoize(self.enc_int_type),
 
75
            FloatType:memoize(self.enc_float_type),
 
76
            LongType:memoize(self.enc_long_type),
 
77
            UnicodeType:memoize(self.enc_unicode_type),
 
78
            StringType:memoize(self.enc_string_type),
 
79
            NoneType:self.enc_none_type,
 
80
            BooleanType:memoize(self.enc_bool_type)
 
81
        }
 
82
 
 
83
        self.decoder = {
 
84
            self.protocol[TupleType]:self.dec_tuple_type,
 
85
            self.protocol[ListType]:self.dec_list_type,
 
86
            self.protocol[DictType]:self.dec_dict_type,
 
87
            self.protocol[LongType]:self.dec_long_type,
 
88
            self.protocol[StringType]:self.dec_string_type,
 
89
            self.protocol[FloatType]:self.dec_float_type,
 
90
            self.protocol[IntType]:self.dec_int_type,
 
91
            self.protocol[NoneType]:self.dec_none_type,
 
92
            self.protocol[BooleanType]:self.dec_bool_type,
 
93
            self.protocol[UnicodeType]:self.dec_unicode_type,
 
94
            self.protocol['HomogenousList']:self.dec_homogenous_list_type,
 
95
            self.protocol['HomogenousTuple']:self.dec_homogenous_tuple_type
 
96
                
 
97
        }
 
98
 
 
99
    def enc_dict_type(self, obj):
 
100
        data = "".join([self.encoder[type(i)](i) for i in obj.items()])
 
101
        return "%s%s%s" % (self.protocol[DictType], pack("!L", len(data)), data)
 
102
 
 
103
    def enc_list_type(self, obj):
 
104
        if len(set([type(i) for i in obj])) == 1:
 
105
            return self.enc_homogenous_list_type(obj)
 
106
        data = "".join([self.encoder[type(i)](i) for i in obj])
 
107
        return "%s%s%s" % (self.protocol[type(obj)], pack("!L", len(data)), data)
 
108
        
 
109
    def enc_homogenous_list_type(self, obj):
 
110
        data = "".join([self.encoder[type(i)](i)[1:] for i in obj])
 
111
        if type(obj) == type([]):
 
112
            prefix = self.protocol['HomogenousList']
 
113
        else:
 
114
            prefix = self.protocol['HomogenousTuple']
 
115
            
 
116
        return "%s%s%s%s" % (prefix, self.protocol[type(obj[0])], pack("!L", len(data)), data)
 
117
 
 
118
    def enc_int_type(self, obj):
 
119
        return "%s%s" % (self.protocol[IntType], pack("!i", obj))
 
120
 
 
121
    def enc_float_type(self, obj):
 
122
        return "%s%s" % (self.protocol[FloatType], pack("!d", obj))
 
123
 
 
124
    def enc_long_type(self, obj):
 
125
        obj = hex(obj)
 
126
        if obj[0] == "-":
 
127
            pre = "-"
 
128
            obj = obj[3:-1]
 
129
        else:
 
130
            pre = "+"
 
131
            obj = obj[2:-1]
 
132
        return "%s%s%s%s" % (self.protocol[LongType], pre, pack("!L", len(obj)), obj)
 
133
 
 
134
    def enc_unicode_type(self, obj):
 
135
        obj = obj.encode(UNICODE_CODEC)
 
136
        return "%s%s%s" % (self.protocol[UnicodeType], pack("!L", len(obj)), obj)
 
137
 
 
138
    def enc_string_type(self, obj):
 
139
        return "%s%s%s" % (self.protocol[StringType], pack("!L", len(obj)), obj)
 
140
 
 
141
    def enc_none_type(self, obj):
 
142
        return self.protocol[NoneType]
 
143
 
 
144
    def enc_bool_type(self, obj):
 
145
        return self.protocol[BooleanType] + str(int(obj))
 
146
 
 
147
    def dumps(self, obj):
 
148
        """
 
149
        Return the string that would be written to a file by dump(value, file). The value must be a supported type. Raise a ValueError exception if value has (or contains an object that has) an unsupported type.
 
150
        """
 
151
        options = "".join((hex(self.version)[2:],hex(SIZEOF_INT)[2:],hex(SIZEOF_FLOAT)[2:]))
 
152
        assert len(options) == 3
 
153
        try:
 
154
            data = self.encoder[type(obj)](obj)
 
155
        except KeyError, e:
 
156
            raise ValueError, "Type not supported. (%s)" % e
 
157
        header = "".join((self.header, options))
 
158
        assert len(header) == 6
 
159
        return "".join((header, data))
 
160
 
 
161
    def dump(self, obj, file):
 
162
        """
 
163
        Write the value on the open file. The value must be a supported type. The file must be an open file object such as sys.stdout or returned by open() or posix.popen(). It must be opened in binary mode ('wb' or 'w+b').
 
164
        If the value has (or contains an object that has) an unsupported type, a ValueError exception is raised
 
165
        """
 
166
        return file.write(self.dumps(obj))
 
167
    
 
168
    def build_sequence(self, data, cast=list):
 
169
        size = unpack('!L', data.read(SIZEOF_INT))[0]
 
170
        items = []
 
171
        data_tell = data.tell
 
172
        items_append = items.append
 
173
        self_decoder = self.decoder
 
174
        data_read = data.read
 
175
        start_position = data.tell()
 
176
        while (data_tell() - start_position) < size:
 
177
            T = data_read(1)
 
178
            value = self_decoder[T](data)
 
179
            items_append(value)
 
180
        return cast(items)
 
181
 
 
182
    def build_homogenous_sequence(self, data, cast=list):
 
183
        T = data.read(1)
 
184
        size = unpack('!L', data.read(SIZEOF_INT))[0]
 
185
        items = []
 
186
        data_tell = data.tell
 
187
        items_append = items.append
 
188
        self_decoder = self.decoder
 
189
        data_read = data.read
 
190
        start_position = data.tell()
 
191
        while (data_tell() - start_position) < size:
 
192
            value = self_decoder[T](data)
 
193
            items_append(value)
 
194
        return cast(items)
 
195
 
 
196
    def dec_tuple_type(self, data):
 
197
        return self.build_sequence(data, cast=tuple)
 
198
 
 
199
    def dec_list_type(self, data):
 
200
        return self.build_sequence(data, cast=list)
 
201
 
 
202
    def dec_homogenous_list_type(self, data):
 
203
        return self.build_homogenous_sequence(data, cast=list)
 
204
    
 
205
    def dec_homogenous_tuple_type(self, data):
 
206
        return self.build_homogenous_sequence(data, cast=tuple)
 
207
 
 
208
    def dec_dict_type(self, data):
 
209
        return self.build_sequence(data, cast=dict)
 
210
 
 
211
    def dec_long_type(self, data):
 
212
        pre = data.read(1)
 
213
        size = unpack('!L', data.read(self.int_size))[0]
 
214
        value = long(data.read(size),16)
 
215
        if pre == "-": value = -value
 
216
        return value
 
217
 
 
218
    def dec_string_type(self, data):
 
219
        size = unpack('!L', data.read(self.int_size))[0]
 
220
        value = str(data.read(size))
 
221
        return value
 
222
 
 
223
    def dec_float_type(self, data):
 
224
        value = unpack('!d', data.read(self.float_size))[0]
 
225
        return value
 
226
 
 
227
    def dec_int_type(self, data):
 
228
        value = unpack('!i', data.read(self.int_size))[0]
 
229
        return value
 
230
 
 
231
    def dec_none_type(self, data):
 
232
        return None
 
233
 
 
234
    def dec_bool_type(self, data):
 
235
        value = int(data.read(1))
 
236
        return bool(value)
 
237
 
 
238
    def dec_unicode_type(self, data):
 
239
        size = unpack('!L', data.read(self.int_size))[0]
 
240
        value = data.read(size).decode(UNICODE_CODEC)
 
241
        return value
 
242
 
 
243
    def loads(self, data):
 
244
        """
 
245
        Convert the string to a value. If no valid value is found, raise EOFError, ValueError or TypeError. Extra characters in the string are ignored.
 
246
        """
 
247
        self.strings = {}
 
248
        buffer = StringIO(data)
 
249
        header = buffer.read(len(self.header))
 
250
        assert header == self.header
 
251
        assert self.version <= int(buffer.read(1), 10)
 
252
        self.int_size = int(buffer.read(1), 10)
 
253
        self.float_size = int(buffer.read(1), 10)
 
254
        try:
 
255
            value = self.decoder[buffer.read(1)](buffer)
 
256
        except KeyError, e:
 
257
            raise ValueError, "Type prefix not supported. (%s)" % (e)
 
258
        return value
 
259
 
 
260
    def load(self, file):
 
261
        """
 
262
        Read one value from the open file and return it. If no valid value is read, raise EOFError, ValueError or TypeError. The file must be an open file object opened in binary mode ('rb' or 'r+b').
 
263
        """
 
264
        return self.loads(file.read())
 
265
 
 
266
 
 
267
__gherk = Gherkin()
 
268
dumps = __gherk.dumps
 
269
loads = __gherk.loads
 
270
dump = __gherk.dump
 
271
load = __gherk.load
 
272