1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
|
# Copyright (C) 2009 Canonical
#
# Authors:
# Michael Vogt
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; version 3.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
import apt
import apt_pkg
import logging
import re
import urllib
import time
import xml.sax.saxutils
import gobject
import gio
import glib
from enums import USER_AGENT
# define additional entities for the unescape method, needed
# because only '&', '<', and '>' are included by default
ESCAPE_ENTITIES = {"'":"'",
'"':'"'}
class ExecutionTime(object):
"""
Helper that can be used in with statements to have a simple
measure of the timming of a particular block of code, e.g.
with ExecutinTime("db flush"):
db.flush()
"""
def __init__(self, info=""):
self.info = info
def __enter__(self):
self.now = time.time()
def __exit__(self, type, value, stack):
logger = logging.getLogger("softwarecenter.performance")
logger.debug("%s: %s" % (self.info, time.time() - self.now))
class GnomeProxyURLopener(urllib.FancyURLopener):
"""A urllib.URLOpener that honors the gnome proxy settings"""
def __init__(self, user_agent=USER_AGENT):
proxies = {}
http_proxy = get_http_proxy_string_from_gconf()
if http_proxy:
proxies = { "http" : http_proxy }
urllib.FancyURLopener.__init__(self, proxies)
self.version = user_agent
def http_error_404(self, url, fp, errcode, errmsg, headers):
logging.debug("http_error_404: %s %s %s" % (url, errcode, errmsg))
raise Url404Error, "404 %s" % url
def http_error_403(self, url, fp, errcode, errmsg, headers):
logging.debug("http_error_403: %s %s %s" % (url, errcode, errmsg))
raise Url403Error, "403 %s" % url
def htmlize_package_desc(desc):
def _is_bullet(line):
return re.match("^(\s*[-*])", line)
inside_p = False
inside_li = False
indent_len = None
for line in desc.splitlines():
stripped_line = line.strip()
if (not inside_p and
not inside_li and
not _is_bullet(line) and
stripped_line):
yield '<p tabindex="0">'
inside_p = True
if stripped_line:
match = re.match("^(\s*[-*])", line)
if match:
if inside_li:
yield "</li>"
yield "<li>"
inside_li = True
indent_len = len(match.group(1))
stripped_line = line[indent_len:].strip()
yield stripped_line
elif inside_li:
if not line.startswith(" " * indent_len):
yield "</li>"
inside_li = False
yield stripped_line
else:
yield stripped_line
else:
if inside_li:
yield "</li>"
inside_li = False
if inside_p:
yield "</p>"
inside_p = False
if inside_li:
yield "</li>"
if inside_p:
yield "</p>"
def get_http_proxy_string_from_gconf():
"""Helper that gets the http proxy from gconf
Returns: string with http://auth:pw@proxy:port/ or None
"""
try:
import gconf, glib
client = gconf.client_get_default()
if client.get_bool("/system/http_proxy/use_http_proxy"):
authentication = ""
if client.get_bool("/system/http_proxy/use_authentication"):
user = client.get_string("/system/http_proxy/authentication_user")
password = client.get_string("/system/http_proxy/authentication_password")
authentication = "%s:%s@" % (user, password)
host = client.get_string("/system/http_proxy/host")
port = client.get_int("/system/http_proxy/port")
http_proxy = "http://%s%s:%s/" % (authentication, host, port)
return http_proxy
except Exception:
logging.exception("failed to get proxy from gconf")
return None
def encode_for_xml(unicode_data, encoding="ascii"):
""" encode a given string for xml """
return unicode_data.encode(encoding, 'xmlcharrefreplace')
def decode_xml_char_reference(s):
""" takes a string like
'Search…'
and converts it to
'Search...'
"""
import re
p = re.compile("\&\#x(\d\d\d\d);")
return p.sub(r"\u\1", s).decode("unicode-escape")
def unescape(text):
"""
unescapes the given text
"""
return xml.sax.saxutils.unescape(text, ESCAPE_ENTITIES)
def get_current_arch():
return apt_pkg.config.find("Apt::Architecture")
def human_readable_name_from_ppa_uri(ppa_uri):
""" takes a PPA uri and returns a human readable name for it """
from urlparse import urlsplit
name = urlsplit(ppa_uri).path
if name.endswith("/ubuntu"):
return name[0:-len("/ubuntu")]
return name
def sources_filename_from_ppa_entry(entry):
"""
takes a PPA SourceEntry and returns a filename suitable for sources.list.d
"""
from urlparse import urlsplit
import apt_pkg
name = "%s.list" % apt_pkg.URItoFileName(entry.uri)
return name
# FIXME: why not call it a generic downloader?
class ImageDownloader(gobject.GObject):
__gsignals__ = {
"image-url-reachable" : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
(bool,),),
"image-download-complete" : (gobject.SIGNAL_RUN_LAST,
gobject.TYPE_NONE,
(str,),),
}
def download_image(self, url, dest_file_path):
self.url = url
self.dest_file_path = dest_file_path
f = gio.File(url)
# first check if the url is reachable
f.query_info_async(gio.FILE_ATTRIBUTE_STANDARD_SIZE,
self._check_url_reachable_and_then_download_cb)
def _check_url_reachable_and_then_download_cb(self, f, result):
try:
result = f.query_info_finish(result)
self.emit('image-url-reachable', True)
# url is reachable, now download the icon file
f.load_contents_async(self._icon_download_complete_cb)
except glib.GError, e:
self.emit('image-url-reachable', False)
del f
def _icon_download_complete_cb(self, f, result, path=None):
# The result from the download is actually a tuple with three
# elements (content, size, etag?)
# The first element is the actual content so let's grab that
content = f.load_contents_finish(result)[0]
outputfile = open(self.dest_file_path, "w")
outputfile.write(content)
outputfile.close()
self.emit('image-download-complete', self.dest_file_path)
if __name__ == "__main__":
s = decode_xml_char_reference('Search…')
print s
print type(s)
print unicode(s)
|