~divmod-dev/divmod.org/trunk

« back to all changes in this revision

Viewing changes to Epsilon/epsilon/juice.py

  • Committer: Jean-Paul Calderone
  • Date: 2014-06-29 20:33:04 UTC
  • mfrom: (2749.1.1 remove-epsilon-1325289)
  • Revision ID: exarkun@twistedmatrix.com-20140629203304-gdkmbwl1suei4m97
mergeĀ lp:~exarkun/divmod.org/remove-epsilon-1325289

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# -*- test-case-name: epsilon.test.test_juice -*-
2
 
# Copyright 2005 Divmod, Inc.  See LICENSE file for details
3
 
 
4
 
__metaclass__ = type
5
 
 
6
 
import warnings, pprint
7
 
 
8
 
from twisted.internet.main import CONNECTION_LOST
9
 
from twisted.internet.defer import Deferred, maybeDeferred, fail
10
 
from twisted.internet.protocol import ServerFactory, ClientFactory
11
 
from twisted.internet.ssl import Certificate
12
 
from twisted.python.failure import Failure
13
 
from twisted.python import log, filepath
14
 
 
15
 
from epsilon.liner import LineReceiver
16
 
from epsilon import extime
17
 
 
18
 
ASK = '_ask'
19
 
ANSWER = '_answer'
20
 
COMMAND = '_command'
21
 
ERROR = '_error'
22
 
ERROR_CODE = '_error_code'
23
 
ERROR_DESCRIPTION = '_error_description'
24
 
LENGTH = '_length'
25
 
BODY = 'body'
26
 
 
27
 
debug = False
28
 
 
29
 
class JuiceBox(dict):
30
 
    """ I am a packet in the JUICE protocol.  """
31
 
 
32
 
    def __init__(self, __body='', **kw):
33
 
        self.update(kw)
34
 
        if __body:
35
 
            assert isinstance(__body, str), "body must be a string: %r" % ( repr(__body),)
36
 
            self['body'] = __body
37
 
 
38
 
    def body():
39
 
        def get(self):
40
 
            warnings.warn("body attribute of boxes is now just a regular field",
41
 
                          stacklevel=2)
42
 
            return self['body']
43
 
        def set(self, newbody):
44
 
            warnings.warn("body attribute of boxes is now just a regular field",
45
 
                          stacklevel=2)
46
 
            self['body'] = newbody
47
 
        return get,set
48
 
    body = property(*body())
49
 
 
50
 
    def copy(self):
51
 
        newBox = self.__class__()
52
 
        newBox.update(self)
53
 
        return newBox
54
 
 
55
 
    def serialize(self,
56
 
                  delimiter='\r\n',
57
 
                  escaped='\r\n '):
58
 
        assert LENGTH not in self
59
 
 
60
 
        L = []
61
 
        for (k, v) in self.iteritems():
62
 
            if k == BODY:
63
 
                k = LENGTH
64
 
                v = str(len(self[BODY]))
65
 
            L.append(k.replace('_', '-').title())
66
 
            L.append(': ')
67
 
            L.append(v.replace(delimiter, escaped))
68
 
            L.append(delimiter)
69
 
 
70
 
        L.append(delimiter)
71
 
        if BODY in self:
72
 
            L.append(self[BODY])
73
 
 
74
 
        bytes = ''.join(L)
75
 
        return bytes
76
 
 
77
 
    def sendTo(self, proto):
78
 
        """
79
 
        Serialize and send this box to a Juice instance.  By the time it is
80
 
        being sent, several keys are required.  I must have exactly ONE of::
81
 
 
82
 
            -ask
83
 
            -answer
84
 
            -error
85
 
 
86
 
        If the '-ask' header is set, then the '-command' header must also be
87
 
        set.
88
 
        """
89
 
        proto.sendPacket(self)
90
 
 
91
 
# juice.Box => JuiceBox
92
 
 
93
 
Box = JuiceBox
94
 
 
95
 
class TLSBox(JuiceBox):
96
 
    def __repr__(self):
97
 
        return 'TLS(**%s)' % (super(TLSBox, self).__repr__(),)
98
 
 
99
 
 
100
 
    def __init__(self, __certificate, __verify=None, __sslstarted=None, **kw):
101
 
        super(TLSBox, self).__init__(**kw)
102
 
        self.certificate = __certificate
103
 
        self.verify = __verify
104
 
        self.sslstarted = __sslstarted
105
 
 
106
 
    def sendTo(self, proto):
107
 
        super(TLSBox, self).sendTo(proto)
108
 
        if self.verify is None:
109
 
            proto.startTLS(self.certificate)
110
 
        else:
111
 
            proto.startTLS(self.certificate, self.verify)
112
 
        if self.sslstarted is not None:
113
 
            self.sslstarted()
114
 
 
115
 
class QuitBox(JuiceBox):
116
 
    def __repr__(self):
117
 
        return 'Quit(**%s)' % (super(QuitBox, self).__repr__(),)
118
 
 
119
 
 
120
 
    def sendTo(self, proto):
121
 
        super(QuitBox, self).sendTo(proto)
122
 
        proto.transport.loseConnection()
123
 
 
124
 
class _SwitchBox(JuiceBox):
125
 
    def __repr__(self):
