import cv2 import mediapipe as mp import time import numpy as np from collections import deque from geometry_utils import calculate_ear, calculate_mar_simple, estimate_head_pose, LEFT_EYE, RIGHT_EYE from face_library import FaceLibrary try: from new_emotion_test import analyze_emotion_with_hsemotion HAS_EMOTION_MODULE = True except ImportError: print("⚠️ 未找到 new_emotion_test.py,情绪功能将不可用") HAS_EMOTION_MODULE = False class MonitorSystem: def __init__(self, face_db): # 初始化 MediaPipe self.mp_face_mesh = mp.solutions.face_mesh self.face_mesh = self.mp_face_mesh.FaceMesh( max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) # 初始化人脸底库 self.face_lib = FaceLibrary(face_db) # 状态变量 self.current_user = None # --- 时间控制 --- self.last_identity_check_time = 0 self.IDENTITY_CHECK_INTERVAL = 2.0 self.last_emotion_check_time = 0 self.EMOTION_CHECK_INTERVAL = 3.0 # --- 历史数据 --- self.HISTORY_LEN = 5 self.ear_history = deque(maxlen=self.HISTORY_LEN) self.mar_history = deque(maxlen=self.HISTORY_LEN) # 缓存上一次的检测结果 self.cached_emotion = { "label": "detecting...", "va": (0.0, 0.0) } def _get_smoothed_value(self, history, current_val): """内部函数:计算滑动平均值""" history.append(current_val) if len(history) == 0: return current_val return sum(history) / len(history) def process_frame(self, frame): """ 输入 BGR 图像,返回分析结果字典 """ h, w = frame.shape[:2] rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = self.face_mesh.process(rgb_frame) analysis_data = { "has_face": False, "ear": 0.0, "mar": 0.0, "pose": (0, 0, 0), "identity": self.current_user, "emotion_label": self.cached_emotion["label"], "emotion_va": self.cached_emotion["va"] } if not results.multi_face_landmarks: self.ear_history.clear() self.mar_history.clear() return analysis_data analysis_data["has_face"] = True landmarks = results.multi_face_landmarks[0].landmark # 计算 EAR left_ear = calculate_ear([landmarks[i] for i in LEFT_EYE], w, h) right_ear = calculate_ear([landmarks[i] for i in RIGHT_EYE], w, h) raw_ear = (left_ear + right_ear) / 2.0 # 计算 MAR top = np.array([landmarks[13].x * w, landmarks[13].y * h]) bottom = np.array([landmarks[14].x * w, landmarks[14].y * h]) left = np.array([landmarks[78].x * w, landmarks[78].y * h]) right = np.array([landmarks[308].x * w, landmarks[308].y * h]) raw_mar = calculate_mar_simple(top, bottom, left, right) # --- 使用 History 进行数据平滑 --- smoothed_ear = self._get_smoothed_value(self.ear_history, raw_ear) smoothed_mar = self._get_smoothed_value(self.mar_history, raw_mar) # 计算头部姿态 pitch, yaw, roll = estimate_head_pose(landmarks, w, h) analysis_data.update({ "ear": round(smoothed_ear, 4), "mar": round(smoothed_mar, 4), "pose": (int(pitch), int(yaw), int(roll)) }) now = time.time() # --- 身份识别 --- if now - self.last_identity_check_time > self.IDENTITY_CHECK_INTERVAL: xs = [l.x for l in landmarks] ys = [l.y for l in landmarks] # 计算人脸框 face_loc = ( int(min(ys) * h), int(max(xs) * w), int(max(ys) * h), int(min(xs) * w) ) pad = 20 face_loc = (max(0, face_loc[0]-pad), min(w, face_loc[1]+pad), min(h, face_loc[2]+pad), max(0, face_loc[3]-pad)) match_result = self.face_lib.identify(rgb_frame, face_location=face_loc) if match_result: self.current_user = match_result["info"] self.last_identity_check_time = now analysis_data["identity"] = self.current_user # --- 情绪识别 --- if HAS_EMOTION_MODULE and (now - self.last_emotion_check_time > self.EMOTION_CHECK_INTERVAL): if results.multi_face_landmarks: landmarks = results.multi_face_landmarks[0].landmark xs = [l.x for l in landmarks] ys = [l.y for l in landmarks] # 计算裁剪坐标 x_min = int(min(xs) * w) x_max = int(max(xs) * w) y_min = int(min(ys) * h) y_max = int(max(ys) * h) pad_x = int((x_max - x_min) * 0.2) pad_y = int((y_max - y_min) * 0.2) x_min = max(0, x_min - pad_x) x_max = min(w, x_max + pad_x) y_min = max(0, y_min - pad_y) y_max = min(h, y_max + pad_y) face_crop = frame[y_min:y_max, x_min:x_max] if face_crop.size > 0: try: emo_results = analyze_emotion_with_hsemotion(face_crop) if emo_results: top_res = emo_results[0] self.cached_emotion["label"] = top_res.get("emotion", "unknown") self.cached_emotion["va"] = top_res.get("vaVal", (0.0, 0.0)) except Exception as e: print(f"情绪分析出错: {e}") self.last_emotion_check_time = now analysis_data["emotion_label"] = self.cached_emotion["label"] analysis_data["emotion_va"] = self.cached_emotion["va"] return analysis_data