2
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
4
# Copyright (c) 2017 Canonical
6
# Author: Andrea Azzarone <andrea.azzarone@canonical.com>
8
# This program is free software; you can redistribute it and/or
9
# modify it under the terms of the GNU General Public License as
10
# published by the Free Software Foundation; either version 2 of the
11
# License, or (at your option) any later version.
13
# This program is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
# GNU General Public License for more details.
18
# You should have received a copy of the GNU General Public License
19
# along with this program; if not, write to the Free Software
20
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
23
from gi.repository import GLib
29
HOST_NAME = '/var/snap/canonical-livepatch/current/livepatchd.sock'
32
class UHTTPConnection(http.client.HTTPConnection):
34
def __init__(self, path):
35
http.client.HTTPConnection.__init__(self, 'localhost')
39
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
40
sock.connect(self.path)
44
class LivePatchSocket(object):
46
def __init__(self, http_conn=None):
48
self.conn = UHTTPConnection(HOST_NAME)
52
def get_status(self, on_done):
56
self.conn.request('GET', '/status?verbose=True')
57
r = self.conn.getresponse()
58
active = r.status == 200
59
data = yaml.safe_load(r.read())
60
except Exception as e:
63
check_state = LivePatchSocket.get_check_state(data)
64
patch_state = LivePatchSocket.get_patch_state(data)
65
fixes = LivePatchSocket.get_fixes(data)
66
GLib.idle_add(lambda: on_done(
67
active, check_state, patch_state, fixes))
69
thread = threading.Thread(target=do_call)
73
def get_check_state(data):
75
status = data['status']
76
kernel = next((k for k in status if k['running']), None)
77
return kernel['livepatch']['checkState']
78
except Exception as e:
82
def get_patch_state(data):
84
status = data['status']
85
kernel = next((k for k in status if k['running']), None)
86
return kernel['livepatch']['patchState']
87
except Exception as e:
93
status = data['status']
94
kernel = next((k for k in status if k['running']), None)
95
fixes = kernel['livepatch']['fixes']
96
return [LivePatchFix(f)
97
for f in fixes.replace('* ', '').split('\n') if len(f) > 0]
98
except Exception as e:
102
class LivePatchFix(object):
104
def __init__(self, text):
105
patched_pattern = ' (unpatched)'
106
self.patched = text.find(patched_pattern) == -1
107
self.name = text.replace(patched_pattern, '')
109
def __eq__(self, other):
110
if isinstance(other, LivePatchFix):
111
return self.name == other.name and self.patched == other.patched
112
return NotImplemented
114
def __ne__(self, other):
115
result = self.__eq__(other)
116
if result is NotImplemented: