~ubuntu-branches/ubuntu/maverick/bzr/maverick

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# Copyright (C) 2008 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

import StringIO
from bzrlib import errors, filters
from bzrlib.filters import (
    ContentFilter,
    ContentFilterContext,
    filtered_input_file,
    filtered_output_bytes,
    _get_filter_stack_for,
    _get_registered_names,
    internal_size_sha_file_byname,
    register_filter_stack_map,
    )
from bzrlib.osutils import sha_string
from bzrlib.tests import TestCase, TestCaseInTempDir


# sample filter stacks
def _swapcase(chunks, context=None):
    return [s.swapcase() for s in chunks]
def _addjunk(chunks):
    return ['junk\n'] + [s for s in chunks]
def _deljunk(chunks, context):
    return [s for s in chunks[1:]]
_stack_1 = [
    ContentFilter(_swapcase, _swapcase),
    ]
_stack_2 = [
    ContentFilter(_swapcase, _swapcase),
    ContentFilter(_addjunk, _deljunk),
    ]

# sample data
_sample_external = ['Hello\n', 'World\n']
_internal_1 = ['hELLO\n', 'wORLD\n']
_internal_2 = ['junk\n', 'hELLO\n', 'wORLD\n']


class TestContentFilterContext(TestCase):

    def test_empty_filter_context(self):
        ctx = ContentFilterContext()
        self.assertEqual(None, ctx.relpath())

    def test_filter_context_with_path(self):
        ctx = ContentFilterContext('foo/bar')
        self.assertEquals('foo/bar', ctx.relpath())


class TestFilteredInput(TestCase):

    def test_filtered_input_file(self):
        # test an empty stack returns the same result
        external = ''.join(_sample_external)
        f = StringIO.StringIO(external)
        self.assertEqual(external, filtered_input_file(f, None).read())
        # test a single item filter stack
        f = StringIO.StringIO(external)
        expected = ''.join(_internal_1)
        self.assertEqual(expected, filtered_input_file(f, _stack_1).read())
        # test a multi item filter stack
        f = StringIO.StringIO(external)
        expected = ''.join(_internal_2)
        self.assertEqual(expected, filtered_input_file(f, _stack_2).read())


class TestFilteredOutput(TestCase):

    def test_filtered_output_bytes(self):
        # test an empty stack returns the same result
        self.assertEqual(_sample_external, list(filtered_output_bytes(
            _sample_external, None)))
        # test a single item filter stack
        self.assertEqual(_sample_external, list(filtered_output_bytes(
            _internal_1, _stack_1)))
        # test a multi item filter stack
        self.assertEqual(_sample_external, list(filtered_output_bytes(
            _internal_2, _stack_2)))


class TestFilteredSha(TestCaseInTempDir):

    def test_filtered_size_sha(self):
        # check that the size and sha matches what's expected
        text = 'Foo Bar Baz\n'
        a = open('a', 'wb')
        a.write(text)
        a.close()
        post_filtered_content = ''.join(_swapcase([text], None))
        expected_len = len(post_filtered_content)
        expected_sha = sha_string(post_filtered_content)
        self.assertEqual((expected_len,expected_sha),
            internal_size_sha_file_byname('a',
            [ContentFilter(_swapcase, _swapcase)]))


class TestFilterStackMaps(TestCase):

    def _register_map(self, pref, stk1, stk2):
        def stk_lookup(key):
            return {'v1': stk1, 'v2': stk2}.get(key)
        register_filter_stack_map(pref, stk_lookup)

    def test_filter_stack_maps(self):
        # Save the current registry
        original_registry = filters._reset_registry()
        try:
            # Test registration
            a_stack = [ContentFilter('b', 'c')]
            z_stack = [ContentFilter('y', 'x'), ContentFilter('w', 'v')]
            self._register_map('foo', a_stack, z_stack)
            self.assertEqual(['foo'], _get_registered_names())
            self._register_map('bar', z_stack, a_stack)
            self.assertEqual(['bar', 'foo'], _get_registered_names())
            # Test re-registration raises an error
            self.assertRaises(errors.BzrError, self._register_map,
                'foo', [], [])
        finally:
            # Restore the real registry
            filters._reset_registry(original_registry)

    def test_get_filter_stack_for(self):
        # Save the current registry
        original_registry = filters._reset_registry()
        try:
            # Test filter stack lookup
            a_stack = [ContentFilter('b', 'c')]
            d_stack = [ContentFilter('d', 'D')]
            z_stack = [ContentFilter('y', 'x'), ContentFilter('w', 'v')]
            self._register_map('foo', a_stack, z_stack)
            self._register_map('bar', d_stack, z_stack)
            prefs = (('foo','v1'),)
            self.assertEqual(a_stack, _get_filter_stack_for(prefs))
            prefs = (('foo','v2'),)
            self.assertEqual(z_stack, _get_filter_stack_for(prefs))
            prefs = (('foo','v1'), ('bar','v1'))
            self.assertEqual(a_stack + d_stack, _get_filter_stack_for(prefs))
            # Test an unknown preference
            prefs = (('baz','v1'),)
            self.assertEqual([], _get_filter_stack_for(prefs))
            # Test an unknown value
            prefs = (('foo','v3'),)
            self.assertEqual([], _get_filter_stack_for(prefs))
            # Test a value of None is skipped
            prefs = (('foo',None), ('bar', 'v1'))
            self.assertEqual(d_stack, _get_filter_stack_for(prefs))
        finally:
            # Restore the real registry
            filters._reset_registry(original_registry)