~ubuntu-branches/debian/sid/python-py-zipkin/sid

« back to all changes in this revision

Viewing changes to py_zipkin/logging_helper.py

  • Committer: Package Import Robot
  • Author(s): Olivier Sallou
  • Date: 2017-08-16 12:51:25 UTC
  • Revision ID: package-import@ubuntu.com-20170816125125-hbry2c4hm32nuq91
Tags: upstream-0.9.0
ImportĀ upstreamĀ versionĀ 0.9.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
import logging
 
3
import time
 
4
from collections import defaultdict
 
5
 
 
6
from py_zipkin.exception import ZipkinError
 
7
from py_zipkin.thrift import annotation_list_builder
 
8
from py_zipkin.thrift import binary_annotation_list_builder
 
9
from py_zipkin.thrift import copy_endpoint_with_new_service_name
 
10
from py_zipkin.thrift import create_span
 
11
from py_zipkin.thrift import thrift_objs_in_bytes
 
12
from py_zipkin.util import generate_random_64bit_string
 
13
 
 
14
 
 
15
try:  # Python 2.7+
 
16
    from logging import NullHandler
 
17
except ImportError:  # pragma: no cover
 
18
    class NullHandler(logging.Handler):
 
19
        def emit(self, record):
 
20
            pass
 
21
 
 
22
null_handler = NullHandler()
 
23
zipkin_logger = logging.getLogger('py_zipkin.logger')
 
24
zipkin_logger.addHandler(null_handler)
 
25
zipkin_logger.setLevel(logging.DEBUG)
 
26
 
 
27
LOGGING_END_KEY = 'py_zipkin.logging_end'
 
28
 
 
29
 
 
30
class ZipkinLoggingContext(object):
 
31
    """A logging context specific to a Zipkin trace. If the trace is sampled,
 
32
    the logging context sends serialized Zipkin spans to a transport_handler.
 
33
    The logging context sends root "server" or "client" span, as well as all
 
34
    local child spans collected within this context.
 
35
 
 
36
    This class should only be used by the main `zipkin_span` entrypoint.
 
37
    """
 
38
 
 
39
    def __init__(
 
40
        self,
 
41
        zipkin_attrs,
 
42
        thrift_endpoint,
 
43
        log_handler,
 
44
        span_name,
 
45
        transport_handler,
 
46
        report_root_timestamp,
 
47
        binary_annotations=None,
 
48
        add_logging_annotation=False,
 
49
        client_context=False,
 
50
        max_span_batch_size=None,
 
51
    ):
 
52
        self.zipkin_attrs = zipkin_attrs
 
53
        self.thrift_endpoint = thrift_endpoint
 
54
        self.log_handler = log_handler
 
55
        self.span_name = span_name
 
56
        self.transport_handler = transport_handler
 
57
        self.response_status_code = 0
 
58
        self.report_root_timestamp = report_root_timestamp
 
59
        self.binary_annotations_dict = binary_annotations or {}
 
60
        self.sa_binary_annotations = []
 
61
        self.add_logging_annotation = add_logging_annotation
 
62
        self.client_context = client_context
 
63
        self.max_span_batch_size = max_span_batch_size
 
64
 
 
65
    def start(self):
 
66
        """Actions to be taken before request is handled.
 
67
        1) Attach `zipkin_logger` to :class:`ZipkinLoggerHandler` object.
 
68
        2) Record the start timestamp.
 
69
        """
 
70
        zipkin_logger.removeHandler(null_handler)
 
71
        zipkin_logger.addHandler(self.log_handler)
 
72
        self.start_timestamp = time.time()
 
73
        return self
 
74
 
 
75
    def stop(self):
 
76
        """Actions to be taken post request handling.
 
77
        1) Log the service annotations to scribe.
 
78
        2) Detach `zipkin_logger` handler.
 
79
        """
 
80
        self.log_spans()
 
81
        zipkin_logger.removeHandler(self.log_handler)
 
