~thekorn/python-osso-abook/trunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import gobject
import ctypes

from libosso_abook import osso_abook, \
    OssoABookWaitableCallback, OssoABookContactPredicate
from enums import OssoABookAggregatorState, EnumResult
from utils import typed_glist
from libebook import FIELD_TYPES
 
class Contact(gobject.GObject):
    
    def __init__(self, OssoABookContact):
        super(Contact, self).__init__()
        self.__contact = OssoABookContact
        self.__roster_contacts = list(self.get_roster_contacts())
        
    def get_property(self, name):
        id = self.field_id(name)
        if id is None:
            raise AttributeError
        return self.get(id)
        
    def get_properties(self, *names):
        return tuple(self.get_property(name) for name in names)
        
    def set_property(self, name, value):
        raise NotImplementedError("all properties are readonly atm")
    
    @property
    def _data(self):
        return self.__contact
        
    def get_persistent_uid(self):
        contact_get_persistent_uid = osso_abook.osso_abook_contact_get_persistent_uid
        contact_get_persistent_uid.restype = ctypes.c_char_p
        result = contact_get_persistent_uid(self.__contact)
        return result
        
    def get_name(self):
        get_name = osso_abook.osso_abook_contact_get_name
        get_name.restype = ctypes.c_char_p
        result = get_name(self.__contact)
        return result
        
    def get(self, field_id):
        get = osso_abook.e_contact_get
        get.argtypes = [ctypes.c_int, ctypes.c_int]
        if field_id in FIELD_TYPES:
            get.restype = FIELD_TYPES[field_id]
        else:
            get.restype = ctypes.c_char_p
        result = get(self.__contact, int(field_id))
        return result
        
    def get_all(self, field_id):
        results = list()
        for contact in [self] + self.__roster_contacts:
            result = contact.get(field_id)
            if result is not None:
                if isinstance(result, list):
                    for x in result:
                        results.append(x)
                else:
                    results.append(result)
        if len(results) == 0:
            return None
        elif len(results) == 1:
            return results.pop()
        else:
            return results
        
    @staticmethod
    def field_name(field_id):
        """ name of a field/attribute """
        field_name = osso_abook.e_contact_field_name
        field_name.restype = ctypes.c_char_p
        return field_name(int(field_id))
        
    @staticmethod
    def pretty_name(field_id):
        """ userreadable field identifier """
        pretty_name = osso_abook.e_contact_pretty_name
        pretty_name.restype = ctypes.c_char_p
        return pretty_name(int(field_id))
        
    @staticmethod
    def field_id(field_name):
        """ id = Contact.field_id(Contact.field_name(id)) """
        field_id = osso_abook.e_contact_field_id
        field_id.argtypes = [ctypes.c_char_p]
        return field_id(str(field_name)) or None
        
    def is_roster_contact(self):
        return bool(osso_abook.osso_abook_contact_is_roster_contact(self.__contact))
        
    def has_roster_contacts(self):
        return bool(osso_abook.osso_abook_contact_has_roster_contacts(self.__contact))
        
    def get_roster_contacts(self):
        get_roster_contacts = osso_abook.osso_abook_contact_get_roster_contacts
        get_roster_contacts.restype = lambda x: typed_glist(Contact, x)
        return get_roster_contacts(self.__contact)
        

