~ubuntu-branches/debian/sid/python-py-zipkin/sid

« back to all changes in this revision

Viewing changes to py_zipkin/zipkin.py

  • Committer: Package Import Robot
  • Author(s): Olivier Sallou
  • Date: 2017-08-16 12:51:25 UTC
  • Revision ID: package-import@ubuntu.com-20170816125125-hbry2c4hm32nuq91
Tags: upstream-0.9.0
ImportĀ upstreamĀ versionĀ 0.9.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
import functools
 
3
import random
 
4
import time
 
5
from collections import namedtuple
 
6
 
 
7
from py_zipkin.exception import ZipkinError
 
8
from py_zipkin.logging_helper import zipkin_logger
 
9
from py_zipkin.logging_helper import ZipkinLoggerHandler
 
10
from py_zipkin.logging_helper import ZipkinLoggingContext
 
11
from py_zipkin.thread_local import get_zipkin_attrs
 
12
from py_zipkin.thread_local import pop_zipkin_attrs
 
13
from py_zipkin.thread_local import push_zipkin_attrs
 
14
from py_zipkin.thrift import SERVER_ADDR_VAL
 
15
from py_zipkin.thrift import create_binary_annotation
 
16
from py_zipkin.thrift import create_endpoint
 
17
from py_zipkin.thrift import zipkin_core
 
18
from py_zipkin.util import generate_random_64bit_string
 
19
from py_zipkin.util import generate_random_128bit_string
 
20
 
 
21
 
 
22
"""
 
23
Holds the basic attributes needed to log a zipkin trace
 
24
 
 
25
:param trace_id: Unique trace id
 
26
:param span_id: Span Id of the current request span
 
27
:param parent_span_id: Parent span Id of the current request span
 
28
:param flags: stores flags header. Currently unused
 
29
:param is_sampled: pre-computed boolean whether the trace should be logged
 
30
"""
 
31
ZipkinAttrs = namedtuple(
 
32
    'ZipkinAttrs',
 
33
    ['trace_id', 'span_id', 'parent_span_id', 'flags', 'is_sampled'],
 
34
)
 
35
 
 
36
 
 
37
STANDARD_ANNOTATIONS = {
 
38
    'client': {'cs', 'cr'},
 
39
    'server': {'ss', 'sr'},
 
40
}
 
41
STANDARD_ANNOTATIONS_KEYS = frozenset(STANDARD_ANNOTATIONS.keys())
 
42
 
 
43
 
 
44
class zipkin_span(object):
 
45
    """Context manager/decorator for all of your zipkin tracing needs.
 
46
 
 
47
    Usage #1: Start a trace with a given sampling rate
 
48
 
 
49
    This begins the zipkin trace and also records the root span. The required
 
50
    params are service_name, transport_handler, and sample_rate.
 
51
 
 
52
    # Start a trace with do_stuff() as the root span
 
53
    def some_batch_job(a, b):
 
54
        with zipkin_span(
 
55
            service_name='my_service',
 
56
            span_name='my_span_name',
 
57
            transport_handler=some_handler,
 
58
            port=22,
 
59
            sample_rate=0.05,
 
60
        ):
 
61
            do_stuff()
 
62
 
 
63
    Usage #2: Trace a service call.
 
64
 
 
65
    The typical use case is instrumenting a framework like Pyramid or Django. Only
 
66
    ss and sr times are recorded for the root span. Required params are
 
67
    service_name, zipkin_attrs, transport_handler, and port.
 
68
 
 
69
    # Used in a pyramid tween
 
70
    def tween(request):
 
71
        zipkin_attrs = some_zipkin_attr_creator(request)
 
72
        with zipkin_span(
 
73
            service_name='my_service,'
 
74
            span_name='my_span_name',
 
75
            zipkin_attrs=zipkin_attrs,
 
76
            transport_handler=some_handler,
 
77
            port=22,
 
78
        ) as zipkin_context:
 
79
            response = handler(request)
 
80
            zipkin_context.update_binary_annotations(
 
81
                some_binary_annotations)
 
82
            return response
 
83
 
 
84
    Usage #3: Log a span within the context of a zipkin trace
 
85
 
 
86
    If you're already in a zipkin trace, you can use this to log a span inside. The
 
87
    only required param is service_name. If you're not in a zipkin trace, this
 
88
    won't do anything.
 
89
 
 
90
    # As a decorator
 
91
    @zipkin_span(service_name='my_service', span_name='my_function')
 
92
    def my_function():
 
93
        do_stuff()
 
94
 
 
95
    # As a context manager
 
96
    def my_function():
 
97
        with zipkin_span(service_name='my_service', span_name='do_stuff'):
 
98
            do_stuff()
 
99
    """
 
