-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathplot_image_only.py
82 lines (64 loc) · 2.25 KB
/
plot_image_only.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
from typing import Callable, Optional
import cv2
import numpy as np
import pyarrow as pa
from dora import DoraStatus
pa.array([])
CI = os.environ.get("CI")
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
font = cv2.FONT_HERSHEY_SIMPLEX
class Operator:
"""
Plot image and bounding box
"""
def __init__(self):
self.image = []
self.bboxs = []
self.bounding_box_messages = 0
self.image_messages = 0
def on_event(
self,
dora_event: dict,
send_output: Callable[[str, bytes | pa.Array, Optional[dict]], None],
) -> DoraStatus:
if dora_event["type"] == "INPUT":
return self.on_input(dora_event, send_output)
return DoraStatus.CONTINUE
def on_input(
self,
dora_input: dict,
send_output: Callable[[str, bytes | pa.Array, Optional[dict]], None],
) -> DoraStatus:
"""
Put image and bounding box on cv2 window.
Args:
dora_input["id"] (str): Id of the dora_input declared in the yaml configuration
dora_input["value"] (arrow array): message of the dora_input
send_output Callable[[str, bytes | pa.Array, Optional[dict]], None]:
Function for sending output to the dataflow:
- First argument is the `output_id`
- Second argument is the data as either bytes or `pa.Array`
- Third argument is dora metadata dict
e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
"""
if dora_input["id"] == "image":
frame = (
dora_input["value"]
.to_numpy()
.reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
.copy() # copy the image because we want to modify it below
)
self.image = frame
self.image_messages += 1
print("received " + str(self.image_messages) + " images")
if CI != "true":
cv2.imshow("frame", self.image)
if cv2.waitKey(1) & 0xFF == ord("q"):
return DoraStatus.STOP
return DoraStatus.CONTINUE
def __del__(self):
cv2.destroyAllWindows()