126
 
        return 'Switch(**%s)' % (super(_SwitchBox, self).__repr__(),)
127
 
 
128
 
 
129
 
    def __init__(self, __proto, **kw):
130
 
        super(_SwitchBox, self).__init__(**kw)
131
 
        self.innerProto = __proto
132
 
 
133
 
    def sendTo(self, proto):
134
 
        super(_SwitchBox, self).sendTo(proto)
135
 
        proto._switchTo(self.innerProto)
136
 
 
137
 
 
138
 
 
139
 
class NegotiateBox(JuiceBox):
140
 
    def __repr__(self):
141
 
        return 'Negotiate(**%s)' % (super(NegotiateBox, self).__repr__(),)
142
 
 
143
 
 
144
 
    def sendTo(self, proto):
145
 
        super(NegotiateBox, self).sendTo(proto)
146
 
        proto._setProtocolVersion(int(self['version']))
147
 
 
148
 
 
149
 
 
150
 
class JuiceError(Exception):
151
 
    pass
152
 
 
153
 
class RemoteJuiceError(JuiceError):
154
 
    """
155
 
    This error indicates that something went wrong on the remote end of the
156
 
    connection, and the error was serialized and transmitted to you.
157
 
    """
158
 
    def __init__(self, errorCode, description, fatal=False):
159
 
        """Create a remote error with an error code and description.
160
 
        """
161
 
        Exception.__init__(self, "Remote[%s]: %s" % (errorCode, description))
162
 
        self.errorCode = errorCode
163
 
        self.description = description
164
 
        self.fatal = fatal
165
 
 
166
 
class UnhandledRemoteJuiceError(RemoteJuiceError):
167
 
    def __init__(self, description):
168
 
        errorCode = "UNHANDLED"
169
 
        RemoteJuiceError.__init__(self, errorCode, description)
170
 
 
171
 
class JuiceBoxError(JuiceError):
172
 
    pass
173
 
 
174
 
class MalformedJuiceBox(JuiceBoxError):
175
 
    pass
176
 
 
177
 
class UnhandledCommand(JuiceError):
178
 
    pass
179
 
 
180
 
 
181
 
class IncompatibleVersions(JuiceError):
182
 
    pass
183
 
 
184
 
class _Transactor:
185
 
    def __init__(self, store, callable):
186
 
        self.store = store
187
 
        self.callable = callable
188
 
 
189
 
    def __call__(self, box):
190
 
        return self.store.transact(self.callable, box)
191
 
 
192
 
    def __repr__(self):
193
 
        return '<Transaction in: %s of: %s>' % (self.store, self.callable)
194
 
 
195
 
class DispatchMixin:
196
 
    baseDispatchPrefix = 'juice_'
197
 
    autoDispatchPrefix = 'command_'
198
 
 
199
 
    wrapper = None
200
 
 
201
 
    def _auto(self, aCallable, proto, namespace=None):
202
 
        if aCallable is None:
203
 
            return None
204
 
        command = aCallable.command
205
 
        if namespace not in command.namespaces:
206
 
            # if you're in the wrong namespace, you are very likely not allowed
207
 
            # to invoke the command you are trying to invoke.  some objects
208
 
            # have commands exposed in a separate namespace for security
209
 
            # reasons, since the security model is a role : namespace mapping.
210
 
            log.msg('WRONG NAMESPACE: %r, %r' % (namespace, command.namespaces))
211
 
            return None
212
 
        def doit(box):
213
 
            kw = stringsToObjects(box, command.arguments, proto)
214
 
            for name, extraArg in command.extra:
215
 
                kw[name] = extraArg.fromTransport(proto.transport)
216
 
#             def checkIsDict(result):
217
 
#                 if not isinstance(result, dict):
218
 
#                     raise RuntimeError("%r returned %r, not dictionary" % (
219
 
#                             aCallable, result))
220
 
#                 return result
221
 
            def checkKnownErrors(error):
222
 
                key = error.trap(*command.allErrors)
223
 
                code = command.allErrors[key]
224
 
                desc = str(error.value)
225
 
                return Failure(RemoteJuiceError(
226
 
                        code, desc, error in command.fatalErrors))
227
 
            return maybeDeferred(aCallable, **kw).addCallback(
228
 
                command.makeResponse, proto).addErrback(
229
 
                checkKnownErrors)
230
 
        return doit
231
 
 
232
 
    def _wrap(self, aCallable):
233
 
        if aCallable is None:
234
 
            return None
235
 
        wrap = self.wrapper
236
 
        if wrap is not None:
237
 
            return wrap(aCallable)
238
 
        else:
239
 
            return aCallable
240
 
 
241
 
    def normalizeCommand(self, cmd):
242
 
        """Return the canonical form of a command.
243
 
        """
244
 
        return cmd.upper().strip().replace('-', '_')
245
 
 
246
 
    def lookupFunction(self, proto, name, namespace):
247
 
        """Return a callable to invoke when executing the named command.
248
 
        """
249
 
        # Try to find a method to be invoked in a transaction first
250
 
        # Otherwise fallback to a "regular" method
251
 
        fName = self.autoDispatchPrefix + name
252
 
        fObj = getattr(self, fName, None)
