返回顶部
分享到

Python快速实现定时器的五种常见方法

python 来源:互联网 作者:佚名 发布时间:2025-07-30 17:59:31 人浏览
摘要

1.sleep法(阻塞) 通过 while + sleep 实现定时任务 (1) 存在时间漂移(等待运行时间) 1 2 3 4 5 6 7 import time def loopMonitor(): while True: MonitorSystem() # 1min检查一次 time.sleep(60) loopMonitor() (2) 维护sleep时间

1.sleep法(阻塞)

通过 while + sleep 实现定时任务

(1) 存在时间漂移(等待运行时间)

1

2

3

4

5

6

7

import time

def loopMonitor():

    while True:

        MonitorSystem()

        # 1min检查一次

        time.sleep(60)

loopMonitor()

(2) 维护sleep时间

1

2

3

4

5

6

7

import time

def loopMonitor():

    while True:

        MonitorSystem()

        #2s检查一次

        time.sleep(60.0 - ((time.time() - starttime) % 60.0))

loopMonitor()

2.自定义法 (+线程、非阻塞)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

import threading

import time

  

class RepeatedTimer(object):

  def __init__(self, interval, function, *args, **kwargs):

    self._timer = None

    self.interval = interval

    self.function = function

    self.args = args

    self.kwargs = kwargs

    self.is_running = False

    self.next_call = time.time()

    self.start()

  

  def _run(self):

    self.is_running = False

    self.start()

    self.function(*self.args, **self.kwargs)

  

  def start(self):

    if not self.is_running:

      self.next_call += self.interval

      self._timer = threading.Timer(self.next_call - time.time(), self._run)

      self._timer.start()

      self.is_running = True

  

  def stop(self):

    self._timer.cancel()

    self.is_running = False

     

     

from time import sleep

def hello(name):

    print "Hello %s!" % name

  

print "starting..."

rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()

try:

    sleep(5) # your long-running job goes here...

finally:

    rt.stop() # better in a try/finally block to make sure the program ends!

3.Twisted

更加健壮,并且实现了许多功能:守护进程、日志记录或异常处理

1

2

3

4

5

6

7

8

9

10

11

12

from twisted.internet import task, reactor

  

timeout = 60.0 # Sixty seconds

  

def doWork():

    #do work here

    pass

  

l = task.LoopingCall(doWork)

l.start(timeout) # call every sixty seconds

  

reactor.run()

4.schedule (简单方便,推荐)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

import schedule

import time

  

def job():

    print("I'm working...")

  

#schedule.every(1)创建Job, seconds.do(func)按秒间隔查询并执行

schedule.every(1).seconds.do(func)

#添加任务按分执行

schedule.every(1).minutes.do(func)

#添加任务按天执行

schedule.every(1).days.do(func)

#添加任务按周执行

schedule.every().weeks.do(func)

#添加任务每周一执行,执行时间为下周一这一时刻时间

schedule.every().monday.do(func)

  

  

schedule.every().monday.at("12:00").do(job)

# 每隔10分钟执行一次任务

schedule.every(10).minutes.do(job)

# 每隔一小时执行一次任务

schedule.every().hour.do(job)

# 每天10:30执行一次任务

schedule.every().day.at("10:30").do(job)

# 每隔5到10分钟运行一次任务?

schedule.every(5).to(10).minutes.do(job)

# 每周一的这个时候执行一次任务

schedule.every().monday.do(job)

# 每周三 13:15执行一次任务

schedule.every().wednesday.at("13:15").do(job)

# # 每分钟的第17秒执行任务 

schedule.every().minute.at(":17").do(job)

  

while True:

    schedule.run_pending()   # 检测是否有到期任务

    time.sleep(1)  # 避免一直查询cpu过高。sleep时间不应该小于执行时间,否则会线程堆积。

5.Apscheduler (功能多,推荐)

调度器(scheduler)

BlockingScheduler: 调度器在当前进程的主线程中运行,会阻塞当前线程。

BackgroundScheduler: 调度器在后台线程中运行,不会阻塞当前线程。

AsyncIOScheduler: 结合asyncio模块一起使用。

GeventScheduler: 程序中使用gevent作为IO模型和GeventExecutor配合使用。

TornadoScheduler: 程序中使用Tornado的IO模型,用 ioloop.add_timeout 完成定时唤醒。

TwistedScheduler: 配合TwistedExecutor,用reactor.callLater完成定时唤醒。

QtScheduler: 应用是一个Qt应用,需使用QTimer完成定时唤醒。

触发器(trigger)

date是最基本的一种调度,作业任务只会执行一次。

interval触发器,固定时间间隔触发。

cron 触发器,在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。

作业存储(job store)

添加任务,有两种添加方法,一种add_job(), 另一种是scheduled_job()修饰器来修饰函数。

1

2

3

4

5

6

7

8

9

10

11

12

from datetime import datetime

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

  

# 第一种

@scheduler.scheduled_job(job_func, 'interval', seconds=10)

def timed_task():    

    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

     

# 第二种

scheduler.add_job(timed_task, 'interval', seconds=5)

scheduler.start()

删除任务,两种方法:remove_job() 和 job.remove()。remove_job()是根据任务的id来移除,所以要在任务创建的时候指定一个 id。job.remove()则是对任务执行remove方法。

1

2

3

4

5

scheduler.add_job(job_func, 'interval', seconds=20, id='one')

scheduler.remove_job(one)

  

task = add_job(task_func, 'interval', seconds=2, id='job_one')

task.remvoe()

获取任务列表,通过scheduler.get_jobs()方法能够获取当前调度器中的所有任务的列表

1

tasks = scheduler.get_jobs()

关闭任务,使用scheduler.shutdown()默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。

1

2

scheduler.shutdown()

scheduler.shutdown(wait=false)

注意点

时区问题

报错:ValueError: Timezone offset does not match system offset: 0 != 28800. Please, check your config files.

分析:通过分析异常日志,发现APScheduler的默认timezone,而“0”是获取的系统环境变量的TZ时间28800对应timezone为“Asia/Shanghai”, 而0对应timezone为“UTC”,所以需将系统环境变量的时区与APScheduler的时区设置为一致。

解决:

1

2

3

4

5

6

7

8

#!/usr/bin/python

# -*- coding: utf-8 -*-

  

from apscheduler.schedulers.background import BackgroundScheduler   

import os

  

os.environ['TZ']= "Asia/Shanghai"

scheduler = BackgroundScheduler(timezone="Asia/Shanghai")

其他解决方案:

如果部署应用dockerfile配置,也可以在dockerfile中设定系统时区。


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 :
相关文章
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计