Files
cv_state_ana/reproject/main.py
2026-02-07 17:01:37 +08:00

516 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from calendar import c
import cv2
import threading
import time
import queue
import socket
import json
import urllib.request
import struct
import numpy as np
import mediapipe as mp
from analyzer import MonitorSystem
from webrtc_server import WebRTCServer
from HeartRateMonitor import HeartRateMonitor
SERVER_HOST = "10.128.50.6"
SERVER_PORT = 65432
API_URL = "http://10.128.50.6:5000/api/states"
CAMERA_ID = "23373333"
BASIC_FACE_DB = {
"Zhihang": {"name": "Zhihang Deng", "age": 20, "image-path": "zhihang.png"},
"Yaoyu": {"name": "Yaoyu Zhang", "age": 20, "image-path": "yaoyu.jpg"},
}
mp_face_mesh_main = mp.solutions.face_mesh
face_mesh_main = mp_face_mesh_main.FaceMesh(
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5,
)
def apply_soft_roi(frame):
h, w = frame.shape[:2]
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
black_bg = np.zeros((h, w, 3), dtype=np.uint8)
results = face_mesh_main.process(rgb_frame)
if not results.multi_face_landmarks:
return frame
landmarks = results.multi_face_landmarks[0].landmark
xs = [l.x for l in landmarks]
ys = [l.y for l in landmarks]
# 计算人脸框
face_loc = (
int(min(ys) * h - 0.1 * h),
int(max(xs) * w + 0.1 * w),
int(max(ys) * h + 0.1 * h),
int(min(xs) * w - 0.1 * w),
)
pad = 30
face_loc = (
max(0, face_loc[0] - pad),
min(w, face_loc[1] + pad),
min(h, face_loc[2] + pad),
max(0, face_loc[3] - pad),
)
top = face_loc[0]
right = face_loc[1]
bottom = face_loc[2]
left = face_loc[3]
scale_factor = 10
small_bg = cv2.resize(
frame, (w // scale_factor, h // scale_factor), interpolation=cv2.INTER_LINEAR
)
# 使用 INTER_NEAREST 马赛克效果
# 使用 INTER_LINEAR 毛玻璃模糊效果
blurred_frame = cv2.resize(small_bg, (w, h), interpolation=cv2.INTER_LINEAR)
face_roi = frame[top:bottom, left:right]
blurred_frame[top:bottom, left:right] = face_roi
black_bg[top:bottom, left:right] = face_roi
return blurred_frame
frame_queue = queue.Queue(maxsize=2)
video_queue = queue.Queue(maxsize=10)
data_queue = queue.Queue(maxsize=10)
show_queue = queue.Queue(maxsize=10)
stop_event = threading.Event()
def capture_thread():
"""
采集线程:优化了分发逻辑,对视频流进行降频处理
"""
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
print("[Capture] 摄像头启动...")
frame_count = 0
last_time = time.time()
while not stop_event.is_set():
ret, frame = cap.read()
if not ret:
break
if not frame_queue.full():
frame_queue.put(frame)
else:
try:
frame_queue.get_nowait()
frame_queue.put(frame)
except queue.Empty:
pass
# try:
# if video_queue.full():
# video_queue.get_nowait()
# video_queue.put(frame)
# except:
# pass
frame_count += 1
# time.sleep(1 / 30)
# current_time = time.time()
# if current_time - last_time >= 1.0:
# print(f"[Capture] FPS: {frame_count}")
# frame_count = 0
# last_time = current_time
# print(current_time - last_time)
# last_time = current_time
cap.release()
print("[Capture] 线程结束")
def analysis_thread():
"""
核心分析线程:
1. 即使无人脸也发送状态(字段为空字符串)。
2. 队列满时丢弃旧数据,保证数据实时性。
"""
monitor = MonitorSystem(BASIC_FACE_DB)
print("[Analysis] 分析系统启动...")
freq = 0
gap = 60
status = 0 # 0:open 1:close
last_time = time.time()
last_freq = 0
heart_monitor = HeartRateMonitor()
while not stop_event.is_set():
try:
frame = frame_queue.get(timeout=1)
except queue.Empty:
continue
# 核心分析
result = monitor.process_frame(frame)
result["eye_close_freq"] = 0
result["heart_rate_bpm"] = 0
if video_queue.full():
video_queue.get_nowait()
video_queue.put(result["frame"])
payload = {
"id": CAMERA_ID,
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
"name": "",
"ear": "",
"mar": "",
"iris_ratio": "",
"eye_close_freq": "",
"pose": "",
"emo_label": "",
"emo_va": "",
"heart_rate_bpm": "",
}
if result["has_face"] and result["identity"]:
payload.update(
{
"name": result["identity"]["name"],
"ear": result["ear"],
"mar": result["mar"],
"iris_ratio": result["iris_ratio"],
"pose": result["pose"],
"emo_label": result["emotion_label"],
"emo_va": result["emotion_va"],
}
)
elif result["has_face"]:
payload.update(
{
"name": "Unknown",
"ear": result["ear"],
"mar": result["mar"],
"iris_ratio": result["iris_ratio"],
"pose": result["pose"],
"emo_label": result["emotion_label"],
"emo_va": result["emotion_va"],
}
)
if result["has_face"] and result["ear"] < 0.2:
if status == 0:
freq += 1
status = 1
elif result["has_face"] and result["ear"] >= 0.2:
if status == 1:
freq += 1
status = 0
if time.time() - last_time >= gap:
last_freq = freq / 2
freq = 0
last_time = time.time()
result["eye_close_freq"] = last_freq
payload["eye_close_freq"] = last_freq
bpm = heart_monitor.process_frame(frame, result["landmark"])
if bpm != None:
result["heart_rate_bpm"] = bpm
payload["heart_rate_bpm"] = bpm
if data_queue.full():
try:
_ = data_queue.get_nowait()
except queue.Empty:
pass
data_queue.put(payload)
show_queue.put((result["frame"], result))
# draw_debug_info(frame, result)
# cv2.imshow("Monitor Client", frame)
print("[Analysis] 分析线程结束")
def video_stream_thread():
"""
发送线程:优化了 Socket 设置和压缩参数
"""
print(f"[Video] 准备连接服务器 {SERVER_HOST}:{SERVER_PORT} ...")
server = WebRTCServer(60, 5, "ws://10.128.50.6:5000")
server.start()
fourcc = cv2.VideoWriter_fourcc(*'avc1')
# jetson-nvenc 编码器
width = 1280
height = 720
fps = 30
bitrate = 2000000 # 建议设为 2Mbps (2000000) 或 4Mbps1Mbps 对 720p 来说有点糊
filename = "output.mp4"
gst_pipeline = (
f"appsrc ! "
f"video/x-raw, format=BGR, width={width}, height={height}, framerate={fps}/1 ! " # 1. 明确声明输入参数
f"queue ! "
f"videoconvert ! " # 2. CPU转换 (BGR -> BGRx)
f"video/x-raw, format=BGRx ! " # nvvidconv 对 BGRx 支持比 BGR 好
f"nvvidconv ! " # 3. 硬件转换 (搬运到 NVMM)
f"video/x-raw(memory:NVMM), format=NV12 ! " # 编码器只吃 NV12 格式的 NVMM 数据
f"nvv4l2h264enc bitrate={bitrate} control-rate=1 profile=High ! "
f"h264parse ! "
f"qtmux ! "
f"filesink location={filename} "
)
# 注意cv2.CAP_GSTREAMER 是必须的
out = cv2.VideoWriter(gst_pipeline, cv2.CAP_GSTREAMER, 0, float(fps), (width, height))
# out1 = cv2.VideoWriter('output1.mp4', fourcc, 30.0, (1280, 720))
# out2 = cv2.VideoWriter('output2.mp4', fourcc, 30.0, (1280, 720))
while not stop_event.is_set():
try:
frame = video_queue.get(timeout=1)
server.provide_frame(frame)
out.write(frame)
# out2.write(frame)
except queue.Empty:
continue
except Exception as e:
print(f"[Video] 发送错误: {e}")
continue
# while not stop_event.is_set():
# try:
# with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# s.connect((SERVER_HOST, SERVER_PORT))
# print(f"[Video] 已连接")
# camera_id_bytes = CAMERA_ID.encode("utf-8")
# while not stop_event.is_set():
# try:
# frame = video_queue.get(timeout=1)
# small_frame = cv2.resize(apply_soft_roi(frame), (1280, 720))
# ret, buffer = cv2.imencode(
# ".jpg", small_frame, [cv2.IMWRITE_JPEG_QUALITY, 50]
# )
# if not ret:
# continue
# frame_bytes = buffer.tobytes()
# header_id_len = len(camera_id_bytes).to_bytes(4, "big")
# header_frame_len = len(frame_bytes).to_bytes(4, "big")
# packet = (
# header_id_len
# + camera_id_bytes
# + header_frame_len
# + frame_bytes
# )
# s.sendall(packet)
# except queue.Empty:
# continue
# except Exception as e:
# print(f"[Video] 发送断开: {e}")
# break
# except Exception as e:
# print(f"[Video] 重连中... {e}")
# time.sleep(3)
out.release()
# out2.release()
print("[Video] 线程结束")
def data_upload_thread():
"""
周期性爆发模式
逻辑:每隔 30 秒,连续发送 5 次数据(间隔 1 秒)。
由于 analysis_thread 保证了队列里总是最新数据,这里取到的就是实时状态。
"""
print("[Data] 数据上报线程启动 (周期模式: 休眠30s -> 连发5次)")
LONG_SLEEP = 30
BURST_COUNT = 5
BURST_GAP = 1
while not stop_event.is_set():
# --- 阶段 1: 长休眠 (30秒) ---
if stop_event.wait(LONG_SLEEP):
break
# --- 阶段 2: 爆发发送 (5次) ---
print(f"[Data] 开始上报周期 (连发 {BURST_COUNT} 次)...")
try:
while not data_queue.empty():
data_queue.get_nowait()
except queue.Empty:
pass
time.sleep(0.1)
for i in range(BURST_COUNT):
if stop_event.is_set():
break
try:
data = data_queue.get(timeout=1.5)
try:
req = urllib.request.Request(
url=API_URL,
data=json.dumps(data).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=2) as resp:
pass
# 打印日志
name_info = data["name"] if data["name"] else "NO-FACE"
print(
f"[Data Upload {i+1}/{BURST_COUNT}] {name_info} | Time:{data['time']}"
)
except Exception as e:
print(f"[Data] Upload Error: {e}")
except queue.Empty:
print(f"[Data] 队列为空,跳过第 {i+1} 次发送")
if i < BURST_COUNT - 1:
stop_event.wait(BURST_GAP)
print("[Data] 数据上报线程结束")
def draw_debug_info(frame, result):
"""在画面上画出即时数据"""
if not result["has_face"]:
cv2.putText(
frame, "NO FACE", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2
)
return
# 显示身份
id_text = result["identity"]["name"] if result["identity"] else "Unknown"
color = (0, 255, 0) if result["identity"] else (0, 255, 255)
cv2.putText(
frame, f"User: {id_text}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2
)
# 显示数据
cv2.putText(
frame,
f"EAR: {result['ear']}",
(20, 70),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 0),
1,
)
cv2.putText(
frame,
f"MAR: {result['mar']}",
(20, 95),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 0),
1,
)
cv2.putText(
frame,
f"Iris Ratio: {result['iris_ratio']}",
(20, 190),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 0),
1,
)
cv2.putText(
frame,
f"Eye Close Freq: {result['eye_close_freq']}",
(20, 170),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 0, 255),
1,
)
cv2.putText(
frame,
f"Heart Rate BPM: {result['heart_rate_bpm']}",
(20, 210),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 165, 255),
1,
)
if result["ear"] < 0.15:
cv2.putText(
frame, "EYE CLOSE", (250, 250), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2
)
p, y, r = result["pose"]
cv2.putText(
frame,
f"Pose: P{p} Y{y} R{r}",
(20, 120),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 255),
1,
)
emo = result.get("emotion_label", "N/A")
va = result.get("emotion_va", (0, 0))
# 显示格式: Emo: happy (-0.5, 0.2)
emo_text = f"Emo: {emo} ({va[0]:.2f}, {va[1]:.2f})"
cv2.putText(
frame, emo_text, (20, 145), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 1
)
if __name__ == "__main__":
t1 = threading.Thread(target=capture_thread, daemon=True)
t2 = threading.Thread(target=analysis_thread, daemon=True)
t3 = threading.Thread(target=video_stream_thread, daemon=True)
t4 = threading.Thread(target=data_upload_thread, daemon=True)
t1.start()
t2.start()
t3.start()
t4.start()
try:
while not stop_event.is_set():
try:
frame, result = show_queue.get(timeout=1)
except queue.Empty:
continue
# frame = apply_soft_roi(frame)
display_frame = frame.copy()
draw_debug_info(display_frame, result)
cv2.imshow("Monitor Client", display_frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
stop_event.set()
# time.sleep(1)
cv2.destroyAllWindows()
except KeyboardInterrupt:
print("停止程序...")
stop_event.set()
t1.join()
t2.join()
t3.join()
t4.join()