广告位联系
返回顶部
分享到

Golang实现CronJob(定时任务)的方法

Golang 来源:互联网 作者:佚名 发布时间:2023-04-05 20:15:41 人浏览
摘要

最近做了一个需求,是定时任务相关的。以前定时任务都是通过 linux crontab 去实现的,现在服务上云(k8s)了,尝试了 k8s 的 CronJob,由于公司提供的是界面化工具,使用、查看起来很不方

最近做了一个需求,是定时任务相关的。以前定时任务都是通过 linux crontab 去实现的,现在服务上云(k8s)了,尝试了 k8s 的 CronJob,由于公司提供的是界面化工具,使用、查看起来很不方便。于是有了本文,通过一个单 pod 去实现一个常驻服务,去跑定时任务。

经过筛选,选用了cron这个库,它支持 linux cronjob 语法取配置定时任务,还支持@every 10s、@hourly 等描述符去配置定时任务,完全满足我们要求,比如下面的例子:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

package main

 

import (

    "fmt"

 

    "github.com/natefinch/lumberjack"

    "github.com/robfig/cron/v3"

    "github.com/sirupsen/logrus"

)

 

type CronLogger struct {

    clog *logrus.Logger

}

 

func (l *CronLogger) Info(msg string, keysAndValues ...interface{}) {

    l.clog.WithFields(logrus.Fields{

        "data": keysAndValues,

    }).Info(msg)

}

 

func (l *CronLogger) Error(err error, msg string, keysAndValues ...interface{}) {

    l.clog.WithFields(logrus.Fields{

        "msg":  msg,

        "data": keysAndValues,

    }).Warn(err.Error())

}

 

func main() {

    logger := logrus.New()

    _logger := &lumberjack.Logger{

        Filename:   "./test.log",

        MaxSize:    50,

        MaxAge:     15,

        MaxBackups: 5,

    }

 

    logger.SetOutput(_logger)

    logger.SetFormatter(&logrus.JSONFormatter{

        DisableHTMLEscape: true,

    })

 

    c := cron.New(cron.WithLogger(&CronLogger{

        clog: logger,

    }))

    c.AddFunc("*/5 * * * *", func() {

        fmt.Println("你的流量包即将过期了")

    })

    c.AddFunc("*/2 * * * *", func() {

        fmt.Println("你的转码包即将过期了")

    })

    c.Start()

 

    for {

        select {}

    }

}

使用了 cronjob、并结合了 golang 的 log 组建,输出日志到文件,使用很方便。

但是,在使用过程中,发现还有些不足,缺少某些功能,比如我很想使用的查看任务列表。

类库介绍

扩展性强

此类库扩展性挺强,通过 JobWrapper 去包装一个任务,NewChain(w1, w2, w3).Then(job),相关实现如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

type JobWrapper func(Job) Job

type Chain struct {

    wrappers []JobWrapper

}

func NewChain(c ...JobWrapper) Chain {

    return Chain{c}

}

func (c Chain) Then(j Job) Job {

    for i := range c.wrappers {

        j = c.wrappers[len(c.wrappers)-i-1](j)

    }

    return j

}

比如当前脚本如果还没有执行完,下次任务时间又到了,就可以通过如下默认提供的 wrapper 去避免继续执行。可以看到最后执行的任务 j.Run() 被包装在了一个函数闭包中,并且根据闭包中的 channel 去判断是否执行,避免重复执行。首次执行的时候,容量为 1 的 channel 中已经有数据了,重复执行时,channel 无数据,默认跳过,等上次任务执行完成后,又像 channel 中写入一条数据,下次 channel 可以读出数据,又可以执行任务了:

1

2

3

4

5

6

7

8

9

10

11

12

13

func SkipIfStillRunning(j Job) Job {

    var ch = make(chan struct{}, 1)

    ch <- struct{}{}

    return FuncJob(func() {

        select {

        case v := <-ch:

            defer func() { ch <- v }()

            j.Run()

        default:

            // "skip"

        }

    })

}

主流程

cron 主流程是启动一个协程,里面有双重 for 循环,下面我们来一起分析一下。

定时器

第一层循环,首先计算下次最早执行任务的时间跟当前时间间隔 gap,然后设置定时器为 gap,这里很巧妙,定时器间隔不是 1s/次,而是跟下次任务的时间相关,这样就避免了无用的定时器循环,也让执行时间更精准,不存在设置小了浪费资源,设置大了误差大的情况。接下来进入第二层循环。

1

2

sort.Sort(byTime(c.entries))

timer = time.NewTimer(c.entries[0].Next.Sub(now))

事件循环

事件循环中,包含了很多事件,比如 添加任务、停止、移除任务,当 cron 启动之后,这些任务都是异步的。比如添加任务,不会直接将任务信息写入内存中,而是进入事件循环,加入之后,重新计算第一二层循环,避免了正在修改任务信息,又执行任务信息,然后出错的情况。

有人可能会问了,为何不在事件中加锁,这样也能避免内存竞争。我想说,我们执行的是脚本任务,有的事件可能很长,可能会阻塞有些事件,所以这些事件都放在循环中,避免了加锁,也满足了要求。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

for {

    select {

    case now = <-timer.C:

        // 执行任务

    case newEntry := <-c.add:

        // 添加任务

    case replyChan := <-c.snapshot:

        // 获取任务信息

    case <-c.stop:

        //  停止任务

    case id := <-c.remove:

        // 移除任务

    }

    break

}

类库改造

在了解了项目的基本情况之后,对项目做了部分改造,方便使用。

打印任务列表信息

在主循环汇总加入了信号量监听,当触发信号量 SIGUSR1,将任务信息输出到日志:

1

2

3

4

5

6

7

8

9

10

11

