最近在同步的时候老是遇到重复插入报错:python
发现是我同一个导入过程执行了两遍,我是使用 python 的任务调度模块 schedule 去执行的,模拟出任务的使用场景以下:数据库
# 测试调度模块是否会重复执行任务
import logging
import threading
import time
import schedule
logger = logging.getLogger()
NUM = list()
def main_run():
part_run()
def run_threaded(job_func):
# 对于每个任务 开启一个进程去执行
job_thread = threading.Thread(target=job_func)
job_thread.start()
def part_run():
# 模拟每 30 s执行一次的定时任务
schedule.every(10).seconds.do(run_threaded, new_finance_update)
# 在一个有阻塞的主程序中去运行
while True:
# logger.info(schedule.jobs)
print(schedule.jobs)
schedule.run_pending()
time.sleep(10)
def new_finance_update():
# 真正执行的任务
# 表示第几回执行到这里
global NUM
print("=="*10, NUM)
for i in range(100):
print(i, end=" ")
NUM.append(i)
time.sleep(30)
for j in list("/Users/furuiyang/PyEnv/demo/bin/python3.7 /Users/furuiyang/code2/jztask/tasks/re_task.py"):
print(j, end=" ")
NUM.append(j)
time.sleep(30)
if __name__ == "__main__":
try:
main_run()
except KeyboardInterrupt:
print(NUM)
复制代码
执行结果:多线程
开启了多线程向数据库进行插入,没有在线程之间创建消息联系,形成一个线层已经插入的数据被另外的一个线程重复插入。app
去掉多线程,代码以下:测试
# 测试调度模块是否会重复执行任务
import logging
import threading
import time
import schedule
logger = logging.getLogger()
NUM = list()
def main_run():
part_run()
# def run_threaded(job_func):
# # 对于每个任务 开启一个进程去执行
# job_thread = threading.Thread(target=job_func)
# job_thread.start()
def part_run():
# 模拟每 30 s执行一次的定时任务
schedule.every(10).seconds.do(new_finance_update)
# 在一个有阻塞的主程序中去运行
while True:
# logger.info(schedule.jobs)
print(schedule.jobs)
schedule.run_pending()
time.sleep(10)
def new_finance_update():
# 真正执行的任务
# 表示第几回执行到这里
global NUM
print("=="*10, NUM)
for i in range(100):
print(i, end=" ")
NUM.append(i)
time.sleep(30)
for j in list("/Users/furuiyang/PyEnv/demo/bin/python3.7 /Users/furuiyang/code2/jztask/tasks/re_task.py"):
print(j, end=" ")
NUM.append(j)
time.sleep(30)
if __name__ == "__main__":
try:
main_run()
except KeyboardInterrupt:
print(NUM)
复制代码
中断时的运行结果:fetch
第二种是将具体要作的任务进行划分,而后放在一个队列中,每一个任务的完成之间互相不受到影响:ui
具体代码以下:spa
import threading
import time
from queue import Queue
import schedule
jobqueue = Queue()
def fetch_job():
# 从队列中去获取任务
job_func = jobqueue.get()
return job_func
def cal_num(num):
time.sleep(1)
print(f"{num} 开始被计算")
print()
def generate_job():
# 根据具体的状况去生成任务
for j in range(10):
yield (cal_num, j)
def put_job():
# 遍历生成的任务将其放入队列
for job in generate_job():
jobqueue.put(job)
def run_threaded():
while not jobqueue.empty():
# 取出而且执行
job_func, j = fetch_job()
# 对于每个任务 开启一个进程去执行 j 是任务参数
job_thread = threading.Thread(target=job_func, args=(j,))
job_thread.start()
time.sleep(10)
print("任务执行完毕")
def part_run():
# 生成任务而且将其放入队列中
put_job()
# 每 10 s执行一次的定时任务 开启一个新的线程去执行
schedule.every(10).seconds.do(run_threaded)
# 在一个有阻塞的主程序中去运行
while True:
# logger.info(schedule.jobs)
print(schedule.jobs)
schedule.run_pending()
time.sleep(10)
if __name__ == "__main__":
part_run()
复制代码