5
Copyright 2014-2016 Andreas Würl
7
Licensed under the Apache License, Version 2.0 (the "License");
8
you may not use this file except in compliance with the License.
9
You may obtain a copy of the License at
11
http://www.apache.org/licenses/LICENSE-2.0
13
Unless required by applicable law or agreed to in writing, software
14
distributed under the License is distributed on an "AS IS" BASIS,
15
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
See the License for the specific language governing permissions and
17
limitations under the License.
27
from injector import singleton
29
from blitzortung import data, builder, types
33
class SignalVelocity(object):
35
class for time/distance conversion regarding the reduced speed of light
38
# speed of light in m / ns
41
__c_reduction_permille = 2.5
43
__c = (1 - 0.001 * __c_reduction_permille) * __c0
45
def get_distance_time(self, distance):
46
""" return the time in nanoseconds for a given distance in meters """
47
return int(distance / self.__c)
49
def get_time_distance(self, time_ns):
50
""" return the distance in meters for a given time interval in nanoseconds """
51
return time_ns * self.__c
54
class SimulatedData(object):
55
def __init__(self, x_coord_or_point, y_coord=None):
56
self.strike_location = types.Point(x_coord_or_point, y_coord)
57
self.signal_velocity = SignalVelocity()
58
self.event_builder = builder.Event()
59
self.timestamp = pd.Timestamp(datetime.datetime.utcnow())
61
def get_timestamp(self):
64
def get_event_at(self, x_coord_or_point, y_coord=None, distance_offset=0.0):
65
event_location = types.Point(x_coord_or_point, y_coord)
66
distance = self.strike_location.distance_to(event_location)
68
nanosecond_offset = self.signal_velocity.get_distance_time(distance + distance_offset)
70
self.event_builder.set_x(x_coord_or_point)
71
self.event_builder.set_y(y_coord)
72
self.event_builder.set_timestamp(self.timestamp, nanosecond_offset)
74
return self.event_builder.build()
76
def get_source_event(self):
77
self.event_builder.set_x(self.strike_location.x)
78
self.event_builder.set_y(self.strike_location.y)
79
self.event_builder.set_timestamp(self.timestamp)
80
return self.event_builder.build()
83
class ThreePointSolution(data.Event):
84
def __init__(self, reference_event, azimuth, distance, signal_velocity):
85
location = reference_event.geodesic_shift(azimuth, distance)
86
distance = reference_event.distance_to(location)
88
total_nanoseconds = reference_event.timestamp.value
89
total_nanoseconds -= signal_velocity.get_distance_time(distance)
90
self.signal_velocity = signal_velocity
92
super(ThreePointSolution, self).__init__(pd.Timestamp(total_nanoseconds), location)
94
def get_residual_time_at(self, event):
95
distance = self.distance_to(event)
96
distance_runtime = self.signal_velocity.get_distance_time(distance)
98
measured_runtime = event.timestamp.value - self.timestamp.value
100
return (measured_runtime - distance_runtime) / 1000.0
102
def get_total_residual_time_of(self, events):
103
residual_time_sum = 0.0
105
residual_time = self.get_residual_time_at(event)
107
residual_time_sum += residual_time * residual_time
109
return math.sqrt(residual_time_sum) / len(events)
112
return super(ThreePointSolution, self).__str__()
115
class ThreePointSolver(object):
117
calculates the exact coordinates of the intersection of two hyperbola defined by three event points/times
119
The solution is calculated in a polar coordinate system which has its origin in the location
120
of the first event point
123
def __init__(self, events):
125
raise ValueError("ThreePointSolution requires three events")
128
from . import INJECTOR
130
self.signal_velocity = INJECTOR.get(SignalVelocity)
132
distance_0_1 = events[0].distance_to(events[1])
133
distance_0_2 = events[0].distance_to(events[2])
135
azimuth_0_1 = events[0].azimuth_to(events[1])
136
azimuth_0_2 = events[0].azimuth_to(events[2])
138
time_difference_0_1 = events[0].ns_difference_to(events[1])
139
time_difference_0_2 = events[0].ns_difference_to(events[2])
141
time_distance_0_1 = self.signal_velocity.get_time_distance(time_difference_0_1)
142
time_distance_0_2 = self.signal_velocity.get_time_distance(time_difference_0_2)
144
self.solutions = self.solve(time_distance_0_1, distance_0_1, azimuth_0_1, time_distance_0_2,
145
distance_0_2, azimuth_0_2)
147
def get_solution(self):
148
solution_count = len(self.solutions)
150
if solution_count > 1:
152
for solution in self.solutions:
153
solutions[solution] = solution.get_total_residual_time_of(self.events)
154
print(" 3PointSolution:", solution, solutions[solution])
155
return min(solutions, key=solutions.get)
156
elif solution_count == 1:
157
return self.solutions[0]
161
def solve(self, d1, g1, azimuth1, d2, g2, azimuth2):
163
phi1 = self.azimuth_to_angle(azimuth1)
164
phi2 = self.azimuth_to_angle(azimuth2)
166
(p1, q1) = self.calculate_p_q(d1, g1)
167
(p2, q2) = self.calculate_p_q(d2, g2)
169
(cosine, sine) = self.calculate_angle_projection(phi1, phi2)
171
denominator = q1 * q1 + q2 * q2 - 2 * q1 * q2 * cosine
173
root_argument = q2 * q2 * (-self.square(p1 - p2) + denominator) * sine * sine
177
if root_argument < 0.0 or denominator == 0.0:
178
print("%.1f %.1f %.1f°, %.1f %.1f %.1f°" % (d1, g1, azimuth1, d2, g2, azimuth2))
181
part_1 = (-p1 * q1 + p2 * q1 + (p1 - p2) * q2 * cosine) / denominator
182
part_2 = math.sqrt(root_argument) / denominator
185
if abs(part_1 + part_2) <= 1.0:
186
solution_angles.append(math.acos(part_1 + part_2))
187
solution_angles.append(-math.acos(part_1 + part_2))
188
if abs(part_1 - part_2) <= 1.0:
189
solution_angles.append(math.acos(part_1 - part_2))
190
solution_angles.append(-math.acos(part_1 - part_2))
192
for solution_angle in solution_angles:
193
if self.is_angle_valid_for_hyperbola(solution_angle, d1, g1, phi1) and \
194
self.is_angle_valid_for_hyperbola(solution_angle, d2, g2, phi2):
196
solution_distance = self.hyperbola_radius(solution_angle, d1, g1, 0)
197
solution_azimuth = self.angle_to_azimuth(solution_angle + phi1)
198
solution_a = ThreePointSolution(self.events[0], solution_azimuth, solution_distance,
199
self.signal_velocity)
201
solution_distance = self.hyperbola_radius(solution_angle, d2, g2, phi2 - phi1)
202
solution_azimuth = self.angle_to_azimuth(solution_angle + phi1)
203
solution_b = ThreePointSolution(self.events[0], solution_azimuth, solution_distance,
204
self.signal_velocity)
206
if solution_a.has_same_location(solution_b):
207
solutions.append(solution_a)
212
def calculate_angle_projection(phi1, phi2):
214
return math.cos(angle), math.sin(angle)
217
def hyperbola_radius(theta, d, g, phi):
218
return 0.5 * (g * g - d * d) / (d + g * math.cos(theta - phi))
220
def is_angle_valid_for_hyperbola(self, angle, d, g, phi):
228
asymptotic_angle = self.calculate_asymptotic_angle(d, g)
230
return -asymptotic_angle < angle < asymptotic_angle
233
def calculate_asymptotic_angle(d, g):
234
return math.acos(-d / g)
237
def calculate_p_q(d, g):
238
denominator = g * g - d * d
239
return d / denominator, g / denominator
246
def azimuth_to_angle(azimuth):
247
return math.pi / 2 - azimuth
250
def angle_to_azimuth(angle):
251
return math.pi / 2 - angle
254
class FitSeed(object):
255
def __init__(self, events, signal_velocity):
257
self.signal_velocity = signal_velocity
261
def iterate_combinations(self):
263
for combination in itertools.combinations(self.events[1:5], 2):
264
self.find_three_point_solution([self.events[0]] + list(combination))
266
def find_three_point_solution(self, selected_events):
267
three_point_solver = ThreePointSolver(selected_events)
269
solution = three_point_solver.get_solution()
271
self.solutions[solution] = solution.get_total_residual_time_of(self.events)
273
def get_seed_event(self):
275
self.iterate_combinations()
278
for solution, residual_time in self.solutions.iteritems():
279
print("%.1f %s" % (residual_time, str(solution)))
280
return min(self.solutions, key=self.solutions.get)
286
Time, Longitude, Latitude = range(3)
292
class LeastSquareFit(object):
295
def __init__(self, three_point_solution, events, signal_velocity):
297
self.m_dim = len(events)
299
self.time_reference = events[0].timestamp
300
self.signal_velocity = signal_velocity
302
self.parameters = collections.OrderedDict()
303
self.parameters[FitParameter.Time] = self.calculate_time_value(three_point_solution.timestamp)
304
self.parameters[FitParameter.Longitude] = three_point_solution.x
305
self.parameters[FitParameter.Latitude] = three_point_solution.y
307
self.a_matrix = np.zeros((self.m_dim, self.n_dim))
308
self.b_vector = np.zeros(self.m_dim)
309
self.residuals = np.zeros(self.m_dim)
311
self.least_square_sum = None
312
self.previous_least_square_sum = None
314
self.successful = False
315
self.iteration_count = 0
317
def get_parameter(self, parameter):
318
return self.parameters[parameter]
320
def calculate_time_value(self, timestamp):
321
return (timestamp.value - self.time_reference.value) / self.TIME_FACTOR
323
def calculate_partial_derivative(self, event_index, parameter_index):
327
self.parameters[parameter_index] += delta
329
slope = (self.residuals[event_index] - self.get_residual_time_at(self.events[event_index])) / delta
331
self.parameters[parameter_index] -= delta
335
def perform_fit_step(self):
337
self.iteration_count += 1
339
self.initialize_data()
341
(solution, residues, rank, singular_values) = np.linalg.lstsq(self.a_matrix, self.b_vector)
343
self.update_parameters(solution)
345
self.calculate_least_square_sum()
347
def initialize_data(self):
349
for index, event in enumerate(self.events):
350
self.residuals[index] = self.get_residual_time_at(event)
352
for event_index in range(self.m_dim):
353
for parameter_index in self.parameters:
354
self.a_matrix[event_index][parameter_index] = self.calculate_partial_derivative(event_index,
356
self.b_vector[event_index] = self.get_residual_time_at(self.events[event_index])
358
def update_parameters(self, solution):
359
for parameter_index, delta in enumerate(solution):
360
self.parameters[parameter_index] += delta
362
def calculate_least_square_sum(self):
363
residual_time_sum = 0.0
365
for event in self.events:
366
residual_time = self.get_residual_time_at(event)
368
residual_time_sum += residual_time * residual_time
370
overdetermination_factor = max(1, self.m_dim - self.n_dim)
372
residual_time_sum /= overdetermination_factor
374
self.previous_least_square_sum = self.least_square_sum
375
self.least_square_sum = residual_time_sum
377
return residual_time_sum
379
def get_residual_time_at(self, event):
380
location = self.get_location()
381
distance = location.distance_to(event)
382
distance_runtime = self.signal_velocity.get_distance_time(distance) / self.TIME_FACTOR
384
measured_runtime = self.calculate_time_value(event.timestamp) - self.parameters[FitParameter.Time]
386
return measured_runtime - distance_runtime
388
def get_location(self):
389
return types.Point(self.parameters[FitParameter.Longitude], self.parameters[FitParameter.Latitude])
391
def get_timestamp(self):
392
return pd.Timestamp(self.time_reference.value + int(round(self.parameters[FitParameter.Time] * 1000, 0)))
394
def get_least_square_sum(self):
395
return self.least_square_sum
397
def get_least_square_change(self):
398
if self.least_square_sum and self.previous_least_square_sum:
399
return self.least_square_sum / self.previous_least_square_sum - 1.0
402
def requires_another_iteration(self):
403
if self.least_square_sum and self.previous_least_square_sum:
404
if self.least_square_sum / self.previous_least_square_sum > 1.1:
406
if abs(self.get_least_square_change()) < 10e-4:
407
self.successful = True
410
if self.iteration_count >= 20:
415
def is_successful(self):
416
return self.successful
418
def get_solution(self):
419
event_builder = builder.Event()
421
location = self.get_location()
422
event_builder.set_x(location.x())
423
event_builder.set_y(location.y())
424
event_builder.set_timestamp(self.get_timestamp())
426
return event_builder.build()
428
def get_number_of_iterations(self):
429
return self.iteration_count