~ubuntu-cloud-archive/ubuntu/precise/nova/trunk

« back to all changes in this revision

Viewing changes to nova/openstack/common/rpc/impl_qpid.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandelman
  • Date: 2012-06-22 12:39:57 UTC
  • mfrom: (1.1.57)
  • Revision ID: package-import@ubuntu.com-20120622123957-hbzwg84nt9rqwg8r
Tags: 2012.2~f2~20120621.14517-0ubuntu1
[ Chuck Short ]
* New upstream version.

[ Adam Gandelman ]
* debian/rules: Temporarily disable test suite while blocking
  tests are investigated. 
* debian/patches/kombu_tests_timeout.patch: Dropped.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
#    Copyright 2011 OpenStack LLC
 
4
#    Copyright 2011 - 2012, Red Hat, Inc.
 
5
#
 
6
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
7
#    not use this file except in compliance with the License. You may obtain
 
8
#    a copy of the License at
 
9
#
 
10
#         http://www.apache.org/licenses/LICENSE-2.0
 
11
#
 
12
#    Unless required by applicable law or agreed to in writing, software
 
13
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
14
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
15
#    License for the specific language governing permissions and limitations
 
16
#    under the License.
 
17
 
 
18
import functools
 
19
import itertools
 
20
import json
 
21
import logging
 
22
import time
 
23
import uuid
 
24
 
 
25
import eventlet
 
26
import greenlet
 
27
import qpid.messaging
 
28
import qpid.messaging.exceptions
 
29
 
 
30
from nova.openstack.common import cfg
 
31
from nova.openstack.common.gettextutils import _
 
32
from nova.openstack.common.rpc import amqp as rpc_amqp
 
33
from nova.openstack.common.rpc import common as rpc_common
 
34
 
 
35
LOG = logging.getLogger(__name__)
 
36
 
 
37
qpid_opts = [
 
38
    cfg.StrOpt('qpid_hostname',
 
39
               default='localhost',
 
40
               help='Qpid broker hostname'),
 
41
    cfg.StrOpt('qpid_port',
 
42
               default='5672',
 
43
               help='Qpid broker port'),
 
44
    cfg.StrOpt('qpid_username',
 
45
               default='',
 
46
               help='Username for qpid connection'),
 
47
    cfg.StrOpt('qpid_password',
 
48
               default='',
 
49
               help='Password for qpid connection'),
 
50
    cfg.StrOpt('qpid_sasl_mechanisms',
 
51
               default='',
 
52
               help='Space separated list of SASL mechanisms to use for auth'),
 
53
    cfg.BoolOpt('qpid_reconnect',
 
54
                default=True,
 
55
                help='Automatically reconnect'),
 
56
    cfg.IntOpt('qpid_reconnect_timeout',
 
57
               default=0,
 
58
               help='Reconnection timeout in seconds'),
 
59
    cfg.IntOpt('qpid_reconnect_limit',
 
60
               default=0,
 
61
               help='Max reconnections before giving up'),
 
62
    cfg.IntOpt('qpid_reconnect_interval_min',
 
63
               default=0,
 
64
               help='Minimum seconds between reconnection attempts'),
 
65
    cfg.IntOpt('qpid_reconnect_interval_max',
 
66
               default=0,
 
67
               help='Maximum seconds between reconnection attempts'),
 
68
    cfg.IntOpt('qpid_reconnect_interval',
 
69
               default=0,
 
70
               help='Equivalent to setting max and min to the same value'),
 
71
    cfg.IntOpt('qpid_heartbeat',
 
72
               default=5,
 
73
               help='Seconds between connection keepalive heartbeats'),
 
74
    cfg.StrOpt('qpid_protocol',
 
75
               default='tcp',
 
76
               help="Transport to use, either 'tcp' or 'ssl'"),
 
77
    cfg.BoolOpt('qpid_tcp_nodelay',
 
78
                default=True,
 
79
                help='Disable Nagle algorithm'),
 
80
    ]
 
81
 
 
82
cfg.CONF.register_opts(qpid_opts)
 