100
 
 
101
    def __init__(
 
102
        self,
 
103
        service_name,
 
104
        span_name='span',
 
105
        zipkin_attrs=None,
 
106
        transport_handler=None,
 
107
        max_span_batch_size=None,
 
108
        annotations=None,
 
109
        binary_annotations=None,
 
110
        port=0,
 
111
        sample_rate=None,
 
112
        include=('client', 'server'),
 
113
        add_logging_annotation=False,
 
114
        report_root_timestamp=False,
 
115
        use_128bit_trace_id=False,
 
116
        host=None
 
117
    ):
 
118
        """Logs a zipkin span. If this is the root span, then a zipkin
 
119
        trace is started as well.
 
120
 
 
121
        :param service_name: The name of the called service
 
122
        :type service_name: string
 
123
        :param span_name: Optional name of span, defaults to 'span'
 
124
        :type span_name: string
 
125
        :param zipkin_attrs: Optional set of zipkin attributes to be used
 
126
        :type zipkin_attrs: ZipkinAttrs
 
127
        :param transport_handler: Callback function that takes a message parameter
 
128
            and handles logging it
 
129
        :type transport_handler: function
 
130
        :param max_span_batch_size: Spans in a trace are sent in batches,
 
131
        max_span_batch_size defines max size of one batch
 
132
        :type max_span_batch_size: int
 
133
        :param annotations: Optional dict of str -> timestamp annotations
 
134
        :type annotations: dict of str -> int
 
135
        :param binary_annotations: Optional dict of str -> str span attrs
 
136
        :type binary_annotations: dict of str -> str
 
137
        :param port: The port number of the service. Defaults to 0.
 
138
        :type port: int
 
139
        :param sample_rate: Rate at which to sample; 0.0 - 100.0. If passed-in
 
140
            zipkin_attrs have is_sampled=False and the sample_rate param is > 0,
 
141
            a new span will be generated at this rate. This means that if you
 
142
            propagate sampling decisions to downstream services, but still have
 
143
            sample_rate > 0 in those services, the actual rate of generated
 
144
            spans for those services will be > sampling_rate.
 
145
        :type sample_rate: float
 
146
        :param include: which annotations to include
 
147
            can be one of {'client', 'server'}
 
148
            corresponding to ('cs', 'cr') and ('ss', 'sr') respectively
 
149
        :type include: iterable
 
150
        :param add_logging_annotation: Whether to add a 'logging_end'
 
151
            annotation when py_zipkin finishes logging spans
 
152
        :type add_logging_annotation: boolean
 
153
        :param report_root_timestamp: Whether the span should report timestamp
 
154
            and duration. Only applies to "root" spans in this local context,
 
155
            so spans created inside other span contexts will always log
 
156
            timestamp/duration. Note that this is only an override for spans
 
157
            that have zipkin_attrs passed in. Spans that make their own
 
158
            sampling decisions (i.e. are the root spans of entire traces) will
 
159
            always report timestamp/duration.
 
160
        :type report_root_timestamp: boolean
 
161
        :param use_128bit_trace_id: If true, generate 128-bit trace_ids
 
162
        :type use_128bit_trace_id: boolean
 
163
        :param host: Contains the ipv4 value of the host. The ipv4 value isn't
 
164
            automatically determined in a docker environment
 
165
        :type host: string
 
166
        """
 
167
        self.service_name = service_name
 
168
        self.span_name = span_name
 
169
        self.zipkin_attrs = zipkin_attrs
 
170
        self.transport_handler = transport_handler
 
171
        self.max_span_batch_size = max_span_batch_size
 
172
        self.annotations = annotations or {}
 
173
        self.binary_annotations = binary_annotations or {}
 
174
        self.port = port
 
175
        self.logging_context = None
 
