~rbalint/update-manager/remove-autoremovable-kernels

« back to all changes in this revision

Viewing changes to UpdateManager/Core/LivePatchSocket.py

  • Committer: Brian Murray
  • Date: 2017-08-31 16:25:42 UTC
  • mfrom: (2762.1.10 update-manager)
  • Revision ID: brian@canonical.com-20170831162542-e0n273gins1y0w48
Tags: 1:17.10.7
releasing package update-manager version 1:17.10.7

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# LivePatchSocket.py
 
2
# -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*-
 
3
#
 
4
#  Copyright (c) 2017 Canonical
 
5
#
 
6
#  Author: Andrea Azzarone <andrea.azzarone@canonical.com>
 
7
#
 
8
#  This program is free software; you can redistribute it and/or
 
9
#  modify it under the terms of the GNU General Public License as
 
10
#  published by the Free Software Foundation; either version 2 of the
 
11
#  License, or (at your option) any later version.
 
12
#
 
13
#  This program is distributed in the hope that it will be useful,
 
14
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
16
#  GNU General Public License for more details.
 
17
#
 
18
#  You should have received a copy of the GNU General Public License
 
19
#  along with this program; if not, write to the Free Software
 
20
#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
 
21
#  USA
 
22
 
 
23
from gi.repository import GLib
 
24
import http.client
 
25
import socket
 
26
import threading
 
27
import yaml
 
28
 
 
29
HOST_NAME = '/var/snap/canonical-livepatch/current/livepatchd.sock'
 
30
 
 
31
 
 
32
class UHTTPConnection(http.client.HTTPConnection):
 
33
 
 
34
    def __init__(self, path):
 
35
        http.client.HTTPConnection.__init__(self, 'localhost')
 
36
        self.path = path
 
37
 
 
38
    def connect(self):
 
39
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 
40
        sock.connect(self.path)
 
41
        self.sock = sock
 
42
 
 
43
 
 
44
class LivePatchSocket(object):
 
45
 
 
46
    def __init__(self, http_conn=None):
 
47
        if http_conn is None:
 
48
            self.conn = UHTTPConnection(HOST_NAME)
 
49
        else:
 
50
            self.conn = http_conn
 
51
 
 
52
    def get_status(self, on_done):
 
53
 
 
54
        def do_call():
 
55
            try:
 
56
                self.conn.request('GET', '/status?verbose=True')
 
57
                r = self.conn.getresponse()
 
58
                active = r.status == 200
 
59
                data = yaml.safe_load(r.read())
 
60
            except Exception as e:
 
61
                active = False
 
62
                data = dict()
 
63
            check_state = LivePatchSocket.get_check_state(data)
 
64
            patch_state = LivePatchSocket.get_patch_state(data)
 
65
            fixes = LivePatchSocket.get_fixes(data)
 
66
            GLib.idle_add(lambda: on_done(
 
67
                active, check_state, patch_state, fixes))
 
68
 
 
69
        thread = threading.Thread(target=do_call)
 
70
        thread.start()
 
71
 
 
72
    @staticmethod
 
73
    def get_check_state(data):
 
74
        try:
 
75
            status = data['status']
 
76
            kernel = next((k for k in status if k['running']), None)
 
77
            return kernel['livepatch']['checkState']
 
78
        except Exception as e:
 
79
            return 'check-failed'
 
80
 
 
81
    @staticmethod
 
82
    def get_patch_state(data):
 
83
        try:
 
84
            status = data['status']
 
85
            kernel = next((k for k in status if k['running']), None)
 
86
            return kernel['livepatch']['patchState']
 
87
        except Exception as e:
 
88
            return 'unknown'
 
89
 
 
90
    @staticmethod
 
91
    def get_fixes(data):
 
92
        try:
 
93
            status = data['status']
 
94
            kernel = next((k for k in status if k['running']), None)
 
95
            fixes = kernel['livepatch']['fixes']
 
96
            return [LivePatchFix(f)
 
97
                    for f in fixes.replace('* ', '').split('\n') if len(f) > 0]
 
98
        except Exception as e:
 
99
            return list()
 
100
 
 
101
 
 
102
class LivePatchFix(object):
 
103
 
 
104
    def __init__(self, text):
 
105
        patched_pattern = ' (unpatched)'
 
106
        self.patched = text.find(patched_pattern) == -1
 
107
        self.name = text.replace(patched_pattern, '')
 
108
 
 
109
    def __eq__(self, other):
 
110
        if isinstance(other, LivePatchFix):
 
111
            return self.name == other.name and self.patched == other.patched
 
112
        return NotImplemented
 
113
 
 
114
    def __ne__(self, other):
 
115
        result = self.__eq__(other)
 
116
        if result is NotImplemented:
 
117
            return result
 
118
        return not result