253
 
        if fObj is not None:
254
 
            # pass the namespace along
255
 
            return self._auto(fObj, proto, namespace)
256
 
 
257
 
        assert namespace is None, 'Old-style parsing'
258
 
        # Fall back to simplistic command dispatching - we probably want to get
259
 
        # rid of this eventually, there's no reason to do extra work and write
260
 
        # fewer docs all the time.
261
 
        fName = self.baseDispatchPrefix + name
262
 
        return getattr(self, fName, None)
263
 
 
264
 
    def dispatchCommand(self, proto, cmd, box, namespace=None):
265
 
        fObj = self.lookupFunction(proto, self.normalizeCommand(cmd), namespace)
266
 
        if fObj is None:
267
 
            return fail(UnhandledCommand(cmd))
268
 
        return maybeDeferred(self._wrap(fObj), box)
269
 
 
270
 
PYTHON_KEYWORDS = [
271
 
    'and', 'del', 'for', 'is', 'raise', 'assert', 'elif', 'from', 'lambda',
272
 
    'return', 'break', 'else', 'global', 'not', 'try', 'class', 'except',
273
 
    'if', 'or', 'while', 'continue', 'exec', 'import', 'pass', 'yield',
274
 
    'def', 'finally', 'in', 'print']
275
 
 
276
 
def normalizeKey(key):
277
 
    lkey = key.lower().replace('-', '_')
278
 
    if lkey in PYTHON_KEYWORDS:
279
 
        return lkey.title()
280
 
    return lkey
281
 
 
282
 
 
283
 
def parseJuiceHeaders(lines):
284
 
    """
285
 
    Create a JuiceBox from a list of header lines.
286
 
 
287
 
    @param lines: a list of lines.
288
 
    """
289
 
    b = JuiceBox()
290
 
    bodylen = 0
291
 
    key = None
292
 
    for L in lines:
293
 
        if L[0] == ' ':
294
 
            # continuation
295
 
            assert key is not None
296
 
            b[key] += '\r\n'+L[1:]
297
 
            continue
298
 
        parts = L.split(': ', 1)
299
 
        if len(parts) != 2:
300
 
            raise MalformedJuiceBox("Wrong number of parts: %r" % (L,))
301
 
        key, value = parts
302
 
        key = normalizeKey(key)
303
 
        b[key] = value
304
 
    return int(b.pop(LENGTH, 0)), b
305
 
 
306
 
class JuiceParserBase(DispatchMixin):
307
 
 
308
 
    def __init__(self):
309
 
        self._outstandingRequests = {}
310
 
 
311
 
    def _puke(self, failure):
312
 
        log.msg("Juice server or network failure "
313
 
                "unhandled by client application:")
314
 
        log.err(failure)
315
 
        log.msg(
316
 
            "Dropping connection!  "
317
 
            "To avoid, add errbacks to ALL remote commands!")
318
 
        if self.transport is not None:
319
 
            self.transport.loseConnection()
320
 
 
321
 
    _counter = 0L
322
 
 
323
 
    def _nextTag(self):
324
 
        self._counter += 1
325
 
        return '%x' % (self._counter,)
326
 
 
327
 
    def failAllOutgoing(self, reason):
328
 
        OR = self._outstandingRequests.items()
329
 
        self._outstandingRequests = None # we can never send another request
330
 
        for key, value in OR:
331
 
            value.errback(reason)
332
 
 
333
 
    def juiceBoxReceived(self, box):
334
 
        if debug:
335
 
            log.msg("Juice receive: %s" % pprint.pformat(dict(box.iteritems())))
336
 
 
337
 
        if ANSWER in box:
338
 
            question = self._outstandingRequests.pop(box[ANSWER])
339
 
            question.addErrback(self._puke)
340
 
            self._wrap(question.callback)(box)
341
 
        elif ERROR in box:
342
 
            question = self._outstandingRequests.pop(box[ERROR])
343
 
            question.addErrback(self._puke)
344
 
            self._wrap(question.errback)(
345
 
                Failure(RemoteJuiceError(box[ERROR_CODE],
346
 
                                         box[ERROR_DESCRIPTION])))
347
 
        elif COMMAND in box:
348
 
            cmd = box[COMMAND]
349
 
            def sendAnswer(answerBox):
350
 
                if ASK not in box:
351
 
                    return
352
 
                if self.transport is None:
353
 
                    return
354
 
                answerBox[ANSWER] = box[ASK]
355
 
                answerBox.sendTo(self)
356
 
            def sendError(error):
357
 
                if ASK not in box:
358
 
                    return error
359
 
                if error.check(RemoteJuiceError):
360
 
                    code = error.value.errorCode
361
 
                    desc = error.value.description
362
 
                    if error.value.fatal:
363
 
                        errorBox = QuitBox()
364
 
                    else:
365
 
                        errorBox = JuiceBox()
366
 
                else:
367
 
                    errorBox = QuitBox()
368
 
                    log.err(error) # here is where server-side logging happens
369
 
                                   # if the error isn't handled
370
 
                    code = 'UNHANDLED'
371
 
                    desc = "Unhandled Remote System Exception "
372
 
                errorBox[ERROR] = box[ASK]
373
 
                errorBox[ERROR_DESCRIPTION] = desc