82
        zipkin_logger.addHandler(null_handler)
 
83
 
 
84
    def log_spans(self):
 
85
        """Main function to log all the annotations stored during the entire
 
86
        request. This is done if the request is sampled and the response was
 
87
        a success. It also logs the service (`ss` and `sr`) or the client
 
88
        ('cs' and 'cr') annotations.
 
89
        """
 
90
        if not self.zipkin_attrs.is_sampled:
 
91
            return
 
92
 
 
93
        span_sender = ZipkinBatchSender(self.transport_handler,
 
94
                                        self.max_span_batch_size)
 
95
        with span_sender:
 
96
            end_timestamp = time.time()
 
97
            # Collect additional annotations from the logging handler
 
98
            annotations_by_span_id = defaultdict(dict)
 
99
            binary_annotations_by_span_id = defaultdict(dict)
 
100
            for msg in self.log_handler.extra_annotations:
 
101
                span_id = msg['parent_span_id'] or self.zipkin_attrs.span_id
 
102
                # This should check if these are non-None
 
103
                annotations_by_span_id[span_id].update(msg['annotations'])
 
104
                binary_annotations_by_span_id[span_id].update(
 
105
                    msg['binary_annotations']
 
106
                )
 
107
 
 
108
            # Collect, annotate, and log client spans from the logging handler
 
109
            for span in self.log_handler.client_spans:
 
110
                # The parent_span_id is either the parent ID set in the
 
111
                # logging handler or the current Zipkin context's span ID.
 
112
                parent_span_id = (
 
113
                    span['parent_span_id'] or
 
114
                    self.zipkin_attrs.span_id
 
115
                )
 
116
                # A new client span's span ID can be overridden
 
117
                span_id = span['span_id'] or generate_random_64bit_string()
 
118
                endpoint = copy_endpoint_with_new_service_name(
 
119
                    self.thrift_endpoint, span['service_name']
 
120
                )
 
121
                # Collect annotations both logged with the new spans and
 
122
                # logged in separate log messages.
 
123
                annotations = span['annotations']
 
124
                annotations.update(annotations_by_span_id[span_id])
 
125
                binary_annotations = span['binary_annotations']
 
126
                binary_annotations.update(
 
127
                    binary_annotations_by_span_id[span_id])
 
128
 
 
129
                timestamp, duration = get_local_span_timestamp_and_duration(
 
130
                    annotations
 
131
                )
 
132
                # Create serializable thrift objects of annotations
 
133
                thrift_annotations = annotation_list_builder(
 
134
                    annotations, endpoint
 
135
                )
 
136
                thrift_binary_annotations = binary_annotation_list_builder(
 
137
                    binary_annotations, endpoint
 
138
                )
 
139
                if span.get('sa_binary_annotations'):
 
140
                    thrift_binary_annotations += span['sa_binary_annotations']
 
141
 
 
142
                span_sender.add_span(
 
143
                    span_id=span_id,
 
144
                    parent_span_id=parent_span_id,
 
145
                    trace_id=self.zipkin_attrs.trace_id,
 
146
                    span_name=span['span_name'],
 
147
                    annotations=thrift_annotations,
 
148
                    binary_annotations=thrift_binary_annotations,
 
149
                    timestamp_s=timestamp,
 
150
                    duration_s=duration,
 
151
                )
 
152
 
 
153
            extra_annotations = annotations_by_span_id[
 
154
                self.zipkin_attrs.span_id]
 
155
            extra_binary_annotations = binary_annotations_by_span_id[
 
156
                self.zipkin_attrs.span_id
 
157
            ]
 
158
 
 
159
            k1, k2 = ('sr', 'ss')
 
160
            if self.client_context:
 
161
                k1, k2 = ('cs', 'cr')
 
162
            annotations = {k1: self.start_timestamp, k2: end_timestamp}
 
163
            annotations.update(extra_annotations)
 
164
 
 
165
            if self.add_logging_annotation:
 
