关于 schedule 使用的一个 bug

问题描述

最近在同步的时候老是遇到重复插入报错:python

发现是我同一个导入过程执行了两遍,我是使用 python 的任务调度模块 schedule 去执行的,模拟出任务的使用场景以下:数据库

# 测试调度模块是否会重复执行任务
import logging
import threading
import time

import schedule

logger = logging.getLogger()
NUM = list()


def main_run():
    part_run()


def run_threaded(job_func):
    # 对于每个任务 开启一个进程去执行
    job_thread = threading.Thread(target=job_func)
    job_thread.start()


def part_run():
    # 模拟每 30 s执行一次的定时任务
    schedule.every(10).seconds.do(run_threaded, new_finance_update)

    # 在一个有阻塞的主程序中去运行
    while True:
        # logger.info(schedule.jobs)
        print(schedule.jobs)
        schedule.run_pending()
        time.sleep(10)


def new_finance_update():
    # 真正执行的任务

    # 表示第几回执行到这里
    global NUM
    print("=="*10, NUM)
    for i in range(100):
        print(i, end=" ")
        NUM.append(i)
        time.sleep(30)
    for j in list("/Users/furuiyang/PyEnv/demo/bin/python3.7 /Users/furuiyang/code2/jztask/tasks/re_task.py"):
        print(j, end=" ")
        NUM.append(j)
        time.sleep(30)


if __name__ == "__main__":
    try:
        main_run()
    except KeyboardInterrupt:
        print(NUM)
复制代码

执行结果:多线程

问题分析

开启了多线程向数据库进行插入,没有在线程之间创建消息联系,形成一个线层已经插入的数据被另外的一个线程重复插入。app

解决方案01

去掉多线程,代码以下:测试

# 测试调度模块是否会重复执行任务
import logging
import threading
import time

import schedule

logger = logging.getLogger()
NUM = list()


def main_run():
    part_run()


# def run_threaded(job_func):
#     # 对于每个任务 开启一个进程去执行
#     job_thread = threading.Thread(target=job_func)
#     job_thread.start()


def part_run():
    # 模拟每 30 s执行一次的定时任务
    schedule.every(10).seconds.do(new_finance_update)

    # 在一个有阻塞的主程序中去运行
    while True:
        # logger.info(schedule.jobs)
        print(schedule.jobs)
        schedule.run_pending()
        time.sleep(10)


def new_finance_update():
    # 真正执行的任务

    # 表示第几回执行到这里
    global NUM
    print("=="*10, NUM)
    for i in range(100):
        print(i, end=" ")
        NUM.append(i)
        time.sleep(30)
    for j in list("/Users/furuiyang/PyEnv/demo/bin/python3.7 /Users/furuiyang/code2/jztask/tasks/re_task.py"):
        print(j, end=" ")
        NUM.append(j)
        time.sleep(30)


if __name__ == "__main__":
    try:
        main_run()
    except KeyboardInterrupt:
        print(NUM)
复制代码

中断时的运行结果:fetch

解决方案02

第二种是将具体要作的任务进行划分,而后放在一个队列中,每一个任务的完成之间互相不受到影响:ui

具体代码以下:spa

import threading
import time
from queue import Queue

import schedule

jobqueue = Queue()


def fetch_job():
    # 从队列中去获取任务
    job_func = jobqueue.get()
    return job_func


def cal_num(num):
    time.sleep(1)
    print(f"{num} 开始被计算")
    print()


def generate_job():
    # 根据具体的状况去生成任务
    for j in range(10):
        yield (cal_num, j)


def put_job():
    # 遍历生成的任务将其放入队列
    for job in generate_job():
        jobqueue.put(job)


def run_threaded():
    while not jobqueue.empty():
        # 取出而且执行
        job_func, j = fetch_job()
        # 对于每个任务 开启一个进程去执行 j 是任务参数
        job_thread = threading.Thread(target=job_func, args=(j,))
        job_thread.start()

    time.sleep(10)
    print("任务执行完毕")


def part_run():
    # 生成任务而且将其放入队列中
    put_job()

    # 每 10 s执行一次的定时任务 开启一个新的线程去执行
    schedule.every(10).seconds.do(run_threaded)

    # 在一个有阻塞的主程序中去运行
    while True:
        # logger.info(schedule.jobs)
        print(schedule.jobs)
        schedule.run_pending()
        time.sleep(10)


if __name__ == "__main__":
    part_run()
复制代码
相关文章
相关标签/搜索