From fd4d180eb10a9f4eccf44e844054894d6d7ed4a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=93=E6=99=BA=E8=88=AA?= <23373333@buaa.edu.cn> Date: Sat, 28 Feb 2026 18:11:03 +0800 Subject: [PATCH] alert finish --- .gitignore | 5 +- reproject/alert.py | 138 +++++++++++++++++++++++++++++++++++++ reproject/main.py | 69 +++++++++++++------ reproject/webrtc_server.py | 29 +++++++- 4 files changed, 217 insertions(+), 24 deletions(-) create mode 100644 reproject/alert.py diff --git a/.gitignore b/.gitignore index 496ee2c..7c26727 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -.DS_Store \ No newline at end of file +.DS_Store +/* +!/reproject/ +!/.gitignore \ No newline at end of file diff --git a/reproject/alert.py b/reproject/alert.py new file mode 100644 index 0000000..0a9fa2d --- /dev/null +++ b/reproject/alert.py @@ -0,0 +1,138 @@ + + +import os +import queue +import threading + +import av + +ALERT_QUEUE_MAXSIZE = 2000 + + +class Alert: + def __init__(self, name, client): + self.name = name + self.client = client + self.frame_count = 0 + self.frame_queue = queue.Queue(maxsize=ALERT_QUEUE_MAXSIZE) + self.encode_thread = None + self.upload_thread = None + self.stop_event = threading.Event() + self.dropped_frames = 0 + + def start(self, width=1920, height=1080, fps=30): + print(f"Starting alert with {width}x{height}") + read_fd, write_fd = os.pipe() + self.read_pipe = os.fdopen(read_fd, "rb", buffering=0) + self.write_pipe = os.fdopen(write_fd, "wb", buffering=0) + + def _upload() -> None: + try: + print(f"Upload thread starting for {self.name}") + self.client.put_object( + "atc", + self.name, + self.read_pipe, + length=-1, + part_size=10 * 1024 * 1024, + content_type="video/mp4", + ) + print(f"Upload completed for {self.name}") + except Exception as e: + print(f"Upload error: {e}") + + self.upload_thread = threading.Thread(target=_upload, daemon=False) + self.upload_thread.start() + + self.container = av.open( + self.write_pipe, + mode="w", + format="mp4", + options={"movflags": "frag_keyframe+empty_moov+default_base_moof"}, + ) + self.stream = self.container.add_stream("libx264", rate=fps) + self.stream.width = width + self.stream.height = height + self.stream.pix_fmt = "yuv420p" + # 使用更快的编码预设 + # self.stream.options = {"preset": "ultrafast", "crf": "23"} + print("AV container and stream initialized") + + # 启动编码线程 + def _encode() -> None: + try: + print("Encode thread starting") + while not self.stop_event.is_set() or not self.frame_queue.empty(): + try: + frame = self.frame_queue.get(timeout=0.1) + if frame is None: + break + av_frame = av.VideoFrame.from_ndarray(frame, format="bgr24") + for packet in self.stream.encode(av_frame): + self.container.mux(packet) + self.frame_count += 1 + except queue.Empty: + continue + except Exception as e: + print(f"Error encoding frame {self.frame_count}: {e}") + print(f"Encode thread finished, encoded {self.frame_count} frames") + except Exception as e: + print(f"Encode thread error: {e}") + + self.encode_thread = threading.Thread(target=_encode, daemon=False) + self.encode_thread.start() + + def provide_frame(self, frame): + try: + self.frame_queue.put(frame, block=True, timeout=0.05) + except queue.Full: + self.dropped_frames += 1 + # 每50帧打印一次 + if self.dropped_frames % 50 == 1: + print( + f"Warning: Frame queue full, dropped {self.dropped_frames} frames so far" + ) + + def end(self): + print( + f"Stopping alert, queued frames: {self.frame_queue.qsize()}, dropped frames: {self.dropped_frames}" + ) + self.frame_queue.put(None) + self.stop_event.set() + + # 等待编码线程完成 + if self.encode_thread: + self.encode_thread.join(timeout=30) + if self.encode_thread.is_alive(): + print("Warning: Encode thread still running after timeout") + else: + print(f"Encode thread completed with {self.frame_count} frames") + + # 完成编码,flush所有待处理的数据 + try: + for packet in self.stream.encode(): + self.container.mux(packet) + except Exception as e: + print(f"Error flushing encoder: {e}") + + # 关闭容器,确保所有数据已写入pipe + try: + self.container.close() + print("Container closed successfully") + except Exception as e: + print(f"Error closing container: {e}") + + # 关闭写端,通知上传线程EOF + self.write_pipe.close() + print("Write pipe closed, waiting for upload thread...") + + # 等待上传线程完成(最多30秒) + if self.upload_thread: + self.upload_thread.join(timeout=30) + if self.upload_thread.is_alive(): + print("Warning: Upload thread still running after timeout") + else: + print("Upload thread completed") + + # 关闭读端 + self.read_pipe.close() diff --git a/reproject/main.py b/reproject/main.py index fb87e4b..6142c62 100644 --- a/reproject/main.py +++ b/reproject/main.py @@ -14,9 +14,7 @@ from analyzer import MonitorSystem from webrtc_server import WebRTCServer from HeartRateMonitor import HeartRateMonitor -SERVER_HOST = "10.128.50.6" -SERVER_PORT = 65432 -API_URL = "http://10.128.50.6:5000/api/states" +API_URL = "http://10.128.48.48:5000/api/states" CAMERA_ID = 5 BASIC_FACE_DB = { @@ -122,10 +120,11 @@ def analysis_thread(): if ana_video_queue.full(): ana_video_queue.get_nowait() ana_video_queue.put(result["frame"]) + # print(f"[Analysis] {time.strftime('%Y-%m-%d %H:%M:%S')} - Frame processed") payload = { "seat_id": CAMERA_ID, - "timestamp": int(time.time()), + "timestamp": time.time(), "heart_rate": 0, "emo_v": 0, "emo_a": 0, @@ -142,7 +141,7 @@ def analysis_thread(): front_data = { "seat_id": CAMERA_ID, - "timestamp": int(time.time()), + "timestamp": time.time(), "label": "", "eye_close_freq": 0.0, "iris_ratio_x": 0, @@ -241,13 +240,10 @@ def analysis_thread(): print("[Analysis] 分析线程结束") -def video_stream_thread(): +def video_stream_thread(server): """ 发送线程:优化了 Socket 设置和压缩参数 """ - print(f"[Video] 准备连接服务器 {SERVER_HOST}:{SERVER_PORT} ...") - server = WebRTCServer(60, 5, "ws://10.128.50.6:5000") - server.start() fourcc = cv2.VideoWriter_fourcc(*'avc1') # # jetson-nvenc 编码器 # # ----------------------------------------------------------- @@ -419,7 +415,7 @@ def data_upload_thread(): print("[Data] 数据上报线程结束") -def alert_thread(): +def alert_thread(server): last_record_time = time.time() file_id = 0 @@ -442,16 +438,17 @@ def alert_thread(): "no_face_time": "长时间无人脸,存在离岗或睡岗可能性", } - fourcc1 = cv2.VideoWriter_fourcc(*'avc1') - alert_out = cv2.VideoWriter(VIDEO_FILE[file_id], fourcc1, 30.0, (1280, 720)) + buffered_frame = [] alert_time = time.time() emo_time = time.time() + + alert_status = False + alert_st = "" + level = 0 while not stop_event.is_set(): frame = ana_video_queue.get(timeout=1) - alert_out.write(frame) + buffered_frame.append(frame) now = time.time() - alert_status = False - alert_st = "" data = None if now - alert_time >= 1: data = ana_data_queue.get(timeout=1) @@ -492,41 +489,64 @@ def alert_thread(): alert_status = True alert_st += alert_info["sleep_time"] + "; " sleep_time = 0 + level += 1 if pianyi_time >= 20: alert_status = True alert_st += alert_info["pianyi_time"] + "; " pianyi_time = 0 + level += 1 if no_face_time >= 60: alert_status = True alert_st += alert_info["no_face_time"] + "; " no_face_time = 0 + level += 1 if now - emo_time >= 300: if down_emo_time > 150: alert_status = True alert_st += alert_info["down_emo_time"] + "; " + level += 1 emo_time = now down_emo_time = 0 if now - last_record_time >= 60: - file_id^= 1 - alert_out.release() heart_spe = heart_spe // heart_num if heart_num != 0 else 0 if haqian_time > 5: alert_status = True alert_st += alert_info["haqian_time"] + ";" - haqian_time = 0 + level += 1 if heart_spe < 60 or heart_spe > 120: alert_status = True alert_st += alert_info["heart_spe"] + ";" - heart_spe = 0 + level += 1 if eye_close: alert_status = True alert_st += alert_info["eye_close"] + ";" - eye_close = False + level += 1 #TODO: 发送警报 - alert_out = cv2.VideoWriter(VIDEO_FILE[file_id], fourcc1, 30.0, (1280, 720)) + haqian_time = 0 + heart_spe = 0 + heart_num = 0 + eye_close = False last_record_time = now + info_level = "" + if level >= 6: + info_level = "严重" + elif level >=4: + info_level = "中等" + else: + info_level = "轻微" + if alert_status: + print(f"警报: {alert_st}") + alert = server.alert(int(time.time()), alert_st, info_level) + alert.start(width=1280, height=720, fps=30) + for f in buffered_frame: + alert.provide_frame(f) + alert.end() + alert_status = False + alert_st = "" + buffered_frame = [] + level = 0 def draw_debug_info(frame, result): @@ -616,15 +636,19 @@ def draw_debug_info(frame, result): if __name__ == "__main__": + server = WebRTCServer(60, 5, "ws://10.128.48.48:5000") + server.start() t1 = threading.Thread(target=capture_thread, daemon=True) t2 = threading.Thread(target=analysis_thread, daemon=True) - t3 = threading.Thread(target=video_stream_thread, daemon=True) + t3 = threading.Thread(target=video_stream_thread, daemon=True, args=(server,)) t4 = threading.Thread(target=data_upload_thread, daemon=True) + t5 = threading.Thread(target=alert_thread, daemon=True, args=(server,)) t1.start() t2.start() t3.start() t4.start() + t5.start() try: while not stop_event.is_set(): @@ -649,3 +673,4 @@ if __name__ == "__main__": t2.join() t3.join() t4.join() + t5.join() diff --git a/reproject/webrtc_server.py b/reproject/webrtc_server.py index 58a2427..435d0f8 100644 --- a/reproject/webrtc_server.py +++ b/reproject/webrtc_server.py @@ -1,6 +1,7 @@ import threading import time import asyncio +from minio import Minio import socketio import numpy as np import cv2 @@ -18,11 +19,20 @@ from aiortc import ( from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE from dotenv import load_dotenv +from alert import Alert + load_dotenv() WEBRTC_SERVER_URL = os.getenv("WEBRTC_SERVER_URL", "ws://localhost:5000") WEBRTC_DATA_HISTORY_MAXLEN = int(os.getenv("WEBRTC_DATA_HISTORY_MAXLEN", 200)) +minio_client = Minio( + endpoint=os.getenv("MINIO_ENDPOINT"), + access_key=os.getenv("MINIO_ACCESS_KEY"), + secret_key=os.getenv("MINIO_SECRET_KEY"), + secure=False, +) + class RingBuffer: def __init__(self, maxlen): @@ -84,6 +94,7 @@ class WebRTCServer: self.fps = fps self.frameContainer = [None] self.hub = MonitoringHub() + self.sio = None self.background_loop = asyncio.new_event_loop() @@ -104,6 +115,7 @@ class WebRTCServer: async def _websocket_start(self): await self.hub.start() sio = socketio.AsyncClient() + self.sio = sio @sio.event async def connect(): @@ -172,6 +184,21 @@ class WebRTCServer: def send_data(self, data): self.hub.send_data(data) + def alert(self, timestamp, summary, level): + payload = { + "seat_id": self.seat, + "timestamp": timestamp, + "summary": summary, + "level": level, + } + if self.sio is not None and self.sio.connected: + asyncio.run_coroutine_threadsafe( + self.sio.emit("alert", payload), self.background_loop + ) + else: + print("Warning: sio is not connected, skip signaling alert emit") + return Alert(name=f"seat{self.seat}_{timestamp}.mp4", client=minio_client) + def stop(self): if self.background_loop.is_running(): self.background_loop.call_soon_threadsafe(self.background_loop.stop) @@ -228,4 +255,4 @@ def route_channel(channel): print(f"Latency: {now - pre}ms") case _: - print(f"Unknown Channel {channel.label}") + print(f"Unknown Channel {channel.label}") \ No newline at end of file