~stolowski/+junk/symbols-test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
/*
 * Copyright (C) 2013 Canonical Ltd
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License version 3 as
 * published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * Authored by: Michi Henning <michi.henning@canonical.com>
 */

#include <unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h>

#include <unity/scopes/internal/RuntimeImpl.h>
#include <unity/scopes/internal/zmq_middleware/Util.h>
#include <unity/scopes/internal/zmq_middleware/ZmqException.h>
#include <unity/scopes/internal/zmq_middleware/ZmqReceiver.h>
#include <unity/scopes/internal/zmq_middleware/ZmqRegistry.h>
#include <unity/scopes/internal/zmq_middleware/ZmqSender.h>
#include <unity/scopes/ScopeExceptions.h>

#include <capnp/serialize.h>
#include <zmqpp/poller.hpp>
#include <zmqpp/socket.hpp>

using namespace std;

namespace unity
{

namespace scopes
{

namespace internal
{

namespace zmq_middleware
{

namespace
{
    // this mutex protects all members of all ZmqObjectProxies
    std::mutex shared_mutex;
}

#define MONITOR_ENDPOINT "ipc:///tmp/scopes-monitor"

ZmqObjectProxy::ZmqObjectProxy(ZmqMiddleware* mw_base,
                               string const& endpoint,
                               string const& identity,
                               string const& category,
                               RequestMode m,
                               int64_t timeout) :
    MWObjectProxy(mw_base),
    endpoint_(endpoint),
    identity_(identity),
    category_(category),
    mode_(m),
    timeout_(timeout)
{
    assert(m != Unknown);
    assert(timeout >= -1);
    throw_if_bad_endpoint(endpoint);

    // Make sure that fields have consistent settings for null proxies.
    if (endpoint.empty() || identity.empty())
    {
        endpoint_ = "";
        identity_ = "";
        category_ = "";
    }
}

ZmqObjectProxy::~ZmqObjectProxy()
{
}

ZmqMiddleware* ZmqObjectProxy::mw_base() const noexcept
{
    return dynamic_cast<ZmqMiddleware*>(MWObjectProxy::mw_base());
}

string ZmqObjectProxy::endpoint() const
{
    lock_guard<mutex> lock(shared_mutex);
    return endpoint_;
}

string ZmqObjectProxy::identity() const
{
    lock_guard<mutex> lock(shared_mutex);
    return identity_;
}

string ZmqObjectProxy::target_category() const
{
    lock_guard<mutex> lock(shared_mutex);
    return category_;
}

int64_t ZmqObjectProxy::timeout() const noexcept
{
    lock_guard<mutex> lock(shared_mutex);
    return timeout_;
}

string ZmqObjectProxy::to_string() const
{
    lock_guard<mutex> lock(shared_mutex);
    if (endpoint_.empty() || identity_.empty())
    {
        return "nullproxy:";
    }
    assert(!endpoint_.empty() && !identity_.empty());
    string s = endpoint_ + "#" + identity_;
    if (!category_.empty())
    {
        s += "!c=" + category_;
    }
    if (mode_ == RequestMode::Oneway)
    {
        s += "!m=o";
    }
    if (timeout_ != -1)
    {
        s += "!t=" + std::to_string(timeout_);
    }
    return s;
}

void ZmqObjectProxy::ping()
{
    capnp::MallocMessageBuilder request_builder;
    make_request_(request_builder, "ping");

    auto future = mw_base()->twoway_pool()->submit([&] { return this->invoke_twoway_(request_builder); });

    auto out_params = future.get();
    auto response = out_params.reader->getRoot<capnproto::Response>();
    throw_if_runtime_exception(response);
}

RequestMode ZmqObjectProxy::mode() const
{
    lock_guard<mutex> lock(shared_mutex);
    return mode_;
}

// Returns a request message with the mode, operation name, endpoint, and identity set for this proxy.

capnproto::Request::Builder ZmqObjectProxy::make_request_(capnp::MessageBuilder& b, std::string const& operation_name) const
{
    lock_guard<mutex> lock(shared_mutex);
    auto request = b.initRoot<capnproto::Request>();
    request.setMode(mode_ == RequestMode::Oneway ? capnproto::RequestMode::ONEWAY : capnproto::RequestMode::TWOWAY);
    request.setOpName(operation_name.c_str());
    request.setId(identity_.c_str());
    request.setCat(category_.c_str());
    return request;
}

#ifdef ENABLE_IPC_MONITOR
void register_monitor_socket(ConnectionPool& pool, zmqpp::context_t const& context)
{
    thread_local static bool monitor_initialized = false;
    if (!monitor_initialized) {
        monitor_initialized = true;
        zmqpp::socket monitor_socket(context, zmqpp::socket_type::publish);
        monitor_socket.set(zmqpp::socket_option::linger, 0);
        monitor_socket.connect(MONITOR_ENDPOINT);
        pool.register_socket(MONITOR_ENDPOINT, move(monitor_socket));
    }
}
#endif

// Get a socket to the endpoint for this proxy and write the request on the wire.

void ZmqObjectProxy::invoke_oneway_(capnp::MessageBuilder& in_params)
{
    // Each calling thread gets its own pool because zmq sockets are not thread-safe.
    thread_local static ConnectionPool pool(*mw_base()->context());

    lock_guard<mutex> lock(shared_mutex);

    assert(mode_ == RequestMode::Oneway);
    zmqpp::socket& s = pool.find(endpoint_);
    ZmqSender sender(s);
    auto segments = in_params.getSegmentsForOutput();
    if (!sender.send(segments, ZmqSender::DontWait))
    {
        // If there is nothing at the other end, discard the message and trash the socket.
        pool.remove(endpoint_);
        return;
    }

#ifdef ENABLE_IPC_MONITOR
    if (true) {
        register_monitor_socket(pool, *mw_base()->context());
        zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT);
        auto word_arr = capnp::messageToFlatArray(segments);
        monitor.send_raw(reinterpret_cast<char*>(&word_arr[0]), word_arr.size() * sizeof(capnp::word));
    }
#endif
}

ZmqObjectProxy::TwowayOutParams ZmqObjectProxy::invoke_twoway_(capnp::MessageBuilder& in_params)
{
    return invoke_twoway_(in_params, timeout_);
}

ZmqObjectProxy::TwowayOutParams ZmqObjectProxy::invoke_twoway_(capnp::MessageBuilder& in_params,
                                                               int64_t twoway_timeout,
                                                               int64_t locate_timeout)
{
    auto registry_proxy = mw_base()->registry_proxy();
    auto ss_registry_proxy = mw_base()->ss_registry_proxy();

    // TODO: HACK: this builds knowledge about the smartscopes proxy running permanently into the run time.
    bool this_is_registry = registry_proxy && identity() == registry_proxy->identity();
    bool this_is_ss_registry = ss_registry_proxy && identity() == ss_registry_proxy->identity();

    // If a registry is configured and this object is not a registry itself,
    // attempt to locate the scope before invoking it.
    if (!this_is_registry && !this_is_ss_registry)
    {
        try
        {
            ObjectProxy new_proxy;
            if (locate_timeout != -1)
            {
                new_proxy = registry_proxy->locate(identity(), locate_timeout);
            }
            else
            {
                new_proxy = registry_proxy->locate(identity());
            }
            // update our proxy with the newly received data
            // (we need to first store values in local variables outside of the mutex,
            // otherwise we will deadlock on the following ZmqObjectProxy methods)
            std::string endpoint = new_proxy->endpoint();
            std::string identity = new_proxy->identity();
            std::string category = new_proxy->target_category();
            int64_t timeout = new_proxy->timeout();
            {
                lock_guard<mutex> lock(shared_mutex);
                endpoint_ = endpoint;
                identity_ = identity;
                category_ = category;
                timeout_ = timeout;
            }
        }
        catch (NotFoundException const&)
        {
            // Ignore a failed locate() for scopes unknown to the registry
        }
    }

    // Try the invocation
    return invoke_twoway__(in_params, twoway_timeout);
}

// Get a socket to the endpoint for this proxy and write the request on the wire.
// Poll for the reply with the given timeout.
// Return a reader for the response or throw if the timeout expires.

ZmqObjectProxy::TwowayOutParams ZmqObjectProxy::invoke_twoway__(capnp::MessageBuilder& in_params, int64_t timeout)
{
    RequestMode mode;
    std::string endpoint;
    {
        lock_guard<mutex> lock(shared_mutex);
        mode = mode_;
        endpoint = endpoint_;
    }

    assert(mode == RequestMode::Twoway);

    zmqpp::socket s(*mw_base()->context(), zmqpp::socket_type::request);
    // Allow short linger time so we don't hang indefinitely if the other end disappears.
    s.set(zmqpp::socket_option::linger, 50);
    // We set a reconnect interval of 20 ms, so we get to the peer quickly, in case
    // the peer hasn't finished binding to its endpoint yet after being exec'd.
    // We back off exponentially to half the call timeout. If we haven't connected
    // by then, the poll below will time out anyway. For inifinite timeout, we try
    // a second.
    int reconnect_max = timeout == -1 ? 1000 : timeout / 2;
    s.set(zmqpp::socket_option::reconnect_interval, 20);
    s.set(zmqpp::socket_option::reconnect_interval_max, reconnect_max);
    s.connect(endpoint);
    ZmqSender sender(s);
    auto segments = in_params.getSegmentsForOutput();
    sender.send(segments);

#ifdef ENABLE_IPC_MONITOR
    // Each calling thread gets its own pool because zmq sockets are not thread-safe.
    thread_local static ConnectionPool pool(*mw_base()->context());

    if (true) {
        register_monitor_socket(pool, *mw_base()->context());
        zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT);
        auto word_arr = capnp::messageToFlatArray(segments);
        monitor.send_raw(reinterpret_cast<char*>(&word_arr[0]), word_arr.size() * sizeof(capnp::word));
    }
#endif

    zmqpp::poller p;
    p.add(s);

    if (timeout == -1)
    {
        p.poll();
    }
    else
    {
        p.poll(timeout);
    }

    if (!p.has_input(s))
    {
        throw TimeoutException("Request timed out after " + std::to_string(timeout) + " milliseconds");
    }

    // Because the ZmqReceiver holds the memory for the unmarshaling buffer, we pass both the receiver
    // and the capnp reader in a struct.
    ZmqObjectProxy::TwowayOutParams out_params;
    out_params.receiver.reset(new ZmqReceiver(s));
    auto params = out_params.receiver->receive();
    out_params.reader.reset(new capnp::SegmentArrayMessageReader(params));
    return out_params;
    // Outgoing twoway socket closed here.
}

} // namespace zmq_middleware

} // namespace internal

} // namespace scopes

} // namespace unity