83
 
 
84
 
 
85
class ConsumerBase(object):
 
86
    """Consumer base class."""
 
87
 
 
88
    def __init__(self, session, callback, node_name, node_opts,
 
89
                 link_name, link_opts):
 
90
        """Declare a queue on an amqp session.
 
91
 
 
92
        'session' is the amqp session to use
 
93
        'callback' is the callback to call when messages are received
 
94
        'node_name' is the first part of the Qpid address string, before ';'
 
95
        'node_opts' will be applied to the "x-declare" section of "node"
 
96
                    in the address string.
 
97
        'link_name' goes into the "name" field of the "link" in the address
 
98
                    string
 
99
        'link_opts' will be applied to the "x-declare" section of "link"
 
100
                    in the address string.
 
101
        """
 
102
        self.callback = callback
 
103
        self.receiver = None
 
104
        self.session = None
 
105
 
 
106
        addr_opts = {
 
107
            "create": "always",
 
108
            "node": {
 
109
                "type": "topic",
 
110
                "x-declare": {
 
111
                    "durable": True,
 
112
                    "auto-delete": True,
 
113
                },
 
114
            },
 
115
            "link": {
 
116
                "name": link_name,
 
117
                "durable": True,
 
118
                "x-declare": {
 
119
                    "durable": False,
 
120
                    "auto-delete": True,
 
121
                    "exclusive": False,
 
122
                },
 
123
            },
 
124
        }
 
125
        addr_opts["node"]["x-declare"].update(node_opts)
 
126
        addr_opts["link"]["x-declare"].update(link_opts)
 
127
 
 
128
        self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
 
129
 
 
130
        self.reconnect(session)
 
131
 
 
132
    def reconnect(self, session):
 
133
        """Re-declare the receiver after a qpid reconnect"""
 
134
        self.session = session
 
135
        self.receiver = session.receiver(self.address)
 
136
        self.receiver.capacity = 1
 
137
 
 
138
    def consume(self):
 
139
        """Fetch the message and pass it to the callback object"""
 
140
        message = self.receiver.fetch()
 
141
        try:
 
142
            self.callback(message.content)
 
143
        except Exception:
 
144
            LOG.exception(_("Failed to process message... skipping it."))
 
145
        finally:
 
146
            self.session.acknowledge(message)
 
147
 
 
148
    def get_receiver(self):
 
149
        return self.receiver
 
150
 
 
151
 
 
152
class DirectConsumer(ConsumerBase):
 
153
    """Queue/consumer class for 'direct'"""
 
154
 
 
155
    def __init__(self, conf, session, msg_id, callback):
 
156
        """Init a 'direct' queue.
 
157
 
 
158
        'session' is the amqp session to use
 
159
        'msg_id' is the msg_id to listen on
 
160
        'callback' is the callback to call when messages are received
 
161
        """
 
162
 
 
163
        super(DirectConsumer, self).__init__(session, callback,
 
164
                        "%s/%s" % (msg_id, msg_id),
 
165
                        {"type": "direct"},
 
166
                        msg_id,
 
167
                        {"exclusive": True})
 
168
 
 
169
 
 
170
class TopicConsumer(ConsumerBase):
 
171
    """Consumer class for 'topic'"""
 
172
 
 
173
    def __init__(self, conf, session, topic, callback, name=None):
 
174
        """Init a 'topic' queue.
 
175
 
 
176
        :param session: the amqp session to use
 
177
        :param topic: is the topic to listen on
 
178
        :paramtype topic: str
 
179
        :param callback: the callback to call when messages are received
 
180
        :param name: optional queue name, defaults to topic
 
181
        """
 
182
 
 
183
        super(TopicConsumer, self).__init__(session, callback,
 
184
                        "%s/%s" % (conf.control_exchange, topic), {},
 
185
                        name or topic, {})
 
186
 
 
187
 
 
188
class FanoutConsumer(ConsumerBase):
 
189
    """Consumer class for 'fanout'"""
 
190
 
 
191
    def __init__(self, conf, session, topic, callback):
 
