眨眼频率、眼动分析、心率、视频录制

This commit is contained in:
邓智航
2026-01-25 15:19:01 +08:00
parent 6e882d2aa4
commit b3997c2646
7 changed files with 977 additions and 188 deletions

View File

@@ -3,17 +3,28 @@ import mediapipe as mp
import time
import numpy as np
from collections import deque
from geometry_utils import calculate_ear, calculate_mar_simple, estimate_head_pose, LEFT_EYE, RIGHT_EYE
from geometry_utils import (
calculate_ear,
calculate_mar_simple,
calculate_iris_pos,
estimate_head_pose,
LEFT_EYE,
RIGHT_EYE,
LEFT_EYE_GAZE_IDXS,
RIGHT_EYE_GAZE_IDXS,
)
from face_library import FaceLibrary
try:
from new_emotion_test import analyze_emotion_with_hsemotion
HAS_EMOTION_MODULE = True
except ImportError:
print("⚠️ 未找到 new_emotion_test.py情绪功能将不可用")
HAS_EMOTION_MODULE = False
class MonitorSystem:
def __init__(self, face_db):
# 初始化 MediaPipe
@@ -22,32 +33,33 @@ class MonitorSystem:
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
min_tracking_confidence=0.5,
)
# 初始化人脸底库
self.face_lib = FaceLibrary(face_db)
# 状态变量
self.current_user = None
# --- 时间控制 ---
self.last_identity_check_time = 0
self.IDENTITY_CHECK_INTERVAL = 2.0
self.IDENTITY_CHECK_INTERVAL = 2.0
self.last_emotion_check_time = 0
self.EMOTION_CHECK_INTERVAL = 3.0
self.EMOTION_CHECK_INTERVAL = 3.0
# --- 历史数据 ---
self.HISTORY_LEN = 5
self.ear_history = deque(maxlen=self.HISTORY_LEN)
self.mar_history = deque(maxlen=self.HISTORY_LEN)
# 缓存上一次的检测结果
self.cached_emotion = {
"label": "detecting...",
"va": (0.0, 0.0)
}
self.iris_ratio_history = [
deque(maxlen=self.HISTORY_LEN),
deque(maxlen=self.HISTORY_LEN),
]
# 缓存上一次的检测结果
self.cached_emotion = {"label": "detecting...", "va": (0.0, 0.0)}
def _get_smoothed_value(self, history, current_val):
"""内部函数:计算滑动平均值"""
@@ -62,32 +74,37 @@ class MonitorSystem:
"""
h, w = frame.shape[:2]
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(rgb_frame)
analysis_data = {
"has_face": False,
"ear": 0.0,
"ear": 0.0,
"mar": 0.0,
"iris_ratio": (0.5, 0.5), # 0最左/上1最右/下
"pose": (0, 0, 0),
"identity": self.current_user,
"emotion_label": self.cached_emotion["label"],
"emotion_va": self.cached_emotion["va"]
"emotion_va": self.cached_emotion["va"],
"landmark": (0, w, h, 0),
"frame": frame,
}
if not results.multi_face_landmarks:
self.ear_history.clear()
self.mar_history.clear()
self.iris_ratio_history[0].clear()
self.iris_ratio_history[1].clear()
return analysis_data
analysis_data["has_face"] = True
landmarks = results.multi_face_landmarks[0].landmark
# 计算 EAR
# 计算 EAR
left_ear = calculate_ear([landmarks[i] for i in LEFT_EYE], w, h)
right_ear = calculate_ear([landmarks[i] for i in RIGHT_EYE], w, h)
raw_ear = (left_ear + right_ear) / 2.0
# 计算 MAR
top = np.array([landmarks[13].x * w, landmarks[13].y * h])
bottom = np.array([landmarks[14].x * w, landmarks[14].y * h])
@@ -95,79 +112,136 @@ class MonitorSystem:
right = np.array([landmarks[308].x * w, landmarks[308].y * h])
raw_mar = calculate_mar_simple(top, bottom, left, right)
# 计算虹膜位置
left_iris_ratio = calculate_iris_pos(landmarks, LEFT_EYE_GAZE_IDXS, w, h)
right_iris_ratio = calculate_iris_pos(landmarks, RIGHT_EYE_GAZE_IDXS, w, h)
raw_iris_ratio = (
(left_iris_ratio[0] + right_iris_ratio[0]) / 2.0,
(left_iris_ratio[1] + right_iris_ratio[1]) / 2.0,
)
# --- 使用 History 进行数据平滑 ---
smoothed_ear = self._get_smoothed_value(self.ear_history, raw_ear)
smoothed_mar = self._get_smoothed_value(self.mar_history, raw_mar)
smoothed_iris_ratio = (
(self._get_smoothed_value(self.iris_ratio_history[0], raw_iris_ratio[0])),
(self._get_smoothed_value(self.iris_ratio_history[1], raw_iris_ratio[1])),
)
# 计算头部姿态
pitch, yaw, roll = estimate_head_pose(landmarks, w, h)
analysis_data.update({
"ear": round(smoothed_ear, 4),
"mar": round(smoothed_mar, 4),
"pose": (int(pitch), int(yaw), int(roll))
})
analysis_data.update(
{
"ear": round(smoothed_ear, 4),
"mar": round(smoothed_mar, 4),
"iris_ratio": (
round(smoothed_iris_ratio[0], 4),
round(smoothed_iris_ratio[1], 4),
),
"pose": (int(pitch), int(yaw), int(roll)),
}
)
xs = [l.x for l in landmarks]
ys = [l.y for l in landmarks]
# 计算人脸框
face_loc = (
int(min(ys) * h - 0.1 * h),
int(max(xs) * w + 0.1 * w),
int(max(ys) * h + 0.1 * h),
int(min(xs) * w - 0.1 * w),
)
pad = 30
face_loc = (
max(0, face_loc[0] - pad),
min(w, face_loc[1] + pad),
min(h, face_loc[2] + pad),
max(0, face_loc[3] - pad),
)
analysis_data["landmark"] = face_loc
# --- ROI处理(对比选择在哪里实现) ---
top = face_loc[0]
right = face_loc[1]
bottom = face_loc[2]
left = face_loc[3]
scale_factor = 10
small_bg = cv2.resize(
frame, (w // scale_factor, h // scale_factor), interpolation=cv2.INTER_LINEAR
)
# 使用 INTER_NEAREST 马赛克效果
# 使用 INTER_LINEAR 毛玻璃模糊效果
blurred_frame = cv2.resize(small_bg, (w, h), interpolation=cv2.INTER_LINEAR)
face_roi = frame[top:bottom, left:right]
blurred_frame[top:bottom, left:right] = face_roi
analysis_data["frame"] = blurred_frame
now = time.time()
# --- 身份识别 ---
if now - self.last_identity_check_time > self.IDENTITY_CHECK_INTERVAL:
xs = [l.x for l in landmarks]
ys = [l.y for l in landmarks]
# 计算人脸框
face_loc = (
int(min(ys) * h), int(max(xs) * w),
int(max(ys) * h), int(min(xs) * w)
)
pad = 20
face_loc = (max(0, face_loc[0]-pad), min(w, face_loc[1]+pad),
min(h, face_loc[2]+pad), max(0, face_loc[3]-pad))
# xs = [l.x for l in landmarks]
# ys = [l.y for l in landmarks]
# # 计算人脸框
# face_loc = (
# int(min(ys) * h), int(max(xs) * w),
# int(max(ys) * h), int(min(xs) * w)
# )
# pad = 20
# face_loc = (max(0, face_loc[0]-pad), min(w, face_loc[1]+pad),
# min(h, face_loc[2]+pad), max(0, face_loc[3]-pad))
match_result = self.face_lib.identify(rgb_frame, face_location=face_loc)
if match_result:
self.current_user = match_result["info"]
self.last_identity_check_time = now
analysis_data["identity"] = self.current_user
# --- 情绪识别 ---
if HAS_EMOTION_MODULE and (now - self.last_emotion_check_time > self.EMOTION_CHECK_INTERVAL):
if HAS_EMOTION_MODULE and (
now - self.last_emotion_check_time > self.EMOTION_CHECK_INTERVAL
):
if results.multi_face_landmarks:
landmarks = results.multi_face_landmarks[0].landmark
xs = [l.x for l in landmarks]
ys = [l.y for l in landmarks]
# 计算裁剪坐标
x_min = int(min(xs) * w)
x_max = int(max(xs) * w)
y_min = int(min(ys) * h)
y_max = int(max(ys) * h)
pad_x = int((x_max - x_min) * 0.2)
pad_y = int((y_max - y_min) * 0.2)
x_min = max(0, x_min - pad_x)
x_max = min(w, x_max + pad_x)
y_min = max(0, y_min - pad_y)
y_max = min(h, y_max + pad_y)
face_crop = frame[y_min:y_max, x_min:x_max]
if face_crop.size > 0:
try:
emo_results = analyze_emotion_with_hsemotion(face_crop)
if emo_results:
top_res = emo_results[0]
self.cached_emotion["label"] = top_res.get("emotion", "unknown")
self.cached_emotion["label"] = top_res.get(
"emotion", "unknown"
)
self.cached_emotion["va"] = top_res.get("vaVal", (0.0, 0.0))
except Exception as e:
print(f"情绪分析出错: {e}")
self.last_emotion_check_time = now
analysis_data["emotion_label"] = self.cached_emotion["label"]
analysis_data["emotion_va"] = self.cached_emotion["va"]
return analysis_data
return analysis_data