Files
cv_state_ana/reproject/webrtc_server.py
2026-02-28 18:11:03 +08:00

258 lines
7.4 KiB
Python

import threading
import time
import asyncio
from minio import Minio
import socketio
import numpy as np
import cv2
import av
import os
from collections import deque
from aiortc import (
MediaStreamError,
RTCConfiguration,
RTCPeerConnection,
RTCSessionDescription,
VideoStreamTrack,
)
from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE
from dotenv import load_dotenv
from alert import Alert
load_dotenv()
WEBRTC_SERVER_URL = os.getenv("WEBRTC_SERVER_URL", "ws://localhost:5000")
WEBRTC_DATA_HISTORY_MAXLEN = int(os.getenv("WEBRTC_DATA_HISTORY_MAXLEN", 200))
minio_client = Minio(
endpoint=os.getenv("MINIO_ENDPOINT"),
access_key=os.getenv("MINIO_ACCESS_KEY"),
secret_key=os.getenv("MINIO_SECRET_KEY"),
secure=False,
)
class RingBuffer:
def __init__(self, maxlen):
self._buf = deque(maxlen=maxlen)
def append(self, item):
self._buf.append(item)
def snapshot(self):
return list(self._buf)
class MonitoringHub:
def __init__(self, max_history=WEBRTC_DATA_HISTORY_MAXLEN):
self._history = RingBuffer(maxlen=max_history)
self._channels = set()
self._queue = asyncio.Queue()
self._task = None
async def start(self):
if self._task is None:
self._task = asyncio.create_task(self._broadcast_loop())
async def _broadcast_loop(self):
while True:
data = await self._queue.get()
dead = []
for ch in list(self._channels):
if ch.readyState != "open":
dead.append(ch)
continue
try:
ch.send(data)
except Exception:
dead.append(ch)
for ch in dead:
self._channels.discard(ch)
def register_channel(self, channel):
# 初次建立时补发历史
print(f"Channel {channel.label} opened, sending history")
self._channels.add(channel)
for item in self._history.snapshot():
try:
channel.send(item)
except Exception:
self._channels.discard(channel)
break
def send_data(self, data):
str_data = str(data)
self._history.append(str_data)
self._queue.put_nowait(str_data)
class WebRTCServer:
def __init__(self, fps, seat, server=WEBRTC_SERVER_URL):
self.pcs = set()
self.fps = fps
self.frameContainer = [None]
self.hub = MonitoringHub()
self.sio = None
self.background_loop = asyncio.new_event_loop()
self._rtc_thread = threading.Thread(
target=self._start_background_loop,
args=(self.background_loop,),
daemon=True,
)
self._rtc_thread.start()
self.server = server
self.seat = seat
def _start_background_loop(self, loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def _websocket_start(self):
await self.hub.start()
sio = socketio.AsyncClient()
self.sio = sio
@sio.event
async def connect():
print("已连接到中心信令服务器")
await sio.emit("checkin", {"seat_id": self.seat})
@sio.event
async def offer(data):
# data 包含: { sdp: "...", type: "offer", "sid": "..." }
print("收到 WebRTC Offer")
localDescription = await self._handle_offer(data)
await sio.emit(
"answer",
{
"sdp": localDescription.sdp,
"type": localDescription.type,
"sid": data["sid"],
},
)
await sio.connect(self.server)
print(self.server, "connected")
await sio.wait()
async def _handle_offer(self, offer):
pc = RTCPeerConnection(RTCConfiguration(iceServers=[]))
self.pcs.add(pc)
start = time.time()
@pc.on("connectionstatechange")
async def on_state_change():
if pc.connectionState == "failed" or pc.connectionState == "closed":
await pc.close()
print("Connection closed")
self.pcs.discard(pc)
@pc.on("datachannel")
def on_data_channel(channel):
route_channel(channel)
await pc.setRemoteDescription(
RTCSessionDescription(offer["sdp"], offer.get("type", "offer"))
)
pc.addTrack(VideoFrameTrack(self.fps, self.frameContainer))
dc = pc.createDataChannel("monitoring")
@dc.on("open")
def on_open():
print("Monitoring data channel opened")
self.hub.register_channel(dc)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
print(f"Handle offer in {(time.time() - start)*1000:.2f}ms")
return pc.localDescription
def start(self):
asyncio.run_coroutine_threadsafe(self._websocket_start(), self.background_loop)
def provide_frame(self, frame):
self.frameContainer[0] = frame
def send_data(self, data):
self.hub.send_data(data)
def alert(self, timestamp, summary, level):
payload = {
"seat_id": self.seat,
"timestamp": timestamp,
"summary": summary,
"level": level,
}
if self.sio is not None and self.sio.connected:
asyncio.run_coroutine_threadsafe(
self.sio.emit("alert", payload), self.background_loop
)
else:
print("Warning: sio is not connected, skip signaling alert emit")
return Alert(name=f"seat{self.seat}_{timestamp}.mp4", client=minio_client)
def stop(self):
if self.background_loop.is_running():
self.background_loop.call_soon_threadsafe(self.background_loop.stop)
if self._rtc_thread.is_alive():
self._rtc_thread.join(timeout=2)
class VideoFrameTrack(VideoStreamTrack):
def __init__(self, fps, fc):
super().__init__()
self.fps = fps
self.frameContainer = fc
async def next_timestamp(self):
"""
重写父类方法,去除帧率限制
"""
if self.readyState != "live":
raise MediaStreamError
if hasattr(self, "_timestamp"):
self._timestamp += int(1 / self.fps * VIDEO_CLOCK_RATE)
wait = self._start + (self._timestamp / VIDEO_CLOCK_RATE) - time.time()
await asyncio.sleep(wait)
else:
self._start = time.time()
self._timestamp = 0
return self._timestamp, VIDEO_TIME_BASE
async def recv(self):
pts, time_base = await self.next_timestamp()
frame = self.frameContainer[0]
if frame is None:
frame = np.zeros((480, 640, 3), dtype=np.uint8)
else:
frame = self.frameContainer[0]
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
video_frame = av.VideoFrame.from_ndarray(frame, format="rgb24")
video_frame.pts = pts
video_frame.time_base = time_base
return video_frame
def route_channel(channel):
match channel.label:
case "latency":
@channel.on("message")
def on_message(message):
now = int(time.time() * 1000 + 0.5)
channel.send(str(now))
pre = int(message)
print(f"Latency: {now - pre}ms")
case _:
print(f"Unknown Channel {channel.label}")