1
# -*- coding: utf-8 -*-
4
from collections import defaultdict
6
from py_zipkin.exception import ZipkinError
7
from py_zipkin.thrift import annotation_list_builder
8
from py_zipkin.thrift import binary_annotation_list_builder
9
from py_zipkin.thrift import copy_endpoint_with_new_service_name
10
from py_zipkin.thrift import create_span
11
from py_zipkin.thrift import thrift_objs_in_bytes
12
from py_zipkin.util import generate_random_64bit_string
16
from logging import NullHandler
17
except ImportError: # pragma: no cover
18
class NullHandler(logging.Handler):
19
def emit(self, record):
22
null_handler = NullHandler()
23
zipkin_logger = logging.getLogger('py_zipkin.logger')
24
zipkin_logger.addHandler(null_handler)
25
zipkin_logger.setLevel(logging.DEBUG)
27
LOGGING_END_KEY = 'py_zipkin.logging_end'
30
class ZipkinLoggingContext(object):
31
"""A logging context specific to a Zipkin trace. If the trace is sampled,
32
the logging context sends serialized Zipkin spans to a transport_handler.
33
The logging context sends root "server" or "client" span, as well as all
34
local child spans collected within this context.
36
This class should only be used by the main `zipkin_span` entrypoint.
46
report_root_timestamp,
47
binary_annotations=None,
48
add_logging_annotation=False,
50
max_span_batch_size=None,
52
self.zipkin_attrs = zipkin_attrs
53
self.thrift_endpoint = thrift_endpoint
54
self.log_handler = log_handler
55
self.span_name = span_name
56
self.transport_handler = transport_handler
57
self.response_status_code = 0
58
self.report_root_timestamp = report_root_timestamp
59
self.binary_annotations_dict = binary_annotations or {}
60
self.sa_binary_annotations = []
61
self.add_logging_annotation = add_logging_annotation
62
self.client_context = client_context
63
self.max_span_batch_size = max_span_batch_size
66
"""Actions to be taken before request is handled.
67
1) Attach `zipkin_logger` to :class:`ZipkinLoggerHandler` object.
68
2) Record the start timestamp.
70
zipkin_logger.removeHandler(null_handler)
71
zipkin_logger.addHandler(self.log_handler)
72
self.start_timestamp = time.time()
76
"""Actions to be taken post request handling.
77
1) Log the service annotations to scribe.
78
2) Detach `zipkin_logger` handler.
81
zipkin_logger.removeHandler(self.log_handler)
82
zipkin_logger.addHandler(null_handler)
85
"""Main function to log all the annotations stored during the entire
86
request. This is done if the request is sampled and the response was
87
a success. It also logs the service (`ss` and `sr`) or the client
88
('cs' and 'cr') annotations.
90
if not self.zipkin_attrs.is_sampled:
93
span_sender = ZipkinBatchSender(self.transport_handler,
94
self.max_span_batch_size)
96
end_timestamp = time.time()
97
# Collect additional annotations from the logging handler
98
annotations_by_span_id = defaultdict(dict)
99
binary_annotations_by_span_id = defaultdict(dict)
100
for msg in self.log_handler.extra_annotations:
101
span_id = msg['parent_span_id'] or self.zipkin_attrs.span_id
102
# This should check if these are non-None
103
annotations_by_span_id[span_id].update(msg['annotations'])
104
binary_annotations_by_span_id[span_id].update(
105
msg['binary_annotations']
108
# Collect, annotate, and log client spans from the logging handler
109
for span in self.log_handler.client_spans:
110
# The parent_span_id is either the parent ID set in the
111
# logging handler or the current Zipkin context's span ID.
113
span['parent_span_id'] or
114
self.zipkin_attrs.span_id
116
# A new client span's span ID can be overridden
117
span_id = span['span_id'] or generate_random_64bit_string()
118
endpoint = copy_endpoint_with_new_service_name(
119
self.thrift_endpoint, span['service_name']
121
# Collect annotations both logged with the new spans and
122
# logged in separate log messages.
123
annotations = span['annotations']
124
annotations.update(annotations_by_span_id[span_id])
125
binary_annotations = span['binary_annotations']
126
binary_annotations.update(
127
binary_annotations_by_span_id[span_id])
129
timestamp, duration = get_local_span_timestamp_and_duration(
132
# Create serializable thrift objects of annotations
133
thrift_annotations = annotation_list_builder(
134
annotations, endpoint
136
thrift_binary_annotations = binary_annotation_list_builder(
137
binary_annotations, endpoint
139
if span.get('sa_binary_annotations'):
140
thrift_binary_annotations += span['sa_binary_annotations']
142
span_sender.add_span(
144
parent_span_id=parent_span_id,
145
trace_id=self.zipkin_attrs.trace_id,
146
span_name=span['span_name'],
147
annotations=thrift_annotations,
148
binary_annotations=thrift_binary_annotations,
149
timestamp_s=timestamp,
153
extra_annotations = annotations_by_span_id[
154
self.zipkin_attrs.span_id]
155
extra_binary_annotations = binary_annotations_by_span_id[
156
self.zipkin_attrs.span_id
159
k1, k2 = ('sr', 'ss')
160
if self.client_context:
161
k1, k2 = ('cs', 'cr')
162
annotations = {k1: self.start_timestamp, k2: end_timestamp}
163
annotations.update(extra_annotations)
165
if self.add_logging_annotation:
166
annotations[LOGGING_END_KEY] = time.time()
168
thrift_annotations = annotation_list_builder(
170
self.thrift_endpoint,
173
# Binary annotations can be set through debug messages or the
174
# set_extra_binary_annotations registry setting.
175
self.binary_annotations_dict.update(extra_binary_annotations)
176
thrift_binary_annotations = binary_annotation_list_builder(
177
self.binary_annotations_dict,
178
self.thrift_endpoint,
180
if self.sa_binary_annotations:
181
thrift_binary_annotations += self.sa_binary_annotations
183
if self.report_root_timestamp:
184
timestamp = self.start_timestamp
185
duration = end_timestamp - self.start_timestamp
187
timestamp = duration = None
189
span_sender.add_span(
190
span_id=self.zipkin_attrs.span_id,
191
parent_span_id=self.zipkin_attrs.parent_span_id,
192
trace_id=self.zipkin_attrs.trace_id,
193
span_name=self.span_name,
194
annotations=thrift_annotations,
195
binary_annotations=thrift_binary_annotations,
196
timestamp_s=timestamp,
201
def get_local_span_timestamp_and_duration(annotations):
202
if 'cs' in annotations and 'cr' in annotations:
203
return annotations['cs'], annotations['cr'] - annotations['cs']
204
elif 'sr' in annotations and 'ss' in annotations:
205
return annotations['sr'], annotations['ss'] - annotations['sr']
209
class ZipkinLoggerHandler(logging.StreamHandler, object):
210
"""Logger Handler to log span annotations or additional client spans to
211
scribe. To connect to the handler, logger name must be
214
:param zipkin_attrs: ZipkinAttrs namedtuple object
217
def __init__(self, zipkin_attrs):
218
super(ZipkinLoggerHandler, self).__init__()
219
# If parent_span_id is set, the application is in a logging context
220
# where each additional client span logged has this span as its parent.
221
# This is to allow logging of hierarchies of spans instead of just
222
# single client spans. See the SpanContext class.
223
self.parent_span_id = None
224
self.zipkin_attrs = zipkin_attrs
225
self.client_spans = []
226
self.extra_annotations = []
228
def store_local_span(
234
sa_binary_annotations=None,
237
"""Convenience method for storing a local child span (a zipkin_span
238
inside other zipkin_spans) to be logged when the outermost zipkin_span
241
self.client_spans.append({
242
'span_name': span_name,
243
'service_name': service_name,
244
'parent_span_id': self.parent_span_id,
246
'annotations': annotations,
247
'binary_annotations': binary_annotations,
248
'sa_binary_annotations': sa_binary_annotations,
251
def emit(self, record):
252
"""Handle each record message. This function is called whenever
253
zipkin_logger.debug() is called.
255
:param record: object containing the `msg` object.
256
Structure of record.msg should be the following:
264
"binary_annotations": {
265
"http.uri": "/foo/bar",
268
"service_name": "myService",
272
- annotations: str -> timestamp annotations
273
- binary_annotations: str -> str binary annotations
274
(One of either annotations or binary_annotations is required)
275
- name: str of new span name; only used if service-name is also
277
- service_name: str of new client span's service name.
279
If service_name is specified, this log msg is considered to
280
represent a new client span. If service_name is omitted, this is
281
considered additional annotation for the currently active
282
"parent span" (either the server span or the parent client span
283
inside a SpanContext).
285
if not self.zipkin_attrs.is_sampled:
287
span_name = record.msg.get('name', 'span')
288
annotations = record.msg.get('annotations', {})
289
binary_annotations = record.msg.get('binary_annotations', {})
290
if not annotations and not binary_annotations:
292
"At least one of annotation/binary annotation has"
293
" to be provided for {0} span".format(span_name)
295
service_name = record.msg.get('service_name', None)
296
# Presence of service_name means this is to be a new local span.
297
if service_name is not None:
298
self.store_local_span(
300
service_name=service_name,
301
annotations=annotations,
302
binary_annotations=binary_annotations,
305
self.extra_annotations.append({
306
'annotations': annotations,
307
'binary_annotations': binary_annotations,
308
'parent_span_id': self.parent_span_id,
312
class ZipkinBatchSender(object):
314
MAX_PORTION_SIZE = 100
316
def __init__(self, transport_handler, max_portion_size=None):
317
self.transport_handler = transport_handler
318
self.max_portion_size = max_portion_size or self.MAX_PORTION_SIZE
324
def __exit__(self, _exc_type, _exc_value, _exc_traceback):
325
if any((_exc_type, _exc_value, _exc_traceback)):
326
error = '{0}: {1}'.format(_exc_type.__name__, _exc_value)
327
raise ZipkinError(error)
342
thrift_span = create_span(
353
self.queue.append(thrift_span)
354
if len(self.queue) >= self.max_portion_size:
358
if self.transport_handler and len(self.queue) > 0:
359
message = thrift_objs_in_bytes(self.queue)
360
self.transport_handler(message)