~unity-api-team/unity-scopes-api/child-scopes-option

« back to all changes in this revision

Viewing changes to src/internal/zmq_middleware/ZmqMiddleware.cpp

  • Committer: Michi Henning
  • Date: 2013-11-19 02:51:46 UTC
  • mto: This revision was merged to the branch mainline in revision 62.
  • Revision ID: michi.henning@canonical.com-20131119025146-kglcalpphl4ozzk7
Fixed scoperegistry to use new scoperunner and to figure out which scopes to run from config files.
Still to do:
- deal with overrides and OEM scopes, particularly the grouping aspect.
- SignalThread needs to invoke a callback for clean shut-down on receipt of SIGINT.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
#include <scopes/internal/zmq_middleware/ZmqMiddleware.h>
20
20
 
 
21
#include <scopes/internal/RuntimeImpl.h>
21
22
#include <scopes/internal/zmq_middleware/ConnectionPool.h>
22
23
#include <scopes/internal/zmq_middleware/ObjectAdapter.h>
23
24
#include <scopes/internal/zmq_middleware/QueryI.h>
64
65
try :
65
66
    MiddlewareBase(runtime),
66
67
    server_name_(server_name),
67
 
    state_(Stopped)
 
68
    state_(Stopped),
 
69
    config_(configfile)
68
70
{
69
71
    assert(!server_name.empty());
70
 
 
71
 
    // TODO: read config from file (thread pool size, invocation timeout, etc.
72
72
}
73
73
catch (zmqpp::exception const& e)
74
74
{
178
178
    return proxy;
179
179
}
180
180
 
 
181
MWScopeProxy ZmqMiddleware::create_scope_proxy(string const& identity)
 
182
{
 
183
    MWScopeProxy proxy;
 
184
    try
 
185
    {
 
186
        string endpoint = "ipc://" + config_.private_dir() + "/" + identity;
 
187
        proxy.reset(new ZmqScope(this, endpoint, identity));
 
188
    }
 
189
    catch (zmqpp::exception const& e)
 
190
    {
 
191
        rethrow_zmq_ex(e);
 
192
    }
 
193
    return proxy;
 
194
}
 
195
 
181
196
MWScopeProxy ZmqMiddleware::create_scope_proxy(string const& identity, string const& endpoint)
182
197
{
183
198
    MWScopeProxy proxy;
200
215
    try
201
216
    {
202
217
        shared_ptr<QueryCtrlI> qci(make_shared<QueryCtrlI>(ctrl));
203
 
        auto adapter = find_adapter(server_name_ + ctrl_suffix);
 
218
        auto adapter = find_adapter(server_name_ + ctrl_suffix, config_.private_dir());
204
219
        function<void()> df;
205
220
        auto proxy = safe_add(df, adapter, "", qci);
206
221
        ctrl->set_disconnect_function(df);
221
236
    try
222
237
    {
223
238
        shared_ptr<QueryI> qi(make_shared<QueryI>(query));
224
 
        auto adapter = find_adapter(server_name_ + query_suffix);
 
239
        auto adapter = find_adapter(server_name_ + query_suffix, config_.private_dir());
225
240
        function<void()> df;
226
241
        auto proxy = safe_add(df, adapter, "", qi);
227
242
        query->set_disconnect_function(df);
244
259
    try
245
260
    {
246
261
        shared_ptr<RegistryI> ri(make_shared<RegistryI>(registry));
247
 
        auto adapter = find_adapter(server_name_);
 
262
        auto adapter = find_adapter(server_name_, runtime()->registry_endpointdir());
248
263
        function<void()> df;
249
264
        auto proxy = safe_add(df, adapter, identity, ri);
250
265
        registry->set_disconnect_function(df);
265
280
    try
266
281
    {
267
282
        shared_ptr<ReplyI> ri(make_shared<ReplyI>(reply));
268
 
        auto adapter = find_adapter(server_name_ + reply_suffix);
 
283
        auto adapter = find_adapter(server_name_ + reply_suffix, config_.public_dir());
269
284
        function<void()> df;
270
285
        auto proxy = safe_add(df, adapter, "", ri);
271
286
        reply->set_disconnect_function(df);
287
302
    try
288
303
    {
289
304
        shared_ptr<ScopeI> si(make_shared<ScopeI>(scope));
290
 
        auto adapter = find_adapter(server_name_);
 
305
        auto adapter = find_adapter(server_name_, config_.private_dir());
291
306
        function<void()> df;
292
307
        auto proxy = safe_add(df, adapter, identity, si);
293
308
        scope->set_disconnect_function(df);
326
341
 
327
342
} // namespace
328
343
 
329
 
shared_ptr<ObjectAdapter> ZmqMiddleware::find_adapter(string const& name)
 
344
shared_ptr<ObjectAdapter> ZmqMiddleware::find_adapter(string const& name, string const& endpoint_dir)
330
345
{
331
346
    lock_guard<mutex> lock(mutex_);
332
347
 
341
356
    RequestType type;
342
357
    if (has_suffix(name, query_suffix))
343
358
    {
344
 
        // The query adapter is single or multi-thread and supports oneway operations only.
 
359
        // The query adapter is single or multi-threaded and supports oneway operations only.
345
360
        // TODO: get pool size from config
346
361
        pool_size = 1;
347
362
        type = RequestType::Oneway;
366
381
        pool_size = 1;
367
382
        type = RequestType::Twoway;
368
383
    }
369
 
    // TODO: get directory of adapter from config
 
384
 
370
385
    // The query adapter is always inproc.
371
 
    string endpoint = (has_suffix(name, query_suffix) ? "inproc://" : "ipc://") + name;
 
386
    string endpoint;
 
387
    if (has_suffix(name, query_suffix))
 
388
    {
 
389
        endpoint = "inproc://" + name;
 
390
    }
 
391
    else
 
392
    {
 
393
        endpoint = "ipc://" + endpoint_dir + "/" + name;
 
394
    }
372
395
 
373
396
    shared_ptr<ObjectAdapter> a(new ObjectAdapter(*this, name, endpoint, type, pool_size));
374
397
    a->activate();