~ubuntu-branches/ubuntu/wily/pymongo/wily-proposed

« back to all changes in this revision

Viewing changes to test/test_monitor.py

  • Committer: Package Import Robot
  • Author(s): Federico Ceratto
  • Date: 2015-04-26 22:43:13 UTC
  • mfrom: (24.1.5 sid)
  • Revision ID: package-import@ubuntu.com-20150426224313-0hga2jphvf0rrmfe
Tags: 3.0.1-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2014-2015 MongoDB, Inc.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
# http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
12
# See the License for the specific language governing permissions and
 
13
# limitations under the License.
 
14
 
 
15
"""Test the monitor module."""
 
16
 
 
17
import gc
 
18
import sys
 
19
from functools import partial
 
20
 
 
21
sys.path[0:0] = [""]
 
22
 
 
23
from pymongo.periodic_executor import _EXECUTORS
 
24
from test import unittest, port, host, IntegrationTest
 
25
from test.utils import single_client, one, connected, wait_until
 
26
 
 
27
 
 
28
def unregistered(ref):
 
29
    gc.collect()
 
30
    return ref not in _EXECUTORS
 
31
 
 
32
 
 
33
class TestMonitor(IntegrationTest):
 
34
    def test_atexit_hook(self):
 
35
        client = single_client(host, port)
 
36
        executor = one(client._topology._servers.values())._monitor._executor
 
37
        connected(client)
 
38
 
 
39
        # The executor stores a weakref to itself in _EXECUTORS.
 
40
        ref = one([r for r in _EXECUTORS.copy() if r() is executor])
 
41
 
 
42
        del executor
 
43
        del client
 
44
 
 
45
        wait_until(partial(unregistered, ref), 'unregister executor',
 
46
                   timeout=5)
 
47
 
 
48
 
 
49
if __name__ == "__main__":
 
50
    unittest.main()