Files
cv_state_ana/reproject/analyzer.py
2026-01-26 17:26:33 +08:00

265 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import cv2
import mediapipe as mp
import time
import numpy as np
import threading
import queue
from collections import deque
from geometry_utils import (
calculate_ear,
calculate_mar_simple,
calculate_iris_pos,
estimate_head_pose,
LEFT_EYE,
RIGHT_EYE,
LEFT_EYE_GAZE_IDXS,
RIGHT_EYE_GAZE_IDXS,
)
from face_library import FaceLibrary
try:
from new_emotion_test import analyze_emotion_with_hsemotion
HAS_EMOTION_MODULE = True
except ImportError:
print("⚠️ 未找到 new_emotion_test.py情绪功能将不可用")
HAS_EMOTION_MODULE = False
class MonitorSystem:
def __init__(self, face_db):
# 初始化 MediaPipe
self.mp_face_mesh = mp.solutions.face_mesh
self.face_mesh = self.mp_face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
# 初始化人脸底库
self.face_lib = FaceLibrary(face_db)
# --- 时间控制 ---
self.last_identity_check_time = 0
self.IDENTITY_CHECK_INTERVAL = 2.0
self.last_emotion_check_time = 0
self.EMOTION_CHECK_INTERVAL = 3.0
# --- 历史数据 ---
self.HISTORY_LEN = 5
self.ear_history = deque(maxlen=self.HISTORY_LEN)
self.mar_history = deque(maxlen=self.HISTORY_LEN)
self.iris_ratio_history = [
deque(maxlen=self.HISTORY_LEN),
deque(maxlen=self.HISTORY_LEN),
]
# 缓存上一次的检测结果
self.cached_emotion = {"label": "detecting...", "va": (0.0, 0.0)}
self.task_queue = queue.Queue(maxsize=2)
self.current_user = None
self.current_emotion = "Neutral"
self.id_emo_thread = threading.Thread(target=self._id_emo_loop, daemon=True)
self.id_emo_thread.start()
def _get_smoothed_value(self, history, current_val):
"""内部函数:计算滑动平均值"""
history.append(current_val)
if len(history) == 0:
return current_val
return sum(history) / len(history)
def process_frame(self, frame):
"""
输入 BGR 图像,返回分析结果字典
"""
h, w = frame.shape[:2]
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(rgb_frame)
analysis_data = {
"has_face": False,
"ear": 0.0,
"mar": 0.0,
"iris_ratio": (0.5, 0.5), # 0最左/上1最右/下
"pose": (0, 0, 0),
"identity": self.current_user,
"emotion_label": self.cached_emotion["label"],
"emotion_va": self.cached_emotion["va"],
"landmark": (0, w, h, 0),
"frame": frame,
}
if not results.multi_face_landmarks:
self.ear_history.clear()
self.mar_history.clear()
self.iris_ratio_history[0].clear()
self.iris_ratio_history[1].clear()
return analysis_data
analysis_data["has_face"] = True
landmarks = results.multi_face_landmarks[0].landmark
# 计算 EAR
left_ear = calculate_ear([landmarks[i] for i in LEFT_EYE], w, h)
right_ear = calculate_ear([landmarks[i] for i in RIGHT_EYE], w, h)
raw_ear = (left_ear + right_ear) / 2.0
# 计算 MAR
top = np.array([landmarks[13].x * w, landmarks[13].y * h])
bottom = np.array([landmarks[14].x * w, landmarks[14].y * h])
left = np.array([landmarks[78].x * w, landmarks[78].y * h])
right = np.array([landmarks[308].x * w, landmarks[308].y * h])
raw_mar = calculate_mar_simple(top, bottom, left, right)
# 计算虹膜位置
left_iris_ratio = calculate_iris_pos(landmarks, LEFT_EYE_GAZE_IDXS, w, h)
right_iris_ratio = calculate_iris_pos(landmarks, RIGHT_EYE_GAZE_IDXS, w, h)
raw_iris_ratio = (
(left_iris_ratio[0] + right_iris_ratio[0]) / 2.0,
(left_iris_ratio[1] + right_iris_ratio[1]) / 2.0,
)
# --- 使用 History 进行数据平滑 ---
smoothed_ear = self._get_smoothed_value(self.ear_history, raw_ear)
smoothed_mar = self._get_smoothed_value(self.mar_history, raw_mar)
smoothed_iris_ratio = (
(self._get_smoothed_value(self.iris_ratio_history[0], raw_iris_ratio[0])),
(self._get_smoothed_value(self.iris_ratio_history[1], raw_iris_ratio[1])),
)
# 计算头部姿态
pitch, yaw, roll = estimate_head_pose(landmarks, w, h)
analysis_data.update(
{
"ear": round(smoothed_ear, 4),
"mar": round(smoothed_mar, 4),
"iris_ratio": (
round(smoothed_iris_ratio[0], 4),
round(smoothed_iris_ratio[1], 4),
),
"pose": (int(pitch), int(yaw), int(roll)),
}
)
xs = [l.x for l in landmarks]
ys = [l.y for l in landmarks]
# 计算人脸框
face_loc = (
int(min(ys) * h - 0.1 * h),
int(max(xs) * w + 0.1 * w),
int(max(ys) * h + 0.1 * h),
int(min(xs) * w - 0.1 * w),
)
pad = 30
face_loc = (
max(0, face_loc[0] - pad),
min(w, face_loc[1] + pad),
min(h, face_loc[2] + pad),
max(0, face_loc[3] - pad),
)
analysis_data["landmark"] = face_loc
# --- ROI处理(对比选择在哪里实现) ---
top = face_loc[0]
right = face_loc[1]
bottom = face_loc[2]
left = face_loc[3]
scale_factor = 10
small_bg = cv2.resize(
frame, (w // scale_factor, h // scale_factor), interpolation=cv2.INTER_LINEAR
)
# 使用 INTER_NEAREST 马赛克效果
# 使用 INTER_LINEAR 毛玻璃模糊效果
blurred_frame = cv2.resize(small_bg, (w, h), interpolation=cv2.INTER_LINEAR)
face_roi = frame[top:bottom, left:right]
blurred_frame[top:bottom, left:right] = face_roi
analysis_data["frame"] = blurred_frame
now = time.time()
# --- 身份识别 ---
if now - self.last_identity_check_time > self.IDENTITY_CHECK_INTERVAL:
sface_loc = (
int(min(ys) * h), int(max(xs) * w),
int(max(ys) * h), int(min(xs) * w)
)
spad = 20
sface_loc = (max(0, sface_loc[0]-spad), min(w, sface_loc[1]+spad),
min(h, sface_loc[2]+spad), max(0, sface_loc[3]-spad))
if self.task_queue.full():
self.task_queue.get()
self.task_queue.put((rgb_frame.copy(), sface_loc, 0))
self.last_identity_check_time = now
analysis_data["identity"] = self.current_user
# --- 情绪识别 ---
if HAS_EMOTION_MODULE and (
now - self.last_emotion_check_time > self.EMOTION_CHECK_INTERVAL
):
# 计算裁剪坐标
x_min = int(min(xs) * w)
x_max = int(max(xs) * w)
y_min = int(min(ys) * h)
y_max = int(max(ys) * h)
pad_x = int((x_max - x_min) * 0.1)
pad_y = int((y_max - y_min) * 0.1)
x_min = max(0, x_min - pad_x)
x_max = min(w, x_max + pad_x)
y_min = max(0, y_min - pad_y)
y_max = min(h, y_max + pad_y)
face_loc = (y_min, x_max, y_max, x_min)
if self.task_queue.full():
self.task_queue.get()
self.task_queue.put((frame.copy(), face_loc, 1))
self.last_emotion_check_time = now
analysis_data["emotion_label"] = self.cached_emotion["label"]
analysis_data["emotion_va"] = self.cached_emotion["va"]
return analysis_data
def _id_emo_loop(self):
while True:
try:
frame, face_loc, task_type = self.task_queue.get()
if task_type == 0:
match_result = self.face_lib.identify(frame, face_location=face_loc)
if match_result:
self.current_user = match_result["info"]
elif task_type == 1 and HAS_EMOTION_MODULE:
face_crop = frame[face_loc[0]:face_loc[2], face_loc[3]:face_loc[1]]
if face_crop.size > 0:
try:
emo_results = analyze_emotion_with_hsemotion(face_crop)
if emo_results:
top_res = emo_results[0]
self.cached_emotion["label"] = top_res.get(
"emotion", "unknown"
)
self.cached_emotion["va"] = top_res.get("vaVal", (0.0, 0.0))
except Exception as e:
print(f"情绪分析出错: {e}")
except Exception as e:
print(f"线程处理出错: {e}")