166
                annotations[LOGGING_END_KEY] = time.time()
 
167
 
 
168
            thrift_annotations = annotation_list_builder(
 
169
                annotations,
 
170
                self.thrift_endpoint,
 
171
            )
 
172
 
 
173
            # Binary annotations can be set through debug messages or the
 
174
            # set_extra_binary_annotations registry setting.
 
175
            self.binary_annotations_dict.update(extra_binary_annotations)
 
176
            thrift_binary_annotations = binary_annotation_list_builder(
 
177
                self.binary_annotations_dict,
 
178
                self.thrift_endpoint,
 
179
            )
 
180
            if self.sa_binary_annotations:
 
181
                thrift_binary_annotations += self.sa_binary_annotations
 
182
 
 
183
            if self.report_root_timestamp:
 
184
                timestamp = self.start_timestamp
 
185
                duration = end_timestamp - self.start_timestamp
 
186
            else:
 
187
                timestamp = duration = None
 
188
 
 
189
            span_sender.add_span(
 
190
                span_id=self.zipkin_attrs.span_id,
 
191
                parent_span_id=self.zipkin_attrs.parent_span_id,
 
192
                trace_id=self.zipkin_attrs.trace_id,
 
193
                span_name=self.span_name,
 
194
                annotations=thrift_annotations,
 
195
                binary_annotations=thrift_binary_annotations,
 
196
                timestamp_s=timestamp,
 
197
                duration_s=duration,
 
198
            )
 
199
 
 
200
 
 
201
def get_local_span_timestamp_and_duration(annotations):
 
202
    if 'cs' in annotations and 'cr' in annotations:
 
203
        return annotations['cs'], annotations['cr'] - annotations['cs']
 
204
    elif 'sr' in annotations and 'ss' in annotations:
 
205
        return annotations['sr'], annotations['ss'] - annotations['sr']
 
206
    return None, None
 
207
 
 
208
 
 
209
class ZipkinLoggerHandler(logging.StreamHandler, object):
 
210
    """Logger Handler to log span annotations or additional client spans to
 
211
    scribe. To connect to the handler, logger name must be
 
212
    'py_zipkin.logger'.
 
213
 
 
214
    :param zipkin_attrs: ZipkinAttrs namedtuple object
 
215
    """
 
216
 
 
217
    def __init__(self, zipkin_attrs):
 
218
        super(ZipkinLoggerHandler, self).__init__()
 
219
        # If parent_span_id is set, the application is in a logging context
 
220
        # where each additional client span logged has this span as its parent.
 
221
        # This is to allow logging of hierarchies of spans instead of just
 
222
        # single client spans. See the SpanContext class.
 
223
        self.parent_span_id = None
 
224
        self.zipkin_attrs = zipkin_attrs
 
225
        self.client_spans = []
 
226
        self.extra_annotations = []
 
227
 
 
228
    def store_local_span(
 
229
        self,
 
230
        span_name,
 
231
        service_name,
 
232
        annotations,
 
233
        binary_annotations,
 
234
        sa_binary_annotations=None,
 
235
        span_id=None,
 
236
    ):
 
237
        """Convenience method for storing a local child span (a zipkin_span
 
238
        inside other zipkin_spans) to be logged when the outermost zipkin_span
 
239
        exits.
 
240
        """
 
241
        self.client_spans.append({
 
242
            'span_name': span_name,
 
243
            'service_name': service_name,
 
244
            'parent_span_id': self.parent_span_id,
 
245
            'span_id': span_id,
 
246
            'annotations': annotations,
 
247
            'binary_annotations': binary_annotations,
 
248
            'sa_binary_annotations': sa_binary_annotations,
 
249
        })
 
250
 
 
251
    def emit(self, record):
 