class AddressBook(gobject.GObject):
    
    @classmethod
    def get_default(cls):
        aggregator = osso_abook.osso_abook_aggregator_get_default(0)
        return cls(aggregator)
        
    def __init__(self, aggregator):
        super(AddressBook, self).__init__()
        self.__aggregator = aggregator
        
    def get_state(self):
        get_state = osso_abook.osso_abook_aggregator_get_state
        get_state.restype = lambda x: EnumResult(OssoABookAggregatorState, x-1)
        return get_state(self.__aggregator)
        
    def lookup(self, uid, callback, *user_data):
        """ callback gets a generator of contacts as only argument """
        
        def func(aggregator, error, data):
            aggregator_lookup = osso_abook.osso_abook_aggregator_lookup
            aggregator_lookup.restype = lambda x: typed_glist(Contact, x)
            result = aggregator_lookup(aggregator, uid)
            OssoABookWaitableCallback.unregister(cb_proto)
            callback(result, *user_data)
        
        # Keep prototype object reference so it is not destroyed
        cb_proto = OssoABookWaitableCallback(func)
        osso_abook.osso_abook_waitable_call_when_ready(self.__aggregator, cb_proto, 0, 0)
        
    def resolve_master_contacts(self, contact, callback, *user_data):
        """ callback gets a generator of contacts as only argument """
        
        def func(aggregator, error, data):
            aggregator_resolve_master_contacts = osso_abook.osso_abook_aggregator_resolve_master_contacts
            aggregator_resolve_master_contacts.restype = lambda x: typed_glist(Contact, x)
            result = aggregator_resolve_master_contacts(aggregator, contact._data)
            OssoABookWaitableCallback.unregister(cb_proto)
            callback(result, *user_data)
        
        # Keep prototype object reference so it is not destroyed
        cb_proto = OssoABookWaitableCallback(func)
        osso_abook.osso_abook_waitable_call_when_ready(self.__aggregator, cb_proto, 0, 0)
        
    def list_master_contacts(self, callback, *user_data):
        """ callback gets a generator of contacts as only argument """
        
        def func(aggregator, error, data):
            list_master_contacts = osso_abook.osso_abook_aggregator_list_master_contacts
            list_master_contacts.restype = lambda x: typed_glist(Contact, x)
            result = list_master_contacts(aggregator)
            OssoABookWaitableCallback.unregister(cb_proto)
            callback(result, *user_data)
        
        # Keep prototype object reference so it is not destroyed
        cb_proto = OssoABookWaitableCallback(func)
        osso_abook.osso_abook_waitable_call_when_ready(self.__aggregator, cb_proto, 0, 0)
        
    def list_roster_contacts(self, callback, *user_data):
        """ callback gets a generator of contacts as only argument """
        
        def func(aggregator, error, data):
            list_roster_contacts = osso_abook.osso_abook_aggregator_list_roster_contacts
            list_roster_contacts.restype = lambda x: typed_glist(Contact, x)
            result = list_roster_contacts(aggregator)
            OssoABookWaitableCallback.unregister(cb_proto)
            callback(result, *user_data)
        
        # Keep prototype object reference so it is not destroyed
        cb_proto = OssoABookWaitableCallback(func)
        osso_abook.osso_abook_waitable_call_when_ready(self.__aggregator, cb_proto, 0, 0)
        
    def get_master_contact_count(self, callback, *user_data):
        """ callback gets a generator of contacts as only argument """
        
        def func(aggregator, error, data):
            get_master_contact_count = osso_abook.osso_abook_aggregator_get_master_contact_count
            result = get_master_contact_count(aggregator)
            OssoABookWaitableCallback.unregister(cb_proto)
            callback(result, *user_data)
        
        # Keep prototype object reference so it is not destroyed
        cb_proto = OssoABookWaitableCallback(func)
        osso_abook.osso_abook_waitable_call_when_ready(self.__aggregator, cb_proto, 0, 0)
        
    def find_contacts_full(self, predicate, callback, *user_data):
        """ callback gets a generator of contacts as only argument """
        
        def func(aggregator, error, data):
            find_contacts_full = osso_abook.osso_abook_aggregator_find_contacts_full
            find_contacts_full.restype = lambda x: typed_glist(Contact, x)
            
            cb_predicate = OssoABookContactPredicate(lambda c: predicate(Contact(c)))
            
            result = find_contacts_full(aggregator, cb_predicate)
            OssoABookWaitableCallback.unregister(cb_proto)
            OssoABookContactPredicate.unregister(cb_predicate)
            callback(result, *user_data)
        
        # Keep prototype object reference so it is not destroyed
        cb_proto = OssoABookWaitableCallback(func)
        osso_abook.osso_abook_waitable_call_when_ready(self.__aggregator, cb_proto, 0, 0)
        
    def find_contacts(self, equery, callback, *user_data):
        """ callback gets a generator of contacts as only argument """
        # FIXME: it's somehow not working, all queries return an empty result set
        
        def func(aggregator, error, data):
            find_contacts = osso_abook.osso_abook_aggregator_find_contacts
            find_contacts.restype = lambda x: typed_glist(Contact, x)
            
            result = find_contacts(aggregator, hash(equery))
            OssoABookWaitableCallback.unregister(cb_proto)
            callback(result, *user_data)
        
        # Keep prototype object reference so it is not destroyed
        cb_proto = OssoABookWaitableCallback(func)
        osso_abook.osso_abook_waitable_call_when_ready(self.__aggregator, cb_proto, 0, 0)
        
    def find_contacts_for_phone_number(self, phone_number, fuzzy_match, callback, *user_data):
        """ callback gets a generator of contacts as only argument """
        
        def func(aggregator, error, data):
            find_contacts = osso_abook.osso_abook_aggregator_find_contacts_for_phone_number
            find_contacts.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_int]
            find_contacts.restype = lambda x: typed_glist(Contact, x)
            
            result = find_contacts(aggregator, str(phone_number), bool(fuzzy_match))
            OssoABookWaitableCallback.unregister(cb_proto)
            callback(result, *user_data)
        
        # Keep prototype object reference so it is not destroyed
        cb_proto = OssoABookWaitableCallback(func)
        osso_abook.osso_abook_waitable_call_when_ready(self.__aggregator, cb_proto, 0, 0)
        
    def find_contacts_for_im_contact(self, username, account, callback, *user_data):
        """ callback gets a generator of contacts as only argument """
        # FIXME: it's not working and McAccount is not implemented
        if account is not None:
            raise NotImplementedError("account is not implemented")
        
        def func(aggregator, error, data):
            find_contacts = osso_abook.osso_abook_aggregator_find_contacts_for_im_contact
            find_contacts.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_int]
            find_contacts.restype = lambda x: typed_glist(Contact, x)
            
            result = find_contacts(aggregator, str(username), 0)
            OssoABookWaitableCallback.unregister(cb_proto)
            callback(result, *user_data)
        
        # Keep prototype object reference so it is not destroyed
        cb_proto = OssoABookWaitableCallback(func)
        osso_abook.osso_abook_waitable_call_when_ready(self.__aggregator, cb_proto, 0, 0)