374
 
                errorBox[ERROR_CODE] = code
375
 
                if self.transport is not None:
376
 
                    errorBox.sendTo(self)
377
 
                return None # intentionally stop the error here: don't log the
378
 
                            # traceback if it's handled, do log it (earlier) if
379
 
                            # it isn't
380
 
            self.dispatchCommand(self, cmd, box).addCallbacks(sendAnswer, sendError
381
 
                                                              ).addErrback(self._puke)
382
 
        else:
383
 
            raise RuntimeError(
384
 
                "Empty packet received over connection-oriented juice: %r" % (box,))
385
 
 
386
 
    def sendBoxCommand(self, command, box, requiresAnswer=True):
387
 
        """
388
 
        Send a command across the wire with the given C{juice.Box}.
389
 
 
390
 
        Returns a Deferred which fires with the response C{juice.Box} when it
391
 
        is received, or fails with a C{juice.RemoteJuiceError} if an error is
392
 
        received.
393
 
 
394
 
        If the Deferred fails and the error is not handled by the caller of
395
 
        this method, the failure will be logged and the connection dropped.
396
 
        """
397
 
        if self._outstandingRequests is None:
398
 
            return fail(CONNECTION_LOST)
399
 
        box[COMMAND] = command
400
 
        tag = self._nextTag()
401
 
        if requiresAnswer:
402
 
            box[ASK] = tag
403
 
            result = self._outstandingRequests[tag] = Deferred()
404
 
        else:
405
 
            result = None
406
 
        box.sendTo(self)
407
 
        return result
408
 
 
409
 
 
410
 
 
411
 
 
412
 
 
413
 
 
414
 
class Argument:
415
 
    optional = False
416
 
 
417
 
    def __init__(self, optional=False):
418
 
        self.optional = optional
419
 
 
420
 
    def retrieve(self, d, name):
421
 
        if self.optional:
422
 
            value = d.get(name)
423
 
            if value is not None:
424
 
                del d[name]
425
 
        else:
426
 
            value = d.pop(name)
427
 
        return value
428
 
 
429
 
    def fromBox(self, name, strings, objects, proto):
430
 
        st = self.retrieve(strings, name)
431
 
        if self.optional and st is None:
432
 
            objects[name] = None
433
 
        else:
434
 
            objects[name] = self.fromStringProto(st, proto)
435
 
 
436
 
    def toBox(self, name, strings, objects, proto):
437
 
        obj = self.retrieve(objects, name)
438
 
        if self.optional and obj is None:
439
 
            # strings[name] = None
440
 
            return
441
 
        else:
442
 
            strings[name] = self.toStringProto(obj, proto)
443
 
 
444
 
    def fromStringProto(self, inString, proto):
445
 
        return self.fromString(inString)
446
 
 
447
 
    def toStringProto(self, inObject, proto):
448
 
        return self.toString(inObject)
449
 
 
450
 
    def fromString(self, inString):
451
 
        raise NotImplementedError()
452
 
 
453
 
    def toString(self, inObject):
454
 
        raise NotImplementedError()
455
 
 
456
 
class JuiceList(Argument):
457
 
    def __init__(self, subargs):
458
 
        self.subargs = subargs
459
 
 
460
 
    def fromStringProto(self, inString, proto):
461
 
        boxes = parseString(inString)
462
 
        values = [stringsToObjects(box, self.subargs, proto)
463
 
                  for box in boxes]
464
 
        return values
465
 
 
466
 
    def toStringProto(self, inObject, proto):
467
 
        return ''.join([objectsToStrings(
468
 
                    objects, self.subargs, Box(), proto
469
 
                    ).serialize() for objects in inObject])
470
 
 
471
 
class ListOf(Argument):
472
 
    def __init__(self, subarg, delimiter=', '):
473
 
        self.subarg = subarg
474
 
        self.delimiter = delimiter
475
 
 
476
 
    def fromStringProto(self, inString, proto):
477
 
        strings = inString.split(self.delimiter)
478
 
        L = [self.subarg.fromStringProto(string, proto)
479
 
             for string in strings]
480
 
        return L
481
 
 
482
 
    def toStringProto(self, inObject, proto):
483
 
        L = []
484
 
        for inSingle in inObject:
485
 
            outString = self.subarg.toStringProto(inSingle, proto)
486
 
            assert self.delimiter not in outString
487
 
            L.append(outString)
488
 
        return self.delimiter.join(L)
489
 
 
490
 
class Integer(Argument):
491
 
    fromString = int
492
 
    def toString(self, inObject):
493
 
        return str(int(inObject))
494
 
 
495
 
class String(Argument):
496
 
    def toString(self, inObject):
497
 
        return inObject
498
 
    def fromString(self, inString):
499
 
        return inString
500
 
 
501
 
class EncodedString(Argument):
502
 
 
503
 
    def __init__(self, encoding):
504
 
        self.encoding = encoding
505
 
 
506
 
    def toString(self, inObject):
507
 
        return inObject.encode(self.encoding)
508
 
 
509
 
    def fromString(self, inString):
510
 
        return inString.decode(self.encoding)
511
 
 
512
 
# Temporary backwards compatibility for Exponent
513
 
 
514
 
Body = String
515
 
 
516
 
class Unicode(String):
517
 
    def toString(self, inObject):
518
 
        # assert isinstance(inObject, unicode)
519
 
        return String.toString(self, inObject.encode('utf-8'))
520
 
 
521
 
    def fromString(self, inString):
522
 
        # assert isinstance(inString, str)
523
 
        return String.fromString(self, inString).decode('utf-8')
524
 
 
525
 
class Path(Unicode):
526
 
    def fromString(self, inString):
527
 
        return filepath.FilePath(Unicode.fromString(self, inString))
528
 
 
529
 
    def toString(self, inObject):
530
 
        return Unicode.toString(self, inObject.path)
531
 
 
532
 
 
533
 
class Float(Argument):
534
 
    fromString = float
535
 
    toString = str
536
 
 
537
 
class Base64Binary(Argument):
538
 
    def toString(self, inObject):
539
 
        return inObject.encode('base64').replace('\n', '')
540
 
    def fromString(self, inString):
541
 
        return inString.decode('base64')
542
 
 
543
 
class Time(Argument):
544
 
    def toString(self, inObject):
545
 
        return inObject.asISO8601TimeAndDate()
546
 
    def fromString(self, inString):
547
 
        return extime.Time.fromISO8601TimeAndDate(inString)
548
 
 
549
 
class ExtraArg:
550
 
    def fromTransport(self, inTransport):
551
 
        raise NotImplementedError()
552
 
 
553
 
class Peer(ExtraArg):
554
 
    def fromTransport(self, inTransport):
555
 
        return inTransport.getQ2QPeer()
556
 
 
557
 
class PeerDomain(ExtraArg):
558
 
    def fromTransport(self, inTransport):
559
 
        return inTransport.getQ2QPeer().domain
560
 
 
561
 
class PeerUser(ExtraArg):
562
 
    def fromTransport(self, inTransport):
563
 
        return inTransport.getQ2QPeer().resource
564
 
 
565
 
class Host(ExtraArg):
566
 
    def fromTransport(self, inTransport):
567
 
        return inTransport.getQ2QHost()
568
 
 
569
 
class HostDomain(ExtraArg):
570
 
    def fromTransport(self, inTransport):
571
 
        return inTransport.getQ2QHost().domain
572
 
 
573
 
class HostUser(ExtraArg):
574
 
    def fromTransport(self, inTransport):
575
 
        return inTransport.getQ2QHost().resource
576
 
 
577
 
 
578
 
 
579
 
class Boolean(Argument):
580
 
    def fromString(self, inString):
581
 
        if inString == 'True':
582
 
            return True
583
 
        elif inString == 'False':
584
 
            return False
585
 
        else:
586
 
            raise RuntimeError("Bad boolean value: %r" % (inString,))
587
 
 
588
 
    def toString(self, inObject):
589
 
        if inObject:
590
 
            return 'True'
591
 
        else:
592
 
            return 'False'
593
 
 
594
 
class Command:
595
 
    class __metaclass__(type):
596
 
        def __new__(cls, name, bases, attrs):
597
 
            re = attrs['reverseErrors'] = {}
598
 
            er = attrs['allErrors'] = {}
599
 
            for v, k in attrs.get('errors',{}).iteritems():
600
 
                re[k] = v
601
 
                er[v] = k
602
 
            for v, k in attrs.get('fatalErrors',{}).iteritems():
603
 
                re[k] = v
604
 
                er[v] = k
605
 
            return type.__new__(cls, name, bases, attrs)
606
 
 
607
 
    arguments = []
608
 
    response = []
609
 
    extra = []
610
 
    namespaces = [None]         # This is set to [None] on purpose: None means
611
 
                                # "no namespace", not "empty list".  "empty
612
 
                                # list" will make your command invalid in _all_
613
 
                                # namespaces, effectively uncallable.
614
 
    errors = {}
615
 
    fatalErrors = {}
616
 
 
617
 
    commandType = Box
618
 
    responseType = Box
619
 
 
620
 
    def commandName():
621
 
        def get(self):
622
 
            return self.__class__.__name__
623
 
            raise NotImplementedError("Missing command name")
624
 
        return get,
625
 
    commandName = property(*commandName())
626
 
 
627
 
    def __init__(self, **kw):
628
 
        self.structured = kw
629
 
        givenArgs = [normalizeKey(k) for k in kw.keys()]
630
 
        forgotten = []
631
 
        for name, arg in self.arguments:
632
 
            if normalizeKey(name) not in givenArgs and not arg.optional:
633
 
                forgotten.append(normalizeKey(name))
634
 
#         for v in kw.itervalues():
635
 
#             if v is None:
636
 
#                 from pprint import pformat
637
 
#                 raise RuntimeError("ARGH: %s" % pformat(kw))
638
 
        if forgotten:
639
 
            if len(forgotten) == 1:
640
 
                plural = 'an argument'
641
 
            else:
642
 
                plural = 'some arguments'
643
 
            raise RuntimeError("You forgot %s to %r: %s" % (
644
 
                    plural, self.commandName, ', '.join(forgotten)))
645
 
        forgotten = []
646
 
 
647
 
    def makeResponse(cls, objects, proto):
