1
# Copyright 2015, Google Inc.
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions are
8
# * Redistributions of source code must retain the above copyright
9
# notice, this list of conditions and the following disclaimer.
10
# * Redistributions in binary form must reproduce the above
11
# copyright notice, this list of conditions and the following disclaimer
12
# in the documentation and/or other materials provided with the
14
# * Neither the name of Google Inc. nor the names of its
15
# contributors may be used to endorse or promote products derived from
16
# this software without specific prior written permission.
18
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
"""State and behavior for operation termination."""
34
from grpc.framework.base import _constants
35
from grpc.framework.base import _interfaces
36
from grpc.framework.base import interfaces
37
from grpc.framework.foundation import callable_util
39
_CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!'
43
class _Requirement(enum.Enum):
44
"""Symbols indicating events required for termination."""
47
TRANSMISSION = 'transmission'
48
INGESTION = 'ingestion'
50
_FRONT_NOT_LISTENING_REQUIREMENTS = (_Requirement.TRANSMISSION,)
51
_BACK_NOT_LISTENING_REQUIREMENTS = (
52
_Requirement.EMISSION, _Requirement.INGESTION,)
53
_LISTENING_REQUIREMENTS = (
54
_Requirement.TRANSMISSION, _Requirement.INGESTION,)
57
class _TerminationManager(_interfaces.TerminationManager):
58
"""An implementation of _interfaces.TerminationManager."""
61
self, work_pool, utility_pool, action, requirements, local_failure):
65
work_pool: A thread pool in which customer work will be done.
66
utility_pool: A thread pool in which work utility work will be done.
67
action: An action to call on operation termination.
68
requirements: A combination of _Requirement values identifying what
69
must finish for the operation to be considered completed.
70
local_failure: An interfaces.Outcome specifying what constitutes local
71
failure of customer work.
73
self._work_pool = work_pool
74
self._utility_pool = utility_pool
76
self._local_failure = local_failure
77
self._has_locally_failed = False
78
self._expiration_manager = None
80
self._outstanding_requirements = set(requirements)
84
def set_expiration_manager(self, expiration_manager):
85
self._expiration_manager = expiration_manager
87
def _terminate(self, outcome):
88
"""Terminates the operation.
91
outcome: An interfaces.Outcome describing the outcome of the operation.
93
self._expiration_manager.abort()
94
self._outstanding_requirements = None
95
callbacks = list(self._callbacks)
96
self._callbacks = None
97
self._outcome = outcome
99
act = callable_util.with_exceptions_logged(
100
self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
102
if self._has_locally_failed:
103
self._utility_pool.submit(act, outcome)
105
def call_callbacks_and_act(callbacks, outcome):
106
for callback in callbacks:
107
callback_outcome = callable_util.call_logging_exceptions(
108
callback, _CALLBACK_EXCEPTION_LOG_MESSAGE, outcome)
109
if callback_outcome.exception is not None:
110
outcome = self._local_failure
112
self._utility_pool.submit(act, outcome)
114
self._work_pool.submit(callable_util.with_exceptions_logged(
115
call_callbacks_and_act,
116
_constants.INTERNAL_ERROR_LOG_MESSAGE),
120
"""See _interfaces.TerminationManager.is_active for specification."""
121
return self._outstanding_requirements is not None
123
def add_callback(self, callback):
124
"""See _interfaces.TerminationManager.add_callback for specification."""
125
if not self._has_locally_failed:
126
if self._outstanding_requirements is None:
127
self._work_pool.submit(
128
callable_util.with_exceptions_logged(
129
callback, _CALLBACK_EXCEPTION_LOG_MESSAGE), self._outcome)
131
self._callbacks.append(callback)
133
def emission_complete(self):
134
"""See superclass method for specification."""
135
if self._outstanding_requirements is not None:
136
self._outstanding_requirements.discard(_Requirement.EMISSION)
137
if not self._outstanding_requirements:
138
self._terminate(interfaces.Outcome.COMPLETED)
140
def transmission_complete(self):
141
"""See superclass method for specification."""
142
if self._outstanding_requirements is not None:
143
self._outstanding_requirements.discard(_Requirement.TRANSMISSION)
144
if not self._outstanding_requirements:
145
self._terminate(interfaces.Outcome.COMPLETED)
147
def ingestion_complete(self):
148
"""See superclass method for specification."""
149
if self._outstanding_requirements is not None:
150
self._outstanding_requirements.discard(_Requirement.INGESTION)
151
if not self._outstanding_requirements:
152
self._terminate(interfaces.Outcome.COMPLETED)
154
def abort(self, outcome):
155
"""See _interfaces.TerminationManager.abort for specification."""
156
if outcome is self._local_failure:
157
self._has_failed_locally = True
158
if self._outstanding_requirements is not None:
159
self._terminate(outcome)
162
def front_termination_manager(
163
work_pool, utility_pool, action, subscription_kind):
164
"""Creates a TerminationManager appropriate for front-side use.
167
work_pool: A thread pool in which customer work will be done.
168
utility_pool: A thread pool in which work utility work will be done.
169
action: An action to call on operation termination.
170
subscription_kind: An interfaces.ServicedSubscription.Kind value.
173
A TerminationManager appropriate for front-side use.
175
if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
176
requirements = _FRONT_NOT_LISTENING_REQUIREMENTS
178
requirements = _LISTENING_REQUIREMENTS
180
return _TerminationManager(
181
work_pool, utility_pool, action, requirements,
182
interfaces.Outcome.SERVICED_FAILURE)
185
def back_termination_manager(work_pool, utility_pool, action, subscription_kind):
186
"""Creates a TerminationManager appropriate for back-side use.
189
work_pool: A thread pool in which customer work will be done.
190
utility_pool: A thread pool in which work utility work will be done.
191
action: An action to call on operation termination.
192
subscription_kind: An interfaces.ServicedSubscription.Kind value.
195
A TerminationManager appropriate for back-side use.
197
if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
198
requirements = _BACK_NOT_LISTENING_REQUIREMENTS
200
requirements = _LISTENING_REQUIREMENTS
202
return _TerminationManager(
203
work_pool, utility_pool, action, requirements,
204
interfaces.Outcome.SERVICER_FAILURE)