192
        """Init a 'fanout' queue.
 
193
 
 
194
        'session' is the amqp session to use
 
195
        'topic' is the topic to listen on
 
196
        'callback' is the callback to call when messages are received
 
197
        """
 
198
 
 
199
        super(FanoutConsumer, self).__init__(session, callback,
 
200
                        "%s_fanout" % topic,
 
201
                        {"durable": False, "type": "fanout"},
 
202
                        "%s_fanout_%s" % (topic, uuid.uuid4().hex),
 
203
                        {"exclusive": True})
 
204
 
 
205
 
 
206
class Publisher(object):
 
207
    """Base Publisher class"""
 
208
 
 
209
    def __init__(self, session, node_name, node_opts=None):
 
210
        """Init the Publisher class with the exchange_name, routing_key,
 
211
        and other options
 
212
        """
 
213
        self.sender = None
 
214
        self.session = session
 
215
 
 
216
        addr_opts = {
 
217
            "create": "always",
 
218
            "node": {
 
219
                "type": "topic",
 
220
                "x-declare": {
 
221
                    "durable": False,
 
222
                    # auto-delete isn't implemented for exchanges in qpid,
 
223
                    # but put in here anyway
 
224
                    "auto-delete": True,
 
225
                },
 
226
            },
 
227
        }
 
228
        if node_opts:
 
229
            addr_opts["node"]["x-declare"].update(node_opts)
 
230
 
 
231
        self.address = "%s ; %s" % (node_name, json.dumps(addr_opts))
 
232
 
 
233
        self.reconnect(session)
 
234
 
 
235
    def reconnect(self, session):
 
236
        """Re-establish the Sender after a reconnection"""
 
237
        self.sender = session.sender(self.address)
 
238
 
 
239
    def send(self, msg):
 
240
        """Send a message"""
 
241
        self.sender.send(msg)
 
242
 
 
243
 
 
244
class DirectPublisher(Publisher):
 
245
    """Publisher class for 'direct'"""
 
246
    def __init__(self, conf, session, msg_id):
 
247
        """Init a 'direct' publisher."""
 
248
        super(DirectPublisher, self).__init__(session, msg_id,
 
249
                                              {"type": "Direct"})
 
250
 
 
251
 
 
252
class TopicPublisher(Publisher):
 
253
    """Publisher class for 'topic'"""
 
254
    def __init__(self, conf, session, topic):
 
255
        """init a 'topic' publisher.
 
256
        """
 
257
        super(TopicPublisher, self).__init__(session,
 
258
                                "%s/%s" % (conf.control_exchange, topic))
 
259
 
 
260
 
 
261
class FanoutPublisher(Publisher):
 
262
    """Publisher class for 'fanout'"""
 
263
    def __init__(self, conf, session, topic):
 
264
        """init a 'fanout' publisher.
 
265
        """
 
266
        super(FanoutPublisher, self).__init__(session,
 
267
                                "%s_fanout" % topic, {"type": "fanout"})
 
268
 
 
269
 
 
270
class NotifyPublisher(Publisher):
 
271
    """Publisher class for notifications"""
 
272
    def __init__(self, conf, session, topic):
 
273
        """init a 'topic' publisher.
 
274
        """
 
275
        super(NotifyPublisher, self).__init__(session,
 
276
                                "%s/%s" % (conf.control_exchange, topic),
 
277
                                {"durable": True})
 
278
 
 
279
 
 
280
class Connection(object):
 
281
    """Connection object."""
 
282
 
 
283
    pool = None
 
284
 
 
285
    def __init__(self, conf, server_params=None):
 
286
        self.session = None
 
287
        self.consumers = {}
 
288
        self.consumer_thread = None
 
289
        self.conf = conf
 
290
 
 
291
        if server_params is None:
 
292
            server_params = {}
 
293
 
 
294
        default_params = dict(hostname=self.conf.qpid_hostname,
 
295
                port=self.conf.qpid_port,
 
296
                username=self.conf.qpid_username,
 
297
                password=self.conf.qpid_password)
 
298
 
 
299
        params = server_params
 
300
        for key in default_params.keys():
 
