Python3使用科大讯飞API接口实现音频文件转写

注意事项

科大讯飞语音转写 API 文档连接: https://www.xfyun.cn/doc/asr/lfasr/API.html.
科大讯飞语音转写Python3的demo下载连接:http://xfyun-doc.ufile.ucloud.com.cn/1564736425808301/weblfasr_python3_demo.zip
上一篇写了用百度智能云进行音频文件转写的博客,可是那个效果啊,有点惨不忍睹,至少个人识别结果是这样。而后转而使用了一下科大讯飞的,想着科大讯飞专门作语音相关的这一块,应该会好些。语音转写的Python3的demo代码确实很不错,函数接口很简洁,本文代码都是这个demo里面的。识别准确率仍是能够的,并且不须要像百度那样整点才开始识别,很快就返回了识别结果。
若是你的录音是不止一我的,而是像电话录音那种,想把转写结果中不一样人说的话分离出来,请按照下面这样添加预处理参数(demo中默认是没有添加这儿最后两个参数的,不添加的话,默认是不进行角色分离的):
在这里插入图片描述
这样的话,转写结果的speaker的值就不全是0了,而是根据不一样的人对转写结果进行分离:
在这里插入图片描述html

操做系统:Windows
Python:3.6
可用时长: 免费用户时长5小时,且用且珍惜。
音频属性: 采样率16k或8k、位长8bits或16bits、单声道&多声道
音频格式: wav/flac/opus/m4a/mp3
音频大小: 不超过500M
音频时长: 不超过5小时,建议5分钟以上
语言种类: 中文普通话、英文
转写结果保存时长 30天。(同一通录音不须要从新上传识别,若是你已经上传识别过了,以后只须要使用api.get_result_request(taskid)的方式便可再次获取识别结果,taskid是你第一次上传录音时给你分配的任务ID,避免重复上传浪费可用时长)python

APP_ID, SECRET_KEY的获取

讯飞的好像不须要API_KEY,开放受权的方式和其余大厂的相似:
一、页面右上方“控制台”点击进入,登陆讯飞帐号(没有就注册一个),进入讯飞开放平台。
二、左侧导航栏上方,依次选择 语音识别->语音转写->离线语音转写识别。
三、服务申请。点击“建立应用”,“接口选择”已默认勾选完成,如无其余需求,无需勾选,完成其余资料后,点击最下方“当即建立”按钮。本身能够手动领取5小时免费试用体验包。
四、应用成功则页面显示“建立完毕”,点击”返回应用列表”, 查看新建立应用详情,在服务接口认证信息窗口就能够看到返回的AppID,SecretKey。web

话很少说,直接上代码了

# -*- coding: utf-8 -*-
# 
# author: yanmeng2
# 
# 非实时转写调用demo

import base64
import hashlib
import hmac
import json
import os
import time

import requests

lfasr_host = 'http://raasr.xfyun.cn/api'

# 请求的接口名
api_prepare = '/prepare'
api_upload = '/upload'
api_merge = '/merge'
api_get_progress = '/getProgress'
api_get_result = '/getResult'
# 文件分片大小10M
file_piece_sice = 10485760

# ——————————————————转写可配置参数————————————————
# 参数可在官网界面(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根据需求可自行在gene_params方法里添加修改
# 转写类型
lfasr_type = 0
# 是否开启分词
has_participle = 'false'
has_seperate = 'true'
# 多候选词个数
max_alternatives = 0
# 子用户标识
suid = ''


class SliceIdGenerator:
    """slice id生成器"""

    def __init__(self):
        self.__ch = 'aaaaaaaaa`'

    def getNextSliceId(self):
        ch = self.__ch
        j = len(ch) - 1
        while j >= 0:
            cj = ch[j]
            if cj != 'z':
                ch = ch[:j] + chr(ord(cj) + 1) + ch[j + 1:]
                break
            else:
                ch = ch[:j] + 'a' + ch[j + 1:]
                j = j - 1
        self.__ch = ch
        return self.__ch


