Mqtt 协议实现在线视频会议

news/2024/7/10 23:40:01 标签: 人工智能, 安全, gpt, YOLO, 深度学习

简介

该教程介绍如何使用 Python 和 MQTT 实现一个简单的在线视频会议系统,其中包括推送端和订阅端。推送端通过摄像头捕获视频帧,使用 MQTT 将图像数据实时推送到指定主题。订阅端订阅相同的 MQTT 主题,接收图像并在本地显示,同时计算并显示帧率。

推送端

1. 准备环境

确保已经安装必要的 Python 库:

pip install opencv-python numpy paho-mqtt

2. 推送端代码

# pusher.py
import base64
import time
import multiprocessing
import paho.mqtt.client as mqtt
import numpy as np
import cv2

# MQTT配置
mqtt_broker = "localhost"
mqtt_port = 1883
mqtt_topic = "your_mqtt_topic"

# 初始化MQTT客户端
client = mqtt.Client()
client.connect(mqtt_broker, mqtt_port, 60)

def read_and_publish(output_queue):
    cap = cv2.VideoCapture(2)
    try:
        while True:
            ret, frame = cap.read()
            if ret:
                _, img_encoded = cv2.imencode('.jpg', frame)
                img_base64 = base64.b64encode(img_encoded.tobytes()).decode('utf-8')
                # 将base64图像放入队列
                output_queue.put(img_base64)
    except Exception as e:
        print(f"Error: {e}")
    finally:
        # Release the capture object when done
        cap.release()

def publish_to_mqtt(input_queue):
    while True:
        if not input_queue.empty():
            img_base64 = input_queue.get()
            # 发布到MQTT通道
            client.publish(mqtt_topic, img_base64)

def main():
    # 使用队列在主进程和子进程之间传递数据
    output_queue = multiprocessing.Queue()
    input_queue = multiprocessing.Queue()

    # 启动子进程
    process_read_publish = multiprocessing.Process(target=read_and_publish, args=(output_queue,))
    process_mqtt_publish = multiprocessing.Process(target=publish_to_mqtt, args=(input_queue,))

    # 启动子进程
    process_read_publish.start()
    process_mqtt_publish.start()

    while True:
        start = time.time()

        # 从队列中获取数据
        while not output_queue.empty():
            img_base64 = output_queue.get()
            # 将数据放入用于MQTT发布的队列
            input_queue.put(img_base64)

        end = time.time()
        print(end - start)

    # 不要忘记在结束时释放资源
    process_read_publish.terminate()
    process_mqtt_publish.terminate()
    process_read_publish.join()
    process_mqtt_publish.join()

if __name__ == "__main__":
    main()

订阅端

1. 准备环境

确保已经安装必要的 Python 库:

pip install opencv-python numpy paho-mqtt

2. 订阅端代码

# subscriber.py
import time
import cv2
import numpy as np
import base64
import paho.mqtt.client as mqtt
import concurrent.futures
from multiprocessing import Process, Queue

# MQTT配置
mqtt_broker = "localhost"
mqtt_port = 1883
mqtt_topic = "your_mqtt_topic"

# 共享变量,使用multiprocessing.Value来确保同步
global_fps = Value('f', 0)
start_time = time.time()
end_time = time.time()

