This commit is contained in:
邓智航
2026-03-07 17:00:57 +08:00
parent e5c843334e
commit 5e2616471f
2 changed files with 51 additions and 18 deletions

View File

@@ -18,6 +18,7 @@ class Alert:
self.encode_thread = None self.encode_thread = None
self.upload_thread = None self.upload_thread = None
self.stop_event = threading.Event() self.stop_event = threading.Event()
self.abort_event = threading.Event() # 新增:用于超时强制中断
self.dropped_frames = 0 self.dropped_frames = 0
def start(self, width=1920, height=1080, fps=30): def start(self, width=1920, height=1080, fps=30):
@@ -54,15 +55,19 @@ class Alert:
self.stream.width = width self.stream.width = width
self.stream.height = height self.stream.height = height
self.stream.pix_fmt = "yuv420p" self.stream.pix_fmt = "yuv420p"
# 使用更快的编码预设 self.stream.options = {"preset": "ultrafast", "tune": "zerolatency", "crf": "23"}
# self.stream.options = {"preset": "ultrafast", "crf": "23"}
print("AV container and stream initialized") print("AV container and stream initialized")
# 启动编码线程 # 启动编码线程
def _encode() -> None: def _encode() -> None:
try: try:
print("Encode thread starting") print("Encode thread starting")
# 修改循环条件:增加 abort_event 检查
while not self.stop_event.is_set() or not self.frame_queue.empty(): while not self.stop_event.is_set() or not self.frame_queue.empty():
if self.abort_event.is_set():
print("Encode thread aborted by timeout (dropping remaining frames)")
break
try: try:
frame = self.frame_queue.get(timeout=0.1) frame = self.frame_queue.get(timeout=0.1)
if frame is None: if frame is None:
@@ -100,13 +105,10 @@ class Alert:
self.frame_queue.put(None) self.frame_queue.put(None)
self.stop_event.set() self.stop_event.set()
# 等待编码线程完成 # 等待编码线程完成(不设超时,确保所有帧都编码完成)
if self.encode_thread: if self.encode_thread:
self.encode_thread.join(timeout=30) self.encode_thread.join()
if self.encode_thread.is_alive(): print(f"Encode thread completed with {self.frame_count} frames")
print("Warning: Encode thread still running after timeout")
else:
print(f"Encode thread completed with {self.frame_count} frames")
# 完成编码flush所有待处理的数据 # 完成编码flush所有待处理的数据
try: try:

View File

@@ -1,5 +1,6 @@
from calendar import c from calendar import c
from tracemalloc import stop from tracemalloc import stop
from av import buffer
import cv2 import cv2
import threading import threading
import time import time
@@ -46,7 +47,7 @@ def capture_thread():
""" """
采集线程:优化了分发逻辑,对视频流进行降频处理 采集线程:优化了分发逻辑,对视频流进行降频处理
""" """
cap = cv2.VideoCapture(0) cap = cv2.VideoCapture(1)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
@@ -208,7 +209,7 @@ def analysis_thread():
payload["eye_close_freq"] = last_freq payload["eye_close_freq"] = last_freq
front_data["eye_close_freq"] = last_freq front_data["eye_close_freq"] = last_freq
bpm = heart_monitor.process_frame(frame, result["landmark"]) bpm = heart_monitor.process_frame(frame, result["landmark"])
if bpm != None: if bpm != None and result["has_face"]:
result["heart_rate"] = bpm result["heart_rate"] = bpm
payload["heart_rate"] = bpm payload["heart_rate"] = bpm
front_data["heart_rate"] = bpm front_data["heart_rate"] = bpm
@@ -446,6 +447,7 @@ def alert_thread(server):
alert_status = False alert_status = False
alert_st = "" alert_st = ""
level = 0 level = 0
timett = 0
while not stop_event.is_set(): while not stop_event.is_set():
try: try:
frame = ana_video_queue.get(timeout=1) frame = ana_video_queue.get(timeout=1)
@@ -484,6 +486,15 @@ def alert_thread(server):
# no_face_time # no_face_time
if data["label"] == "": if data["label"] == "":
no_face_time += 1 no_face_time += 1
sleep_time = 0
haqian_time = 0
heart_spe = 0
heart_num = 0
eye_close = False
down_emo_time = 0
pianyi_time = 0
else:
no_face_time = 0
alert_time = now alert_time = now
@@ -497,7 +508,7 @@ def alert_thread(server):
alert_st += alert_info["pianyi_time"] + "; " alert_st += alert_info["pianyi_time"] + "; "
pianyi_time = 0 pianyi_time = 0
level += 1 level += 1
if no_face_time >= 60: if no_face_time >= 30:
alert_status = True alert_status = True
alert_st += alert_info["no_face_time"] + "; " alert_st += alert_info["no_face_time"] + "; "
no_face_time = 0 no_face_time = 0
@@ -517,7 +528,7 @@ def alert_thread(server):
alert_status = True alert_status = True
alert_st += alert_info["haqian_time"] + ";" alert_st += alert_info["haqian_time"] + ";"
level += 1 level += 1
if heart_spe < 60 or heart_spe > 120: if (heart_spe < 60 and timett != 0 and heart_spe != 0) or heart_spe > 120:
alert_status = True alert_status = True
alert_st += alert_info["heart_spe"] + ";" alert_st += alert_info["heart_spe"] + ";"
level += 1 level += 1
@@ -543,18 +554,38 @@ def alert_thread(server):
alert = server.alert(int(time.time()), alert_st, info_level) alert = server.alert(int(time.time()), alert_st, info_level)
alert = HookMocker(alert, "http://10.128.48.48:5000/api/osshook") alert = HookMocker(alert, "http://10.128.48.48:5000/api/osshook")
alert.start(width=1280, height=720, fps=30) alert.start(width=1280, height=720, fps=30)
for f in buffered_frame: print(f"upload buffered frames... {buffered_frame.__len__()}")
alert.provide_frame(f) frames_to_upload = buffered_frame
alert.end() buffered_frame = []
upload_task = threading.Thread(
target=handle_alert_upload,
args=(frames_to_upload, alert),
daemon=True
)
upload_task.start()
alert_status = False alert_status = False
alert_st = "" alert_st = ""
buffered_frame = [] buffered_frame = []
level = 0 level = 0
timett += 1
except queue.Empty: except queue.Empty:
continue continue
except Exception as e: # except Exception as e:
print(f"[Alert] 错误: {e}") # print(f"[Alert] 错误: {e}")
continue # continue
def handle_alert_upload(frames, _alert_instance):
"""
后台线程:负责将缓存的帧喂给 alert 对象,并执行编码和上传
"""
try:
# print("后台上传线程启动...")
for f in frames:
_alert_instance.provide_frame(f)
_alert_instance.end() # 这里阻塞也没关系,因为是在后台线程里
# print("后台上传线程结束")
except Exception as e:
print(f"后台上传出错: {e}")
def draw_debug_info(frame, result): def draw_debug_info(frame, result):