python使用MQTT给硬件传输图片

任务简述

最近因须要用python写一个微服务来用MQTT给硬件传输图片,其中python用的是flask框架,大概流程以下:
python

image

协议为:
  • 须要将图片数据封装成多个消息进行传输,每一个消息传输的数据字节数为1400Byte。
  • 消息(MQTT Payload) 格式:Web服务器-------->BASE:
    image
  • 反馈:BASE---------> Web服务器:
    image
  • 若是Web服务器发送完一个“数据传输消息”后,5S内没有收到MQTT“反馈消息”或者收到的反馈中显示“数据包不完整”,则重发该“数据传输消息”。

程序流程图

根据上面的协议,能够获得以下的流程图:
json

image

代码以下:

# encoding:utf-8
from flask import Flask, jsonify
from flask_restful import Api, Resource, reqparse
from PIL import Image
from io import BytesIO
import requests
import os, logging, time
import paho.mqtt.client as mqtt
import struct
from flask_cors import *

# 日志配置信息
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s (runing by %(funcName)s',
)


class Mqtt(object):
    def __init__(self, img_data, size):
        self.MQTTHOST = '*******'
        self.MQTTPORT = "******"

        # 订阅和发送的主题
        self.topic_from_base = 'mqttTestSub'
        self.topic_to_base = 'mqttTestPub'

        self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
        self.client = mqtt.Client(self.client_id)

        # 完成连接后的回掉函数
        self.client.on_connect = self.on_connect
        # 图片大小
        self.size = size

        # 用于跳出死循环,结束任务
        self.finished = None

        # 包的编号
        self.index = 0

        # 将收到的图片数据按大小分红列表
        self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)]

        # 记录发布后的数据,用于监控时延
        self.pub_time = 0


        self.header_to_base = 0xffffeeee
        self.header_from_base = 0xeeeeffff

        # 功能标识
        self.function_begin = 0x01
        self.function_doing = 0x02
        self.function_finished = 0x03

        # 包的完整和非完整状态
        self.whole_package = 0x01
        self.bad_package = 0x00

        # 头信息的格式,小端模式
        self.format_to_base = "<Lbhh"
        self.format_from_base = "<Lbhb"

        # 若是重发包时,用于检查是否重发第一个包
        self.first = True

        # 若是重发包时,用于检查是否重发最后一个包
        self.last = False

        self.begin_data = 'image.jpg;' + str(self.size)

    # 连接mqtt服务器函数
    def on_mqtt_connect(self):
        self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)
        self.client.loop_start()

    # 连接完成后的回调函数
    def on_connect(self, client, userdata, flags, rc):
        logging.info("+++ Connected with result code {} +++".format(str(rc)))
        self.client.subscribe(self.topic_from_base)

    # 订阅函数
    def subscribe(self):
        self.client.subscribe(self.topic_from_base, 1)
        # 消息到来处理函数
        self.client.on_message = self.on_message

    # 接收到信息后的回调函数

    def on_message(self, client, userdata, msg):
        # 若是接受第一个包则不须要重发第一个
        self.first = False

        # 将接受到的包进行解压,获得一个元组
        base_tuple = struct.unpack(self.format_from_base, msg.payload)
        logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple))
        logging.info("+++ package_number is {}, package_status_from_base is {} +++"
                     .format(base_tuple[2], base_tuple[3]))

        # 检查接受到信息的头部是否正确
        if base_tuple[0] == self.header_from_base:
            logging.info("+++ function_from_base is {} +++".format(base_tuple[1]))

            # 是否完成传输,若是完成则退出
            if base_tuple[1] == self.function_finished:
                logging.info("+++ finish work +++")
                self.finished = 1
                self.client.disconnect()
            else:
                # 是不是最后一个包
                if self.index == len(self.image_data_list) - 1:
                    self.publish('finished', self.function_finished)
                    self.last = True
                    logging.info("+++ finished_data_to_base is finished+++")
                else:

                    # 若是接收到的包不是 0x03则进行传送数据
                    if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing:
                        logging.info("+++ package_number is {}, package_status_from_base is {} +++"
                                     .format(base_tuple[2],base_tuple[3]))

                        # 若是数据的反馈中,包的状态是1则继续发下一个包
                        if base_tuple[3] == self.whole_package:
                            self.publish(self.index, self.function_doing)
                            logging.info("+++ data_to_base is finished+++")
                            self.index += 1

                        # 若是数据的反馈中,包的状态是0则重发数据包
                        elif base_tuple[3] == self.bad_package:
                            re_package_number = base_tuple[2]
                            self.publish(re_package_number-1, self.function_doing)
                            logging.info("+++ re_data_to_base is finished+++")
                        else:
                            logging.info("+++ package_status_from_base is not 0 or 1 +++")
                            self.client.disconnect()
                    else:
                        logging.info("+++ function_identifier is illegal +++")
                        self.client.disconnect()
        else:
            logging.info("+++ header_from_base is illegal +++")
            self.client.disconnect()

    # 数据发送函数
    def publish(self, index, fuc):
        # 看是不是最后一个包
        if index == 'finished':
            length = 0
            package_number = 0
            data = b''
        else:
            length = len(self.image_data_list[index])
            package_number = index
            data = self.image_data_list[index]

        # 打包数据头信息
        buffer = struct.pack(
            self.format_to_base,
            self.header_to_base,
            fuc,
            package_number,
            length
        )
        to_base_data = buffer + data

        # mqtt发送
        self.client.publish(
            self.topic_to_base,
            to_base_data
        )
        self.pub_time = time.time()

    # 发送第一个包函数
    def publish_begin(self):
        buffer = struct.pack(
            self.format_to_base,
            self.header_to_base,
            self.function_begin,
            0,
            len(self.begin_data.encode('utf-8')),
        )
        begin_data = buffer + self.begin_data.encode('utf-8')
        self.client.publish(self.topic_to_base, begin_data)

    # 控制函数
    def control(self):
        self.on_mqtt_connect()
        self.publish_begin()
        begin_time = time.time()
        self.pub_time = time.time()
        self.subscribe()
        while True:
            time.sleep(1)
            # 超过5秒重传
            date = time.time() - self.pub_time
            if date > 5:
                # 是否重传第一个包
                if self.first == True:
                    self.publish_begin()
                    logging.info('+++ this is timeout first_data +++')

                # 是否重传最后一个包
                elif self.last == True:
                    self.publish('finished', self.function_finished)
                    logging.info('+++ this is timeout last_data +++')
                else:
                    self.publish(self.index-1, self.function_doing)
                    logging.info('+++ this is timeout middle_data +++')
            if self.finished == 1:
                logging.info('+++ all works is finished+++')
                break

        print(str(time.time()-begin_time) + 'begin_time - end_time')

app = Flask(__name__)
api = Api(app)
CORS(app, supports_credentials=True)

# 接受参数
parser = reqparse.RequestParser()
parser.add_argument('url', help='mqttImage url', location='args', type=str)


class GetImage(Resource):
    # 获得参数并从图床下载到本地
    def get(self):
        args = parser.parse_args()
        url = args.get('url')
        response = requests.get(url)
        # 获取图片
        image = Image.open(BytesIO(response.content))
        # 存取图片
        add = os.path.join(os.path.abspath(''), 'image.jpg')
        image.save(add)
        # 获得图片大小
        size = os.path.getsize(add)
        f = open(add, 'rb')
        imageData = f.read()
        f.close()

        # 进行mqtt传输
        mqtt = Mqtt(imageData, size)
        mqtt.control()

        # 删除文件
        os.remove(add)
        logging.info('*** the result of control is {} ***'.format(1))
        return jsonify({
            "imageData": 1
        })


api.add_resource(GetImage, '/image')

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0')

复制代码