Files
cv_state_ana/reproject/analyzer.py
2026-05-21 18:24:46 +08:00

398 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import cv2
import mediapipe as mp
import time
import numpy as np
import queue
import multiprocessing as mp_proc
from collections import deque
from geometry_utils import (
calculate_ear,
calculate_mar_simple,
calculate_iris_pos,
estimate_head_pose,
LEFT_EYE,
RIGHT_EYE,
LEFT_EYE_GAZE_IDXS,
RIGHT_EYE_GAZE_IDXS,
)
# from face_library import FaceLibrary
# try:
# from new_emotion_test import analyze_emotion_with_hsemotion
# HAS_EMOTION_MODULE = True
# except ImportError:
# print("⚠️ 未找到 new_emotion_test.py情绪功能将不可用")
# HAS_EMOTION_MODULE = False
class MonitorSystem:
def __init__(self, face_db):
# 初始化 MediaPipe
self.mp_face_mesh = mp.solutions.face_mesh
self.face_mesh = self.mp_face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
# 初始化人脸底库
# self.face_lib = FaceLibrary(face_db)
# --- 时间控制 ---
self.last_identity_check_time = 0
self.IDENTITY_CHECK_INTERVAL = 2.0
self.last_emotion_check_time = 0
self.EMOTION_CHECK_INTERVAL = 3.0
# --- 历史数据 ---
self.HISTORY_LEN = 5
self.ear_history = deque(maxlen=self.HISTORY_LEN)
self.mar_history = deque(maxlen=self.HISTORY_LEN)
self.iris_ratio_history = [
deque(maxlen=self.HISTORY_LEN),
deque(maxlen=self.HISTORY_LEN),
]
# 缓存上一次的检测结果
self.cached_emotion = {"label": "detecting...", "va": (0.0, 0.0)}
self.current_user = None
self.current_emotion = "Neutral"
self.frame_shape = (720, 1280, 3)
# 使用 spawn 避免 fork 复制 OpenCV/MediaPipe/ONNXRuntime 等 C++ 运行时状态。
self.mp_ctx = mp_proc.get_context("spawn")
self.task_queue = self.mp_ctx.Queue(maxsize=2)
self.result_queue = self.mp_ctx.Queue(maxsize=2)
self.worker_proc = self.mp_ctx.Process(
target=background_worker_process,
args=(
self.task_queue,
self.result_queue,
face_db,
),
)
self.worker_proc.start()
def _get_smoothed_value(self, history, current_val):
"""内部函数:计算滑动平均值"""
history.append(current_val)
if len(history) == 0:
return current_val
return sum(history) / len(history)
def process_frame(self, frame):
"""
输入 BGR 图像,返回分析结果字典
"""
# 强制检查分辨率,如果不匹配则 Resize (对应 __init__ 中硬编码的 1280x720)
# 这一步至关重要,否则后台进程读到的全是黑屏
target_h, target_w = self.frame_shape[:2]
if frame.shape[:2] != (target_h, target_w):
frame = cv2.resize(frame, (target_w, target_h))
h, w = frame.shape[:2]
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(rgb_frame)
analysis_data = {
"has_face": False,
"ear": 0.0,
"mar": 0.0,
"iris_ratio": (0.5, 0.5), # 0最左/上1最右/下
"pose": (0, 0, 0),
"identity": self.current_user,
"emotion_label": self.cached_emotion["label"],
"emotion_va": self.cached_emotion["va"],
"landmark": (0, w, h, 0),
"frame": frame,
}
if not results.multi_face_landmarks:
self.ear_history.clear()
self.mar_history.clear()
self.iris_ratio_history[0].clear()
self.iris_ratio_history[1].clear()
return analysis_data
analysis_data["has_face"] = True
landmarks = results.multi_face_landmarks[0].landmark
# 计算 EAR
left_ear = calculate_ear([landmarks[i] for i in LEFT_EYE], w, h)
right_ear = calculate_ear([landmarks[i] for i in RIGHT_EYE], w, h)
raw_ear = (left_ear + right_ear) / 2.0
# 计算 MAR
top = np.array([landmarks[13].x * w, landmarks[13].y * h])
bottom = np.array([landmarks[14].x * w, landmarks[14].y * h])
left = np.array([landmarks[78].x * w, landmarks[78].y * h])
right = np.array([landmarks[308].x * w, landmarks[308].y * h])
raw_mar = calculate_mar_simple(top, bottom, left, right)
# 计算虹膜位置
left_iris_ratio = calculate_iris_pos(landmarks, LEFT_EYE_GAZE_IDXS, w, h)
right_iris_ratio = calculate_iris_pos(landmarks, RIGHT_EYE_GAZE_IDXS, w, h)
raw_iris_ratio = (
(left_iris_ratio[0] + right_iris_ratio[0]) / 2.0,
(left_iris_ratio[1] + right_iris_ratio[1]) / 2.0,
)
# --- 使用 History 进行数据平滑 ---
smoothed_ear = self._get_smoothed_value(self.ear_history, raw_ear)
smoothed_mar = self._get_smoothed_value(self.mar_history, raw_mar)
smoothed_iris_ratio = (
(self._get_smoothed_value(self.iris_ratio_history[0], raw_iris_ratio[0])),
(self._get_smoothed_value(self.iris_ratio_history[1], raw_iris_ratio[1])),
)
# 计算头部姿态
pitch, yaw, roll = estimate_head_pose(landmarks, w, h)
analysis_data.update(
{
"ear": round(smoothed_ear, 4),
"mar": round(smoothed_mar, 4),
"iris_ratio": (
round(smoothed_iris_ratio[0], 4),
round(smoothed_iris_ratio[1], 4),
),
"pose": (int(pitch), int(yaw), int(roll)),
}
)
xs = [l.x for l in landmarks]
ys = [l.y for l in landmarks]
# 计算人脸框
face_loc = (
int(min(ys) * h - 0.1 * h),
int(max(xs) * w + 0.1 * w),
int(max(ys) * h + 0.1 * h),
int(min(xs) * w - 0.1 * w),
)
pad = 30
face_loc = (
max(0, face_loc[0] - pad),
min(w, face_loc[1] + pad),
min(h, face_loc[2] + pad),
max(0, face_loc[3] - pad),
)
analysis_data["landmark"] = face_loc
# --- ROI处理(对比选择在哪里实现) ---
top = face_loc[0]
right = face_loc[1]
bottom = face_loc[2]
left = face_loc[3]
scale_factor = 10
small_bg = cv2.resize(
frame,
(w // scale_factor, h // scale_factor),
interpolation=cv2.INTER_LINEAR,
)
# 使用 INTER_NEAREST 马赛克效果
# 使用 INTER_LINEAR 毛玻璃模糊效果
blurred_frame = cv2.resize(small_bg, (w, h), interpolation=cv2.INTER_LINEAR)
face_roi = frame[top:bottom, left:right]
blurred_frame[top:bottom, left:right] = face_roi
analysis_data["frame"] = blurred_frame
now = time.time()
# --- 身份识别 ---
if now - self.last_identity_check_time > self.IDENTITY_CHECK_INTERVAL:
sface_loc = (
int(min(ys) * h),
int(max(xs) * w),
int(max(ys) * h),
int(min(xs) * w),
)
spad = 20
sface_loc = (
max(0, sface_loc[0] - spad),
min(w, sface_loc[1] + spad),
min(h, sface_loc[2] + spad),
max(0, sface_loc[3] - spad),
)
self._put_latest_task((0, frame.copy(), sface_loc))
self.last_identity_check_time = now
# --- 情绪识别 ---
if (
now - self.last_emotion_check_time > self.EMOTION_CHECK_INTERVAL
):
# 计算裁剪坐标
x_min = int(min(xs) * w)
x_max = int(max(xs) * w)
y_min = int(min(ys) * h)
y_max = int(max(ys) * h)
pad_x = int((x_max - x_min) * 0.1)
pad_y = int((y_max - y_min) * 0.1)
x_min = max(0, x_min - pad_x)
x_max = min(w, x_max + pad_x)
y_min = max(0, y_min - pad_y)
y_max = min(h, y_max + pad_y)
face_loc = (y_min, x_max, y_max, x_min)
face_crop = frame[y_min:y_max, x_min:x_max].copy()
if face_crop.size > 0:
self._put_latest_task((1, face_crop, None))
self.last_emotion_check_time = now
self._drain_results()
analysis_data["identity"] = self.current_user
analysis_data["emotion_label"] = self.cached_emotion["label"]
analysis_data["emotion_va"] = self.cached_emotion["va"]
return analysis_data
def _put_latest_task(self, task):
try:
if self.task_queue.full():
self.task_queue.get_nowait()
self.task_queue.put_nowait(task)
except queue.Full:
try:
self.task_queue.get_nowait()
self.task_queue.put_nowait(task)
except (queue.Empty, queue.Full):
pass
except queue.Empty:
pass
def _drain_results(self):
while True:
try:
type_, data = self.result_queue.get_nowait()
except queue.Empty:
break
if type_ == "identity":
self.current_user = data
elif type_ == "emotion":
self.cached_emotion["label"] = data.get("emotion", "unknown")
self.cached_emotion["va"] = data.get("vaVal", (0.0, 0.0))
def close(self):
try:
self._put_latest_task(None)
except Exception:
pass
if self.worker_proc.is_alive():
self.worker_proc.join(timeout=3)
if self.worker_proc.is_alive():
print("[Worker] 未正常退出,强制结束")
self.worker_proc.terminate()
self.worker_proc.join(timeout=2)
try:
self.face_mesh.close()
except Exception:
pass
# def _id_emo_loop(self):
# while True:
# try:
# frame, face_loc, task_type = self.task_queue.get()
# if task_type == 0:
# match_result = self.face_lib.identify(frame, face_location=face_loc)
# if match_result:
# self.current_user = match_result["info"]
# elif task_type == 1 and HAS_EMOTION_MODULE:
# face_crop = frame[
# face_loc[0] : face_loc[2], face_loc[3] : face_loc[1]
# ]
# if face_crop.size > 0:
# try:
# emo_results = analyze_emotion_with_hsemotion(face_crop)
# if emo_results:
# top_res = emo_results[0]
# self.cached_emotion["label"] = top_res.get(
# "emotion", "unknown"
# )
# self.cached_emotion["va"] = top_res.get(
# "vaVal", (0.0, 0.0)
# )
# except Exception as e:
# print(f"情绪分析出错: {e}")
# except Exception as e:
# print(f"线程处理出错: {e}")
def background_worker_process(
task_queue, # 任务队列 (主 -> 从)
result_queue, # 结果队列 (从 -> 主)
face_db_data, # 把人脸库数据传过去初始化
):
print("[Worker] 正在加载模型...")
from face_library import FaceLibrary
face_lib = FaceLibrary(face_db_data)
try:
from new_emotion_test import analyze_emotion_with_hsemotion
has_emo = True
except:
has_emo = False
print("[Worker] 模型加载完毕")
while True:
try:
task = task_queue.get()
if task is None:
break
task_type, frame_data, face_loc = task
if task_type == 0: # Identity
rgb = cv2.cvtColor(frame_data, cv2.COLOR_BGR2RGB)
res = face_lib.identify(rgb, face_location=face_loc)
if res:
_put_latest_result(result_queue, ("identity", res["info"]))
elif task_type == 1 and has_emo: # Emotion
if frame_data.size > 0:
emo_res = analyze_emotion_with_hsemotion(frame_data)
if emo_res:
_put_latest_result(result_queue, ("emotion", emo_res[0]))
except Exception as e:
print(f"[Worker Error] {e}")
def _put_latest_result(result_queue, result):
try:
if result_queue.full():
result_queue.get_nowait()
result_queue.put_nowait(result)
except queue.Full:
try:
result_queue.get_nowait()
result_queue.put_nowait(result)
except (queue.Empty, queue.Full):
pass
except queue.Empty:
pass