176
        self.sample_rate = sample_rate
 
177
        self.include = include
 
178
        self.add_logging_annotation = add_logging_annotation
 
179
        self.report_root_timestamp_override = report_root_timestamp
 
180
        self.use_128bit_trace_id = use_128bit_trace_id
 
181
        self.host = host
 
182
        self.logging_configured = False
 
183
 
 
184
        # Spans that log a 'cs' timestamp can additionally record
 
185
        # 'sa' binary annotations that show where the request is going.
 
186
        # This holds a list of 'sa' binary annotations.
 
187
        self.sa_binary_annotations = []
 
188
 
 
189
        # Validation checks
 
190
        if self.zipkin_attrs or self.sample_rate is not None:
 
191
            if self.transport_handler is None:
 
192
                raise ZipkinError(
 
193
                    'Root spans require a transport handler to be given')
 
194
 
 
195
        if self.sample_rate is not None and not (0.0 <= self.sample_rate <= 100.0):
 
196
            raise ZipkinError('Sample rate must be between 0.0 and 100.0')
 
197
 
 
198
        if not set(include).issubset(STANDARD_ANNOTATIONS_KEYS):
 
199
            raise ZipkinError(
 
200
                'Only %s are supported as annotations' %
 
201
                STANDARD_ANNOTATIONS_KEYS
 
202
            )
 
203
        else:
 
204
            # get a list of all of the mapped annotations
 
205
            self.annotation_filter = set()
 
206
            for include_name in include:
 
207
                self.annotation_filter.update(STANDARD_ANNOTATIONS[include_name])
 
208
 
 
209
    def __call__(self, f):
 
210
        @functools.wraps(f)
 
211
        def decorated(*args, **kwargs):
 
212
            with zipkin_span(
 
213
                service_name=self.service_name,
 
214
                span_name=self.span_name,
 
215
                zipkin_attrs=self.zipkin_attrs,
 
216
                transport_handler=self.transport_handler,
 
217
                annotations=self.annotations,
 
218
                binary_annotations=self.binary_annotations,
 
219
                port=self.port,
 
220
                sample_rate=self.sample_rate,
 
221
                include=self.include,
 
222
                host=self.host
 
223
            ):
 
224
                return f(*args, **kwargs)
 
225
        return decorated
 
226
 
 
227
    def __enter__(self):
 
228
        return self.start()
 
229
 
 
230
    def start(self):
 
231
        """Enter the new span context. All annotations logged inside this
 
232
        context will be attributed to this span. All new spans generated
 
233
        inside this context will have this span as their parent.
 
234
 
 
235
        In the unsampled case, this context still generates new span IDs and
 
236
        pushes them onto the threadlocal stack, so downstream services calls
 
237
        made will pass the correct headers. However, the logging handler is
 
238
        never attached in the unsampled case, so the spans are never logged.
 
239
        """
 
240
        self.do_pop_attrs = False
 
241
        # If zipkin_attrs are passed in or this span is doing its own sampling,
 
242
        # it will need to actually log spans at __exit__.
 
243
        self.perform_logging = self.zipkin_attrs or self.sample_rate is not None
 
244
        report_root_timestamp = False
 
245
 
 
246
        if self.sample_rate is not None:
 
247
            if self.zipkin_attrs and not self.zipkin_attrs.is_sampled:
 
248
                report_root_timestamp = True
 
249
                self.zipkin_attrs = create_attrs_for_span(
 
250
                    sample_rate=self.sample_rate,
 
251
                    trace_id=self.zipkin_attrs.trace_id,
 
252
                    use_128bit_trace_id=self.use_128bit_trace_id,
 
253
                )
 
254
            elif not self.zipkin_attrs:
 
255
                report_root_timestamp = True
 
256
                self.zipkin_attrs = create_attrs_for_span(
 
257
                    sample_rate=self.sample_rate,
 
258
                    use_128bit_trace_id=self.use_128bit_trace_id,
 
259
                )
 
260
 
 
261
        if not self.zipkin_attrs:
 
262
            # This span is inside the context of an existing trace
 
263
            existing_zipkin_attrs = get_zipkin_attrs()
 
264
            if existing_zipkin_attrs:
 
