Files
cv_state_ana/reproject/main.py
邓智航 17d0fd920a test2
2026-02-07 22:03:29 +08:00

536 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from calendar import c
import cv2
import threading
import time
import queue
import socket
import json
import urllib.request
import struct
import numpy as np
import mediapipe as mp
from analyzer import MonitorSystem
from webrtc_server import WebRTCServer
from HeartRateMonitor import HeartRateMonitor
SERVER_HOST = "10.128.50.6"
SERVER_PORT = 65432
API_URL = "http://10.128.50.6:5000/api/states"
CAMERA_ID = "23373333"
BASIC_FACE_DB = {
"Zhihang": {"name": "Zhihang Deng", "age": 20, "image-path": "zhihang.png"},
"Yaoyu": {"name": "Yaoyu Zhang", "age": 20, "image-path": "yaoyu.jpg"},
}
mp_face_mesh_main = mp.solutions.face_mesh
face_mesh_main = mp_face_mesh_main.FaceMesh(
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
def apply_soft_roi(frame):
h, w = frame.shape[:2]
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
black_bg = np.zeros((h, w, 3), dtype=np.uint8)
results = face_mesh_main.process(rgb_frame)
if not results.multi_face_landmarks:
return frame
landmarks = results.multi_face_landmarks[0].landmark
xs = [l.x for l in landmarks]
ys = [l.y for l in landmarks]
# 计算人脸框
face_loc = (
int(min(ys) * h - 0.1 * h),
int(max(xs) * w + 0.1 * w),
int(max(ys) * h + 0.1 * h),
int(min(xs) * w - 0.1 * w),
)
pad = 30
face_loc = (
max(0, face_loc[0] - pad),
min(w, face_loc[1] + pad),
min(h, face_loc[2] + pad),
max(0, face_loc[3] - pad),
)
top = face_loc[0]
right = face_loc[1]
bottom = face_loc[2]
left = face_loc[3]
scale_factor = 10
small_bg = cv2.resize(
frame, (w // scale_factor, h // scale_factor), interpolation=cv2.INTER_LINEAR
)
# 使用 INTER_NEAREST 马赛克效果
# 使用 INTER_LINEAR 毛玻璃模糊效果
blurred_frame = cv2.resize(small_bg, (w, h), interpolation=cv2.INTER_LINEAR)
face_roi = frame[top:bottom, left:right]
blurred_frame[top:bottom, left:right] = face_roi
black_bg[top:bottom, left:right] = face_roi
return blurred_frame
frame_queue = queue.Queue(maxsize=2)
video_queue = queue.Queue(maxsize=10)
data_queue = queue.Queue(maxsize=10)
show_queue = queue.Queue(maxsize=10)
stop_event = threading.Event()
def capture_thread():
"""
采集线程:优化了分发逻辑,对视频流进行降频处理
"""
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
print("[Capture] 摄像头启动...")
frame_count = 0
last_time = time.time()
while not stop_event.is_set():
ret, frame = cap.read()
if not ret:
break
if not frame_queue.full():
frame_queue.put(frame)
else:
try:
frame_queue.get_nowait()
frame_queue.put(frame)
except queue.Empty:
pass
# try:
# if video_queue.full():
# video_queue.get_nowait()
# video_queue.put(frame)
# except:
# pass
frame_count += 1
# time.sleep(1 / 30)
# current_time = time.time()
# if current_time - last_time >= 1.0:
# print(f"[Capture] FPS: {frame_count}")
# frame_count = 0
# last_time = current_time
# print(current_time - last_time)
# last_time = current_time
cap.release()
print("[Capture] 线程结束")
def analysis_thread():
"""
核心分析线程:
1. 即使无人脸也发送状态(字段为空字符串)。
2. 队列满时丢弃旧数据,保证数据实时性。
"""
monitor = MonitorSystem(BASIC_FACE_DB)
print("[Analysis] 分析系统启动...")
freq = 0
gap = 60
status = 0 # 0:open 1:close
last_time = time.time()
last_freq = 0
heart_monitor = HeartRateMonitor()
while not stop_event.is_set():
try:
frame = frame_queue.get(timeout=1)
except queue.Empty:
continue
# 核心分析
result = monitor.process_frame(frame)
result["eye_close_freq"] = 0
result["heart_rate_bpm"] = 0
if video_queue.full():
video_queue.get_nowait()
video_queue.put(result["frame"])
payload = {
"id": CAMERA_ID,
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
"name": "",
"ear": "",
"mar": "",
"iris_ratio": "",
"eye_close_freq": "",
"pose": "",
"emo_label": "",
"emo_va": "",
"heart_rate_bpm": "",
}
if result["has_face"] and result["identity"]:
payload.update(
{
"name": result["identity"]["name"],
"ear": result["ear"],
"mar": result["mar"],
"iris_ratio": result["iris_ratio"],
"pose": result["pose"],
"emo_label": result["emotion_label"],
"emo_va": result["emotion_va"],
}
)
elif result["has_face"]:
payload.update(
{
"name": "Unknown",
"ear": result["ear"],
"mar": result["mar"],
"iris_ratio": result["iris_ratio"],
"pose": result["pose"],
"emo_label": result["emotion_label"],
"emo_va": result["emotion_va"],
}
)
if result["has_face"] and result["ear"] < 0.2:
if status == 0:
freq += 1
status = 1
elif result["has_face"] and result["ear"] >= 0.2:
if status == 1:
freq += 1
status = 0
if time.time() - last_time >= gap:
last_freq = freq / 2
freq = 0
last_time = time.time()
result["eye_close_freq"] = last_freq
payload["eye_close_freq"] = last_freq
bpm = heart_monitor.process_frame(frame, result["landmark"])
if bpm != None:
result["heart_rate_bpm"] = bpm
payload["heart_rate_bpm"] = bpm
if data_queue.full():
try:
_ = data_queue.get_nowait()
except queue.Empty:
pass
data_queue.put(payload)
show_queue.put((result["frame"], result))
# draw_debug_info(frame, result)
# cv2.imshow("Monitor Client", frame)
print("[Analysis] 分析线程结束")
def video_stream_thread():
"""
发送线程:优化了 Socket 设置和压缩参数
"""
print(f"[Video] 准备连接服务器 {SERVER_HOST}:{SERVER_PORT} ...")
server = WebRTCServer(60, 5, "ws://10.128.50.6:5000")
server.start()
fourcc = cv2.VideoWriter_fourcc(*'avc1')
# jetson-nvenc 编码器
# -----------------------------------------------------------
# 1. 定义参数
filename = 'output.mp4'
fps = 30.0
width = 1280
height = 720
# 2. 构建 GStreamer 软编码管道
# 核心思路BGR (OpenCV) -> I420 (YUV) -> x264enc (CPU编码) -> MP4
gst_pipeline = (
f"appsrc ! "
f"video/x-raw, format=BGR, width={width}, height={height}, framerate={int(fps)}/1 ! "
f"queue ! "
f"videoconvert ! "
f"video/x-raw, format=I420 ! " # 转换颜色空间给编码器
f"x264enc speed-preset=ultrafast tune=zerolatency bitrate=2000 ! " # bitrate=2000 即 2Mbps体积很小
f"h264parse ! "
f"qtmux ! "
f"filesink location={filename} "
)
print(f"[Video] 尝试启动管道: {gst_pipeline}")
# 3. 初始化 VideoWriter (必须使用 CAP_GSTREAMER)
out = cv2.VideoWriter(gst_pipeline, cv2.CAP_GSTREAMER, 0, fps, (width, height))
# 4. 严检查
if not out.isOpened():
print("❌ [Fatal Error] 视频录制启动失败!")
print("可能原因:")
print("1. 你的 OpenCV 没有开启 GStreamer 支持 (cv2.getBuildInformation() 查看)")
print("2. 系统缺少插件 (尝试 sudo apt install gstreamer1.0-plugins-ugly)")
# --- 最后的保底:如果上面都挂了,用 MPEG-4 ---
print("⚠️ 正在回退到 mp4v (MPEG-4) 编码...")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(filename, fourcc, fps, (width, height))
else:
print("✅ [Video] H.264 软编码启动成功!视频将保存为 MP4。")
# -----------------------------------------------------------
# out1 = cv2.VideoWriter('output1.mp4', fourcc, 30.0, (1280, 720))
# out2 = cv2.VideoWriter('output2.mp4', fourcc, 30.0, (1280, 720))
while not stop_event.is_set():
try:
frame = video_queue.get(timeout=1)
server.provide_frame(frame)
out.write(frame)
# out2.write(frame)
except queue.Empty:
continue
except Exception as e:
print(f"[Video] 发送错误: {e}")
continue
# while not stop_event.is_set():
# try:
# with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# s.connect((SERVER_HOST, SERVER_PORT))
# print(f"[Video] 已连接")
# camera_id_bytes = CAMERA_ID.encode("utf-8")
# while not stop_event.is_set():
# try:
# frame = video_queue.get(timeout=1)
# small_frame = cv2.resize(apply_soft_roi(frame), (1280, 720))
# ret, buffer = cv2.imencode(
# ".jpg", small_frame, [cv2.IMWRITE_JPEG_QUALITY, 50]
# )
# if not ret:
# continue
# frame_bytes = buffer.tobytes()
# header_id_len = len(camera_id_bytes).to_bytes(4, "big")
# header_frame_len = len(frame_bytes).to_bytes(4, "big")
# packet = (
# header_id_len
# + camera_id_bytes
# + header_frame_len
# + frame_bytes
# )
# s.sendall(packet)
# except queue.Empty:
# continue
# except Exception as e:
# print(f"[Video] 发送断开: {e}")
# break
# except Exception as e:
# print(f"[Video] 重连中... {e}")
# time.sleep(3)
out.release()
# out2.release()
print("[Video] 线程结束")
def data_upload_thread():
"""
周期性爆发模式
逻辑:每隔 30 秒,连续发送 5 次数据(间隔 1 秒)。
由于 analysis_thread 保证了队列里总是最新数据,这里取到的就是实时状态。
"""
print("[Data] 数据上报线程启动 (周期模式: 休眠30s -> 连发5次)")
LONG_SLEEP = 30
BURST_COUNT = 5
BURST_GAP = 1
while not stop_event.is_set():
# --- 阶段 1: 长休眠 (30秒) ---
if stop_event.wait(LONG_SLEEP):
break
# --- 阶段 2: 爆发发送 (5次) ---
print(f"[Data] 开始上报周期 (连发 {BURST_COUNT} 次)...")
try:
while not data_queue.empty():
data_queue.get_nowait()
except queue.Empty:
pass
time.sleep(0.1)
for i in range(BURST_COUNT):
if stop_event.is_set():
break
try:
data = data_queue.get(timeout=1.5)
try:
req = urllib.request.Request(
url=API_URL,
data=json.dumps(data).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=2) as resp:
pass
# 打印日志
name_info = data["name"] if data["name"] else "NO-FACE"
print(
f"[Data Upload {i+1}/{BURST_COUNT}] {name_info} | Time:{data['time']}"
)
except Exception as e:
print(f"[Data] Upload Error: {e}")
except queue.Empty:
print(f"[Data] 队列为空,跳过第 {i+1} 次发送")
if i < BURST_COUNT - 1:
stop_event.wait(BURST_GAP)
print("[Data] 数据上报线程结束")
def draw_debug_info(frame, result):
"""在画面上画出即时数据"""
if not result["has_face"]:
cv2.putText(
frame, "NO FACE", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2
)
return
# 显示身份
id_text = result["identity"]["name"] if result["identity"] else "Unknown"
color = (0, 255, 0) if result["identity"] else (0, 255, 255)
cv2.putText(
frame, f"User: {id_text}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2
)
# 显示数据
cv2.putText(
frame,
f"EAR: {result['ear']}",
(20, 70),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 0),
1,
)
cv2.putText(
frame,
f"MAR: {result['mar']}",
(20, 95),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 0),
1,
)
cv2.putText(
frame,
f"Iris Ratio: {result['iris_ratio']}",
(20, 190),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 0),
1,
)
cv2.putText(
frame,
f"Eye Close Freq: {result['eye_close_freq']}",
(20, 170),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 0, 255),
1,
)
cv2.putText(
frame,
f"Heart Rate BPM: {result['heart_rate_bpm']}",
(20, 210),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 165, 255),
1,
)
if result["ear"] < 0.15:
cv2.putText(
frame, "EYE CLOSE", (250, 250), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2
)
p, y, r = result["pose"]
cv2.putText(
frame,
f"Pose: P{p} Y{y} R{r}",
(20, 120),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 255),
1,
)
emo = result.get("emotion_label", "N/A")
va = result.get("emotion_va", (0, 0))
# 显示格式: Emo: happy (-0.5, 0.2)
emo_text = f"Emo: {emo} ({va[0]:.2f}, {va[1]:.2f})"
cv2.putText(
frame, emo_text, (20, 145), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 1
)
if __name__ == "__main__":
t1 = threading.Thread(target=capture_thread, daemon=True)
t2 = threading.Thread(target=analysis_thread, daemon=True)
t3 = threading.Thread(target=video_stream_thread, daemon=True)
t4 = threading.Thread(target=data_upload_thread, daemon=True)
t1.start()
t2.start()
t3.start()
t4.start()
try:
while not stop_event.is_set():
try:
frame, result = show_queue.get(timeout=1)
except queue.Empty:
continue
# frame = apply_soft_roi(frame)
display_frame = frame.copy()
draw_debug_info(display_frame, result)
cv2.imshow("Monitor Client", display_frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
stop_event.set()
# time.sleep(1)
cv2.destroyAllWindows()
except KeyboardInterrupt:
print("停止程序...")
stop_event.set()
t1.join()
t2.join()
t3.join()
t4.join()