~axwalk/pushy/0.2

« back to all changes in this revision

Viewing changes to pushy/protocol/connection.py

  • Committer: Andrew Wilkins
  • Date: 2009-04-02 13:20:50 UTC
  • Revision ID: axwalk@gmail.com-20090402132050-gcayoq3mhgapuklo
LP353622: Making connections thread-safe

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2008 Andrew Wilkins <axwalk@gmail.com>
 
1
# Copyright (c) 2008, 2009 Andrew Wilkins <axwalk@gmail.com>
2
2
3
3
# Permission is hereby granted, free of charge, to any person
4
4
# obtaining a copy of this software and associated documentation
21
21
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
22
22
# OTHER DEALINGS IN THE SOFTWARE.
23
23
 
24
 
import marshal, os, struct
 
24
import marshal, os, struct, threading
25
25
from pushy.protocol.message import Message, MessageType
26
26
from pushy.protocol.proxy import Proxy, ProxyType, get_opmask
27
27
 
49
49
        self.log.flush()
50
50
        return data
51
51
 
 
52
 
52
53
class Connection:
53
54
    def __init__(self, istream, ostream, initiator=True):
54
55
        self.__logfile   = None
55
56
        self.__istream   = istream
56
57
        self.__ostream   = ostream
57
58
        self.__initiator = initiator
 
59
        self.__lock      = threading.RLock()
58
60
 
59
61
        # Uncomment the following for debugging.
60
62
        #self.__logfile = open("pushy.%d.log" % os.getpid(), "w")
61
 
        #self.__istream = LoggingFile(istream, open("%d.in" % os.getpid(), "wb"))
62
 
        #self.__ostream = LoggingFile(ostream, open("%d.out" % os.getpid(), "wb"))
 
63
        #self.__istream = LoggingFile(istream, open("%d.in"%os.getpid(),"wb"))
 
64
        #self.__ostream = LoggingFile(ostream, open("%d.out"%os.getpid(),"wb"))
63
65
 
64
66
        self.__outstandingRequests = 0
65
67
        # (Client) Contains mapping of id(obj) -> proxy
77
79
        self.__handle(self.__recv())
78
80
 
79
81
    def eval(self, expression):
80
 
        expression = self.__marshal(expression)
81
 
        self.__outstandingRequests += 1
82
 
        self.__send(Message(MessageType.evaluate, expression))
83
 
        return self.__waitForResponse()
 
82
        self.__lock.acquire()
 
83
        try:
 
84
            expression = self.__marshal(expression)
 
85
            self.__outstandingRequests += 1
 
86
            self.__send(Message(MessageType.evaluate, expression))
 
87
            return self.__waitForResponse()
 
88
        finally:
 
89
            self.__lock.release()
84
90
 
85
91
    def operator(self, type_, id_, args, kwargs):
86
 
        parameters = self.__marshal((id_, args, kwargs))
87
 
        self.__outstandingRequests += 1
88
 
        self.__send(Message(type_, parameters))
89
 
        return self.__waitForResponse()
 
92
        self.__lock.acquire()
 
93
        try:
 
94
            parameters = self.__marshal((id_, args, kwargs))
 
95
            self.__outstandingRequests += 1
 
96
            self.__send(Message(type_, parameters))
 
97
            return self.__waitForResponse()
 
98
        finally:
 
99
            self.__lock.release()
90
100
 
91
101
    def getattr(self, id_, name):
92
 
        parameters = self.__marshal((id_, name))
93
 
        self.__outstandingRequests += 1
94
 
        self.__send(Message(MessageType.getattr, parameters))
95
 
        return self.__waitForResponse()
 
102
        self.__lock.acquire()
 
103
        try:
 
104
            parameters = self.__marshal((id_, name))
 
105
            self.__outstandingRequests += 1
 
106
            self.__send(Message(MessageType.getattr, parameters))
 
107
            return self.__waitForResponse()
 
108
        finally:
 
109
            self.__lock.release()
96
110
 
97
111
    def setattr(self, id_, name, value):
98
 
        parameters = self.__marshal((id_, name, value))
99
 
        self.__outstandingRequests += 1
100
 
        self.__send(Message(MessageType.setattr, parameters))
101
 
        return self.__waitForResponse()
 
112
        self.__lock.acquire()
 
113
        try:
 
114
            parameters = self.__marshal((id_, name, value))
 
115
            self.__outstandingRequests += 1
 
116
            self.__send(Message(MessageType.setattr, parameters))
 
117
            return self.__waitForResponse()
 
118
        finally:
 
119
            self.__lock.release()
102
120
 
103
121
    def getstr(self, id_):
104
 
        self.__outstandingRequests += 1
105
 
        self.__send(Message(MessageType.getstr, self.__marshal(id_)))
106
 
        return self.__waitForResponse()
 
122
        self.__lock.acquire()
 
123
        try:
 
124
            self.__outstandingRequests += 1
 
125
            self.__send(Message(MessageType.getstr, self.__marshal(id_)))
 
126
            return self.__waitForResponse()
 
127
        finally:
 
128
            self.__lock.release()
107
129
 
108
130
    def getrepr(self, id_):
109
 
        self.__outstandingRequests += 1
110
 
        self.__send(Message(MessageType.getrepr, self.__marshal(id_)))
111
 
        return self.__waitForResponse()
 
131
        self.__lock.acquire()
 
132
        try:
 
133
            self.__outstandingRequests += 1
 
134
            self.__send(Message(MessageType.getrepr, self.__marshal(id_)))
 
135
            return self.__waitForResponse()
 
136
        finally:
 
137
            self.__lock.release()
112
138
 
113
139
    def __waitForResponse(self):
114
140
        res = None