def display_fps(frame):
    global start_time
    global end_time
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(end_time)
    global_fps.value = 1 / elapsed_time

    # 在图像上显示帧率
    cv2.putText(frame, f"FPS: {global_fps.value:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
    start_time = end_time

def image_receiver(output_queue):
    # 初始化MQTT客户端
    client = mqtt.Client()
    client.connect(mqtt_broker, mqtt_port, 60)

    # 设置消息接收回调
    def on_message(client, userdata, msg):
        global start_time
        try:
            # 解码base64图像
            start_time = time.time()
            img_base64 = msg.payload.decode('utf-8')
            img_decoded = base64.b64decode(img_base64)
            img_np = np.frombuffer(img_decoded, dtype=np.uint8)
            img = cv2.imdecode(img_np, cv2.IMREAD_COLOR)

            display_fps(img, start_time)

            # 将图像放入队列
            output_queue.put(img)

        except Exception as e:
            print(f"Error: {e}")

    client.on_message = on_message

    # 订阅MQTT主题
    client.subscribe(mqtt_topic)

    # 开始循环,等待消息
    client.loop_forever()

def main():
    # 使用队列在主进程和子进程之间传递数据
    output_queue = Queue()

    # 创建子进程
    receiver_process = Process(target=image_receiver, args=(output_queue,))
    receiver_process.start()

    while True:
        start = time.time()

        # 从队列中获取图像
        while not output_queue.empty():
            img = output_queue.get()

            # 显示图像
            cv2.imshow('Received Image', img)
            cv2.waitKey(1)

        end = time.time()
        print(end - start)

    # 等待子进程结束
    receiver_process.join()

if __name__ == "__main__":
    main()

运行

  1. 在推送端运行 pusher.py
  2. 在订阅端运行 subscriber.py

现在,您应该能够在订阅端实时接收并显示推送端的视频流,同时计算并显示帧率。


http://www.niftyadmin.cn/n/5244845.html

相关文章

怎么压缩过大的GIF动图?三种方法随心选!

GIF图片由于其图片格式,本身就会很大,但是微信QQ还有一些其他的社交平台对上传的表情包是有限制的,这个时候就需要借助一些图片处理工具对GIF进行压缩。 下面就向大家介绍三种好用的方法并展示具体的操作步骤。 一、使用嗨格式压缩大师进行压…

docker安装一主一从MySQL数据库步骤

安装MySQL主从复制 ①宿主机创建以下目录 /mydata/mysql-master/log 命令:mkdir -p /mydata/mysql-master/log /mydata/mysql-master/data 命令:mkdir -p /mydata/mysql-master/data /mydata/mysql-master/conf 命令:mkdir -p /mydata/mysql-…

【集合篇】Map 集合详解

Map 集合详解 HashMap 和 Hashtable 的区别 线程安全性: HashMap 是非线程安全的,Hashtable 是线程安全的,因为 Hashtable 内部的方法基本都经过synchronized 修饰。(如果你要保证线程安全的话就使用 ConcurrentHashMap 吧!&am…

使用PCSS实现的实时阴影效果

PCSS的技术可以使得阴影呈现出近硬远软的效果,并且能够实时实现。 其核心理念是通过模拟光源的面积来产生更自然、更柔和的阴影边缘。 具体步骤: 1、生成shadowmap 2、在进行阴影的比较时候进行平均,并非之前的shadow map 或者之后完全的阴影…

10 单词接龙

题目描述 单词接龙的规则是: 可用于接龙的单词首字母必须要前一个单词的尾字母相同; 当存在多个首字母相同的单词时,取长度最长的单词,如果长度也相等,则取字典序最小的单词;已经参与接龙的单词不能重复使用。 现给定一组全部由小写字母组成…

【项目】学生信息管理系统

概述 本系统总耗时 6 6 6 天,系统包括 学生发展与数据驱动平台6.2.cpp、学生信息.txt、用户账号.txt、注意事项.txt。由于代码对文件的调用使用的是相对路径,所以要求这 4 4 4 个文件都需要在同一目录。使用代码前先仔细看 注意事项。 如图&#xff1…

Python-基本的输入和输出(input函数和print函数用法)

输入 输出 控制台:一种人和计算机交互最基础的方式 什么是控制台?下面这个东西就是控制台。 但是生活中常用的是图形化界面,最初始的计算机使用的就是这样的控制台进行输入和输出的。 所以原来会使用计算机的都是科学家或者高级知识分子。 直到图形化界…

pgsql 判空并设置默认值

在 PostgreSQL 中,可以使用 COALESCE 函数来判断值是否为空并设置一个默认值。 例如,假设有一个表格 users,其中有一个列 username。如果 username 为空,则设置默认值为 ‘guest’,可以使用以下查询: SEL…