forked from fullstackenviormentss/client-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
opa_client_apis.py
316 lines (255 loc) · 9.66 KB
/
opa_client_apis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# coding=utf-8
"""
Open Policy Client module.
Represents a class with fabric methods to create policy and data objects.
"""
import os
import json
import urllib
from http import HTTPStatus
from multiprocessing import Process
from requests.exceptions import ChunkedEncodingError
from functools import singledispatch
from magen_rest_apis.rest_client_apis import RestClientApis
from magen_rest_apis.rest_return_api import RestReturn
from opa_rest_client.opa_watch import OpaWatch
from opa_rest_client.policy import Policy
from opa_rest_client.config import INVALID_REGO_ERROR
DEBUG_QUERY_STRING = "?explain=full&pretty"
WATCH_QUERY_STRING = "?watch&pretty=true"
def read_from_file(file_path: str) -> str:
"""
Read from file and return a blob of data
:param file_path: path to a file
:type file_path: str
:return: data blob
:rtype: str
"""
with open(file_path) as file:
blob = file.read()
return blob.encode()
@singledispatch
def get_policy_blob():
"""
Single Dispatch for several policy types.
This dispatch works with:
- str
- file
# :param policy: policy of a certain type
:return: policy blob
"""
raise NotImplementedError('Unsupported policy type')
@get_policy_blob.register(str)
def _(policy: str):
if os.path.isfile(policy):
return read_from_file(policy)
return policy.encode()
@get_policy_blob.register(bytes)
def _(policy: object):
return policy
@get_policy_blob.register(object)
def _(policy: object):
return policy.read().encode()
class OPAValidationError(Exception):
""" Exception custom class """
pass
class OPAClient(object):
""" Open Policy Agent connection client """
def __init__(self, server='localhost', port=8181, **kwargs):
self.server = server
self.port = str(port)
api_version = kwargs.pop('api_version', None) or 'v1'
self.base = 'http://' self.server ':' self.port '/' api_version '/'
def create_policy(self, name: str, rego_policy) -> Policy:
"""
Policy creation invokes PUT request to an OPA server and returns a policy object
:param name: name of policy; allows '/'
:type name: str
:param rego_policy: blob policy in REGO language
:type rego_policy: str or file handler
:return: policy object
:rtype: Policy
"""
def validate():
"""
validation function for name parameter
:rtype: void
"""
words = name.split('/')
valid_words = [x for x in words if x.isalpha() or '_' in x]
if len(words) != len(valid_words):
raise OPAValidationError('only slashes, underscores and letters are acceptable in policy name')
validate()
policy_url = self.base 'policies/' name
rego_policy = get_policy_blob(rego_policy)
rest_response = RestClientApis.http_put_and_check_success(policy_url,
rego_policy, headers={'Content-Type': 'text/plain'})
rest_json = rest_response.json_body
if rest_response.http_status == HTTPStatus.BAD_REQUEST and rest_json.get('code') == INVALID_REGO_ERROR:
raise OPAValidationError(str(rest_json.get('errors', 'Invalid or Empty REGO')))
if rest_response.http_status == HTTPStatus.OK and not rest_json:
return Policy(policy_name=name, policy_path=policy_url, rego_contents=rego_policy)
def create_opa_policy(url: str, policy):
"""
This function creates a OPA policy on the server
:param url: url address where policy is placed
:param file_path: .rego file path or a raw rego string
:return: RestReturn
"""
if os.path.isfile(policy):
with open(policy, 'r') as file:
policy_data = file.read()
policy_resp = RestClientApis.http_put_and_check_success(url,
policy_data, headers={'Content-Type': 'text/plain'})
return policy_resp
else:
# we are going to treat it as a raw string and hope for the best.
policy_resp = RestClientApis.http_put_and_check_success(url,
policy, headers={'Content-Type': 'text/plain'})
return policy_resp.success, policy_resp.message
def create_base_doc(url, json_data):
"""
This function creates a OPA policy on the server
:param url: url address where base document is placed
:param json_data: Json data for Base Document
:return: success, message
:rtype: tuple
"""
resp = RestClientApis.http_put_and_check_success(url, json_data)
return resp.success, resp.message
def patch_base_doc(url: str, json_data: str) -> (bool, str):
"""
Patches a base document.
:param url: URL of resource to be patched
:param json_data: JSON data as string
:return: success and message
:rtype: tuple
"""
resp = RestClientApis.http_patch_and_check_success(url, json_data,
headers={'Content-Type': 'application/json-patch json'})
return resp.success, resp.message
def delete_all_base_data_doc(url: str, debug=False):
"""
This function deletes all OPA base docs on the server
:param url: url address where base document is placed
:return: success, message
:rtype: tuple
"""
if debug:
url = url DEBUG_QUERY_STRING
data = json.dumps(dict())
resp = RestClientApis.http_put_and_check_success(url, data)
return resp.success, resp.message
def delete_base_doc(url: str, debug=False):
"""
This function deletes an OPA base doc on the server
:param url: url address where base document is placed
:return: success, message
:rtype: tuple
"""
if debug:
url = url DEBUG_QUERY_STRING
resp = RestClientApis.http_delete_and_check_success(url)
return resp.success, resp.message
def get_base_doc(url: str, debug=False) -> (object):
"""
This function gets an OPA base doc on the server
:param url: url address where base document is placed
:return: RestReturn Obj
:rtype: RestReturn
"""
if debug:
url = url DEBUG_QUERY_STRING
resp = RestClientApis.http_get_and_check_success(url)
return resp
def delete_policy(url: str, debug=False) -> (bool, str, dict):
if debug:
url = url DEBUG_QUERY_STRING
resp = RestClientApis.http_delete_and_check_success(url)
return resp.success, resp.message, resp.json_body
def delete_all_policies(url: str, debug=False):
"""
This function deletes all OPA base docs on the server
:param url: url address where base document is placed
:return: success, message
:rtype: tuple
"""
if debug:
url = url DEBUG_QUERY_STRING
data = json.dumps(dict())
resp = RestClientApis.http_put_and_check_success(url, data)
return resp.success, resp.message
def read_chunks(resp):
buf = ""
try:
for chunk in resp.iter_content(chunk_size=None):
if chunk.endswith(b"\n"):
buf = chunk.decode("utf-8")
yield buf
buf = ""
else:
buf = chunk
except ChunkedEncodingError as e:
# Nothing to do, connection terminated.
pass
def process_watch_stream(resp):
with open("watch_stream.txt", "wb ") as f:
f.seek(0)
for buf in read_chunks(resp):
f.write(buf.encode("utf-8"))
f.flush()
def create_watch(url: str) -> (bool, str, OpaWatch):
"""
Creates a watch in OPA. Watches are persistent connections and changes to the watch points
are streamed back through chunked-encoding.
:param url: URL for resource to watch
:return: success, message, OpaWatch class
:rtype: tuple(bool, message, OpaWatch)
"""
orig_url = url
url = url WATCH_QUERY_STRING
resp_obj = RestClientApis.http_get_and_check_success(url, stream=True, timeout=(2.0, None))
p = Process(target=process_watch_stream, args=(resp_obj.response_object,))
p.start()
opa_watch = OpaWatch(orig_url, p, p.pid)
return resp_obj.success, resp_obj.message, opa_watch
def destroy_watch(opa_watch_cls: OpaWatch):
opa_watch_cls.proc.terminate()
opa_watch_cls.proc.join()
def execute_query(url: str, json_data: str) -> (bool, str, dict):
"""
:param url: URL for query
:param json_data: JSON in string format.
:return: success, message and json body in dict
:rtype: tuple(bool, str, dict)
"""
resp_obj = RestClientApis.http_post_and_check_success(url, json_data, location=False)
return resp_obj.success, resp_obj.message, resp_obj.json_body
def execute_adhoc_query(url: str, query_string: str=None) -> (bool, str, dict):
"""
Executed an ad-hoc query
:param query_string: Everything after the ?=
:param url: URL for query that includes the query string
:return: success, message and json body as dict
:rtype: tuple(bool, str, dict)
"""
if query_string:
enc_query_string = urllib.parse.quote_plus(query_string)
url = url "?q=" enc_query_string
resp_obj = RestClientApis.http_get_and_check_success(url)
return resp_obj.success, resp_obj.message, resp_obj.json_body
policy_rego = '''
package appguard
# HTTP API request
import input as http_api
default allow = false
# Allow configuration that matches data portion
allow {
http_api.zone = data.appguard_config[i]["example.com/zone"]
http_api.classification = data.appguard_config[i]["example.com/classification"]
}
'''
if __name__ == '__main__':
opa = OPAClient()
result = opa.create_policy('mypolicy', policy_rego)
print(result)