648
 
        try:
649
 
            return objectsToStrings(objects, cls.response, cls.responseType(), proto)
650
 
        except:
651
 
            log.msg("Exception in %r.makeResponse" % (cls,))
652
 
            raise
653
 
    makeResponse = classmethod(makeResponse)
654
 
 
655
 
    def do(self, proto, namespace=None, requiresAnswer=True):
656
 
        if namespace is not None:
657
 
            cmd = namespace + ":" + self.commandName
658
 
        else:
659
 
            cmd = self.commandName
660
 
        def _massageError(error):
661
 
            error.trap(RemoteJuiceError)
662
 
            rje = error.value
663
 
            return Failure(self.reverseErrors.get(rje.errorCode, UnhandledRemoteJuiceError)(rje.description))
664
 
 
665
 
        d = proto.sendBoxCommand(
666
 
            cmd, objectsToStrings(self.structured, self.arguments, self.commandType(),
667
 
                                  proto),
668
 
            requiresAnswer)
669
 
 
670
 
        if requiresAnswer:
671
 
            d.addCallback(stringsToObjects, self.response, proto)
672
 
            d.addCallback(self.addExtra, proto.transport)
673
 
            d.addErrback(_massageError)
674
 
 
675
 
        return d
676
 
 
677
 
    def addExtra(self, d, transport):
678
 
        for name, extraArg in self.extra:
679
 
            d[name] = extraArg.fromTransport(transport)
680
 
        return d
681
 
 
682
 
 
683
 
class ProtocolSwitchCommand(Command):
684
 
    """Use this command to switch from something Juice-derived to a different
685
 
    protocol mid-connection.  This can be useful to use juice as the
686
 
    connection-startup negotiation phase.  Since TLS is a different layer
687
 
    entirely, you can use Juice to negotiate the security parameters of your
688
 
    connection, then switch to a different protocol, and the connection will
689
 
    remain secured.
690
 
    """
691
 
 
692
 
    def __init__(self, __protoToSwitchToFactory, **kw):
693
 
        self.protoToSwitchToFactory = __protoToSwitchToFactory
694
 
        super(ProtocolSwitchCommand, self).__init__(**kw)
695
 
 
696
 
    def makeResponse(cls, innerProto, proto):
697
 
        return _SwitchBox(innerProto)
698
 
 
699
 
    makeResponse = classmethod(makeResponse)
700
 
 
701
 
    def do(self, proto, namespace=None):
702
 
        d = super(ProtocolSwitchCommand, self).do(proto)
703
 
        proto._lock()
704
 
        def switchNow(ign):
705
 
            innerProto = self.protoToSwitchToFactory.buildProtocol(proto.transport.getPeer())
706
 
            proto._switchTo(innerProto, self.protoToSwitchToFactory)
707
 
            return ign
708
 
        def die(ign):
709
 
            proto.transport.loseConnection()
710
 
            return ign
711
 
        def handle(ign):
712
 
            self.protoToSwitchToFactory.clientConnectionFailed(None, Failure(CONNECTION_LOST))
713
 
            return ign
714
 
        return d.addCallbacks(switchNow, handle).addErrback(die)
715
 
 
716
 
class Negotiate(Command):
717
 
    commandName = 'Negotiate'
718
 
 
719
 
    arguments = [('versions', ListOf(Integer()))]
720
 
    response = [('version', Integer())]
721
 
 
722
 
    responseType = NegotiateBox
723
 
 
724
 
 
725
 
class Juice(LineReceiver, JuiceParserBase):
726
 
    """
727
 
    JUICE (JUice Is Concurrent Events) is a simple connection-oriented
728
 
    request/response protocol.  Packets, or "boxes", are collections of
729
 
    RFC2822-inspired headers, plus a body.  Note that this is NOT a literal
730
 
    interpretation of any existing RFC, 822, 2822 or otherwise, but a simpler
731
 
    version that does not do line continuations, does not specify any
732
 
    particular format for header values, dispatches semantic meanings of most
733
 
    headers on the -Command header rather than giving them global meaning, and
734
 
    allows multiple sets of headers (messages, or JuiceBoxes) on a connection.
735
 
 
736
 
    All headers whose names begin with a dash ('-') are reserved for use by the
737
 
    protocol.  All others are for application use - their meaning depends on
738
 
    the value of the "-Command" header.
739
 
    """
740
 
 
741
 
    protocolName = 'juice-base'
742
 
 
743
 
    hostCertificate = None
744
 
 
745
 
    MAX_LENGTH = 1024 * 1024
746
 
 
747
 
    isServer = property(lambda self: self._issueGreeting,
748
 
                        doc="""
749
 
                        True if this is a juice server, e.g. it is going to
750
 
                        issue or has issued a server greeting upon
751
 
                        connection.
752
 
                        """)
753
 
 
754
 
    isClient = property(lambda self: not self._issueGreeting,
755
 
                        doc="""
756
 
                        True if this is a juice server, e.g. it is not going to
757
 
                        issue or did not issue a server greeting upon
758
 
                        connection.
759
 
                        """)
760
 
 
761
 
    def __init__(self, issueGreeting):
762
 
        """
763
 
        @param issueGreeting: whether to issue a greeting when connected.  This
764
 
        should be set on server-side Juice protocols.
765
 
        """