usrSig := make(chan os.Signal, 1)

signal.Notify(usrSig, syscall.SIGUSR1)

 

for {

    select {

    case <-usrSig:

        // 启动单独的协程去打印定时任务执行信息

        continue

    }

    break

}

根据名称移除脚本

目前脚本只能根据脚本 id 去移除要执行的任务,执行过程中,也不能通过命令去移除任务,不是太方便。比如有个脚本马上要执行了,但是该脚本发现问题了,这时候生产环境的话,就需要更新代码,然后重启服务去下线脚本任务,这时候,黄花菜可能都凉了。

所以我也是通过信号量,来处理运行之后,运行中移除任务的问题,收到信号量之后,读取文件中的内容,根据命令去处理 runing 中的内存:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

usrSig2 := make(chan os.Signal, 1)

signal.Notify(usrSig2, syscall.SIGUSR2)

 

......

case <-usrSig2:

    actionByte, err := os.ReadFile("/tmp/cron.action")

    ...... //校验命令正确性

    action := strings.Fields(string(actionByte))

    switch action[0] {

    case "removeTag":

        timer.Stop()

        now = c.now()

        c.removeEntryByTag(action[1])

        c.logger.Info("removedByTag", "tag", action[1])

    }

......

改造效果

由于原项目已经 2 年多没有个更新过了,就算发起 pr 估计也不会被处理,所以 fork 一份放在了这里aizuyan/cron进行改造,下面是改进之后的代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

package main

 

import (

    // 加载配置文件

 

    "fmt"

 

    "github.com/aizuyan/cron/v3"

)

 

func main() {

    c := cron.New(cron.WithLogger(cron.DefaultLogger))

    c.AddFuncWithTag("流量包过期", "*/5 * * * *", func() {

        fmt.Println("你的流量包即将过期了")

    })

    c.AddFuncWithTag("转码包过期", "*/2 * * * *", func() {

        fmt.Println("你的转码包即将过期了")

    })

    c.Start()

 

    for {

        select {}

    }

}

对每个定时任务增加了一个名称标识,当任务启动后,当我们执行 kill -SIGUSR1 <pid> 的时候,会看到 stdout 输出了运行的任务列表信息:

+----+------------+-------------+---------------------+---------------------+
| ID |    TAG     |    SPEC     |        PREV         |        NEXT         |
+----+------------+-------------+---------------------+---------------------+
|  2 | 转码包过期 | */2 * * * * | 0001-01-01 00:00:00 | 2023-04-02 17:22:00 |
|  1 | 流量包过期 | */5 * * * * | 0001-01-01 00:00:00 | 2023-04-02 17:25:00 |
+----+------------+-------------+---------------------+---------------------+

执行 kill -SIGUSR2 <pid>,移除转码包过期任务,避免了使用 ID 容易出错的问题。

1

2

3

cat /tmp/cron.action

removeTag 转码包过期

// {"data":["tag","转码包过期"],"level":"info","msg":"removedByTag","time":"2023-04-02T18:32:56+08:00"}

放目前为止,是不是更好用了,基本能满足我们的需求了,也可以自己去再做各种扩展。


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 : https://www.cnblogs.com/iforever/p/17283156.html
相关文章
  • GO利用channel协调协程的实现介绍
    go 当中的并发编程是通过goroutine来实现的,利用channel(管道)可以在协程之间传递数据,实现协程的协调与同步。 使用 新建一个管道,使
  • Golang实现CronJob(定时任务)的方法
    最近做了一个需求,是定时任务相关的。以前定时任务都是通过 linux crontab 去实现的,现在服务上云(k8s)了,尝试了 k8s 的 CronJob,由于公司
  • goland中导包报红和go mod问题

    goland中导包报红和go mod问题
    goland导包报红 1. 原理 import的包有两类: (1)在go.mod中有地址的,这种需要拉到gopath/pkg下 (2)没在go.mod的,这种在项目里,import的路径需
  • Hugo游乐场内容初始化示例介绍
    使用Hugo构建站点的体验很棒。 首先是构建速度快,其次是使用起来简单,一个hugo命令,我们的站点就已经就绪。 在构建过程中,Hugo提供了
  • 使用Golang快速构建出命令行应用程序
    在日常开发中,大家对命令行工具(CLI)想必特别熟悉了,如果说你不知道命令工具,那你可能是个假开发。每天都会使用大量的命令行工
  • go开源Hugo站点渲染之模板词法解析

    go开源Hugo站点渲染之模板词法解析
    Deps在准备好NewPathSpec,NewSpec,NewContentSpec,NewSourceSpec后,调用onCreate正式创建HugoSites,并在最后一步,加载模板执行器。 模板执行器只是提
  • 向Rust学习Go考虑简单字符串插值特性示例解析
    fmt.Printf或fmt.Sprintf写拼装字符串业务 在日常开发 Go 工程中,我们经常会用fmt.Printf或fmt.Sprintf去写类似的拼装字符串的业务。 如下代码:
  • Golang实现将中文转化为拼音

    Golang实现将中文转化为拼音
    导语:新用户入职 创建一系列账号比较麻烦,打算通过接口传入姓名进行初始化。想把姓名转化成拼音。因为有些账号即需要中文也需要英
  • Go语言实现猜谜小游戏

    Go语言实现猜谜小游戏
    本文是介绍用Go实现一个猜谜小游戏,就是程序先生成一个随机数num,然后让用户来猜生成的数是多少,如果猜的数比num小就提示用户猜测的
  • go微服务PolarisMesh源码解析服务端启动流程
    polaris-server作为PolarisMesh的控制面,该进程主要负责服务数据、配置数据、治理规则的管理以及下发至北极星SDK以及实现了xDS的客户端。 po
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计