树莓派因为硬件配置低,在运行复杂计算的时候会比较吃力。为了解决这种瓶颈,能够考虑云计算。这篇文章分享下如何经过树莓派发送视频流到远程服务器作条形码识别。html
下载OpenCV
, scipy
和 pillow
:python
$ sudo apt-get install libopencv-dev python-opencv python-scipy $ python -m pip install pillow
下载条形码SDK,经过源码(https://github.com/dynamsoft-dbr/python)编译出Python条码识别模块。git
下载OpenCV
:github
pip install opencv-python
有人实现了 NumpySocket,能够直接拿来使用,主要解决了NumPy Array
(经过OpenCV获取的视频帧)的传输。json
经过Socket
发送接收JSON:服务器
import json def sendJSON(self, data): out = json.dumps(data) try: self.connection.sendall(out) except Exception: exit() def receiveJSON(self): try: chunk = self.socket.recv(1024) except Exception: exit() return json.loads(chunk)
建立用于Windows的pc.py
文件:socket
from numpysocket import NumpySocket import cv2 import time import json import dbr # Get the license of Dynamsoft Barcode Reader from https://www.dynamsoft.com/CustomerPortal/Portal/Triallicense.aspx dbr.initLicense('LICENSE KEY') npSocket = NumpySocket() npSocket.startServer(9999) # Receive frames for barcode detection while(True): try: frame = npSocket.recieveNumpy() # cv2.imshow('PC Reader', frame) results = dbr.decodeBuffer(frame, 0x3FF | 0x2000000 | 0x4000000 | 0x8000000 | 0x10000000) out = {} out['results'] = results # Send barcode results to Raspberry Pi npSocket.sendJSON({'results': results}) except: break # Press ESC to exit key = cv2.waitKey(20) if key == 27 or key == ord('q'): break npSocket.endServer() print('Closed')
这段代码主要是经过循环,不断接收视频帧作条形码识别。而后把结果用JSON格式打包发送。ide
在树莓派上建立一个rpi.py
文件。经过OpenCV的接口能够不断获取视频帧。使用queue
来保存:云计算
def read_barcode(): vc = cv2.VideoCapture(0) vc.set(3, 640) #set width vc.set(4, 480) #set height if not vc.isOpened(): print('Camera is not ready.') return host_ip = '192.168.8.84' npSocket = NumpySocket() npSocket.startClient(host_ip, 9999) socket_thread = SocketThread('SocketThread', npSocket) socket_thread.start() while vc.isOpened(): ret, f = vc.read() cv2.imshow("RPi Reader", f) frame = imresize(f, .5) key = cv2.waitKey(20) if key == 27 or key == ord('q'): socket_thread.isRunning = False socket_thread.join() break if not socket_thread.is_alive(): break try: frame_queue.put_nowait(frame) except: # Clear unused frames try: while True: frame_queue.get_nowait() except: pass frame_queue.close() frame_queue.join_thread() vc.release()
建立了一个线程用于收发视频帧数据和结果:.net
class SocketThread (threading.Thread): def __init__(self, name, npSocket): threading.Thread.__init__(self) self.name = name self.npSocket = npSocket self.isRunning = True def run(self): while self.isRunning: frame = frame_queue.get(timeout=100) try: start_time = time.time() self.npSocket.sendNumpy(frame) obj = self.npSocket.receiveJSON() print("--- %.2f ms seconds ---" % ((time.time() - start_time) * 1000)) data = obj['results'] if (len(data) > 0): for result in data: print("Type: " + result[0]) print("Value: " + result[1] + "\n") else: print('No barcode detected.') except: break self.npSocket.endClient()
在Windows和树莓派上分别运行对应的Python程序: