diff --git a/reproject/main.py b/reproject/main.py index f9e1dee..39e9155 100644 --- a/reproject/main.py +++ b/reproject/main.py @@ -1,4 +1,5 @@ from calendar import c +from tracemalloc import stop import cv2 import threading import time @@ -23,68 +24,22 @@ BASIC_FACE_DB = { "Yaoyu": {"name": "Yaoyu Zhang", "age": 20, "image-path": "yaoyu.jpg"}, } -mp_face_mesh_main = mp.solutions.face_mesh -face_mesh_main = mp_face_mesh_main.FaceMesh( - max_num_faces=1, - refine_landmarks=True, - min_detection_confidence=0.5, - min_tracking_confidence=0.5, -) - - -def apply_soft_roi(frame): - h, w = frame.shape[:2] - rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - black_bg = np.zeros((h, w, 3), dtype=np.uint8) - - results = face_mesh_main.process(rgb_frame) - if not results.multi_face_landmarks: - return frame - landmarks = results.multi_face_landmarks[0].landmark - xs = [l.x for l in landmarks] - ys = [l.y for l in landmarks] - # 计算人脸框 - face_loc = ( - int(min(ys) * h - 0.1 * h), - int(max(xs) * w + 0.1 * w), - int(max(ys) * h + 0.1 * h), - int(min(xs) * w - 0.1 * w), - ) - pad = 30 - face_loc = ( - max(0, face_loc[0] - pad), - min(w, face_loc[1] + pad), - min(h, face_loc[2] + pad), - max(0, face_loc[3] - pad), - ) - top = face_loc[0] - right = face_loc[1] - bottom = face_loc[2] - left = face_loc[3] - scale_factor = 10 - small_bg = cv2.resize( - frame, (w // scale_factor, h // scale_factor), interpolation=cv2.INTER_LINEAR - ) - # 使用 INTER_NEAREST 马赛克效果 - # 使用 INTER_LINEAR 毛玻璃模糊效果 - blurred_frame = cv2.resize(small_bg, (w, h), interpolation=cv2.INTER_LINEAR) - - face_roi = frame[top:bottom, left:right] - - blurred_frame[top:bottom, left:right] = face_roi - black_bg[top:bottom, left:right] = face_roi - - return blurred_frame - +VIDEO_FILE = ["video_temp1.mp4", "video_temp2.mp4"] frame_queue = queue.Queue(maxsize=2) video_queue = queue.Queue(maxsize=10) +ana_video_queue = queue.Queue(maxsize=10) + data_queue = queue.Queue(maxsize=10) show_queue = queue.Queue(maxsize=10) +front_data_queue = queue.Queue(maxsize=10) + +ana_data_queue = queue.Queue(maxsize=10) + stop_event = threading.Event() @@ -164,46 +119,77 @@ def analysis_thread(): if video_queue.full(): video_queue.get_nowait() video_queue.put(result["frame"]) - + if ana_video_queue.full(): + ana_video_queue.get_nowait() + ana_video_queue.put(result["frame"]) payload = { - "id": CAMERA_ID, - "time": time.strftime("%Y-%m-%d %H:%M:%S"), - "name": "", - "ear": "", - "mar": "", - "iris_ratio": "", - "eye_close_freq": "", - "pose": "", - "emo_label": "", - "emo_va": "", - "heart_rate_bpm": "", + "seat_id": CAMERA_ID, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + "heart_rate": 0, + "emo_v": 0, + "emo_a": 0, + "pose_0": 0, #pitch + "pose_1": 0, #yaw + "pose_2": 0, #roll + "ear": 0, + "mar": 0, + "label": "", + "eye_close_freq": 0.0, + "iris_ratio_x": 0, + "iris_ratio_y": 0, } - if result["has_face"] and result["identity"]: + front_data = { + "seat_id": CAMERA_ID, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + "label": "", + "eye_close_freq": 0.0, + "iris_ratio_x": 0, + "iris_ratio_y": 0, + "pose_0": 0, #pitch + "pose_1": 0, #yaw + "pose_2": 0, #roll + "heart_rate": 0, + } + + if result["has_face"]: # and result["identity"]: payload.update( { - "name": result["identity"]["name"], "ear": result["ear"], "mar": result["mar"], - "iris_ratio": result["iris_ratio"], - "pose": result["pose"], - "emo_label": result["emotion_label"], - "emo_va": result["emotion_va"], + "iris_ratio_x": result["iris_ratio"][0], + "iris_ratio_y": result["iris_ratio"][1], + "pose_0": result["pose"][0], + "pose_1": result["pose"][1], + "pose_2": result["pose"][2], + "label": result["emotion_label"], + "emo_v": result["emotion_va"][0], + "emo_a": result["emotion_va"][1], } ) - elif result["has_face"]: - payload.update( + front_data.update( { - "name": "Unknown", - "ear": result["ear"], - "mar": result["mar"], - "iris_ratio": result["iris_ratio"], - "pose": result["pose"], - "emo_label": result["emotion_label"], - "emo_va": result["emotion_va"], + "label": result["emotion_label"], + "iris_ratio_x": result["iris_ratio"][0], + "iris_ratio_y": result["iris_ratio"][1], + "pose_0": result["pose"][0], + "pose_1": result["pose"][1], + "pose_2": result["pose"][2], } ) + # elif result["has_face"]: + # payload.update( + # { + # "name": "Unknown", + # "ear": result["ear"], + # "mar": result["mar"], + # "iris_ratio": result["iris_ratio"], + # "pose": result["pose"], + # "emo_label": result["emotion_label"], + # "emo_va": result["emotion_va"], + # } + # ) if result["has_face"] and result["ear"] < 0.2: if status == 0: @@ -220,18 +206,34 @@ def analysis_thread(): last_time = time.time() result["eye_close_freq"] = last_freq payload["eye_close_freq"] = last_freq + front_data["eye_close_freq"] = last_freq bpm = heart_monitor.process_frame(frame, result["landmark"]) if bpm != None: - result["heart_rate_bpm"] = bpm - payload["heart_rate_bpm"] = bpm + result["heart_rate"] = bpm + payload["heart_rate"] = bpm + front_data["heart_rate"] = bpm if data_queue.full(): try: _ = data_queue.get_nowait() except queue.Empty: pass + if front_data_queue.full(): + try: + _ = front_data_queue.get_nowait() + except queue.Empty: + pass + if ana_data_queue.full(): + try: + _ = ana_data_queue.get_nowait() + except queue.Empty: + pass data_queue.put(payload) + ana_data_queue.put(payload) + + front_data_queue.put(front_data) + show_queue.put((result["frame"], result)) # draw_debug_info(frame, result) # cv2.imshow("Monitor Client", frame) @@ -247,55 +249,58 @@ def video_stream_thread(): server = WebRTCServer(60, 5, "ws://10.128.50.6:5000") server.start() fourcc = cv2.VideoWriter_fourcc(*'avc1') - # jetson-nvenc 编码器 - # ----------------------------------------------------------- - # 1. 定义参数 - filename = 'output.mp4' - fps = 30.0 - width = 1280 - height = 720 +# # jetson-nvenc 编码器 +# # ----------------------------------------------------------- +# # 1. 定义参数 +# filename = 'output.mp4' +# fps = 30.0 +# width = 1280 +# height = 720 - # 2. 构建 GStreamer 软编码管道 - # 核心思路:BGR (OpenCV) -> I420 (YUV) -> x264enc (CPU编码) -> MP4 - gst_pipeline = ( - f"appsrc ! " - f"video/x-raw, format=BGR, width={width}, height={height}, framerate={int(fps)}/1 ! " - f"queue ! " - f"videoconvert ! " - f"video/x-raw, format=I420 ! " # 转换颜色空间给编码器 - f"x264enc speed-preset=ultrafast tune=zerolatency bitrate=2000 ! " # bitrate=2000 即 2Mbps,体积很小 - f"h264parse ! " - f"qtmux ! " - f"filesink location={filename} " - ) +# # 2. 构建 GStreamer 软编码管道 +# # 核心思路:BGR (OpenCV) -> I420 (YUV) -> x264enc (CPU编码) -> MP4 +# gst_pipeline = ( +# f"appsrc ! " +# f"video/x-raw, format=BGR, width={width}, height={height}, framerate={int(fps)}/1 ! " +# f"queue ! " +# f"videoconvert ! " +# f"video/x-raw, format=I420 ! " # 转换颜色空间给编码器 +# f"x264enc speed-preset=ultrafast tune=zerolatency bitrate=2000 ! " # bitrate=2000 即 2Mbps,体积很小 +# f"h264parse ! " +# f"qtmux ! " +# f"filesink location={filename} " +# ) - print(f"[Video] 尝试启动管道: {gst_pipeline}") +# print(f"[Video] 尝试启动管道: {gst_pipeline}") - # 3. 初始化 VideoWriter (必须使用 CAP_GSTREAMER) - out = cv2.VideoWriter(gst_pipeline, cv2.CAP_GSTREAMER, 0, fps, (width, height)) +# # 3. 初始化 VideoWriter (必须使用 CAP_GSTREAMER) +# out = cv2.VideoWriter(gst_pipeline, cv2.CAP_GSTREAMER, 0, fps, (width, height)) - # 4. 严检查 - if not out.isOpened(): - print("❌ [Fatal Error] 视频录制启动失败!") - print("可能原因:") - print("1. 你的 OpenCV 没有开启 GStreamer 支持 (cv2.getBuildInformation() 查看)") - print("2. 系统缺少插件 (尝试 sudo apt install gstreamer1.0-plugins-ugly)") +# # 4. 严检查 +# if not out.isOpened(): +# print("❌ [Fatal Error] 视频录制启动失败!") +# print("可能原因:") +# print("1. 你的 OpenCV 没有开启 GStreamer 支持 (cv2.getBuildInformation() 查看)") +# print("2. 系统缺少插件 (尝试 sudo apt install gstreamer1.0-plugins-ugly)") - # --- 最后的保底:如果上面都挂了,用 MPEG-4 --- - print("⚠️ 正在回退到 mp4v (MPEG-4) 编码...") - fourcc = cv2.VideoWriter_fourcc(*'mp4v') - out = cv2.VideoWriter(filename, fourcc, fps, (width, height)) - else: - print("✅ [Video] H.264 软编码启动成功!视频将保存为 MP4。") +# # --- 最后的保底:如果上面都挂了,用 MPEG-4 --- +# print("⚠️ 正在回退到 mp4v (MPEG-4) 编码...") +# fourcc = cv2.VideoWriter_fourcc(*'mp4v') +# out = cv2.VideoWriter(filename, fourcc, fps, (width, height)) +# else: +# print("✅ [Video] H.264 软编码启动成功!视频将保存为 MP4。") # ----------------------------------------------------------- - # out1 = cv2.VideoWriter('output1.mp4', fourcc, 30.0, (1280, 720)) + out1 = cv2.VideoWriter('output1.mp4', fourcc, 30.0, (1280, 720)) # out2 = cv2.VideoWriter('output2.mp4', fourcc, 30.0, (1280, 720)) while not stop_event.is_set(): try: frame = video_queue.get(timeout=1) server.provide_frame(frame) - out.write(frame) + data = front_data_queue.get(timeout=1) + server.send_data(json.dumps(data)) + + out1.write(frame) # out2.write(frame) except queue.Empty: continue @@ -346,7 +351,7 @@ def video_stream_thread(): # except Exception as e: # print(f"[Video] 重连中... {e}") # time.sleep(3) - out.release() + out1.release() # out2.release() print("[Video] 线程结束") @@ -396,9 +401,9 @@ def data_upload_thread(): pass # 打印日志 - name_info = data["name"] if data["name"] else "NO-FACE" + id_info = data.get("seat_id", "Unknown") print( - f"[Data Upload {i+1}/{BURST_COUNT}] {name_info} | Time:{data['time']}" + f"[Data Upload {i+1}/{BURST_COUNT}] {id_info} | Time:{data['time']}" ) except Exception as e: @@ -413,6 +418,116 @@ def data_upload_thread(): print("[Data] 数据上报线程结束") +def alert_thread(): + last_record_time = time.time() + file_id = 0 + + sleep_time = 0 # 实时 + haqian_time = 0 # 每分钟跟随检测 + heart_spe = 0 # 每分钟跟随检测 + heart_num = 0 + eye_close = False # 每分钟跟随检测 + down_emo_time = 0 # 每五分钟跟随检测 + pianyi_time = 0 # 实时 + no_face_time = 0 # 实时 + + alert_info = { + "sleep_time": "长时间闭眼,存在睡觉可能性", + "haqian_time": "频繁打哈欠,存在疲劳可能性", + "heart_spe": "心率不正常,存在紧张或疲劳可能性", + "eye_close": "频繁闭眼,存在疲劳可能性", + "down_emo_time": "情绪低落,存在不适可能性", + "pianyi_time": "频繁偏头,存在注意力不集中可能性", + "no_face_time": "长时间无人脸,存在离岗或睡岗可能性", + } + + fourcc1 = cv2.VideoWriter_fourcc(*'avc1') + alert_out = cv2.VideoWriter(VIDEO_FILE[file_id], fourcc1, 30.0, (1280, 720)) + alert_time = time.time() + emo_time = time.time() + while not stop_event.is_set(): + frame = ana_video_queue.get(timeout=1) + alert_out.write(frame) + now = time.time() + alert_status = False + alert_st = "" + data = None + if now - alert_time >= 1: + data = ana_data_queue.get(timeout=1) + + # sleep_time + if data["ear"] < 0.2: + sleep_time += 1 + else: + sleep_time = 0 + + # haqian_time + if data["mar"] > 0.95: + haqian_time += 1 + + # heart_spe + heart_spe += data["heart_rate"] + heart_num += 1 + + # eye_close + if data["eye_close_freq"] > 20: + eye_close = True + + # down_emo_time + if data["label"] in ["sad", "bored", "sleepy", "angry", "annoying"]: + down_emo_time += 1 + + # pianyi_time + if data["pose_0"] > 25 or data["pose_0"] < -10 or data["pose_1"] > 50 or data["pose_1"] < -50: + pianyi_time += 1 + + # no_face_time + if data["label"] == "": + no_face_time += 1 + + alert_time = now + + if sleep_time >= 20: + alert_status = True + alert_st += alert_info["sleep_time"] + "; " + sleep_time = 0 + if pianyi_time >= 20: + alert_status = True + alert_st += alert_info["pianyi_time"] + "; " + pianyi_time = 0 + if no_face_time >= 60: + alert_status = True + alert_st += alert_info["no_face_time"] + "; " + no_face_time = 0 + + if now - emo_time >= 300: + if down_emo_time > 150: + alert_status = True + alert_st += alert_info["down_emo_time"] + "; " + emo_time = now + down_emo_time = 0 + + if now - last_record_time >= 60: + file_id^= 1 + alert_out.release() + heart_spe = heart_spe // heart_num if heart_num != 0 else 0 + if haqian_time > 5: + alert_status = True + alert_st += alert_info["haqian_time"] + ";" + haqian_time = 0 + if heart_spe < 60 or heart_spe > 120: + alert_status = True + alert_st += alert_info["heart_spe"] + ";" + heart_spe = 0 + if eye_close: + alert_status = True + alert_st += alert_info["eye_close"] + ";" + eye_close = False + #TODO: 发送警报 + alert_out = cv2.VideoWriter(VIDEO_FILE[file_id], fourcc1, 30.0, (1280, 720)) + last_record_time = now + + def draw_debug_info(frame, result): """在画面上画出即时数据""" if not result["has_face"]: diff --git a/reproject/webrtc_server.py b/reproject/webrtc_server.py index 31818dc..58a2427 100644 --- a/reproject/webrtc_server.py +++ b/reproject/webrtc_server.py @@ -5,7 +5,9 @@ import socketio import numpy as np import cv2 import av +import os +from collections import deque from aiortc import ( MediaStreamError, RTCConfiguration, @@ -14,13 +16,74 @@ from aiortc import ( VideoStreamTrack, ) from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE +from dotenv import load_dotenv + +load_dotenv() + +WEBRTC_SERVER_URL = os.getenv("WEBRTC_SERVER_URL", "ws://localhost:5000") +WEBRTC_DATA_HISTORY_MAXLEN = int(os.getenv("WEBRTC_DATA_HISTORY_MAXLEN", 200)) + + +class RingBuffer: + def __init__(self, maxlen): + self._buf = deque(maxlen=maxlen) + + def append(self, item): + self._buf.append(item) + + def snapshot(self): + return list(self._buf) + + +class MonitoringHub: + def __init__(self, max_history=WEBRTC_DATA_HISTORY_MAXLEN): + self._history = RingBuffer(maxlen=max_history) + self._channels = set() + self._queue = asyncio.Queue() + self._task = None + + async def start(self): + if self._task is None: + self._task = asyncio.create_task(self._broadcast_loop()) + + async def _broadcast_loop(self): + while True: + data = await self._queue.get() + dead = [] + for ch in list(self._channels): + if ch.readyState != "open": + dead.append(ch) + continue + try: + ch.send(data) + except Exception: + dead.append(ch) + for ch in dead: + self._channels.discard(ch) + + def register_channel(self, channel): + # 初次建立时补发历史 + print(f"Channel {channel.label} opened, sending history") + self._channels.add(channel) + for item in self._history.snapshot(): + try: + channel.send(item) + except Exception: + self._channels.discard(channel) + break + + def send_data(self, data): + str_data = str(data) + self._history.append(str_data) + self._queue.put_nowait(str_data) class WebRTCServer: - def __init__(self, fps, seat, server="ws://localhost:5000"): + def __init__(self, fps, seat, server=WEBRTC_SERVER_URL): self.pcs = set() self.fps = fps self.frameContainer = [None] + self.hub = MonitoringHub() self.background_loop = asyncio.new_event_loop() @@ -39,6 +102,7 @@ class WebRTCServer: loop.run_forever() async def _websocket_start(self): + await self.hub.start() sio = socketio.AsyncClient() @sio.event @@ -87,6 +151,13 @@ class WebRTCServer: ) pc.addTrack(VideoFrameTrack(self.fps, self.frameContainer)) + dc = pc.createDataChannel("monitoring") + + @dc.on("open") + def on_open(): + print("Monitoring data channel opened") + self.hub.register_channel(dc) + answer = await pc.createAnswer() await pc.setLocalDescription(answer) print(f"Handle offer in {(time.time() - start)*1000:.2f}ms") @@ -98,6 +169,9 @@ class WebRTCServer: def provide_frame(self, frame): self.frameContainer[0] = frame + def send_data(self, data): + self.hub.send_data(data) + def stop(self): if self.background_loop.is_running(): self.background_loop.call_soon_threadsafe(self.background_loop.stop) diff --git a/reproject/角度说明.png b/reproject/角度说明.png new file mode 100644 index 0000000..d59f728 Binary files /dev/null and b/reproject/角度说明.png differ