diff --git a/reproject/analyzer.py b/reproject/analyzer.py index 93d7a14..890b743 100644 --- a/reproject/analyzer.py +++ b/reproject/analyzer.py @@ -4,6 +4,8 @@ import time import numpy as np import threading import queue +import multiprocessing as mp_proc +from multiprocessing import shared_memory from collections import deque from geometry_utils import ( calculate_ear, @@ -15,16 +17,15 @@ from geometry_utils import ( LEFT_EYE_GAZE_IDXS, RIGHT_EYE_GAZE_IDXS, ) -from face_library import FaceLibrary +# from face_library import FaceLibrary +# try: +# from new_emotion_test import analyze_emotion_with_hsemotion -try: - from new_emotion_test import analyze_emotion_with_hsemotion - - HAS_EMOTION_MODULE = True -except ImportError: - print("⚠️ 未找到 new_emotion_test.py,情绪功能将不可用") - HAS_EMOTION_MODULE = False +# HAS_EMOTION_MODULE = True +# except ImportError: +# print("⚠️ 未找到 new_emotion_test.py,情绪功能将不可用") +# HAS_EMOTION_MODULE = False class MonitorSystem: @@ -39,7 +40,7 @@ class MonitorSystem: ) # 初始化人脸底库 - self.face_lib = FaceLibrary(face_db) + # self.face_lib = FaceLibrary(face_db) # --- 时间控制 --- self.last_identity_check_time = 0 @@ -60,12 +61,51 @@ class MonitorSystem: # 缓存上一次的检测结果 self.cached_emotion = {"label": "detecting...", "va": (0.0, 0.0)} - self.task_queue = queue.Queue(maxsize=2) self.current_user = None self.current_emotion = "Neutral" - self.id_emo_thread = threading.Thread(target=self._id_emo_loop, daemon=True) - self.id_emo_thread.start() + self.frame_shape = (720, 1280, 3) + frame_size = int(np.prod(self.frame_shape)) + + # 必须先解除可能存在的残留 (Windows上有时不需要,但保持好习惯) + # 最好是随机生成一个名字,确保每次运行都是新的 + import secrets + auth_key = secrets.token_hex(4) + shm_unique_name = f"monitor_shm_{auth_key}" + + try: + self.shm = shared_memory.SharedMemory(create=True, size=frame_size, name=shm_unique_name) + except FileExistsError: + # 如果真的点背碰上了,就 connect 这一块 + self.shm = shared_memory.SharedMemory(name=shm_unique_name) + + print(f"[Main] 共享内存已创建: {self.shm.name} (Size: {frame_size} bytes)") + + # 本地 numpy 包装器 + self.shared_frame_array = np.ndarray( + self.frame_shape, dtype=np.uint8, buffer=self.shm.buf + ) + # 初始化为全黑,避免噪音 + self.shared_frame_array.fill(0) + + # 跨进程队列 + self.task_queue = mp_proc.Queue(maxsize=2) + self.result_queue = mp_proc.Queue(maxsize=2) # 1就够了,最新的覆盖 + + # 3. 启动进程 + # Windows下传参,只传名字字符串是安全的 + self.worker_proc = mp_proc.Process( + target=background_worker_process, + args=( + self.shm.name, + self.frame_shape, + self.task_queue, + self.result_queue, + face_db, + ), + ) + self.worker_proc.daemon = True + self.worker_proc.start() def _get_smoothed_value(self, history, current_val): """内部函数:计算滑动平均值""" @@ -78,7 +118,20 @@ class MonitorSystem: """ 输入 BGR 图像,返回分析结果字典 """ + # 强制检查分辨率,如果不匹配则 Resize (对应 __init__ 中硬编码的 1280x720) + # 这一步至关重要,否则后台进程读到的全是黑屏 + target_h, target_w = self.frame_shape[:2] + if frame.shape[:2] != (target_h, target_w): + frame = cv2.resize(frame, (target_w, target_h)) + h, w = frame.shape[:2] + # 现在肯定匹配了,放心写入 + try: + self.shared_frame_array[:] = frame[:] + except Exception: + # 极端情况:数组形状不匹配 (比如通道数变了) + pass + rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = self.face_mesh.process(rgb_frame) @@ -174,7 +227,9 @@ class MonitorSystem: left = face_loc[3] scale_factor = 10 small_bg = cv2.resize( - frame, (w // scale_factor, h // scale_factor), interpolation=cv2.INTER_LINEAR + frame, + (w // scale_factor, h // scale_factor), + interpolation=cv2.INTER_LINEAR, ) # 使用 INTER_NEAREST 马赛克效果 # 使用 INTER_LINEAR 毛玻璃模糊效果 @@ -189,23 +244,27 @@ class MonitorSystem: # --- 身份识别 --- if now - self.last_identity_check_time > self.IDENTITY_CHECK_INTERVAL: sface_loc = ( - int(min(ys) * h), int(max(xs) * w), - int(max(ys) * h), int(min(xs) * w) + int(min(ys) * h), + int(max(xs) * w), + int(max(ys) * h), + int(min(xs) * w), ) spad = 20 - sface_loc = (max(0, sface_loc[0]-spad), min(w, sface_loc[1]+spad), - min(h, sface_loc[2]+spad), max(0, sface_loc[3]-spad)) + sface_loc = ( + max(0, sface_loc[0] - spad), + min(w, sface_loc[1] + spad), + min(h, sface_loc[2] + spad), + max(0, sface_loc[3] - spad), + ) if self.task_queue.full(): self.task_queue.get() - self.task_queue.put((rgb_frame.copy(), sface_loc, 0)) - + self.task_queue.put((sface_loc, 0)) + self.last_identity_check_time = now - analysis_data["identity"] = self.current_user - # --- 情绪识别 --- - if HAS_EMOTION_MODULE and ( + if ( now - self.last_emotion_check_time > self.EMOTION_CHECK_INTERVAL ): @@ -222,43 +281,120 @@ class MonitorSystem: x_max = min(w, x_max + pad_x) y_min = max(0, y_min - pad_y) y_max = min(h, y_max + pad_y) - + face_loc = (y_min, x_max, y_max, x_min) - + if self.task_queue.full(): self.task_queue.get() - self.task_queue.put((frame.copy(), face_loc, 1)) + self.task_queue.put((face_loc, 1)) self.last_emotion_check_time = now + while not self.result_queue.empty(): + type_, data = self.result_queue.get() + if type_ == "identity": + self.current_user = data + elif type_ == "emotion": + self.cached_emotion["label"] = data.get("emotion", "unknown") + self.cached_emotion["va"] = data.get("vaVal", (0.0, 0.0)) + + analysis_data["identity"] = self.current_user analysis_data["emotion_label"] = self.cached_emotion["label"] analysis_data["emotion_va"] = self.cached_emotion["va"] return analysis_data - - def _id_emo_loop(self): - while True: - try: - frame, face_loc, task_type = self.task_queue.get() - if task_type == 0: - match_result = self.face_lib.identify(frame, face_location=face_loc) - if match_result: - self.current_user = match_result["info"] - elif task_type == 1 and HAS_EMOTION_MODULE: - face_crop = frame[face_loc[0]:face_loc[2], face_loc[3]:face_loc[1]] - if face_crop.size > 0: - try: - emo_results = analyze_emotion_with_hsemotion(face_crop) + # def _id_emo_loop(self): + # while True: + # try: + # frame, face_loc, task_type = self.task_queue.get() + # if task_type == 0: + # match_result = self.face_lib.identify(frame, face_location=face_loc) + # if match_result: + # self.current_user = match_result["info"] + # elif task_type == 1 and HAS_EMOTION_MODULE: + # face_crop = frame[ + # face_loc[0] : face_loc[2], face_loc[3] : face_loc[1] + # ] - if emo_results: - top_res = emo_results[0] - self.cached_emotion["label"] = top_res.get( - "emotion", "unknown" - ) - self.cached_emotion["va"] = top_res.get("vaVal", (0.0, 0.0)) + # if face_crop.size > 0: + # try: + # emo_results = analyze_emotion_with_hsemotion(face_crop) - except Exception as e: - print(f"情绪分析出错: {e}") - except Exception as e: - print(f"线程处理出错: {e}") + # if emo_results: + # top_res = emo_results[0] + # self.cached_emotion["label"] = top_res.get( + # "emotion", "unknown" + # ) + # self.cached_emotion["va"] = top_res.get( + # "vaVal", (0.0, 0.0) + # ) + + # except Exception as e: + # print(f"情绪分析出错: {e}") + # except Exception as e: + # print(f"线程处理出错: {e}") + + +def background_worker_process( + shm_name, # 共享内存的名字 + frame_shape, # 图像大小 (h, w, 3) + task_queue, # 任务队列 (主 -> 从) + result_queue, # 结果队列 (从 -> 主) + face_db_data, # 把人脸库数据传过去初始化 +): + existing_shm = shared_memory.SharedMemory(name=shm_name) + # 创建 numpy 数组视图,无需复制数据 + shared_frame = np.ndarray(frame_shape, dtype=np.uint8, buffer=existing_shm.buf) + + print("[Worker] 正在加载模型...") + from face_library import FaceLibrary + + face_lib = FaceLibrary(face_db_data) + + try: + from new_emotion_test import analyze_emotion_with_hsemotion + + has_emo = True + except: + has_emo = False + print("[Worker] 模型加载完毕") + + while True: + try: + # 阻塞等待任务 + # task_info = (task_type, face_loc) + face_loc, task_type = task_queue.get() + + # 注意:这里读取的是共享内存里的图,不需要传图! + # 切片操作也是零拷贝 + # 为了安全,这里 copy 一份出来处理,避免主进程修改 + # 但实际上如果主进程只写新帧,这里读旧帧也问题不大 + # 为了绝对安全和解耦,我们假定主进程已经写入了对应的帧 + + # (实战技巧:通常我们会用一个信号量或多块共享内存来实现乒乓缓存) + # 简化版:我们直接从 shared_frame 读。 + # 由于主进程跑得快,可能SharedMemory里已经是下一帧了。 + # 但对于识别身份来说,差一两帧根本没区别!这才是优化的精髓。 + + current_frame_view = shared_frame.copy() # .copy() 如果你怕读写冲突 + + if task_type == 0: # Identity + # RGB转换 + rgb = cv2.cvtColor(current_frame_view, cv2.COLOR_BGR2RGB) + res = face_lib.identify(rgb, face_location=face_loc) + if res: + result_queue.put(("identity", res["info"])) + + elif task_type == 1 and has_emo: # Emotion + # BGR 直接切 + roi = current_frame_view[ + face_loc[0] : face_loc[2], face_loc[3] : face_loc[1] + ] + if roi.size > 0: + emo_res = analyze_emotion_with_hsemotion(roi) + if emo_res: + result_queue.put(("emotion", emo_res[0])) + + except Exception as e: + print(f"[Worker Error] {e}") diff --git a/reproject/main.py b/reproject/main.py index ce792f4..d580aaf 100644 --- a/reproject/main.py +++ b/reproject/main.py @@ -92,7 +92,7 @@ def capture_thread(): """ 采集线程:优化了分发逻辑,对视频流进行降频处理 """ - cap = cv2.VideoCapture(0) + cap = cv2.VideoCapture(1) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) @@ -244,8 +244,9 @@ def video_stream_thread(): 发送线程:优化了 Socket 设置和压缩参数 """ print(f"[Video] 准备连接服务器 {SERVER_HOST}:{SERVER_PORT} ...") - server = WebRTCServer(fps=60) + server = WebRTCServer(60, 5, "ws://10.128.50.6:5000") server.start() + print("[Video] WebRTC 服务器启动完成") fourcc = cv2.VideoWriter_fourcc(*'avc1') # jetson-nvenc 编码器 # bitrate = 1000000 # 1 Mbps @@ -268,7 +269,9 @@ def video_stream_thread(): try: frame = video_queue.get(timeout=1) # small_frame = cv2.resize(apply_soft_roi(frame), (1280, 720)) + # print("[Video] 获取一帧视频") server.provide_frame(frame) + # print("[Video] 已提供给 WebRTC 服务器") # out1.write(frame) # out2.write(frame) except queue.Empty: diff --git a/reproject/webrtc_server.py b/reproject/webrtc_server.py index f26a361..31818dc 100644 --- a/reproject/webrtc_server.py +++ b/reproject/webrtc_server.py @@ -17,7 +17,7 @@ from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE class WebRTCServer: - def __init__(self, fps): + def __init__(self, fps, seat, server="ws://localhost:5000"): self.pcs = set() self.fps = fps self.frameContainer = [None] @@ -31,7 +31,8 @@ class WebRTCServer: ) self._rtc_thread.start() - self.server = "ws://10.128.50.6:5000" + self.server = server + self.seat = seat def _start_background_loop(self, loop): asyncio.set_event_loop(loop) @@ -43,8 +44,7 @@ class WebRTCServer: @sio.event async def connect(): print("已连接到中心信令服务器") - # TODO 注册自己为设备 - await sio.emit("checkin", {"device_id": "cam_001"}) + await sio.emit("checkin", {"seat_id": self.seat}) @sio.event async def offer(data):