-
Notifications
You must be signed in to change notification settings - Fork 0
/
hausdorff.py
293 lines (225 loc) · 8.21 KB
/
hausdorff.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
"""
Raw Hausdorff distance and metric calculation implementation for point sets `X` and `Y`.
"""
import multiprocessing as mp
import numpy as np
import numba as nb
from typing import List, Tuple, Dict, Optional, Callable, Union, Sequence
@nb.jit(nopython=True, fastmath=True, parallel=True)
def pairwise_distances(X: np.ndarray, Y: np.ndarray, *,
metric: Callable) -> np.ndarray:
card_X = X.shape[0]
card_Y = Y.shape[0]
distances = np.zeros(card_X*card_Y, dtype=np.float32)
for i in range(card_X):
for j in range(card_Y):
dist = metric(X[i, ...], Y[j, ...])
distances[i*j] = dist
return distances
@nb.jit(nopython=True, fastmath=True, parallel=False)
def dir_hd_distcs(X: np.ndarray, Y: np.ndarray, *, metric: Callable) -> np.ndarray:
"""
Compute the directed Hausdorff distances between the sets `X` and `Y`.
Return an array of distances:
For every element x € X, the minimum distance d_min = metric(x, y_min)
with y_min € Y is provided.
Hint: X and Y should be provided as positional arguments only.
Parameters
----------
X: numpy.ndarray
First set for dirH(X, Y)
Y: numpy.ndarray
Second set for dirH(X, Y)
metric: Callable
The metric function used to calculate
dist = metric(x, y) with x € X and y € Y.
Returns
-------
distances: numpy.ndarray
The 1D array of minimal distances with a length
of X.shape[0].
"""
card_X = X.shape[0]
card_Y = Y.shape[0]
distances = np.zeros(card_X, dtype=np.float32)
for i in range(card_X):
mindist = np.inf
for j in range(card_Y):
dist = metric(X[i, ...], Y[j, ...])
if dist < mindist:
mindist = dist
distances[i] = mindist
return distances
@nb.jit(nopython=True, fastmath=True)
def dir_hd_dist(X: np.ndarray, Y: np.ndarray, *, metric: Callable) -> float:
"""
Compute the directed Hausdorff distance.
Hint: X and Y should be provided as positional arguments only.
Parameters
----------
X: numpy.ndarray
First set for dirH(X, Y)
Y: numpy.ndarray
Second set for dirH(X, Y)
metric: Callable
The metric function used to calculate
dist = metric(x, y) with x € X and y € Y.
Returns
-------
hd_distance: float
The directed Hausdorff distance dH(X, Y).
"""
card_X = X.shape[0]
card_Y = Y.shape[0]
maxdist = 0.0
for i in range(card_X):
mindist = np.inf
for j in range(card_Y):
dist = metric(X[i, ...], Y[j, ...])
if dist < mindist:
mindist = dist
if maxdist < mindist:
maxdist = mindist
return maxdist
@nb.jit(nopython=True, fastmath=True, parallel=False)
def argtr_dir_hd_distcs(X: np.ndarray, Y: np.ndarray, *, metric: Callable) -> Tuple[np.ndarray]:
"""
Computes the argument-tracked directed Hausdorff distances between `X` and `Y`.
This means that the distance and the
corresponding index of the element of `Y` that gives the minimal distance
is calculated:
dist_i = metric(x_i, y*_j) where y*_j minimizes the distances over all y € Y.
Hint: X and Y should be provided as positional arguments only.
Parameters
----------
X: numpy.ndarray
First set for dirH(X, Y)
Y: numpy.ndarray
Second set for dirH(X, Y)
metric: Callable
The metric function used to calculate
dist = metric(x, y) with x € X and y € Y.
Returns
-------
(distances, indices): 2-tuple of numpy.ndarray
The tuple of distances and corresponding indices of elements
of Y that minimize the distance.
"""
card_X = X.shape[0]
card_Y = Y.shape[0]
distances = np.zeros((card_X), dtype=np.float32)
indices = np.full(card_X, np.nan, dtype=np.int32)
for i in range(card_X):
mindist = np.inf
minidx = np.nan
for j in range(card_Y):
dist = metric(X[i, ...], Y[j, ...])
if dist < mindist:
mindist = dist
minidx = j
distances[i] = mindist
indices[i] = minidx
assert np.all(~np.isnan(indices)), 'WTF'
return (distances, indices)
@nb.jit(nopython=True, fastmath=True)
def hd_distcs(X: np.ndarray, Y: np.ndarray, *, metric: Callable) -> np.ndarray:
"""
Compute the Hausdorff distances between `X` and `Y` as well as the mirror
case `Y` and `X`.
Hint: X and Y should be provided as positional arguments only.
Parameters
----------
X: numpy.ndarray
First set.
Y: numpy.ndarray
Second set.
metric: Callable
The metric function used to calculate
dist = metric(x, y) with x € X and y € Y.
Returns
-------
(hdists_XY, hdists_YX) : 2-tuple of np.ndarray
Tuple of both directed Hausdorff distances dH(X, Y)
and dH(Y, X).
"""
hdists_XY = dir_hd_distcs(X, Y, metric=metric)
hdists_YX = dir_hd_distcs(Y, X, metric=metric)
return (hdists_XY, hdists_YX)
def hd_distcs_pll(X: np.ndarray, Y: np.ndarray, *, metric: Callable):
"""
Parallelized Hausdorff distances calculation of (dH(X, Y), dH(Y, X))
distributed across two processes.
Hint: X and Y should be provided as positional arguments only.
Parameters
----------
X: numpy.ndarray
First set.
Y: numpy.ndarray
Second set.
metric: Callable
The metric function used to calculate
dist = metric(x, y) with x € X and y € Y.
Returns
-------
(hdists_XY, hdists_YX) : 2-tuple of np.ndarray
Tuple of both directed Hausdorff distances dH(X, Y)
and dH(Y, X).
"""
queue = mp.Queue(3)
worker_XY = mp.Process(target=_worker,
args=(dir_hd_distcs, (X, Y), {'metric' : metric}, queue, 0))
worker_YX = mp.Process(target=_worker,
args=(dir_hd_distcs, (Y, X), {'metric' : metric}, queue, 1))
results = []
workers = [worker_XY, worker_YX]
for w in workers:
w.start()
# collect results from queue BEFORE joining to circumvent eternal .join()
# hang ... be aware that this loop runs forever if a worker fucks up
while len(results) < 2:
results.append(queue.get(block=True))
for w in workers:
w.join()
results.sort(key=lambda tpl : tpl[0])
return tuple(r[1] for r in results)
def _worker(func: Callable, fargs: Tuple, fkwargs: dict, queue: mp.Queue, pos: int) -> None:
"""
Worker template for multiprocessing implementation in `hd_distcs_pll`.
The callable `func` is evaluated with `fargs` and `fkwargs`, its results are placed in
the provided queue as a tuple with the `pos` integer as a canonical
ordering index.
"""
result = func(*fargs, **fkwargs)
queue.put((pos, result))
def avg_hd_dist(X: np.ndarray, Y: np.ndarray, *, metric: Callable, parallel=True) -> float:
"""
Hint: X and Y should be provided as positional arguments only.
"""
if parallel:
dists = hd_distcs_pll(X, Y, metric=metric)
else:
dists = hd_distcs(X, Y, metric=metric)
# accumulate the mean of the directed Hausdorff distances
dir_mean_acc = 0.0
for dir_dists in dists:
dir_mean_acc = np.mean(dir_dists)
# average again
return dir_mean_acc / 2
def hd_metric(X: np.ndarray, Y: np.ndarray, *, metric: Callable, parallel=True) -> float:
"""
Compute the Hausdorff metric between the sets `X` and `Y`.
Hint: X and Y should be provided as positional arguments only.
"""
if parallel:
return np.max(np.concatenate(hd_distcs_pll(X, Y, metric=metric), axis=0))
else:
return np.max(np.concatenate(hd_distcs(X, Y, metric=metric), axis=0))
def percentile_hd_metric(X: np.ndarray, Y: np.ndarray, *, metric: Callable,
q: Union[float, Sequence[float]],
**kwargs) -> float:
"""
Convenience function to compute q-th percentile of Hausdorff distances between
`X` and `Y`.
"""
distances = np.concatenate(hd_distcs(X, Y, metric=metric), axis=0)
return np.percentile(distances, q=q, **kwargs)