1
# -*- coding: utf-8 -*-
5
from collections import namedtuple
7
from py_zipkin.exception import ZipkinError
8
from py_zipkin.logging_helper import zipkin_logger
9
from py_zipkin.logging_helper import ZipkinLoggerHandler
10
from py_zipkin.logging_helper import ZipkinLoggingContext
11
from py_zipkin.thread_local import get_zipkin_attrs
12
from py_zipkin.thread_local import pop_zipkin_attrs
13
from py_zipkin.thread_local import push_zipkin_attrs
14
from py_zipkin.thrift import SERVER_ADDR_VAL
15
from py_zipkin.thrift import create_binary_annotation
16
from py_zipkin.thrift import create_endpoint
17
from py_zipkin.thrift import zipkin_core
18
from py_zipkin.util import generate_random_64bit_string
19
from py_zipkin.util import generate_random_128bit_string
23
Holds the basic attributes needed to log a zipkin trace
25
:param trace_id: Unique trace id
26
:param span_id: Span Id of the current request span
27
:param parent_span_id: Parent span Id of the current request span
28
:param flags: stores flags header. Currently unused
29
:param is_sampled: pre-computed boolean whether the trace should be logged
31
ZipkinAttrs = namedtuple(
33
['trace_id', 'span_id', 'parent_span_id', 'flags', 'is_sampled'],
37
STANDARD_ANNOTATIONS = {
38
'client': {'cs', 'cr'},
39
'server': {'ss', 'sr'},
41
STANDARD_ANNOTATIONS_KEYS = frozenset(STANDARD_ANNOTATIONS.keys())
44
class zipkin_span(object):
45
"""Context manager/decorator for all of your zipkin tracing needs.
47
Usage #1: Start a trace with a given sampling rate
49
This begins the zipkin trace and also records the root span. The required
50
params are service_name, transport_handler, and sample_rate.
52
# Start a trace with do_stuff() as the root span
53
def some_batch_job(a, b):
55
service_name='my_service',
56
span_name='my_span_name',
57
transport_handler=some_handler,
63
Usage #2: Trace a service call.
65
The typical use case is instrumenting a framework like Pyramid or Django. Only
66
ss and sr times are recorded for the root span. Required params are
67
service_name, zipkin_attrs, transport_handler, and port.
69
# Used in a pyramid tween
71
zipkin_attrs = some_zipkin_attr_creator(request)
73
service_name='my_service,'
74
span_name='my_span_name',
75
zipkin_attrs=zipkin_attrs,
76
transport_handler=some_handler,
79
response = handler(request)
80
zipkin_context.update_binary_annotations(
81
some_binary_annotations)
84
Usage #3: Log a span within the context of a zipkin trace
86
If you're already in a zipkin trace, you can use this to log a span inside. The
87
only required param is service_name. If you're not in a zipkin trace, this
91
@zipkin_span(service_name='my_service', span_name='my_function')
95
# As a context manager
97
with zipkin_span(service_name='my_service', span_name='do_stuff'):
106
transport_handler=None,
107
max_span_batch_size=None,
109
binary_annotations=None,
112
include=('client', 'server'),
113
add_logging_annotation=False,
114
report_root_timestamp=False,
115
use_128bit_trace_id=False,
118
"""Logs a zipkin span. If this is the root span, then a zipkin
119
trace is started as well.
121
:param service_name: The name of the called service
122
:type service_name: string
123
:param span_name: Optional name of span, defaults to 'span'
124
:type span_name: string
125
:param zipkin_attrs: Optional set of zipkin attributes to be used
126
:type zipkin_attrs: ZipkinAttrs
127
:param transport_handler: Callback function that takes a message parameter
128
and handles logging it
129
:type transport_handler: function
130
:param max_span_batch_size: Spans in a trace are sent in batches,
131
max_span_batch_size defines max size of one batch
132
:type max_span_batch_size: int
133
:param annotations: Optional dict of str -> timestamp annotations
134
:type annotations: dict of str -> int
135
:param binary_annotations: Optional dict of str -> str span attrs
136
:type binary_annotations: dict of str -> str
137
:param port: The port number of the service. Defaults to 0.
139
:param sample_rate: Rate at which to sample; 0.0 - 100.0. If passed-in
140
zipkin_attrs have is_sampled=False and the sample_rate param is > 0,
141
a new span will be generated at this rate. This means that if you
142
propagate sampling decisions to downstream services, but still have
143
sample_rate > 0 in those services, the actual rate of generated
144
spans for those services will be > sampling_rate.
145
:type sample_rate: float
146
:param include: which annotations to include
147
can be one of {'client', 'server'}
148
corresponding to ('cs', 'cr') and ('ss', 'sr') respectively
149
:type include: iterable
150
:param add_logging_annotation: Whether to add a 'logging_end'
151
annotation when py_zipkin finishes logging spans
152
:type add_logging_annotation: boolean
153
:param report_root_timestamp: Whether the span should report timestamp
154
and duration. Only applies to "root" spans in this local context,
155
so spans created inside other span contexts will always log
156
timestamp/duration. Note that this is only an override for spans
157
that have zipkin_attrs passed in. Spans that make their own
158
sampling decisions (i.e. are the root spans of entire traces) will
159
always report timestamp/duration.
160
:type report_root_timestamp: boolean
161
:param use_128bit_trace_id: If true, generate 128-bit trace_ids
162
:type use_128bit_trace_id: boolean
163
:param host: Contains the ipv4 value of the host. The ipv4 value isn't
164
automatically determined in a docker environment
167
self.service_name = service_name
168
self.span_name = span_name
169
self.zipkin_attrs = zipkin_attrs
170
self.transport_handler = transport_handler
171
self.max_span_batch_size = max_span_batch_size
172
self.annotations = annotations or {}
173
self.binary_annotations = binary_annotations or {}
175
self.logging_context = None
176
self.sample_rate = sample_rate
177
self.include = include
178
self.add_logging_annotation = add_logging_annotation
179
self.report_root_timestamp_override = report_root_timestamp
180
self.use_128bit_trace_id = use_128bit_trace_id
182
self.logging_configured = False
184
# Spans that log a 'cs' timestamp can additionally record
185
# 'sa' binary annotations that show where the request is going.
186
# This holds a list of 'sa' binary annotations.
187
self.sa_binary_annotations = []
190
if self.zipkin_attrs or self.sample_rate is not None:
191
if self.transport_handler is None:
193
'Root spans require a transport handler to be given')
195
if self.sample_rate is not None and not (0.0 <= self.sample_rate <= 100.0):
196
raise ZipkinError('Sample rate must be between 0.0 and 100.0')
198
if not set(include).issubset(STANDARD_ANNOTATIONS_KEYS):
200
'Only %s are supported as annotations' %
201
STANDARD_ANNOTATIONS_KEYS
204
# get a list of all of the mapped annotations
205
self.annotation_filter = set()
206
for include_name in include:
207
self.annotation_filter.update(STANDARD_ANNOTATIONS[include_name])
209
def __call__(self, f):
211
def decorated(*args, **kwargs):
213
service_name=self.service_name,
214
span_name=self.span_name,
215
zipkin_attrs=self.zipkin_attrs,
216
transport_handler=self.transport_handler,
217
annotations=self.annotations,
218
binary_annotations=self.binary_annotations,
220
sample_rate=self.sample_rate,
221
include=self.include,
224
return f(*args, **kwargs)
231
"""Enter the new span context. All annotations logged inside this
232
context will be attributed to this span. All new spans generated
233
inside this context will have this span as their parent.
235
In the unsampled case, this context still generates new span IDs and
236
pushes them onto the threadlocal stack, so downstream services calls
237
made will pass the correct headers. However, the logging handler is
238
never attached in the unsampled case, so the spans are never logged.
240
self.do_pop_attrs = False
241
# If zipkin_attrs are passed in or this span is doing its own sampling,
242
# it will need to actually log spans at __exit__.
243
self.perform_logging = self.zipkin_attrs or self.sample_rate is not None
244
report_root_timestamp = False
246
if self.sample_rate is not None:
247
if self.zipkin_attrs and not self.zipkin_attrs.is_sampled:
248
report_root_timestamp = True
249
self.zipkin_attrs = create_attrs_for_span(
250
sample_rate=self.sample_rate,
251
trace_id=self.zipkin_attrs.trace_id,
252
use_128bit_trace_id=self.use_128bit_trace_id,
254
elif not self.zipkin_attrs:
255
report_root_timestamp = True
256
self.zipkin_attrs = create_attrs_for_span(
257
sample_rate=self.sample_rate,
258
use_128bit_trace_id=self.use_128bit_trace_id,
261
if not self.zipkin_attrs:
262
# This span is inside the context of an existing trace
263
existing_zipkin_attrs = get_zipkin_attrs()
264
if existing_zipkin_attrs:
265
self.zipkin_attrs = ZipkinAttrs(
266
trace_id=existing_zipkin_attrs.trace_id,
267
span_id=generate_random_64bit_string(),
268
parent_span_id=existing_zipkin_attrs.span_id,
269
flags=existing_zipkin_attrs.flags,
270
is_sampled=existing_zipkin_attrs.is_sampled,
273
# If zipkin_attrs are not set up by now, that means this span is not
274
# configured to perform logging itself, and it's not in an existing
275
# Zipkin trace. That means there's nothing else to do and it can exit
277
if not self.zipkin_attrs:
280
push_zipkin_attrs(self.zipkin_attrs)
281
self.do_pop_attrs = True
283
self.start_timestamp = time.time()
285
if self.perform_logging:
286
# Don't set up any logging if we're not sampling
287
if not self.zipkin_attrs.is_sampled:
289
endpoint = create_endpoint(self.port, self.service_name, self.host)
290
client_context = set(self.include) == {'client'}
291
self.log_handler = ZipkinLoggerHandler(self.zipkin_attrs)
292
self.logging_context = ZipkinLoggingContext(
297
self.transport_handler,
298
report_root_timestamp or self.report_root_timestamp_override,
299
binary_annotations=self.binary_annotations,
300
add_logging_annotation=self.add_logging_annotation,
301
client_context=client_context,
302
max_span_batch_size=self.max_span_batch_size,
304
self.logging_context.start()
305
self.logging_configured = True
308
# In the sampled case, patch the ZipkinLoggerHandler.
309
if self.zipkin_attrs.is_sampled:
310
# Be defensive about logging setup. Since ZipkinAttrs are local to
311
# the thread, multithreaded frameworks can get in strange states.
312
# The logging is not going to be correct in these cases, so we set
313
# a flag that turns off logging on __exit__.
315
# Assume there's only a single handler, since all logging
316
# should be set up in this package.
317
log_handler = zipkin_logger.handlers[0]
320
# Make sure it's not a NullHandler or something
321
if not isinstance(log_handler, ZipkinLoggerHandler):
323
# Put span ID on logging handler.
324
self.log_handler = zipkin_logger.handlers[0]
325
# Store the old parent_span_id, probably None, in case we have
326
# nested zipkin_spans
327
self.old_parent_span_id = self.log_handler.parent_span_id
328
self.log_handler.parent_span_id = self.zipkin_attrs.span_id
329
self.logging_configured = True
333
def __exit__(self, _exc_type, _exc_value, _exc_traceback):
334
self.stop(_exc_type, _exc_value, _exc_traceback)
336
def stop(self, _exc_type=None, _exc_value=None, _exc_traceback=None):
337
"""Exit the span context. Zipkin attrs are pushed onto the
338
threadlocal stack regardless of sampling, so they always need to be
339
popped off. The actual logging of spans depends on sampling and that
340
the logging was correctly set up.
342
if self.do_pop_attrs:
345
if not self.logging_configured:
348
# Add the error annotation if an exception occurred
349
if any((_exc_type, _exc_value, _exc_traceback)):
350
error_msg = '{0}: {1}'.format(_exc_type.__name__, _exc_value)
351
self.update_binary_annotations({
352
zipkin_core.ERROR: error_msg,
355
# Logging context is only initialized for "root" spans of the local
356
# process (i.e. this zipkin_span not inside of any other local
358
if self.logging_context:
359
self.logging_context.stop()
360
self.logging_context = None
363
# If we've gotten here, that means that this span is a child span of
364
# this context's root span (i.e. it's a zipkin_span inside another
366
end_timestamp = time.time()
368
self.log_handler.parent_span_id = self.old_parent_span_id
370
# We are simulating a full two-part span locally, so set cs=sr and ss=cr
372
'cs': self.start_timestamp,
373
'sr': self.start_timestamp,
377
# But we filter down if we only want to emit some of the annotations
378
filtered_annotations = {
379
k: v for k, v in full_annotations.items()
380
if k in self.annotation_filter
383
self.annotations.update(filtered_annotations)
385
self.log_handler.store_local_span(
386
span_name=self.span_name,
387
service_name=self.service_name,
388
annotations=self.annotations,
389
binary_annotations=self.binary_annotations,
390
sa_binary_annotations=self.sa_binary_annotations,
391
span_id=self.zipkin_attrs.span_id,
394
def update_binary_annotations(self, extra_annotations):
395
"""Updates the binary annotations for the current span.
397
If this trace is not being sampled then this is a no-op.
399
if not self.zipkin_attrs:
401
if not self.zipkin_attrs.is_sampled:
403
if not self.logging_context:
404
# This is not the root span, so binary annotations will be added
405
# to the log handler when this span context exits.
406
self.binary_annotations.update(extra_annotations)
408
# Otherwise, we're in the context of the root span, so just update
409
# the binary annotations for the logging context directly.
410
self.logging_context.binary_annotations_dict.update(extra_annotations)
412
def add_sa_binary_annotation(
415
service_name='unknown',
418
"""Adds a 'sa' binary annotation to the current span.
420
'sa' binary annotations are useful for situations where you need to log
421
where a request is going but the destination doesn't support zipkin.
423
Note that the span must have 'cs'/'cr' annotations.
425
:param port: The port number of the destination
427
:param service_name: The name of the destination service
428
:type service_name: str
429
:param host: Host address of the destination
432
if not self.zipkin_attrs or not self.zipkin_attrs.is_sampled:
435
if 'client' not in self.include:
436
# TODO: trying to set a sa binary annotation for a non-client span
437
# should result in a logged error
440
sa_endpoint = create_endpoint(
442
service_name=service_name,
445
sa_binary_annotation = create_binary_annotation(
446
key=zipkin_core.SERVER_ADDR,
447
value=SERVER_ADDR_VAL,
448
annotation_type=zipkin_core.AnnotationType.BOOL,
451
if not self.logging_context:
452
self.sa_binary_annotations.append(sa_binary_annotation)
454
self.logging_context.sa_binary_annotations.append(sa_binary_annotation)
457
def _validate_args(kwargs):
458
if 'include' in kwargs:
460
'"include" is not valid in this context. '
461
'You probably want to use zipkin_span()'
465
class zipkin_client_span(zipkin_span):
466
"""Logs a client-side zipkin span.
468
Subclass of :class:`zipkin_span` using only annotations relevant to clients
471
def __init__(self, *args, **kwargs):
472
"""Logs a zipkin span with client annotations.
474
See :class:`zipkin_span` for arguments
476
_validate_args(kwargs)
478
kwargs['include'] = ('client',)
479
super(zipkin_client_span, self).__init__(*args, **kwargs)
482
class zipkin_server_span(zipkin_span):
483
"""Logs a server-side zipkin span.
485
Subclass of :class:`zipkin_span` using only annotations relevant to servers
488
def __init__(self, *args, **kwargs):
489
"""Logs a zipkin span with server annotations.
491
See :class:`zipkin_span` for arguments
493
_validate_args(kwargs)
495
kwargs['include'] = ('server',)
496
super(zipkin_server_span, self).__init__(*args, **kwargs)
499
def create_attrs_for_span(
503
use_128bit_trace_id=False,
505
"""Creates a set of zipkin attributes for a span.
507
:param sample_rate: Float between 0.0 and 100.0 to determine sampling rate
508
:type sample_rate: float
509
:param trace_id: Optional 16-character hex string representing a trace_id.
510
If this is None, a random trace_id will be generated.
512
:param span_id: Optional 16-character hex string representing a span_id.
513
If this is None, a random span_id will be generated.
516
# Calculate if this trace is sampled based on the sample rate
518
if use_128bit_trace_id:
519
trace_id = generate_random_128bit_string()
521
trace_id = generate_random_64bit_string()
523
span_id = generate_random_64bit_string()
524
if sample_rate == 0.0:
527
is_sampled = (random.random() * 100) < sample_rate
534
is_sampled=is_sampled,
538
def create_http_headers_for_new_span():
540
Generate the headers for a new zipkin span.
544
If the method is not called from within a zipkin_trace conext,
545
empty dict will be returned back.
547
:returns: dict containing (X-B3-TraceId, X-B3-SpanId, X-B3-ParentSpanId,
548
X-B3-Flags and X-B3-Sampled) keys OR an empty dict.
550
zipkin_attrs = get_zipkin_attrs()
556
'X-B3-TraceId': zipkin_attrs.trace_id,
557
'X-B3-SpanId': generate_random_64bit_string(),
558
'X-B3-ParentSpanId': zipkin_attrs.span_id,
560
'X-B3-Sampled': '1' if zipkin_attrs.is_sampled else '0',