766
 
        JuiceParserBase.__init__(self)
767
 
        self._issueGreeting = issueGreeting
768
 
 
769
 
    def __repr__(self):
770
 
        return '<%s %s/%s at 0x%x>' % (self.__class__.__name__, self.isClient and 'client' or 'server', self.innerProtocol, id(self))
771
 
 
772
 
    __locked = False
773
 
 
774
 
    def _lock(self):
775
 
        """ Lock this Juice instance so that no further Juice traffic may be sent.
776
 
        This is used when sending a request to switch underlying protocols.
777
 
        You probably want to subclass ProtocolSwitchCommand rather than calling
778
 
        this directly.
779
 
        """
780
 
        self.__locked = True
781
 
 
782
 
    innerProtocol = None
783
 
 
784
 
    def _switchTo(self, newProto, clientFactory=None):
785
 
        """ Switch this Juice instance to a new protocol.  You need to do this
786
 
        'simultaneously' on both ends of a connection; the easiest way to do
787
 
        this is to use a subclass of ProtocolSwitchCommand.
788
 
        """
789
 
 
790
 
        assert self.innerProtocol is None, "Protocol can only be safely switched once."
791
 
        self.setRawMode()
792
 
        self.innerProtocol = newProto
793
 
        self.innerProtocolClientFactory = clientFactory
794
 
        newProto.makeConnection(self.transport)
795
 
 
796
 
    innerProtocolClientFactory = None
797
 
 
798
 
    def juiceBoxReceived(self, box):
799
 
        if self.__locked and COMMAND in box and ASK in box:
800
 
            # This is a command which will trigger an answer, and we can no
801
 
            # longer answer anything, so don't bother delivering it.
802
 
            return
803
 
        return super(Juice, self).juiceBoxReceived(box)
804
 
 
805
 
    def sendPacket(self, completeBox):
806
 
        """
807
 
        Send a juice.Box to my peer.
808
 
 
809
 
        Note: transport.write is never called outside of this method.
810
 
        """
811
 
        assert not self.__locked, "You cannot send juice packets when a connection is locked"
812
 
        if self._startingTLSBuffer is not None:
813
 
            self._startingTLSBuffer.append(completeBox)
814
 
        else:
815
 
            if debug:
816
 
                log.msg("Juice send: %s" % pprint.pformat(dict(completeBox.iteritems())))
817
 
 
818
 
            self.transport.write(completeBox.serialize())
819
 
 
820
 
    def sendCommand(self, command, __content='', __answer=True, **kw):
821
 
        box = JuiceBox(__content, **kw)
822
 
        return self.sendBoxCommand(command, box, requiresAnswer=__answer)
823
 
 
824
 
    _outstandingRequests = None
825
 
    _justStartedTLS = False
826
 
 
827
 
    def makeConnection(self, transport):
828
 
        self._transportPeer = transport.getPeer()
829
 
        self._transportHost = transport.getHost()
830
 
        log.msg("%s %s connection established (HOST:%s PEER:%s)" % (self.isClient and "client" or "server",
831
 
                                                                    self.__class__.__name__,
832
 
                                                                    self._transportHost,
833
 
                                                                    self._transportPeer))
834
 
        self._outstandingRequests = {}
835
 
        self._requestBuffer = []
836
 
        LineReceiver.makeConnection(self, transport)
837
 
 
838
 
    _startingTLSBuffer = None
839
 
 
840
 
    def prepareTLS(self):
841
 
        self._startingTLSBuffer = []
842
 
 
843
 
    def startTLS(self, certificate, *verifyAuthorities):
844
 
        if self.hostCertificate is None:
845
 
            self.hostCertificate = certificate
846
 
            self._justStartedTLS = True
847
 
            self.transport.startTLS(certificate.options(*verifyAuthorities))
848
 
            stlsb = self._startingTLSBuffer
849
 
            if stlsb is not None:
850
 
                self._startingTLSBuffer = None
851
 
                for box in stlsb:
852
 
                    self.sendPacket(box)
853
 
        else:
854
 
            raise RuntimeError(
855
 
                "Previously authenticated connection between %s and %s "
856
 
                "is trying to re-establish as %s" % (
857
 
                    self.hostCertificate,
858
 
                    Certificate.peerFromTransport(self.transport),
859
 
                    (certificate, verifyAuthorities)))
860
 
 
861
 
    def dataReceived(self, data):
862
 
        # If we successfully receive any data after TLS has been started, that
863
 
        # means the connection was secured properly.  Make a note of that fact.
864
 
        if self._justStartedTLS:
865
 
            self._justStartedTLS = False
866
 
        return LineReceiver.dataReceived(self, data)
867
 
 
868
 
    def connectionLost(self, reason):
869
 
        log.msg("%s %s connection lost (HOST:%s PEER:%s)" % (
870
 
                self.isClient and 'client' or 'server',
871
 
                self.__class__.__name__,
872
 
                self._transportHost,
873
 
                self._transportPeer))
874
 
        self.failAllOutgoing(reason)
875
 
        if self.innerProtocol is not None:
876
 
            self.innerProtocol.connectionLost(reason)
877
 
            if self.innerProtocolClientFactory is not None:
878
 
                self.innerProtocolClientFactory.clientConnectionLost(None, reason)
879
 
 
880
 
    def lineReceived(self, line):
881
 
        if line:
882
 
            self._requestBuffer.append(line)
883
 
        else:
884
 
            buf = self._requestBuffer
885
 
            self._requestBuffer = []
886
 
            bodylen, b = parseJuiceHeaders(buf)
887
 
            if bodylen:
888
 
                self._bodyRemaining = bodylen
889
 
                self._bodyBuffer = []
890
 
                self._pendingBox = b
891
 
                self.setRawMode()
892
 
            else:
893
 
                self.juiceBoxReceived(b)
894
 
 
895
 
    def rawDataReceived(self, data):
896
 
        if self.innerProtocol is not None:
897
 
            self.innerProtocol.dataReceived(data)
898
 
            return
899
 
        self._bodyRemaining -= len(data)
900
 
        if self._bodyRemaining <= 0:
901
 
            if self._bodyRemaining < 0:
902
 
                self._bodyBuffer.append(data[:self._bodyRemaining])
903
 
                extraData = data[self._bodyRemaining:]
904
 
            else:
905
 
                self._bodyBuffer.append(data)
906
 
                extraData = ''
907
 
            self._pendingBox['body'] = ''.join(self._bodyBuffer)
908
 
            self._bodyBuffer = None
909
 
            b, self._pendingBox = self._pendingBox, None
910
 
            self.juiceBoxReceived(b)
911
 
            if self.innerProtocol is not None:
912
 
                self.innerProtocol.makeConnection(self.transport)
913
 
                if extraData:
914
 
                    self.innerProtocol.dataReceived(extraData)
915
 
            else:
916
 
                self.setLineMode(extraData)
917
 
        else:
918
 
            self._bodyBuffer.append(data)
919
 
 
920
 
    protocolVersion = 0
921
 
 
922
 
    def _setProtocolVersion(self, version):
923
 
        # if we ever want to actually mangle encodings, this is the place to do
924
 
        # it!
925
 
        self.protocolVersion = version
926
 
        return version
927
 
 
928
 
    def renegotiateVersion(self, newVersion):
929
 
        assert newVersion in VERSIONS, (
930
 
            "This side of the connection doesn't support version %r"
931
 
            % (newVersion,))
932
 
        v = VERSIONS[:]
933
 
        v.remove(newVersion)
934
 
        return Negotiate(versions=[newVersion]).do(self).addCallback(
935
 
            lambda ver: self._setProtocolVersion(ver['version']))
936
 
 
937
 
    def command_NEGOTIATE(self, versions):
938
 
        for version in versions:
939
 
            if version in VERSIONS:
940
 
                return dict(version=version)
941
 
        raise IncompatibleVersions()
942
 
    command_NEGOTIATE.command = Negotiate
943
 
 
944
 
 
945
 
VERSIONS = [1]
946
 
 
947
 
from cStringIO import StringIO
948
 
class _ParserHelper(Juice):
949
 
    def __init__(self):
950
 
        Juice.__init__(self, False)
951
 
        self.boxes = []
952
 
        self.results = Deferred()
953
 
 
954
 
    def getPeer(self):
955
 
        return 'string'
956
 
 
957
 
    def getHost(self):
958
 
        return 'string'
959
 
 
960
 
    disconnecting = False
961
 
 
962
 
    def juiceBoxReceived(self, box):
963
 
        self.boxes.append(box)
964
 
 
965
 
    # Synchronous helpers
966
 
    def parse(cls, fileObj):
967
 
        p = cls()
968
 
        p.makeConnection(p)
969
 
        p.dataReceived(fileObj.read())
970
 
        return p.boxes
971
 
    parse = classmethod(parse)
972
 
 
973
 
    def parseString(cls, data):
974
 
        return cls.parse(StringIO(data))
975
 
    parseString = classmethod(parseString)
976
 
 
977
 
parse = _ParserHelper.parse
978
 
parseString = _ParserHelper.parseString
979
 
 
980
 
def stringsToObjects(strings, arglist, proto):
981
 
    objects = {}
982
 
    myStrings = strings.copy()
983
 
    for argname, argparser in arglist:
984
 
        argparser.fromBox(argname, myStrings, objects, proto)
985
 
    return objects
986
 
 
987
 
def objectsToStrings(objects, arglist, strings, proto):
988
 
    myObjects = {}
989
 
    for (k, v) in objects.items():
990
 
        myObjects[normalizeKey(k)] = v
991
 
 
992
 
    for argname, argparser in arglist:
993
 
        argparser.toBox(argname, strings, myObjects, proto)
994
 
    return strings
995
 
 
996
 
class JuiceServerFactory(ServerFactory):
997
 
    protocol = Juice
998
 
    def buildProtocol(self, addr):
999
 
        prot = self.protocol(True)
1000
 
        prot.factory = self
1001
 
        return prot
1002
 
 
1003
 
class JuiceClientFactory(ClientFactory):
1004
 
    protocol = Juice
1005
 
    def buildProtocol(self, addr):
1006
 
        prot = self.protocol(False)
1007
 
        prot.factory = self
1008
 
        return prot
1009