Files
cv_state_ana/reproject/analyzer.py
2026-01-26 22:06:52 +08:00

401 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import cv2
import mediapipe as mp
import time
import numpy as np
import threading
import queue
import multiprocessing as mp_proc
from multiprocessing import shared_memory
from collections import deque
from geometry_utils import (
calculate_ear,
calculate_mar_simple,
calculate_iris_pos,
estimate_head_pose,
LEFT_EYE,
RIGHT_EYE,
LEFT_EYE_GAZE_IDXS,
RIGHT_EYE_GAZE_IDXS,
)
# from face_library import FaceLibrary
# try:
# from new_emotion_test import analyze_emotion_with_hsemotion
# HAS_EMOTION_MODULE = True
# except ImportError:
# print("⚠️ 未找到 new_emotion_test.py情绪功能将不可用")
# HAS_EMOTION_MODULE = False
class MonitorSystem:
def __init__(self, face_db):
# 初始化 MediaPipe
self.mp_face_mesh = mp.solutions.face_mesh
self.face_mesh = self.mp_face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
# 初始化人脸底库
# self.face_lib = FaceLibrary(face_db)
# --- 时间控制 ---
self.last_identity_check_time = 0
self.IDENTITY_CHECK_INTERVAL = 2.0
self.last_emotion_check_time = 0
self.EMOTION_CHECK_INTERVAL = 3.0
# --- 历史数据 ---
self.HISTORY_LEN = 5
self.ear_history = deque(maxlen=self.HISTORY_LEN)
self.mar_history = deque(maxlen=self.HISTORY_LEN)
self.iris_ratio_history = [
deque(maxlen=self.HISTORY_LEN),
deque(maxlen=self.HISTORY_LEN),
]
# 缓存上一次的检测结果
self.cached_emotion = {"label": "detecting...", "va": (0.0, 0.0)}
self.current_user = None
self.current_emotion = "Neutral"
self.frame_shape = (720, 1280, 3)
frame_size = int(np.prod(self.frame_shape))
# 必须先解除可能存在的残留 (Windows上有时不需要但保持好习惯)
# 最好是随机生成一个名字,确保每次运行都是新的
import secrets
auth_key = secrets.token_hex(4)
shm_unique_name = f"monitor_shm_{auth_key}"
try:
self.shm = shared_memory.SharedMemory(create=True, size=frame_size, name=shm_unique_name)
except FileExistsError:
# 如果真的点背碰上了,就 connect 这一块
self.shm = shared_memory.SharedMemory(name=shm_unique_name)
print(f"[Main] 共享内存已创建: {self.shm.name} (Size: {frame_size} bytes)")
# 本地 numpy 包装器
self.shared_frame_array = np.ndarray(
self.frame_shape, dtype=np.uint8, buffer=self.shm.buf
)
# 初始化为全黑,避免噪音
self.shared_frame_array.fill(0)
# 跨进程队列
self.task_queue = mp_proc.Queue(maxsize=2)
self.result_queue = mp_proc.Queue(maxsize=2) # 1就够了最新的覆盖
# 3. 启动进程
# Windows下传参只传名字字符串是安全的
self.worker_proc = mp_proc.Process(
target=background_worker_process,
args=(
self.shm.name,
self.frame_shape,
self.task_queue,
self.result_queue,
face_db,
),
)
self.worker_proc.daemon = True
self.worker_proc.start()
def _get_smoothed_value(self, history, current_val):
"""内部函数:计算滑动平均值"""
history.append(current_val)
if len(history) == 0:
return current_val
return sum(history) / len(history)
def process_frame(self, frame):
"""
输入 BGR 图像,返回分析结果字典
"""
# 强制检查分辨率,如果不匹配则 Resize (对应 __init__ 中硬编码的 1280x720)
# 这一步至关重要,否则后台进程读到的全是黑屏
target_h, target_w = self.frame_shape[:2]
if frame.shape[:2] != (target_h, target_w):
frame = cv2.resize(frame, (target_w, target_h))
h, w = frame.shape[:2]
# 现在肯定匹配了,放心写入
try:
self.shared_frame_array[:] = frame[:]
except Exception:
# 极端情况:数组形状不匹配 (比如通道数变了)
pass
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(rgb_frame)
analysis_data = {
"has_face": False,
"ear": 0.0,
"mar": 0.0,
"iris_ratio": (0.5, 0.5), # 0最左/上1最右/下
"pose": (0, 0, 0),
"identity": self.current_user,
"emotion_label": self.cached_emotion["label"],
"emotion_va": self.cached_emotion["va"],
"landmark": (0, w, h, 0),
"frame": frame,
}
if not results.multi_face_landmarks:
self.ear_history.clear()
self.mar_history.clear()
self.iris_ratio_history[0].clear()
self.iris_ratio_history[1].clear()
return analysis_data
analysis_data["has_face"] = True
landmarks = results.multi_face_landmarks[0].landmark
# 计算 EAR
left_ear = calculate_ear([landmarks[i] for i in LEFT_EYE], w, h)
right_ear = calculate_ear([landmarks[i] for i in RIGHT_EYE], w, h)
raw_ear = (left_ear + right_ear) / 2.0
# 计算 MAR
top = np.array([landmarks[13].x * w, landmarks[13].y * h])
bottom = np.array([landmarks[14].x * w, landmarks[14].y * h])
left = np.array([landmarks[78].x * w, landmarks[78].y * h])
right = np.array([landmarks[308].x * w, landmarks[308].y * h])
raw_mar = calculate_mar_simple(top, bottom, left, right)
# 计算虹膜位置
left_iris_ratio = calculate_iris_pos(landmarks, LEFT_EYE_GAZE_IDXS, w, h)
right_iris_ratio = calculate_iris_pos(landmarks, RIGHT_EYE_GAZE_IDXS, w, h)
raw_iris_ratio = (
(left_iris_ratio[0] + right_iris_ratio[0]) / 2.0,
(left_iris_ratio[1] + right_iris_ratio[1]) / 2.0,
)
# --- 使用 History 进行数据平滑 ---
smoothed_ear = self._get_smoothed_value(self.ear_history, raw_ear)
smoothed_mar = self._get_smoothed_value(self.mar_history, raw_mar)
smoothed_iris_ratio = (
(self._get_smoothed_value(self.iris_ratio_history[0], raw_iris_ratio[0])),
(self._get_smoothed_value(self.iris_ratio_history[1], raw_iris_ratio[1])),
)
# 计算头部姿态
pitch, yaw, roll = estimate_head_pose(landmarks, w, h)
analysis_data.update(
{
"ear": round(smoothed_ear, 4),
"mar": round(smoothed_mar, 4),
"iris_ratio": (
round(smoothed_iris_ratio[0], 4),
round(smoothed_iris_ratio[1], 4),
),
"pose": (int(pitch), int(yaw), int(roll)),
}
)
xs = [l.x for l in landmarks]
ys = [l.y for l in landmarks]
# 计算人脸框
face_loc = (
int(min(ys) * h - 0.1 * h),
int(max(xs) * w + 0.1 * w),
int(max(ys) * h + 0.1 * h),
int(min(xs) * w - 0.1 * w),
)
pad = 30
face_loc = (
max(0, face_loc[0] - pad),
min(w, face_loc[1] + pad),
min(h, face_loc[2] + pad),
max(0, face_loc[3] - pad),
)
analysis_data["landmark"] = face_loc
# --- ROI处理(对比选择在哪里实现) ---
top = face_loc[0]
right = face_loc[1]
bottom = face_loc[2]
left = face_loc[3]
scale_factor = 10
small_bg = cv2.resize(
frame,
(w // scale_factor, h // scale_factor),
interpolation=cv2.INTER_LINEAR,
)
# 使用 INTER_NEAREST 马赛克效果
# 使用 INTER_LINEAR 毛玻璃模糊效果
blurred_frame = cv2.resize(small_bg, (w, h), interpolation=cv2.INTER_LINEAR)
face_roi = frame[top:bottom, left:right]
blurred_frame[top:bottom, left:right] = face_roi
analysis_data["frame"] = blurred_frame
now = time.time()
# --- 身份识别 ---
if now - self.last_identity_check_time > self.IDENTITY_CHECK_INTERVAL:
sface_loc = (
int(min(ys) * h),
int(max(xs) * w),
int(max(ys) * h),
int(min(xs) * w),
)
spad = 20
sface_loc = (
max(0, sface_loc[0] - spad),
min(w, sface_loc[1] + spad),
min(h, sface_loc[2] + spad),
max(0, sface_loc[3] - spad),
)
if self.task_queue.full():
self.task_queue.get()
self.task_queue.put((sface_loc, 0))
self.last_identity_check_time = now
# --- 情绪识别 ---
if (
now - self.last_emotion_check_time > self.EMOTION_CHECK_INTERVAL
):
# 计算裁剪坐标
x_min = int(min(xs) * w)
x_max = int(max(xs) * w)
y_min = int(min(ys) * h)
y_max = int(max(ys) * h)
pad_x = int((x_max - x_min) * 0.1)
pad_y = int((y_max - y_min) * 0.1)
x_min = max(0, x_min - pad_x)
x_max = min(w, x_max + pad_x)
y_min = max(0, y_min - pad_y)
y_max = min(h, y_max + pad_y)
face_loc = (y_min, x_max, y_max, x_min)
if self.task_queue.full():
self.task_queue.get()
self.task_queue.put((face_loc, 1))
self.last_emotion_check_time = now
while not self.result_queue.empty():
type_, data = self.result_queue.get()
if type_ == "identity":
self.current_user = data
elif type_ == "emotion":
self.cached_emotion["label"] = data.get("emotion", "unknown")
self.cached_emotion["va"] = data.get("vaVal", (0.0, 0.0))
analysis_data["identity"] = self.current_user
analysis_data["emotion_label"] = self.cached_emotion["label"]
analysis_data["emotion_va"] = self.cached_emotion["va"]
return analysis_data
# def _id_emo_loop(self):
# while True:
# try:
# frame, face_loc, task_type = self.task_queue.get()
# if task_type == 0:
# match_result = self.face_lib.identify(frame, face_location=face_loc)
# if match_result:
# self.current_user = match_result["info"]
# elif task_type == 1 and HAS_EMOTION_MODULE:
# face_crop = frame[
# face_loc[0] : face_loc[2], face_loc[3] : face_loc[1]
# ]
# if face_crop.size > 0:
# try:
# emo_results = analyze_emotion_with_hsemotion(face_crop)
# if emo_results:
# top_res = emo_results[0]
# self.cached_emotion["label"] = top_res.get(
# "emotion", "unknown"
# )
# self.cached_emotion["va"] = top_res.get(
# "vaVal", (0.0, 0.0)
# )
# except Exception as e:
# print(f"情绪分析出错: {e}")
# except Exception as e:
# print(f"线程处理出错: {e}")
def background_worker_process(
shm_name, # 共享内存的名字
frame_shape, # 图像大小 (h, w, 3)
task_queue, # 任务队列 (主 -> 从)
result_queue, # 结果队列 (从 -> 主)
face_db_data, # 把人脸库数据传过去初始化
):
existing_shm = shared_memory.SharedMemory(name=shm_name)
# 创建 numpy 数组视图,无需复制数据
shared_frame = np.ndarray(frame_shape, dtype=np.uint8, buffer=existing_shm.buf)
print("[Worker] 正在加载模型...")
from face_library import FaceLibrary
face_lib = FaceLibrary(face_db_data)
try:
from new_emotion_test import analyze_emotion_with_hsemotion
has_emo = True
except:
has_emo = False
print("[Worker] 模型加载完毕")
while True:
try:
# 阻塞等待任务
# task_info = (task_type, face_loc)
face_loc, task_type = task_queue.get()
# 注意:这里读取的是共享内存里的图,不需要传图!
# 切片操作也是零拷贝
# 为了安全,这里 copy 一份出来处理,避免主进程修改
# 但实际上如果主进程只写新帧,这里读旧帧也问题不大
# 为了绝对安全和解耦,我们假定主进程已经写入了对应的帧
# (实战技巧:通常我们会用一个信号量或多块共享内存来实现乒乓缓存)
# 简化版:我们直接从 shared_frame 读。
# 由于主进程跑得快可能SharedMemory里已经是下一帧了。
# 但对于识别身份来说,差一两帧根本没区别!这才是优化的精髓。
current_frame_view = shared_frame.copy() # .copy() 如果你怕读写冲突
if task_type == 0: # Identity
# RGB转换
rgb = cv2.cvtColor(current_frame_view, cv2.COLOR_BGR2RGB)
res = face_lib.identify(rgb, face_location=face_loc)
if res:
result_queue.put(("identity", res["info"]))
elif task_type == 1 and has_emo: # Emotion
# BGR 直接切
roi = current_frame_view[
face_loc[0] : face_loc[2], face_loc[3] : face_loc[1]
]
if roi.size > 0:
emo_res = analyze_emotion_with_hsemotion(roi)
if emo_res:
result_queue.put(("emotion", emo_res[0]))
except Exception as e:
print(f"[Worker Error] {e}")