265
                self.zipkin_attrs = ZipkinAttrs(
 
266
                    trace_id=existing_zipkin_attrs.trace_id,
 
267
                    span_id=generate_random_64bit_string(),
 
268
                    parent_span_id=existing_zipkin_attrs.span_id,
 
269
                    flags=existing_zipkin_attrs.flags,
 
270
                    is_sampled=existing_zipkin_attrs.is_sampled,
 
271
                )
 
272
 
 
273
        # If zipkin_attrs are not set up by now, that means this span is not
 
274
        # configured to perform logging itself, and it's not in an existing
 
275
        # Zipkin trace. That means there's nothing else to do and it can exit
 
276
        # early.
 
277
        if not self.zipkin_attrs:
 
278
            return self
 
279
 
 
280
        push_zipkin_attrs(self.zipkin_attrs)
 
281
        self.do_pop_attrs = True
 
282
 
 
283
        self.start_timestamp = time.time()
 
284
 
 
285
        if self.perform_logging:
 
286
            # Don't set up any logging if we're not sampling
 
287
            if not self.zipkin_attrs.is_sampled:
 
288
                return self
 
289
            endpoint = create_endpoint(self.port, self.service_name, self.host)
 
290
            client_context = set(self.include) == {'client'}
 
291
            self.log_handler = ZipkinLoggerHandler(self.zipkin_attrs)
 
292
            self.logging_context = ZipkinLoggingContext(
 
293
                self.zipkin_attrs,
 
294
                endpoint,
 
295
                self.log_handler,
 
296
                self.span_name,
 
297
                self.transport_handler,
 
298
                report_root_timestamp or self.report_root_timestamp_override,
 
299
                binary_annotations=self.binary_annotations,
 
300
                add_logging_annotation=self.add_logging_annotation,
 
301
                client_context=client_context,
 
302
                max_span_batch_size=self.max_span_batch_size,
 
303
            )
 
304
            self.logging_context.start()
 
305
            self.logging_configured = True
 
306
            return self
 
307
        else:
 
308
            # In the sampled case, patch the ZipkinLoggerHandler.
 
309
            if self.zipkin_attrs.is_sampled:
 
310
                # Be defensive about logging setup. Since ZipkinAttrs are local to
 
311
                # the thread, multithreaded frameworks can get in strange states.
 
312
                # The logging is not going to be correct in these cases, so we set
 
313
                # a flag that turns off logging on __exit__.
 
314
                try:
 
315
                    # Assume there's only a single handler, since all logging
 
316
                    # should be set up in this package.
 
317
                    log_handler = zipkin_logger.handlers[0]
 
318
                except IndexError:
 
319
                    return self
 
320
                # Make sure it's not a NullHandler or something
 
321
                if not isinstance(log_handler, ZipkinLoggerHandler):
 
322
                    return self
 
323
                # Put span ID on logging handler.
 
324
                self.log_handler = zipkin_logger.handlers[0]
 
325
                # Store the old parent_span_id, probably None, in case we have
 
326
                # nested zipkin_spans
 
327
                self.old_parent_span_id = self.log_handler.parent_span_id
 
328
                self.log_handler.parent_span_id = self.zipkin_attrs.span_id
 
329
                self.logging_configured = True
 
330
 
 
331
            return self
 
332
 
 
333
    def __exit__(self, _exc_type, _exc_value, _exc_traceback):
 
334
        self.stop(_exc_type, _exc_value, _exc_traceback)
 
335
 
 
336
    def stop(self, _exc_type=None, _exc_value=None, _exc_traceback=None):
 
337
        """Exit the span context. Zipkin attrs are pushed onto the
 
338
        threadlocal stack regardless of sampling, so they always need to be
 
339
        popped off. The actual logging of spans depends on sampling and that
 
340
        the logging was correctly set up.
 
341
        """
 
342
        if self.do_pop_attrs:
 
343
            pop_zipkin_attrs()
 
344
 
 
345
        if not self.logging_configured:
 
346
            return
 
347
 
 
348
        # Add the error annotation if an exception occurred
 
349
        if any((_exc_type, _exc_value, _exc_traceback)):
 
350
            error_msg = '{0}: {1}'.format(_exc_type.__name__, _exc_value)
 
351
            self.update_binary_annotations({
 
352
                zipkin_core.ERROR: error_msg,
 
353
            })
 