class RequestApi(object):
    def __init__(self, appid, secret_key, upload_file_path):
        self.appid = appid
        self.secret_key = secret_key
        self.upload_file_path = upload_file_path

    # 根据不一样的apiname生成不一样的参数,本示例中未使用所有参数您可在官网(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后选择适合业务场景的进行更换
    def gene_params(self, apiname, taskid=None, slice_id=None):
        appid = self.appid
        secret_key = self.secret_key
        upload_file_path = self.upload_file_path
        ts = str(int(time.time()))
        m2 = hashlib.md5()
        m2.update((appid + ts).encode('utf-8'))
        md5 = m2.hexdigest()
        md5 = bytes(md5, encoding='utf-8')
        # 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa
        signa = hmac.new(secret_key.encode('utf-8'), md5, hashlib.sha1).digest()
        signa = base64.b64encode(signa)
        signa = str(signa, 'utf-8')
        file_len = os.path.getsize(upload_file_path)
        file_name = os.path.basename(upload_file_path)
        param_dict = {}

        if apiname == api_prepare:
            # slice_num是指分片数量,若是您使用的音频都是较短音频也能够不分片,直接将slice_num指定为1便可
            slice_num = int(file_len / file_piece_sice) + (0 if (file_len % file_piece_sice == 0) else 1)
            param_dict['app_id'] = appid
            param_dict['signa'] = signa
            param_dict['ts'] = ts
            param_dict['file_len'] = str(file_len)
            param_dict['file_name'] = file_name
            param_dict['slice_num'] = str(slice_num)
        elif apiname == api_upload:
            param_dict['app_id'] = appid
            param_dict['signa'] = signa
            param_dict['ts'] = ts
            param_dict['task_id'] = taskid
            param_dict['slice_id'] = slice_id
        elif apiname == api_merge:
            param_dict['app_id'] = appid
            param_dict['signa'] = signa
            param_dict['ts'] = ts
            param_dict['task_id'] = taskid
            param_dict['file_name'] = file_name
        elif apiname == api_get_progress or apiname == api_get_result:
            param_dict['app_id'] = appid
            param_dict['signa'] = signa
            param_dict['ts'] = ts
            param_dict['task_id'] = taskid
        return param_dict

    # 请求和结果解析,结果中各个字段的含义可参考:https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html
    def gene_request(self, apiname, data, files=None, headers=None):
        response = requests.post(lfasr_host + apiname, data=data, files=files, headers=headers)
        result = json.loads(response.text)
        if result["ok"] == 0:
            print("{} success:".format(apiname) + str(result))
            return result
        else:
            print("{} error:".format(apiname) + str(result))
            exit(0)
            return result

    # 预处理
    def prepare_request(self):
        return self.gene_request(apiname=api_prepare,
                                 data=self.gene_params(api_prepare))

    # 上传
    def upload_request(self, taskid, upload_file_path):
        file_object = open(upload_file_path, 'rb')
        try:
            index = 1
            sig = SliceIdGenerator()
            while True:
                content = file_object.read(file_piece_sice)
                if not content or len(content) == 0:
                    break
                files = {
                    "filename": self.gene_params(api_upload).get("slice_id"),
                    "content": content
                }
                response = self.gene_request(api_upload,
                                             data=self.gene_params(api_upload, taskid=taskid,
                                                                   slice_id=sig.getNextSliceId()),
                                             files=files)
                if response.get('ok') != 0:
                    # 上传分片失败
                    print('upload slice fail, response: ' + str(response))
                    return False
                print('upload slice ' + str(index) + ' success')
                index += 1
        finally:
            'file index:' + str(file_object.tell())
            file_object.close()
        return True

    # 合并
    def merge_request(self, taskid):
        return self.gene_request(api_merge, data=self.gene_params(api_merge, taskid=taskid))

    # 获取进度
    def get_progress_request(self, taskid):
        return self.gene_request(api_get_progress, data=self.gene_params(api_get_progress, taskid=taskid))

    # 获取结果
    def get_result_request(self, taskid):
        return self.gene_request(api_get_result, data=self.gene_params(api_get_result, taskid=taskid))

    def all_api_request(self):
        # 1. 预处理
        pre_result = self.prepare_request()
        taskid = pre_result["data"]
        # 2 . 分片上传
        self.upload_request(taskid=taskid, upload_file_path=self.upload_file_path)
        # 3 . 文件合并
        self.merge_request(taskid=taskid)
        # 4 . 获取任务进度
        while True:
            # 每隔20秒获取一次任务进度
            progress = self.get_progress_request(taskid)
            progress_dic = progress
            if progress_dic['err_no'] != 0 and progress_dic['err_no'] != 26605:
                print('task error: ' + progress_dic['failed'])
                return
            else:
                data = progress_dic['data']
                task_status = json.loads(data)
                if task_status['status'] == 9:
                    print('task ' + taskid + ' finished')
                    break
                print('The task ' + taskid + ' is in processing, task status: ' + str(data))

            # 每次获取进度间隔20S
            time.sleep(20)
        # 5 . 获取结果
        self.get_result_request(taskid=taskid)


# 注意:若是出现requests模块报错:"NoneType" object has no attribute 'read', 请尝试将requests模块更新到2.20.0或以上版本(本demo测试版本为2.20.0)
# 输入讯飞开放平台的appid,secret_key和待转写的文件路径

if __name__ == '__main__':
    APP_ID = "***"
    SECRET_KEY = "****"
    file_path = r"***.wav"
    api = RequestApi(appid=APP_ID, secret_key=SECRET_KEY, upload_file_path=file_path)
    api.all_api_request()

运行结果像这样的

固然,你能够根据本身的需求对demo进行改进,好比你想并发识别录音,你能够添加多线程执行的函数,为了获取taskid方便,我在class的初始化里边添加了self.taskid = “None”,并在预处理结果返回以后从新对taskid赋值。
在这里插入图片描述json

def thread_func(wav_file_path, txt_file_path):  # 线程函数,方便并发识别录音
    doc = open(txt_file_path, 'w', encoding='utf-8')
    # doc.close()
    api = RequestApi(appid=APP_ID, secret_key=SECRET_KEY, upload_file_path=wav_file_path)
    api.all_api_request()  # demo中这个函数是完整过程执行,但我把提取结果的模块提出来了
    print('taskid is: ' + api.taskid, file=doc)
    result = api.get_result_request(api.taskid)
    result = eval(result['data'])
    # print(result)
    for x in result:
        print(x)
        print(x, file=doc)
    doc.close()

#主函数写成相似这种
if __name__ == '__main__':
    file_list = [
        "o2019082112552587460156",
        "o2019082115552587460127"
    ]
    APP_ID = "***"
    SECRET_KEY = "***"
    file_read_path = r"D:\MyProject\Python\Voice_SDK\20190820\\"
    file_save_path = r"D:\MyProject\Python\Voice_SDK\20190820_xunfei\\"
    for file in file_list: #多并发批量执行
        wav_file_path = file_read_path + file + ".wav"
        txt_file_path = file_save_path + file + ".txt"
        t = threading.Thread(target=thread_func, args=(wav_file_path, txt_file_path))
        t.start()