301
            params.setdefault(key, default_params[key])
 
302
 
 
303
        self.broker = params['hostname'] + ":" + str(params['port'])
 
304
        # Create the connection - this does not open the connection
 
305
        self.connection = qpid.messaging.Connection(self.broker)
 
306
 
 
307
        # Check if flags are set and if so set them for the connection
 
308
        # before we call open
 
309
        self.connection.username = params['username']
 
310
        self.connection.password = params['password']
 
311
        self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
 
312
        self.connection.reconnect = self.conf.qpid_reconnect
 
313
        if self.conf.qpid_reconnect_timeout:
 
314
            self.connection.reconnect_timeout = (
 
315
                    self.conf.qpid_reconnect_timeout)
 
316
        if self.conf.qpid_reconnect_limit:
 
317
            self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
 
318
        if self.conf.qpid_reconnect_interval_max:
 
319
            self.connection.reconnect_interval_max = (
 
320
                    self.conf.qpid_reconnect_interval_max)
 
321
        if self.conf.qpid_reconnect_interval_min:
 
322
            self.connection.reconnect_interval_min = (
 
323
                    self.conf.qpid_reconnect_interval_min)
 
324
        if self.conf.qpid_reconnect_interval:
 
325
            self.connection.reconnect_interval = (
 
326
                    self.conf.qpid_reconnect_interval)
 
327
        self.connection.hearbeat = self.conf.qpid_heartbeat
 
328
        self.connection.protocol = self.conf.qpid_protocol
 
329
        self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
 
330
 
 
331
        # Open is part of reconnect -
 
332
        # NOTE(WGH) not sure we need this with the reconnect flags
 
333
        self.reconnect()
 
334
 
 
335
    def _register_consumer(self, consumer):
 
336
        self.consumers[str(consumer.get_receiver())] = consumer
 
337
 
 
338
    def _lookup_consumer(self, receiver):
 
339
        return self.consumers[str(receiver)]
 
340
 
 
341
    def reconnect(self):
 
342
        """Handles reconnecting and re-establishing sessions and queues"""
 
343
        if self.connection.opened():
 
344
            try:
 
345
                self.connection.close()
 
346
            except qpid.messaging.exceptions.ConnectionError:
 
347
                pass
 
348
 
 
349
        while True:
 
350
            try:
 
351
                self.connection.open()
 
352
            except qpid.messaging.exceptions.ConnectionError, e:
 
353
                LOG.error(_('Unable to connect to AMQP server: %s'), e)
 
354
                time.sleep(self.conf.qpid_reconnect_interval or 1)
 
355
            else:
 
356
                break
 
357
 
 
358
        LOG.info(_('Connected to AMQP server on %s'), self.broker)
 
359
 
 
360
        self.session = self.connection.session()
 
361
 
 
362
        for consumer in self.consumers.itervalues():
 
363
            consumer.reconnect(self.session)
 
364
 
 
365
        if self.consumers:
 
366
            LOG.debug(_("Re-established AMQP queues"))
 
367
 
 
368
    def ensure(self, error_callback, method, *args, **kwargs):
 
369
        while True:
 
370
            try:
 
371
                return method(*args, **kwargs)
 
372
            except (qpid.messaging.exceptions.Empty,
 
373
                    qpid.messaging.exceptions.ConnectionError), e:
 
374
                if error_callback:
 
375
                    error_callback(e)
 
376
                self.reconnect()
 
377
 
 
378
    def close(self):
 
379
        """Close/release this connection"""
 
380
        self.cancel_consumer_thread()
 
381
        self.connection.close()
 
382
        self.connection = None
 
383
 
 
384
    def reset(self):
 
385
        """Reset a connection so it can be used again"""
 
386
        self.cancel_consumer_thread()
 
387
        self.session.close()
 
388
        self.session = self.connection.session()
 
389
        self.consumers = {}
 
390
 
 
391
    def declare_consumer(self, consumer_cls, topic, callback):
 
392
        """Create a Consumer using the class that was passed in and
 
393
        add it to our list of consumers
 
394
        """
 
395
        def _connect_error(exc):
 