354
 
 
355
        # Logging context is only initialized for "root" spans of the local
 
356
        # process (i.e. this zipkin_span not inside of any other local
 
357
        # zipkin_spans)
 
358
        if self.logging_context:
 
359
            self.logging_context.stop()
 
360
            self.logging_context = None
 
361
            return
 
362
 
 
363
        # If we've gotten here, that means that this span is a child span of
 
364
        # this context's root span (i.e. it's a zipkin_span inside another
 
365
        # zipkin_span).
 
366
        end_timestamp = time.time()
 
367
 
 
368
        self.log_handler.parent_span_id = self.old_parent_span_id
 
369
 
 
370
        # We are simulating a full two-part span locally, so set cs=sr and ss=cr
 
371
        full_annotations = {
 
372
            'cs': self.start_timestamp,
 
373
            'sr': self.start_timestamp,
 
374
            'ss': end_timestamp,
 
375
            'cr': end_timestamp,
 
376
        }
 
377
        # But we filter down if we only want to emit some of the annotations
 
378
        filtered_annotations = {
 
379
            k: v for k, v in full_annotations.items()
 
380
            if k in self.annotation_filter
 
381
        }
 
382
 
 
383
        self.annotations.update(filtered_annotations)
 
384
 
 
385
        self.log_handler.store_local_span(
 
386
            span_name=self.span_name,
 
387
            service_name=self.service_name,
 
388
            annotations=self.annotations,
 
389
            binary_annotations=self.binary_annotations,
 
390
            sa_binary_annotations=self.sa_binary_annotations,
 
391
            span_id=self.zipkin_attrs.span_id,
 
392
        )
 
393
 
 
394
    def update_binary_annotations(self, extra_annotations):
 
395
        """Updates the binary annotations for the current span.
 
396
 
 
397
        If this trace is not being sampled then this is a no-op.
 
398
        """
 
399
        if not self.zipkin_attrs:
 
400
            return
 
401
        if not self.zipkin_attrs.is_sampled:
 
402
            return
 
403
        if not self.logging_context:
 
404
            # This is not the root span, so binary annotations will be added
 
405
            # to the log handler when this span context exits.
 
406
            self.binary_annotations.update(extra_annotations)
 
407
        else:
 
408
            # Otherwise, we're in the context of the root span, so just update
 
409
            # the binary annotations for the logging context directly.
 
410
            self.logging_context.binary_annotations_dict.update(extra_annotations)
 
411
 
 
412
    def add_sa_binary_annotation(
 
413
        self,
 
414
        port=0,
 
415
        service_name='unknown',
 
416
        host='127.0.0.1',
 
417
    ):
 
418
        """Adds a 'sa' binary annotation to the current span.
 
419
 
 
420
        'sa' binary annotations are useful for situations where you need to log
 
421
        where a request is going but the destination doesn't support zipkin.
 
422
 
 
423
        Note that the span must have 'cs'/'cr' annotations.
 
424
 
 
425
        :param port: The port number of the destination
 
426
        :type port: int
 
427
        :param service_name: The name of the destination service
 
428
        :type service_name: str
 
429
        :param host: Host address of the destination
 
430
        :type host: str
 
431
        """
 
432
        if not self.zipkin_attrs or not self.zipkin_attrs.is_sampled:
 
433
            return
 
434
 
 
435
        if 'client' not in self.include:
 
436
            # TODO: trying to set a sa binary annotation for a non-client span
 
437
            # should result in a logged error
 
438
            return
 
439
 
 
440
        sa_endpoint = create_endpoint(
 
441
            port=port,
 
442
            service_name=service_name,
 
443
            host=host,
 
444
        )
 
445
        sa_binary_annotation = create_binary_annotation(
 
446
            key=zipkin_core.SERVER_ADDR,
 
447
            value=SERVER_ADDR_VAL,
 
448
            annotation_type=zipkin_core.AnnotationType.BOOL,
 
449
            host=sa_endpoint,
 
450
        )
 
451
        if not self.logging_context:
 
452
            self.sa_binary_annotations.append(sa_binary_annotation)
 
453
        else:
 
454
            self.logging_context.sa_binary_annotations.append(sa_binary_annotation)
 
