package com.aros.apron.manager; import android.os.Handler; import android.os.Looper; import androidx.annotation.NonNull; import com.aros.apron.tools.LogUtil; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import dji.sdk.keyvalue.value.common.ComponentIndexType; import dji.v5.manager.datacenter.MediaDataCenter; import dji.v5.manager.datacenter.camera.StreamInfo; import dji.v5.manager.interfaces.ICameraStreamManager; /** * UDP视频流推送管理类 * 负责从视频监听器获取码流数据并通过UDP协议推送 */ public class UdpStreamManager { private static final String TAG = "UdpStreamManager"; // 线程池和主线程Handler private final ExecutorService udpExecutor = Executors.newSingleThreadExecutor(); private final Handler mainHandler = new Handler(Looper.getMainLooper()); // 单例模式 private static class UdpStreamHolder { private static final UdpStreamManager INSTANCE = new UdpStreamManager(); } public static UdpStreamManager getInstance() { return UdpStreamHolder.INSTANCE; } // UDP配置 private static final String DEFAULT_HOST = "127.0.0.1"; private static final int DEFAULT_PORT_1_PORT = 8080; private static final int DEFAULT_FPV_PORT = 8081; private String targetHost = DEFAULT_HOST; private int port1Port = DEFAULT_PORT_1_PORT; private int fpvPort = DEFAULT_FPV_PORT; // 状态管理 private final AtomicBoolean isStreaming = new AtomicBoolean(false); private final AtomicBoolean isInitialized = new AtomicBoolean(false); // UDP相关 private DatagramSocket port1UdpSocket; private DatagramSocket fpvUdpSocket; private InetAddress targetAddress; // 视频码流监听器 private ICameraStreamManager.ReceiveStreamListener port1StreamListener; private ICameraStreamManager.ReceiveStreamListener fpvStreamListener; private UdpStreamManager() { } /** * 初始化UDP流管理器 * @param host 目标主机地址 * @param port1Port PORT_1相机的目标端口 * @param fpvPort FPV相机的目标端口 */ public void init(String host, int port1Port, int fpvPort) { if (isInitialized.get()) { LogUtil.log(TAG, "UDP流管理器已初始化"); return; } targetHost = host != null ? host : DEFAULT_HOST; this.port1Port = port1Port > 0 ? port1Port : DEFAULT_PORT_1_PORT; this.fpvPort = fpvPort > 0 ? fpvPort : DEFAULT_FPV_PORT; udpExecutor.execute(() -> { try { // 初始化UDP socket port1UdpSocket = new DatagramSocket(); fpvUdpSocket = new DatagramSocket(); targetAddress = InetAddress.getByName(targetHost); // 初始化视频码流监听器 initStreamListeners(); isInitialized.set(true); LogUtil.log(TAG, "UDP流管理器初始化成功,目标地址: " + targetHost + ", PORT_1端口: " + this.port1Port + ", FPV端口: " + this.fpvPort); } catch (SocketException e) { LogUtil.log(TAG, "UDP socket初始化失败: " + e.getMessage()); e.printStackTrace(); } catch (UnknownHostException e) { LogUtil.log(TAG, "无法解析目标地址: " + e.getMessage()); e.printStackTrace(); } }); } /** * 初始化UDP流管理器(使用默认端口) * @param host 目标主机地址 */ public void init(String host) { init(host, DEFAULT_PORT_1_PORT, DEFAULT_FPV_PORT); } /** * 初始化UDP流管理器(使用默认地址和端口) */ public void init() { init(DEFAULT_HOST, DEFAULT_PORT_1_PORT, DEFAULT_FPV_PORT); } /** * 初始化视频码流监听器 */ private void initStreamListeners() { ICameraStreamManager cameraManager = MediaDataCenter.getInstance().getCameraStreamManager(); if (cameraManager == null) { LogUtil.log(TAG, "CameraStreamManager获取失败"); return; } port1StreamListener=new ICameraStreamManager.ReceiveStreamListener() { @Override public void onReceiveStream(@NonNull byte[] data, int offset, int length, @NonNull StreamInfo info) { if (isStreaming.get()) { // 截取有效数据 byte[] streamData = new byte[length]; System.arraycopy(data, offset, streamData, 0, length); sendStreamData(streamData, ComponentIndexType.PORT_1); } } }; // FPV相机码流监听器 fpvStreamListener=new ICameraStreamManager.ReceiveStreamListener() { @Override public void onReceiveStream(@NonNull byte[] data, int offset, int length, @NonNull StreamInfo info) { if (isStreaming.get()) { // 截取有效数据 byte[] streamData = new byte[length]; System.arraycopy(data, offset, streamData, 0, length); sendStreamData(streamData, ComponentIndexType.FPV); } } }; // 注册监听器 cameraManager.addReceiveStreamListener(ComponentIndexType.PORT_1, port1StreamListener); cameraManager.addReceiveStreamListener(ComponentIndexType.FPV, fpvStreamListener); LogUtil.log(TAG, "视频码流监听器注册成功"); } /** * 发送码流数据 * @param streamData 码流数据 * @param componentIndex 相机组件索引 */ private void sendStreamData(byte[] streamData, ComponentIndexType componentIndex) { if (targetAddress == null) { return; } try { // 选择对应的UDP socket和端口 DatagramSocket socket = null; int port = 0; if (componentIndex == ComponentIndexType.PORT_1) { socket = port1UdpSocket; port = port1Port; } else if (componentIndex == ComponentIndexType.FPV) { socket = fpvUdpSocket; port = fpvPort; } if (socket == null) { return; } // 构造UDP数据包 // 前4字节表示相机索引(1: PORT_1, 2: FPV) // 接下来4字节表示数据长度 // 剩余部分为码流数据 int headerSize = 8; byte[] packetData = new byte[headerSize + streamData.length]; // 写入相机索引 int cameraIndex = componentIndex == ComponentIndexType.PORT_1 ? 1 : 2; packetData[0] = (byte) (cameraIndex >> 24); packetData[1] = (byte) (cameraIndex >> 16); packetData[2] = (byte) (cameraIndex >> 8); packetData[3] = (byte) cameraIndex; // 写入数据长度 packetData[4] = (byte) (streamData.length >> 24); packetData[5] = (byte) (streamData.length >> 16); packetData[6] = (byte) (streamData.length >> 8); packetData[7] = (byte) streamData.length; // 写入码流数据 System.arraycopy(streamData, 0, packetData, headerSize, streamData.length); // 发送UDP数据包 DatagramPacket packet = new DatagramPacket( packetData, packetData.length, targetAddress, port ); socket.send(packet); } catch (IOException e) { LogUtil.log(TAG, "发送UDP数据包失败: " + e.getMessage()); e.printStackTrace(); } } /** * 开始UDP流推送 */ public void startStreaming() { if (!isInitialized.get()) { LogUtil.log(TAG, "UDP流管理器未初始化,请先调用init方法"); return; } isStreaming.set(true); LogUtil.log(TAG, "UDP流推送已启动"); } /** * 停止UDP流推送 */ public void stopStreaming() { isStreaming.set(false); LogUtil.log(TAG, "UDP流推送已停止"); } /** * 关闭UDP流管理器 */ public void close() { stopStreaming(); // 移除视频码流监听器 ICameraStreamManager cameraManager = MediaDataCenter.getInstance().getCameraStreamManager(); if (cameraManager != null) { if (port1StreamListener != null) { cameraManager.removeReceiveStreamListener(port1StreamListener); } if (fpvStreamListener != null) { cameraManager.removeReceiveStreamListener(fpvStreamListener); } } // 关闭UDP socket if (port1UdpSocket != null) { port1UdpSocket.close(); port1UdpSocket = null; } if (fpvUdpSocket != null) { fpvUdpSocket.close(); fpvUdpSocket = null; } isInitialized.set(false); LogUtil.log(TAG, "UDP流管理器已关闭"); } /** * 设置目标地址 * @param host 目标主机地址 * @param port1Port PORT_1相机的目标端口 * @param fpvPort FPV相机的目标端口 */ public void setTargetAddress(String host, int port1Port, int fpvPort) { if (host != null) { targetHost = host; } if (port1Port > 0) { this.port1Port = port1Port; } if (fpvPort > 0) { this.fpvPort = fpvPort; } udpExecutor.execute(() -> { try { targetAddress = InetAddress.getByName(targetHost); LogUtil.log(TAG, "目标地址已更新: " + targetHost + ", PORT_1端口: " + this.port1Port + ", FPV端口: " + this.fpvPort); } catch (UnknownHostException e) { LogUtil.log(TAG, "无法解析新目标地址: " + e.getMessage()); e.printStackTrace(); } }); } /** * 获取UDP流推送状态 */ public boolean isStreaming() { return isStreaming.get(); } /** * 获取UDP流管理器初始化状态 */ public boolean isInitialized() { return isInitialized.get(); } /** * 获取当前目标地址 */ public String getTargetAddress() { return targetHost + ", PORT_1端口: " + port1Port + ", FPV端口: " + fpvPort; } }