数据发送逻辑完善、警报识别逻辑完成
This commit is contained in:
@@ -5,7 +5,9 @@ import socketio
|
||||
import numpy as np
|
||||
import cv2
|
||||
import av
|
||||
import os
|
||||
|
||||
from collections import deque
|
||||
from aiortc import (
|
||||
MediaStreamError,
|
||||
RTCConfiguration,
|
||||
@@ -14,13 +16,74 @@ from aiortc import (
|
||||
VideoStreamTrack,
|
||||
)
|
||||
from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
WEBRTC_SERVER_URL = os.getenv("WEBRTC_SERVER_URL", "ws://localhost:5000")
|
||||
WEBRTC_DATA_HISTORY_MAXLEN = int(os.getenv("WEBRTC_DATA_HISTORY_MAXLEN", 200))
|
||||
|
||||
|
||||
class RingBuffer:
|
||||
def __init__(self, maxlen):
|
||||
self._buf = deque(maxlen=maxlen)
|
||||
|
||||
def append(self, item):
|
||||
self._buf.append(item)
|
||||
|
||||
def snapshot(self):
|
||||
return list(self._buf)
|
||||
|
||||
|
||||
class MonitoringHub:
|
||||
def __init__(self, max_history=WEBRTC_DATA_HISTORY_MAXLEN):
|
||||
self._history = RingBuffer(maxlen=max_history)
|
||||
self._channels = set()
|
||||
self._queue = asyncio.Queue()
|
||||
self._task = None
|
||||
|
||||
async def start(self):
|
||||
if self._task is None:
|
||||
self._task = asyncio.create_task(self._broadcast_loop())
|
||||
|
||||
async def _broadcast_loop(self):
|
||||
while True:
|
||||
data = await self._queue.get()
|
||||
dead = []
|
||||
for ch in list(self._channels):
|
||||
if ch.readyState != "open":
|
||||
dead.append(ch)
|
||||
continue
|
||||
try:
|
||||
ch.send(data)
|
||||
except Exception:
|
||||
dead.append(ch)
|
||||
for ch in dead:
|
||||
self._channels.discard(ch)
|
||||
|
||||
def register_channel(self, channel):
|
||||
# 初次建立时补发历史
|
||||
print(f"Channel {channel.label} opened, sending history")
|
||||
self._channels.add(channel)
|
||||
for item in self._history.snapshot():
|
||||
try:
|
||||
channel.send(item)
|
||||
except Exception:
|
||||
self._channels.discard(channel)
|
||||
break
|
||||
|
||||
def send_data(self, data):
|
||||
str_data = str(data)
|
||||
self._history.append(str_data)
|
||||
self._queue.put_nowait(str_data)
|
||||
|
||||
|
||||
class WebRTCServer:
|
||||
def __init__(self, fps, seat, server="ws://localhost:5000"):
|
||||
def __init__(self, fps, seat, server=WEBRTC_SERVER_URL):
|
||||
self.pcs = set()
|
||||
self.fps = fps
|
||||
self.frameContainer = [None]
|
||||
self.hub = MonitoringHub()
|
||||
|
||||
self.background_loop = asyncio.new_event_loop()
|
||||
|
||||
@@ -39,6 +102,7 @@ class WebRTCServer:
|
||||
loop.run_forever()
|
||||
|
||||
async def _websocket_start(self):
|
||||
await self.hub.start()
|
||||
sio = socketio.AsyncClient()
|
||||
|
||||
@sio.event
|
||||
@@ -87,6 +151,13 @@ class WebRTCServer:
|
||||
)
|
||||
pc.addTrack(VideoFrameTrack(self.fps, self.frameContainer))
|
||||
|
||||
dc = pc.createDataChannel("monitoring")
|
||||
|
||||
@dc.on("open")
|
||||
def on_open():
|
||||
print("Monitoring data channel opened")
|
||||
self.hub.register_channel(dc)
|
||||
|
||||
answer = await pc.createAnswer()
|
||||
await pc.setLocalDescription(answer)
|
||||
print(f"Handle offer in {(time.time() - start)*1000:.2f}ms")
|
||||
@@ -98,6 +169,9 @@ class WebRTCServer:
|
||||
def provide_frame(self, frame):
|
||||
self.frameContainer[0] = frame
|
||||
|
||||
def send_data(self, data):
|
||||
self.hub.send_data(data)
|
||||
|
||||
def stop(self):
|
||||
if self.background_loop.is_running():
|
||||
self.background_loop.call_soon_threadsafe(self.background_loop.stop)
|
||||
|
||||
Reference in New Issue
Block a user