455
 
 
456
 
 
457
def _validate_args(kwargs):
 
458
    if 'include' in kwargs:
 
459
        raise ValueError(
 
460
            '"include" is not valid in this context. '
 
461
            'You probably want to use zipkin_span()'
 
462
        )
 
463
 
 
464
 
 
465
class zipkin_client_span(zipkin_span):
 
466
    """Logs a client-side zipkin span.
 
467
 
 
468
    Subclass of :class:`zipkin_span` using only annotations relevant to clients
 
469
    """
 
470
 
 
471
    def __init__(self, *args, **kwargs):
 
472
        """Logs a zipkin span with client annotations.
 
473
 
 
474
        See :class:`zipkin_span` for arguments
 
475
        """
 
476
        _validate_args(kwargs)
 
477
 
 
478
        kwargs['include'] = ('client',)
 
479
        super(zipkin_client_span, self).__init__(*args, **kwargs)
 
480
 
 
481
 
 
482
class zipkin_server_span(zipkin_span):
 
483
    """Logs a server-side zipkin span.
 
484
 
 
485
    Subclass of :class:`zipkin_span` using only annotations relevant to servers
 
486
    """
 
487
 
 
488
    def __init__(self, *args, **kwargs):
 
489
        """Logs a zipkin span with server annotations.
 
490
 
 
491
        See :class:`zipkin_span` for arguments
 
492
        """
 
493
        _validate_args(kwargs)
 
494
 
 
495
        kwargs['include'] = ('server',)
 
496
        super(zipkin_server_span, self).__init__(*args, **kwargs)
 
497
 
 
498
 
 
499
def create_attrs_for_span(
 
500
    sample_rate=100.0,
 
501
    trace_id=None,
 
502
    span_id=None,
 
503
    use_128bit_trace_id=False,
 
504
):
 
505
    """Creates a set of zipkin attributes for a span.
 
506
 
 
507
    :param sample_rate: Float between 0.0 and 100.0 to determine sampling rate
 
508
    :type sample_rate: float
 
509
    :param trace_id: Optional 16-character hex string representing a trace_id.
 
510
                    If this is None, a random trace_id will be generated.
 
511
    :type trace_id: str
 
512
    :param span_id: Optional 16-character hex string representing a span_id.
 
513
                    If this is None, a random span_id will be generated.
 
514
    :type span_id: str
 
515
    """
 
516
    # Calculate if this trace is sampled based on the sample rate
 
517
    if trace_id is None:
 
518
        if use_128bit_trace_id:
 
519
            trace_id = generate_random_128bit_string()
 
520
        else:
 
521
            trace_id = generate_random_64bit_string()
 
522
    if span_id is None:
 
523
        span_id = generate_random_64bit_string()
 
524
    if sample_rate == 0.0:
 
525
        is_sampled = False
 
526
    else:
 
527
        is_sampled = (random.random() * 100) < sample_rate
 
528
 
 
529
    return ZipkinAttrs(
 
530
        trace_id=trace_id,
 
531
        span_id=span_id,
 
532
        parent_span_id=None,
 
533
        flags='0',
 
534
        is_sampled=is_sampled,
 
535
    )
 
536
 
 
537
 
 
538
def create_http_headers_for_new_span():
 
539
    """
 
540
    Generate the headers for a new zipkin span.
 
541
 
 
542
    .. note::
 
543
 
 
544
        If the method is not called from within a zipkin_trace conext,
 
545
        empty dict will be returned back.
 
546
 
 
547
    :returns: dict containing (X-B3-TraceId, X-B3-SpanId, X-B3-ParentSpanId,
 
548
                X-B3-Flags and X-B3-Sampled) keys OR an empty dict.
 
549
    """
 
550
    zipkin_attrs = get_zipkin_attrs()
 
551
 
 
552
    if not zipkin_attrs:
 
553
        return {}
 
554
 
 
555
    return {
 
556
        'X-B3-TraceId': zipkin_attrs.trace_id,
 
557
        'X-B3-SpanId': generate_random_64bit_string(),
 
558
        'X-B3-ParentSpanId': zipkin_attrs.span_id,
 
559
        'X-B3-Flags': '0',
 
560
        'X-B3-Sampled': '1' if zipkin_attrs.is_sampled else '0',
 
561
    }