252
        """Handle each record message. This function is called whenever
 
253
        zipkin_logger.debug() is called.
 
254
 
 
255
        :param record: object containing the `msg` object.
 
256
            Structure of record.msg should be the following:
 
257
            ::
 
258
 
 
259
            {
 
260
                "annotations": {
 
261
                    "cs": ts1,
 
262
                    "cr": ts2,
 
263
                },
 
264
                "binary_annotations": {
 
265
                    "http.uri": "/foo/bar",
 
266
                },
 
267
                "name": "foo_span",
 
268
                "service_name": "myService",
 
269
            }
 
270
 
 
271
            Keys:
 
272
            - annotations: str -> timestamp annotations
 
273
            - binary_annotations: str -> str binary annotations
 
274
              (One of either annotations or binary_annotations is required)
 
275
            - name: str of new span name; only used if service-name is also
 
276
              specified.
 
277
            - service_name: str of new client span's service name.
 
278
 
 
279
            If service_name is specified, this log msg is considered to
 
280
            represent a new client span. If service_name is omitted, this is
 
281
            considered additional annotation for the currently active
 
282
            "parent span" (either the server span or the parent client span
 
283
            inside a SpanContext).
 
284
        """
 
285
        if not self.zipkin_attrs.is_sampled:
 
286
            return
 
287
        span_name = record.msg.get('name', 'span')
 
288
        annotations = record.msg.get('annotations', {})
 
289
        binary_annotations = record.msg.get('binary_annotations', {})
 
290
        if not annotations and not binary_annotations:
 
291
            raise ZipkinError(
 
292
                "At least one of annotation/binary annotation has"
 
293
                " to be provided for {0} span".format(span_name)
 
294
            )
 
295
        service_name = record.msg.get('service_name', None)
 
296
        # Presence of service_name means this is to be a new local span.
 
297
        if service_name is not None:
 
298
            self.store_local_span(
 
299
                span_name=span_name,
 
300
                service_name=service_name,
 
301
                annotations=annotations,
 
302
                binary_annotations=binary_annotations,
 
303
            )
 
304
        else:
 
305
            self.extra_annotations.append({
 
306
                'annotations': annotations,
 
307
                'binary_annotations': binary_annotations,
 
308
                'parent_span_id': self.parent_span_id,
 
309
            })
 
310
 
 
311
 
 
312
class ZipkinBatchSender(object):
 
313
 
 
314
    MAX_PORTION_SIZE = 100
 
315
 
 
316
    def __init__(self, transport_handler, max_portion_size=None):
 
317
        self.transport_handler = transport_handler
 
318
        self.max_portion_size = max_portion_size or self.MAX_PORTION_SIZE
 
319
 
 
320
    def __enter__(self):
 
321
        self.queue = []
 
322
        return self
 
323
 
 
324
    def __exit__(self, _exc_type, _exc_value, _exc_traceback):
 
325
        if any((_exc_type, _exc_value, _exc_traceback)):
 
326
            error = '{0}: {1}'.format(_exc_type.__name__, _exc_value)
 
327
            raise ZipkinError(error)
 
328
        else:
 
329
            self.flush()
 
330
 
 
331
    def add_span(
 
332
        self,
 
333
        span_id,
 
334
        parent_span_id,
 
335
        trace_id,
 
336
        span_name,
 
337
        annotations,
 
338
        binary_annotations,
 
339
        timestamp_s,
 
340
        duration_s,
 
341
    ):
 
342
        thrift_span = create_span(
 
343
            span_id,
 
344
            parent_span_id,
 
345
            trace_id,
 
346
            span_name,
 
347
            annotations,
 
348
            binary_annotations,
 
349
            timestamp_s,
 
350
            duration_s,
 
351
        )
 
352
 
 
353
        self.queue.append(thrift_span)
 
354
        if len(self.queue) >= self.max_portion_size:
 
355
            self.flush()
 
356
 
 
357
    def flush(self):
 
358
        if self.transport_handler and len(self.queue) > 0:
 
359
            message = thrift_objs_in_bytes(self.queue)
 
360
            self.transport_handler(message)
 
361
            self.queue = []