396
            log_info = {'topic': topic, 'err_str': str(exc)}
 
397
            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
 
398
                "%(err_str)s") % log_info)
 
399
 
 
400
        def _declare_consumer():
 
401
            consumer = consumer_cls(self.conf, self.session, topic, callback)
 
402
            self._register_consumer(consumer)
 
403
            return consumer
 
404
 
 
405
        return self.ensure(_connect_error, _declare_consumer)
 
406
 
 
407
    def iterconsume(self, limit=None, timeout=None):
 
408
        """Return an iterator that will consume from all queues/consumers"""
 
409
 
 
410
        def _error_callback(exc):
 
411
            if isinstance(exc, qpid.messaging.exceptions.Empty):
 
412
                LOG.exception(_('Timed out waiting for RPC response: %s') %
 
413
                        str(exc))
 
414
                raise rpc_common.Timeout()
 
415
            else:
 
416
                LOG.exception(_('Failed to consume message from queue: %s') %
 
417
                        str(exc))
 
418
 
 
419
        def _consume():
 
420
            nxt_receiver = self.session.next_receiver(timeout=timeout)
 
421
            try:
 
422
                self._lookup_consumer(nxt_receiver).consume()
 
423
            except Exception:
 
424
                LOG.exception(_("Error processing message.  Skipping it."))
 
425
 
 
426
        for iteration in itertools.count(0):
 
427
            if limit and iteration >= limit:
 
428
                raise StopIteration
 
429
            yield self.ensure(_error_callback, _consume)
 
430
 
 
431
    def cancel_consumer_thread(self):
 
432
        """Cancel a consumer thread"""
 
433
        if self.consumer_thread is not None:
 
434
            self.consumer_thread.kill()
 
435
            try:
 
436
                self.consumer_thread.wait()
 
437
            except greenlet.GreenletExit:
 
438
                pass
 
439
            self.consumer_thread = None
 
440
 
 
441
    def publisher_send(self, cls, topic, msg):
 
442
        """Send to a publisher based on the publisher class"""
 
443
 
 
444
        def _connect_error(exc):
 
445
            log_info = {'topic': topic, 'err_str': str(exc)}
 
446
            LOG.exception(_("Failed to publish message to topic "
 
447
                "'%(topic)s': %(err_str)s") % log_info)
 
448
 
 
449
        def _publisher_send():
 
450
            publisher = cls(self.conf, self.session, topic)
 
451
            publisher.send(msg)
 
452
 
 
453
        return self.ensure(_connect_error, _publisher_send)
 
454
 
 
455
    def declare_direct_consumer(self, topic, callback):
 
456
        """Create a 'direct' queue.
 
457
        In nova's use, this is generally a msg_id queue used for
 
458
        responses for call/multicall
 
459
        """
 
460
        self.declare_consumer(DirectConsumer, topic, callback)
 
461
 
 
462
    def declare_topic_consumer(self, topic, callback=None, queue_name=None):
 
463
        """Create a 'topic' consumer."""
 
464
        self.declare_consumer(functools.partial(TopicConsumer,
 
465
                                                name=queue_name,
 
466
                                                ),
 
467
                              topic, callback)
 
468
 
 
469
    def declare_fanout_consumer(self, topic, callback):
 
470
        """Create a 'fanout' consumer"""
 
471
        self.declare_consumer(FanoutConsumer, topic, callback)
 
472
 
 
473
    def direct_send(self, msg_id, msg):
 
474
        """Send a 'direct' message"""
 
475
        self.publisher_send(DirectPublisher, msg_id, msg)
 
476
 
 
477
    def topic_send(self, topic, msg):
 
478
        """Send a 'topic' message"""
 
479
        self.publisher_send(TopicPublisher, topic, msg)
 
480
 
 
481
    def fanout_send(self, topic, msg):
 
482
        """Send a 'fanout' message"""
 
483
        self.publisher_send(FanoutPublisher, topic, msg)
 
484
 
 
485
    def notify_send(self, topic, msg, **kwargs):
 
486
        """Send a notify message on a topic"""
 
