3
from gi.repository import GObject
7
4
from mock import patch
5
from gi.repository import GObject
9
from testutils import setup_test_env
7
from tests.utils import (
12
13
from softwarecenter.backend.recagent import RecommenderAgent
14
from softwarecenter.testutils import (
18
16
class TestRecommenderAgent(unittest.TestCase):
19
17
""" tests the recommender agent """
22
20
self.loop = GObject.MainLoop(GObject.main_context_default())
24
self.orig_host = os.environ.get("SOFTWARE_CENTER_RECOMMENDER_HOST")
25
if not "SOFTWARE_CENTER_RECOMMENDER_HOST" in os.environ:
26
server = "https://rec.staging.ubuntu.com"
27
#server = "https://rec.ubuntu.com"
28
os.environ["SOFTWARE_CENTER_RECOMMENDER_HOST"] = server
31
if self.orig_host is None:
32
del os.environ["SOFTWARE_CENTER_RECOMMENDER_HOST"]
22
if "SOFTWARE_CENTER_RECOMMENDER_HOST" in os.environ:
23
orig_host = os.environ.get("SOFTWARE_CENTER_RECOMMENDER_HOST")
24
self.addCleanup(os.environ.__setitem__,
25
"SOFTWARE_CENTER_RECOMMENDER_HOST", orig_host)
34
os.environ["SOFTWARE_CENTER_RECOMMENDER_HOST"] = self.orig_host
27
self.addCleanup(os.environ.pop, "SOFTWARE_CENTER_RECOMMENDER_HOST")
28
server = "https://rec.staging.ubuntu.com"
29
os.environ["SOFTWARE_CENTER_RECOMMENDER_HOST"] = server
36
31
@patch('softwarecenter.backend.recagent.SpawnHelper'
37
32
'.run_generic_piston_helper')
38
33
def test_mocked_recagent_post_submit_profile(self, mock_spawn_helper_run):
39
34
def _patched_on_submit_profile_data(*args, **kwargs):
40
35
piston_submit_profile = {}
41
recommender_agent.emit("submit-profile-finished",
36
recommender_agent.emit("submit-profile-finished",
42
37
piston_submit_profile)
43
38
mock_spawn_helper_run.side_effect = _patched_on_submit_profile_data
44
recommender_agent = RecommenderAgent()
39
recommender_agent = RecommenderAgent(needs_auth=False)
45
40
recommender_agent.connect("submit-profile-finished", self.on_query_done)
46
41
recommender_agent.connect("error", self.on_query_error)
47
42
recommender_agent._calc_profile_id = lambda profile: "i-am-random"
52
47
self.assertNotEqual(kwargs['data'][0]['package_list'], [])
54
49
def on_query_done(self, recagent, data):
55
print "query done, data: '%s'" % data
50
# print "query done, data: '%s'" % data
58
53
def on_query_error(self, recagent, error):
59
print "query error received: ", error
54
# print "query error received: ", error
63
58
def test_recagent_query_server_status(self):
64
59
# NOTE: This requires a working recommender host that is reachable
65
recommender_agent = RecommenderAgent()
60
recommender_agent = RecommenderAgent(needs_auth=False)
66
61
recommender_agent.connect("server-status", self.on_query_done)
67
62
recommender_agent.connect("error", self.on_query_error)
68
63
recommender_agent.query_server_status()
70
65
self.assertFalse(self.error)
72
# FIXME: disabled for now as the server is not quite working
67
# FIXME: disabled for now as the server is not quite working
73
68
def disabled_test_recagent_post_submit_profile(self):
74
69
# NOTE: This requires a working recommender host that is reachable
75
recommender_agent = RecommenderAgent()
70
recommender_agent = RecommenderAgent(needs_auth=False)
76
71
recommender_agent.connect("submit-profile-finished", self.on_query_done)
77
72
recommender_agent.connect("error", self.on_query_error)
81
76
self.assertFalse(self.error)
82
77
#print mock_request._post
84
79
# NOTE: this server call is currently not needed and not used
85
80
# def disabled_test_recagent_query_submit_anon_profile(self):
86
81
# # NOTE: This requires a working recommender host that is reachable
87
# recommender_agent = RecommenderAgent()
82
# recommender_agent = RecommenderAgent(needs_auth=False)
88
83
# recommender_agent.connect("submit-anon-profile", self.on_query_done)
89
84
# recommender_agent.connect("error", self.on_query_error)
90
85
# recommender_agent.query_submit_anon_profile(
95
90
# self.assertFalse(self.error)
97
92
# FIXME: disabled for now as the server is not quite working
98
93
def disabled_test_recagent_query_profile(self):
99
94
# NOTE: This requires a working recommender host that is reachable
100
recommender_agent = RecommenderAgent()
95
recommender_agent = RecommenderAgent(needs_auth=False)
101
96
recommender_agent.connect("profile", self.on_query_done)
102
97
recommender_agent.connect("error", self.on_query_error)
103
98
recommender_agent.query_profile(pkgnames=["pitivi", "fretsonfire"])
107
102
# FIXME: disabled for now as the server is not quite working
108
103
def disabled_test_recagent_query_recommend_me(self):
109
104
# NOTE: This requires a working recommender host that is reachable
110
recommender_agent = RecommenderAgent()
105
recommender_agent = RecommenderAgent(needs_auth=False)
111
106
recommender_agent.connect("recommend-me", self.on_query_done)
112
107
recommender_agent.connect("error", self.on_query_error)
113
108
recommender_agent.query_recommend_me()
117
112
def test_recagent_query_recommend_app(self):
118
113
# NOTE: This requires a working recommender host that is reachable
119
recommender_agent = RecommenderAgent()
114
recommender_agent = RecommenderAgent(needs_auth=False)
120
115
recommender_agent.connect("recommend-app", self.on_query_done)
121
116
recommender_agent.connect("error", self.on_query_error)
122
117
recommender_agent.query_recommend_app("pitivi")
126
121
# disabled for now (2012-03-20) as the server is returning 504
127
122
def disabled_test_recagent_query_recommend_all_apps(self):
128
123
# NOTE: This requires a working recommender host that is reachable
129
recommender_agent = RecommenderAgent()
124
recommender_agent = RecommenderAgent(needs_auth=False)
130
125
recommender_agent.connect("recommend-all-apps", self.on_query_done)
131
126
recommender_agent.connect("error", self.on_query_error)
132
127
recommender_agent.query_recommend_all_apps()
134
129
self.assertFalse(self.error)
136
131
def test_recagent_query_recommend_top(self):
137
132
# NOTE: This requires a working recommender host that is reachable
138
recommender_agent = RecommenderAgent()
133
recommender_agent = RecommenderAgent(needs_auth=False)
139
134
recommender_agent.connect("recommend-top", self.on_query_done)
140
135
recommender_agent.connect("error", self.on_query_error)
141
136
recommender_agent.query_recommend_top()
143
138
self.assertFalse(self.error)
145
140
def test_recagent_query_error(self):
146
141
# NOTE: This tests the error condition itself! it simply forces an error
147
142
# 'cuz there definitely isn't a server here :)
148
os.environ["SOFTWARE_CENTER_RECOMMENDER_HOST"] = "https://test-no-server-here.staging.ubuntu.com"
149
recommender_agent = RecommenderAgent()
143
fake_server = "https://test-no-server-here.staging.ubuntu.com"
144
os.environ["SOFTWARE_CENTER_RECOMMENDER_HOST"] = fake_server
145
recommender_agent = RecommenderAgent(needs_auth=False)
150
146
recommender_agent.connect("recommend-top", self.on_query_done)
151
147
recommender_agent.connect("error", self.on_query_error)
152
148
recommender_agent.query_recommend_top()