Merge branch 'master' of ssh://188.166.251.33:2222/colden/cv_state_ana

This commit is contained in:
2026-05-21 18:31:16 +08:00
2 changed files with 53 additions and 16 deletions

View File

@@ -1,5 +1,3 @@
import os import os
import queue import queue
import threading import threading
@@ -19,9 +17,14 @@ class Alert:
self.upload_thread = None self.upload_thread = None
self.stop_event = threading.Event() self.stop_event = threading.Event()
self.dropped_frames = 0 self.dropped_frames = 0
self.accepting_frames = False
self.ended = False
def start(self, width=1920, height=1080, fps=30): def start(self, width=1920, height=1080, fps=30):
print(f"Starting alert with {width}x{height}") print(f"Starting alert with {width}x{height}")
self.accepting_frames = True
self.ended = False
self.stop_event.clear()
read_fd, write_fd = os.pipe() read_fd, write_fd = os.pipe()
self.read_pipe = os.fdopen(read_fd, "rb", buffering=0) self.read_pipe = os.fdopen(read_fd, "rb", buffering=0)
self.write_pipe = os.fdopen(write_fd, "wb", buffering=0) self.write_pipe = os.fdopen(write_fd, "wb", buffering=0)
@@ -65,8 +68,6 @@ class Alert:
while not self.stop_event.is_set() or not self.frame_queue.empty(): while not self.stop_event.is_set() or not self.frame_queue.empty():
try: try:
frame = self.frame_queue.get(timeout=0.1) frame = self.frame_queue.get(timeout=0.1)
if frame is None:
break
av_frame = av.VideoFrame.from_ndarray(frame, format="bgr24") av_frame = av.VideoFrame.from_ndarray(frame, format="bgr24")
for packet in self.stream.encode(av_frame): for packet in self.stream.encode(av_frame):
self.container.mux(packet) self.container.mux(packet)
@@ -83,6 +84,8 @@ class Alert:
self.encode_thread.start() self.encode_thread.start()
def provide_frame(self, frame): def provide_frame(self, frame):
if not self.accepting_frames or self.ended:
return
try: try:
self.frame_queue.put(frame, block=True, timeout=0.05) self.frame_queue.put(frame, block=True, timeout=0.05)
except queue.Full: except queue.Full:
@@ -94,17 +97,20 @@ class Alert:
) )
def end(self): def end(self):
if self.ended:
return
self.ended = True
self.accepting_frames = False
print( print(
f"Stopping alert, queued frames: {self.frame_queue.qsize()}, dropped frames: {self.dropped_frames}" f"Stopping alert, queued frames: {self.frame_queue.qsize()}, dropped frames: {self.dropped_frames}"
) )
self.frame_queue.put(None)
self.stop_event.set() self.stop_event.set()
# 等待编码线程完成 # 等待编码线程完成
if self.encode_thread: if self.encode_thread:
self.encode_thread.join(timeout=30) self.encode_thread.join()
if self.encode_thread.is_alive(): if self.encode_thread.is_alive():
print("Warning: Encode thread still running after timeout") print("Warning: Encode thread still running")
else: else:
print(f"Encode thread completed with {self.frame_count} frames") print(f"Encode thread completed with {self.frame_count} frames")

View File

@@ -1,5 +1,6 @@
from calendar import c from calendar import c
from tracemalloc import stop from tracemalloc import stop
from av import buffer
import cv2 import cv2
import threading import threading
import time import time
@@ -61,7 +62,7 @@ def capture_thread():
""" """
采集线程:优化了分发逻辑,对视频流进行降频处理 采集线程:优化了分发逻辑,对视频流进行降频处理
""" """
cap = cv2.VideoCapture(0) cap = cv2.VideoCapture(1)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
@@ -438,6 +439,7 @@ def alert_thread(server):
alert_status = False alert_status = False
alert_st = "" alert_st = ""
level = 0 level = 0
timett = 0
while not stop_event.is_set(): while not stop_event.is_set():
try: try:
frame = ana_video_queue.get(timeout=1) frame = ana_video_queue.get(timeout=1)
@@ -476,6 +478,15 @@ def alert_thread(server):
# no_face_time # no_face_time
if data["label"] == "": if data["label"] == "":
no_face_time += 1 no_face_time += 1
sleep_time = 0
haqian_time = 0
heart_spe = 0
heart_num = 0
eye_close = False
down_emo_time = 0
pianyi_time = 0
else:
no_face_time = 0
alert_time = now alert_time = now
@@ -489,7 +500,7 @@ def alert_thread(server):
alert_st += alert_info["pianyi_time"] + "; " alert_st += alert_info["pianyi_time"] + "; "
pianyi_time = 0 pianyi_time = 0
level += 1 level += 1
if no_face_time >= 60: if no_face_time >= 30:
alert_status = True alert_status = True
alert_st += alert_info["no_face_time"] + "; " alert_st += alert_info["no_face_time"] + "; "
no_face_time = 0 no_face_time = 0
@@ -509,7 +520,7 @@ def alert_thread(server):
alert_status = True alert_status = True
alert_st += alert_info["haqian_time"] + ";" alert_st += alert_info["haqian_time"] + ";"
level += 1 level += 1
if heart_spe < 60 or heart_spe > 120: if (heart_spe < 60 and timett != 0 and heart_spe != 0) or heart_spe > 120:
alert_status = True alert_status = True
alert_st += alert_info["heart_spe"] + ";" alert_st += alert_info["heart_spe"] + ";"
level += 1 level += 1
@@ -535,18 +546,38 @@ def alert_thread(server):
alert = server.alert(int(time.time()), alert_st, info_level) alert = server.alert(int(time.time()), alert_st, info_level)
alert = HookMocker(alert, "http://10.128.48.204:5000/api/osshook") alert = HookMocker(alert, "http://10.128.48.204:5000/api/osshook")
alert.start(width=1280, height=720, fps=30) alert.start(width=1280, height=720, fps=30)
for f in buffered_frame: print(f"upload buffered frames... {buffered_frame.__len__()}")
alert.provide_frame(f) frames_to_upload = buffered_frame
alert.end() buffered_frame = []
upload_task = threading.Thread(
target=handle_alert_upload,
args=(frames_to_upload, alert),
daemon=True
)
upload_task.start()
alert_status = False alert_status = False
alert_st = "" alert_st = ""
buffered_frame = [] buffered_frame = []
level = 0 level = 0
timett += 1
except queue.Empty: except queue.Empty:
continue continue
except Exception as e: # except Exception as e:
print(f"[Alert] 错误: {e}") # print(f"[Alert] 错误: {e}")
continue # continue
def handle_alert_upload(frames, _alert_instance):
"""
后台线程:负责将缓存的帧喂给 alert 对象,并执行编码和上传
"""
try:
# print("后台上传线程启动...")
for f in frames:
_alert_instance.provide_frame(f)
_alert_instance.end() # 这里阻塞也没关系,因为是在后台线程里
# print("后台上传线程结束")
except Exception as e:
print(f"后台上传出错: {e}")
def draw_debug_info(frame, result): def draw_debug_info(frame, result):