diff --git a/reproject/alert.py b/reproject/alert.py index 0a9fa2d..ccc54ee 100644 --- a/reproject/alert.py +++ b/reproject/alert.py @@ -18,6 +18,7 @@ class Alert: self.encode_thread = None self.upload_thread = None self.stop_event = threading.Event() + self.abort_event = threading.Event() # 新增:用于超时强制中断 self.dropped_frames = 0 def start(self, width=1920, height=1080, fps=30): @@ -54,15 +55,19 @@ class Alert: self.stream.width = width self.stream.height = height self.stream.pix_fmt = "yuv420p" - # 使用更快的编码预设 - # self.stream.options = {"preset": "ultrafast", "crf": "23"} + self.stream.options = {"preset": "ultrafast", "tune": "zerolatency", "crf": "23"} print("AV container and stream initialized") # 启动编码线程 def _encode() -> None: try: print("Encode thread starting") + # 修改循环条件:增加 abort_event 检查 while not self.stop_event.is_set() or not self.frame_queue.empty(): + if self.abort_event.is_set(): + print("Encode thread aborted by timeout (dropping remaining frames)") + break + try: frame = self.frame_queue.get(timeout=0.1) if frame is None: @@ -100,13 +105,10 @@ class Alert: self.frame_queue.put(None) self.stop_event.set() - # 等待编码线程完成 + # 等待编码线程完成(不设超时,确保所有帧都编码完成) if self.encode_thread: - self.encode_thread.join(timeout=30) - if self.encode_thread.is_alive(): - print("Warning: Encode thread still running after timeout") - else: - print(f"Encode thread completed with {self.frame_count} frames") + self.encode_thread.join() + print(f"Encode thread completed with {self.frame_count} frames") # 完成编码,flush所有待处理的数据 try: diff --git a/reproject/main.py b/reproject/main.py index 45f04b3..d10b3a1 100644 --- a/reproject/main.py +++ b/reproject/main.py @@ -1,5 +1,6 @@ from calendar import c from tracemalloc import stop +from av import buffer import cv2 import threading import time @@ -46,7 +47,7 @@ def capture_thread(): """ 采集线程:优化了分发逻辑,对视频流进行降频处理 """ - cap = cv2.VideoCapture(0) + cap = cv2.VideoCapture(1) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) @@ -208,7 +209,7 @@ def analysis_thread(): payload["eye_close_freq"] = last_freq front_data["eye_close_freq"] = last_freq bpm = heart_monitor.process_frame(frame, result["landmark"]) - if bpm != None: + if bpm != None and result["has_face"]: result["heart_rate"] = bpm payload["heart_rate"] = bpm front_data["heart_rate"] = bpm @@ -446,6 +447,7 @@ def alert_thread(server): alert_status = False alert_st = "" level = 0 + timett = 0 while not stop_event.is_set(): try: frame = ana_video_queue.get(timeout=1) @@ -484,6 +486,15 @@ def alert_thread(server): # no_face_time if data["label"] == "": no_face_time += 1 + sleep_time = 0 + haqian_time = 0 + heart_spe = 0 + heart_num = 0 + eye_close = False + down_emo_time = 0 + pianyi_time = 0 + else: + no_face_time = 0 alert_time = now @@ -497,7 +508,7 @@ def alert_thread(server): alert_st += alert_info["pianyi_time"] + "; " pianyi_time = 0 level += 1 - if no_face_time >= 60: + if no_face_time >= 30: alert_status = True alert_st += alert_info["no_face_time"] + "; " no_face_time = 0 @@ -517,7 +528,7 @@ def alert_thread(server): alert_status = True alert_st += alert_info["haqian_time"] + ";" level += 1 - if heart_spe < 60 or heart_spe > 120: + if (heart_spe < 60 and timett != 0 and heart_spe != 0) or heart_spe > 120: alert_status = True alert_st += alert_info["heart_spe"] + ";" level += 1 @@ -543,18 +554,38 @@ def alert_thread(server): alert = server.alert(int(time.time()), alert_st, info_level) alert = HookMocker(alert, "http://10.128.48.48:5000/api/osshook") alert.start(width=1280, height=720, fps=30) - for f in buffered_frame: - alert.provide_frame(f) - alert.end() + print(f"upload buffered frames... {buffered_frame.__len__()}") + frames_to_upload = buffered_frame + buffered_frame = [] + upload_task = threading.Thread( + target=handle_alert_upload, + args=(frames_to_upload, alert), + daemon=True + ) + upload_task.start() alert_status = False alert_st = "" buffered_frame = [] level = 0 + timett += 1 except queue.Empty: continue - except Exception as e: - print(f"[Alert] 错误: {e}") - continue + # except Exception as e: + # print(f"[Alert] 错误: {e}") + # continue + +def handle_alert_upload(frames, _alert_instance): + """ + 后台线程:负责将缓存的帧喂给 alert 对象,并执行编码和上传 + """ + try: + # print("后台上传线程启动...") + for f in frames: + _alert_instance.provide_frame(f) + _alert_instance.end() # 这里阻塞也没关系,因为是在后台线程里 + # print("后台上传线程结束") + except Exception as e: + print(f"后台上传出错: {e}") def draw_debug_info(frame, result):