diff --git a/reproject/alert.py b/reproject/alert.py index 0a9fa2d..4df414e 100644 --- a/reproject/alert.py +++ b/reproject/alert.py @@ -1,5 +1,3 @@ - - import os import queue import threading @@ -19,9 +17,14 @@ class Alert: self.upload_thread = None self.stop_event = threading.Event() self.dropped_frames = 0 + self.accepting_frames = False + self.ended = False def start(self, width=1920, height=1080, fps=30): print(f"Starting alert with {width}x{height}") + self.accepting_frames = True + self.ended = False + self.stop_event.clear() read_fd, write_fd = os.pipe() self.read_pipe = os.fdopen(read_fd, "rb", buffering=0) self.write_pipe = os.fdopen(write_fd, "wb", buffering=0) @@ -65,8 +68,6 @@ class Alert: while not self.stop_event.is_set() or not self.frame_queue.empty(): try: frame = self.frame_queue.get(timeout=0.1) - if frame is None: - break av_frame = av.VideoFrame.from_ndarray(frame, format="bgr24") for packet in self.stream.encode(av_frame): self.container.mux(packet) @@ -83,6 +84,8 @@ class Alert: self.encode_thread.start() def provide_frame(self, frame): + if not self.accepting_frames or self.ended: + return try: self.frame_queue.put(frame, block=True, timeout=0.05) except queue.Full: @@ -94,17 +97,20 @@ class Alert: ) def end(self): + if self.ended: + return + self.ended = True + self.accepting_frames = False print( f"Stopping alert, queued frames: {self.frame_queue.qsize()}, dropped frames: {self.dropped_frames}" ) - self.frame_queue.put(None) self.stop_event.set() # 等待编码线程完成 if self.encode_thread: - self.encode_thread.join(timeout=30) + self.encode_thread.join() if self.encode_thread.is_alive(): - print("Warning: Encode thread still running after timeout") + print("Warning: Encode thread still running") else: print(f"Encode thread completed with {self.frame_count} frames") diff --git a/reproject/main.py b/reproject/main.py index bd3fc92..a776acd 100644 --- a/reproject/main.py +++ b/reproject/main.py @@ -1,5 +1,6 @@ from calendar import c from tracemalloc import stop +from av import buffer import cv2 import threading import time @@ -61,7 +62,7 @@ def capture_thread(): """ 采集线程:优化了分发逻辑,对视频流进行降频处理 """ - cap = cv2.VideoCapture(0) + cap = cv2.VideoCapture(1) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) @@ -438,6 +439,7 @@ def alert_thread(server): alert_status = False alert_st = "" level = 0 + timett = 0 while not stop_event.is_set(): try: frame = ana_video_queue.get(timeout=1) @@ -476,6 +478,15 @@ def alert_thread(server): # no_face_time if data["label"] == "": no_face_time += 1 + sleep_time = 0 + haqian_time = 0 + heart_spe = 0 + heart_num = 0 + eye_close = False + down_emo_time = 0 + pianyi_time = 0 + else: + no_face_time = 0 alert_time = now @@ -489,7 +500,7 @@ def alert_thread(server): alert_st += alert_info["pianyi_time"] + "; " pianyi_time = 0 level += 1 - if no_face_time >= 60: + if no_face_time >= 30: alert_status = True alert_st += alert_info["no_face_time"] + "; " no_face_time = 0 @@ -509,7 +520,7 @@ def alert_thread(server): alert_status = True alert_st += alert_info["haqian_time"] + ";" level += 1 - if heart_spe < 60 or heart_spe > 120: + if (heart_spe < 60 and timett != 0 and heart_spe != 0) or heart_spe > 120: alert_status = True alert_st += alert_info["heart_spe"] + ";" level += 1 @@ -535,18 +546,38 @@ def alert_thread(server): alert = server.alert(int(time.time()), alert_st, info_level) alert = HookMocker(alert, "http://10.128.48.204:5000/api/osshook") alert.start(width=1280, height=720, fps=30) - for f in buffered_frame: - alert.provide_frame(f) - alert.end() + print(f"upload buffered frames... {buffered_frame.__len__()}") + frames_to_upload = buffered_frame + buffered_frame = [] + upload_task = threading.Thread( + target=handle_alert_upload, + args=(frames_to_upload, alert), + daemon=True + ) + upload_task.start() alert_status = False alert_st = "" buffered_frame = [] level = 0 + timett += 1 except queue.Empty: continue - except Exception as e: - print(f"[Alert] 错误: {e}") - continue + # except Exception as e: + # print(f"[Alert] 错误: {e}") + # continue + +def handle_alert_upload(frames, _alert_instance): + """ + 后台线程:负责将缓存的帧喂给 alert 对象,并执行编码和上传 + """ + try: + # print("后台上传线程启动...") + for f in frames: + _alert_instance.provide_frame(f) + _alert_instance.end() # 这里阻塞也没关系,因为是在后台线程里 + # print("后台上传线程结束") + except Exception as e: + print(f"后台上传出错: {e}") def draw_debug_info(frame, result):