487
        self.publisher_send(NotifyPublisher, topic, msg)
 
488
 
 
489
    def consume(self, limit=None):
 
490
        """Consume from all queues/consumers"""
 
491
        it = self.iterconsume(limit=limit)
 
492
        while True:
 
493
            try:
 
494
                it.next()
 
495
            except StopIteration:
 
496
                return
 
497
 
 
498
    def consume_in_thread(self):
 
499
        """Consumer from all queues/consumers in a greenthread"""
 
500
        def _consumer_thread():
 
501
            try:
 
502
                self.consume()
 
503
            except greenlet.GreenletExit:
 
504
                return
 
505
        if self.consumer_thread is None:
 
506
            self.consumer_thread = eventlet.spawn(_consumer_thread)
 
507
        return self.consumer_thread
 
508
 
 
509
    def create_consumer(self, topic, proxy, fanout=False):
 
510
        """Create a consumer that calls a method in a proxy object"""
 
511
        proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
 
512
                rpc_amqp.get_connection_pool(self.conf, Connection))
 
513
 
 
514
        if fanout:
 
515
            consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
 
516
        else:
 
517
            consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
 
518
 
 
519
        self._register_consumer(consumer)
 
520
 
 
521
        return consumer
 
522
 
 
523
    def create_worker(self, topic, proxy, pool_name):
 
524
        """Create a worker that calls a method in a proxy object"""
 
525
        proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
 
526
                rpc_amqp.get_connection_pool(self.conf, Connection))
 
527
 
 
528
        consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
 
529
                                 name=pool_name)
 
530
 
 
531
        self._register_consumer(consumer)
 
532
 
 
533
        return consumer
 
534
 
 
535
 
 
536
def create_connection(conf, new=True):
 
537
    """Create a connection"""
 
538
    return rpc_amqp.create_connection(conf, new,
 
539
            rpc_amqp.get_connection_pool(conf, Connection))
 
540
 
 
541
 
 
542
def multicall(conf, context, topic, msg, timeout=None):
 
543
    """Make a call that returns multiple times."""
 
544
    return rpc_amqp.multicall(conf, context, topic, msg, timeout,
 
545
            rpc_amqp.get_connection_pool(conf, Connection))
 
546
 
 
547
 
 
548
def call(conf, context, topic, msg, timeout=None):
 
549
    """Sends a message on a topic and wait for a response."""
 
550
    return rpc_amqp.call(conf, context, topic, msg, timeout,
 
551
            rpc_amqp.get_connection_pool(conf, Connection))
 
552
 
 
553
 
 
554
def cast(conf, context, topic, msg):
 
555
    """Sends a message on a topic without waiting for a response."""
 
556
    return rpc_amqp.cast(conf, context, topic, msg,
 
557
            rpc_amqp.get_connection_pool(conf, Connection))
 
558
 
 
559
 
 
560
def fanout_cast(conf, context, topic, msg):
 
561
    """Sends a message on a fanout exchange without waiting for a response."""
 
562
    return rpc_amqp.fanout_cast(conf, context, topic, msg,
 
563
            rpc_amqp.get_connection_pool(conf, Connection))
 
564
 
 
565
 
 
566
def cast_to_server(conf, context, server_params, topic, msg):
 
567
    """Sends a message on a topic to a specific server."""
 
568
    return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
 
569
            rpc_amqp.get_connection_pool(conf, Connection))
 
570
 
 
571
 
 
572
def fanout_cast_to_server(conf, context, server_params, topic, msg):
 
573
    """Sends a message on a fanout exchange to a specific server."""
 
574
    return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic,
 
575
            msg, rpc_amqp.get_connection_pool(conf, Connection))
 
576
 
 
577
 
 
578
def notify(conf, context, topic, msg):
 
579
    """Sends a notification event on a topic."""
 
580
    return rpc_amqp.notify(conf, context, topic, msg,
 
581
            rpc_amqp.get_connection_pool(conf, Connection))
 
582
 
 
583
 
 
584
def cleanup():
 
585
    return rpc_amqp.cleanup(Connection.pool)