~ahasenack/landscape-client/landscape-client-1.5.5-0ubuntu0.10.04.0

1.2.5 by Free Ekanayaka
Import upstream version 1.5.2.1
1
import time
2
3
try:
4
    import sqlite3
5
except ImportError:
6
    from pysqlite2 import dbapi2 as sqlite3
7
8
from landscape.tests.helpers import LandscapeTest
9
10
from landscape.broker.exchangestore import ExchangeStore
11
12
13
class ExchangeStoreTest(LandscapeTest):
14
    """Unit tests for the C{ExchangeStore}."""
15
16
    def setUp(self):
17
        super(ExchangeStoreTest, self).setUp()
18
19
        self.filename = self.makeFile()
20
        self.store1 = ExchangeStore(self.filename)
21
        self.store2 = ExchangeStore(self.filename)
22
23
    def test_add_message_context(self):
24
        """Adding a message context works correctly."""
25
        now = time.time()
26
        self.store1.add_message_context(123, 'abc', 'change-packages')
27
28
        db = sqlite3.connect(self.store2._filename)
29
        cursor = db.cursor()
30
        cursor.execute(
31
            "SELECT operation_id, secure_id, message_type, timestamp "
32
            "FROM message_context WHERE operation_id=?", (123,))
33
        results = cursor.fetchall()
34
        self.assertEquals(1, len(results))
35
        [row] = results
36
        self.assertEquals(123, row[0])
37
        self.assertEquals('abc', row[1])
38
        self.assertEquals('change-packages', row[2])
39
        self.assertTrue(row[3] > now)
40
41
    def test_add_message_context_with_duplicate_operation_id(self):
42
        """Only one message context with a given operation-id is permitted."""
43
        self.store1.add_message_context(123, "abc", "change-packages")
44
        self.assertRaises(
45
            (sqlite3.IntegrityError, sqlite3.OperationalError),
46
            self.store1.add_message_context, 123, "def", "change-packages")
47
48
    def test_get_message_context(self):
49
        """
50
        Accessing a C{MessageContext} with an existing C{operation-id} works.
51
        """
52
        now = time.time()
53
        self.store1.add_message_context(234, 'bcd', 'change-packages')
54
        context = self.store2.get_message_context(234)
55
        self.assertEquals(234, context.operation_id)
56
        self.assertEquals('bcd', context.secure_id)
57
        self.assertEquals('change-packages', context.message_type)
58
        self.assertTrue(context.timestamp > now)
59
60
    def test_get_message_context_with_nonexistent_operation_id(self):
61
        """Attempts to access a C{MessageContext} with a non-existent
62
        C{operation-id} result in C{None}."""
63
        self.assertIs(None, self.store1.get_message_context(999))
64
65
    def test_message_context_remove(self):
66
        """C{MessageContext}s are deleted correctly."""
67
        context = self.store1.add_message_context(
68
            345, 'opq', 'change-packages')
69
        context.remove()
70
        self.assertIs(None, self.store1.get_message_context(345))
71
72
    def test_all_operation_ids_for_empty_database(self):
73
        """
74
        Calling C{all_operation_ids} on an empty database returns an empty
75
        list.
76
        """
77
        self.assertEquals([], self.store1.all_operation_ids())
78
79
    def test_all_operation_ids(self):
80
        """C{all_operation_ids} works correctly."""
81
        self.store1.add_message_context(456, 'cde', 'change-packages')
82
        self.assertEquals([456], self.store2.all_operation_ids())
83
        self.store2.add_message_context(567, 'def', 'change-packages')
84
        self.assertEquals([456, 567], self.store1.all_operation_ids())