-
Notifications
You must be signed in to change notification settings - Fork 0
/
k_nearest_neighbors.py
143 lines (117 loc) · 5.04 KB
/
k_nearest_neighbors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# Author: Hezekiah Branch
# Tufts CS
import numpy as np
def split_into_train_and_test(x_all_LF, frac_test=0.5, random_state=None):
'''
Divide provided array into train and test set along first dimension
User can provide a random number generator object to ensure reproducibility.
Args
----
x_all_LF : 2D array, shape = (n_total_examples, n_features) (L, F)
Each row is a feature vector
frac_test : float, fraction between 0 and 1
Indicates fraction of all L examples to allocate to the "test" set
random_state : np.random.RandomState instance or integer or None
If int, code will create RandomState instance with provided value as seed
If None, defaults to the current numpy random number generator np.random
Returns
-------
x_train_MF : 2D array, shape = (n_train_examples, n_features) (M, F)
Each row is a feature vector
Should be a separately allocated array, NOT a view of any input array
x_test_NF : 2D array, shape = (n_test_examples, n_features) (N, F)
Each row is a feature vector
Should be a separately allocated array, NOT a view of any input array
Post Condition
--------------
This function should be side-effect free. The provided input array x_all_LF
should not change at all (not be shuffled, etc.)
Examples
--------
>>> x_LF = np.eye(10)
>>> xcopy_LF = x_LF.copy() # preserve what input was before the call
>>> train_MF, test_NF = split_into_train_and_test(
... x_LF, frac_test=0.3, random_state=np.random.RandomState(0))
>>> train_MF.shape
(7, 10)
>>> test_NF.shape
(3, 10)
>>> print(train_MF)
[[0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
[0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
[0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]]
>>> print(test_NF)
[[0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]
[1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]]
## Verify that input array did not change due to function call
>>> np.allclose(x_LF, xcopy_LF)
True
References
----------
For more about RandomState, see:
https://stackoverflow.com/questions/28064634/random-state-pseudo-random-numberin-scikit-learn
'''
# Set seed for reproducibility
seed = None
if random_state is None:
seed = np.random
elif type(random_state) == int:
seed = np.random.RandomState(random_state)
# Calculate dimensions of split
num_rows = len(x_all_LF)
N = int(np.ceil(num_rows * frac_test))
# Generate indices by sampling from uniform distribution
test_indices = seed.choice(num_rows, N, replace=False)
# List comprehension below should be linear time complexity
train_indices = [idx for idx in np.arange(num_rows) if idx not in test_indices]
# Generate training and testing set
x_test_NF = x_all_LF[test_indices, :]
x_train_MF = x_all_LF[train_indices, :]
# Return train and test arrays
return x_train_MF, x_test_NF
def calc_k_nearest_neighbors(data_NF, query_QF, K=1):
''' Compute and return k-nearest neighbors under Euclidean distance
Any ties in distance may be broken arbitrarily.
Args
----
data_NF : 2D array, shape = (n_examples, n_features) aka (N, F)
Each row is a feature vector for one example in dataset
query_QF : 2D array, shape = (n_queries, n_features) aka (Q, F)
Each row is a feature vector whose neighbors we want to find
K : int, positive (must be >= 1)
Number of neighbors to find per query vector
Returns
-------
neighb_QKF : 3D array, (n_queries, n_neighbors, n_feats) (Q, K, F)
Entry q,k is feature vector of the k-th neighbor of the q-th query
'''
# Create dictionary to store map of k nearest neighbors
distance_map = {}
# Lambda function to calculate distance with any valid p-norm
p_norm = lambda a, b, p: sum([(a[i] - b[i]) ** p for i in range(len(a))]) ** (1 / p)
# Calculate kNN for each query in the query-set
# Estimated quadratic time complexity with naive kNN
for q_idx in range(len(query_QF)):
nearest_neighbors = {}
for d_idx in range(len(data_NF)):
# Use euclidean norm to measure similarity between vectors
nearest_neighbors[d_idx] = p_norm(query_QF[q_idx], data_NF[d_idx], 2)
# Sort nearest neighbors by L2 distance
sorted_neighbors = sorted(nearest_neighbors.items(), key=lambda x:x[1])
# Find the top k from closest to farthest
top_k = [dist[0] for dist in sorted_neighbors[0:K]]
# Add top k nearest neighbors to distance map
distance_map[q_idx] = [data_NF[k] for k in top_k]
# Create final three-dimensional array of k-nearest neighbors
neighbors = []
for q_idx in range(len(query_QF)):
grab_top_k = []
for neighbor in distance_map[q_idx]:
grab_top_k.append(neighbor)
neighbors.append(grab_top_k)
return np.array(neighbors, dtype=float)