11
11
from ReducePyDoNothing import ReducePyDoNothing
12
12
from OutputPyJSON import OutputPyJSON
14
from Go import Go, get_possible_dataflows
16
16
class FakeMap(): #pylint: disable = W0232, R0903
17
17
"""Map mock-up that always fails to birth"""
31
31
Make sure get_possible_dataflows() doesn't return nonsense
33
keys = get_possible_dataflows().keys()
33
keys = Go.get_possible_dataflows().keys()
34
34
self.assertTrue('pipeline_single_thread' in keys)
36
36
def input_birth_test(self):
109
109
command_line_args = False)
112
def test_type_of_dataflow_good(self):
113
"""Check that Go executes okay with good dataflow"""
112
def test_dataflow_single_thread(self):
114
Check that Go executes okay with pipeline_single_thread dataflow.
114
116
inputer = InputPyEmptyDocument(1)
115
117
transformer = MapPyDoNothing()
116
118
merger = ReducePyDoNothing()
121
123
Go(inputer, transformer, merger, outputer, config, \
122
124
command_line_args = False)
126
def test_dataflow_multi_process(self):
128
Check that Go executes okay with multi_process dataflow.
130
inputer = InputPyEmptyDocument(1)
131
transformer = MapPyDoNothing()
132
merger = ReducePyDoNothing()
133
outputer = OutputPyJSON(open(self.tmp_file, 'w'))
134
for map_red_type in ['multi_process']:
135
config = StringIO(u"""type_of_dataflow="%s" """ % map_red_type)
137
with self.assertRaises(Exception):
138
Go(inputer, transformer, merger, outputer, config, \
139
command_line_args = False)
124
141
def test_dataflow_not_implemented(self):
125
142
"""Check that Go notifies user of unimplemented dataflow"""
126
143
inputer = InputPyEmptyDocument(1)
127
144
transformer = MapPyDoNothing()
128
145
merger = ReducePyDoNothing()
129
146
outputer = OutputPyJSON(open(self.tmp_file, 'w'))
130
for map_red_type in ['many_local_threads','control_room_style']:
147
for map_red_type in ['many_local_threads']:
131
148
config = StringIO(u"""type_of_dataflow="%s" """ % map_red_type)
133
150
with self.assertRaises(NotImplementedError):