machinery入门看这一篇(异步任务队列)

前言

哈喽,你们好,我是asong,此次给你们介绍一个go的异步任务框架machinery。使用过python的同窗们都知道Celery框架,machinery框架就相似于Celery框架。下面咱们就来学习一下machinery的基本使用。python

本身翻译一个粗略版的machinery中文文档,有须要的伙伴们公众号自取无水印版:后台回复:machinery便可领取。git

或者github下载:https://github.com/asong2020/Golang_Dream/tree/master/machinerygithub

抛砖引玉

咱们在使用某些APP时,登录系统后通常会收到一封邮件或者一个短信提示咱们在某个时间某某地点登录了。而邮件或短信都是在咱们已经登录后才收到,这里就是采用的异步机制。你们有没有想过这里为何没有使用同步机制实现呢?咱们来分析一下。假设咱们如今采用同步的方式实现,用户在登陆时,首先会去检验一下帐号密码是否正确,验证经过后去给用户发送登录提示信息,假如在这一步出错了,那么就会致使用户登录失败,这样是大大影响用户的体验感的,一个登录提示的优先级别并非很高,因此咱们彻底能够采用异步的机制实现,即便失败了也不会影响用户的体验。前面说了这么多,那么异步机制该怎么实现呢?对,没错,就是machinery框架,据说大家还不会使用它,今天我就写一个小例子,咱们一块儿来学习一下他吧。golang

特性

上面只是简单举了个例子,任务队列有着普遍的应用场景,好比大批量的计算任务,当有大量数据插入,经过拆分并分批插入任务队列,从而实现串行链式任务处理或者实现分组并行任务处理,提升系统鲁棒性,提升系统并发度;或者对数据进行预处理,按期的从后端存储将数据同步到到缓存系统,从而在查询请求发生时,直接去缓存系统中查询,提升查询请求的响应速度。适用任务队列的场景有不少,这里就不一一列举了。回归本文主题,既然咱们要学习machinery,就要先了解一下他都有哪些特性呢?web

  • 任务重试机制
  • 延迟任务支持
  • 任务回调机制
  • 任务结果记录
  • 支持Workflow模式:Chain,Group,Chord
  • 多Brokers支持:Redis, AMQP, AWS SQS
  • 多Backends支持:Redis, Memcache, AMQP, MongoDB

架构

任务队列,简而言之就是一个放大的生产者消费者模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。基于这种框架设计思想,咱们来看下machinery的简单设计结构图例:面试


  • Sender:业务推送模块,生成具体任务,可根据业务逻辑中,按交互进行拆分;
  • Broker:存储具体序列化后的任务,machinery中目前支持到Redis, AMQP,和SQS;
  • Worker:工做进程,负责消费者功能,处理具体的任务;
  • Backend:后端存储,用于存储任务执行状态的数据;

e.g

学习一门新东西,我都习惯先写一个demo,先学会了走,再学会跑。因此先来看一个例子,功能很简单,异步计算1到10的和。redis

先看一下配置文件代码:后端

broker: redis://localhost:6379

default_queue: "asong"

result_backend: redis://localhost:6379

redis:
  max_idle: 3
  max_active: 3
  max_idle_timeout: 240
  wait: true
  read_timeout: 15
  write_timeout: 15
  connect_timeout: 15
  normal_tasks_poll_period: 1000
  delayed_tasks_poll_period: 500
  delayed_tasks_key: "asong"

这里brokerresult_backend来实现。设计模式

主代码,完整版github获取:缓存


func main()  {

 cnf,err := config.NewFromYaml("./config.yml",false)
 if err != nil{
  log.Println("config failed",err)
  return
 }

 server,err := machinery.NewServer(cnf)
 if err != nil{
  log.Println("start server failed",err)
  return
 }

 // 注册任务
 err = server.RegisterTask("sum",Sum)
 if err != nil{
  log.Println("reg task failed",err)
  return
 }

 worker := server.NewWorker("asong"1)
 go func() {
  err = worker.Launch()
  if err != nil {
   log.Println("start worker error",err)
   return
  }
 }()

 //task signature
 signature := &tasks.Signature{
  Name: "sum",
  Args: []tasks.Arg{
   {
    Type:  "[]int64",
    Value: []int64{1,2,3,4,5,6,7,8,9,10},
   },
  },
 }

 asyncResult, err := server.SendTask(signature)
 if err != nil {
  log.Fatal(err)
 }
 res, err := asyncResult.Get(1)
 if err != nil {
  log.Fatal(err)
 }
 log.Printf("get res is %v\n", tasks.HumanReadableResults(res))

}

运行结果:

INFO: 2020/10/31 11:32:15 file.go:19 Successfully loaded config from file ./config.yml
INFO: 2020/10/31 11:32:15 worker.go:58 Launching a worker with the following settings:
INFO: 2020/10/31 11:32:15 worker.go:59 - Broker: redis://localhost:6379
INFO: 2020/10/31 11:32:15 worker.go:61 - DefaultQueue: asong
INFO: 2020/10/31 11:32:15 worker.go:65 - ResultBackend: redis://localhost:6379
INFO: 2020/10/31 11:32:15 redis.go:100 [*] Waiting for messages. To exit press CTRL+C
DEBUG: 2020/10/31 11:32:16 redis.go:342 Received new message: {"UUID":"task_9f01be1f-3237-49f1-8464-eecca2e50597","Name":"sum","RoutingKey":"asong","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"[]int64","Value":[1,2,3,4,5,6,7,8,9,10]}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
DEBUG: 2020/10/31 11:32:16 worker.go:261 Processed task task_9f01be1f-3237-49f1-8464-eecca2e50597. Results = 55
2020/10/31 11:32:16 get res is 55

好啦,如今咱们开始讲一讲上面的代码流程,

  • 读取配置文件,这一步是为了配置 brokerresult_backend,这里我都选择的是 redis,由于电脑正好有这个环境,就直接用了。
  • Machinery 库必须在使用前实例化。实现方法是建立一个 Server实例。 ServerMachinery配置和注册任务的基本对象。
  • 在你的 workders能消费一个任务前,你须要将它注册到服务器。这是经过给任务分配一个惟一的名称来实现的。
  • 为了消费任务,你需有有一个或多个worker正在运行。运行worker所须要的只是一个具备已注册任务的 Server实例。每一个worker将只使用已注册的任务。对于队列中的每一个任务,Worker.Process()方法将在一个goroutine中运行。可使用 server.NewWorker的第二参数来限制并发运行的worker.Process()调用的数量(每一个worker)。
  • 能够经过将 Signature实例传递给 Server实例来调用任务。
  • 调用 HumanReadableResults这个方法能够处理反射值,获取到最终的结果。

多功能

1. 延时任务

上面的代码只是一个简单machinery使用示例,其实machiney也支持延时任务的,能够经过在任务signature上设置ETA时间戳字段来延迟任务。

eta := time.Now().UTC().Add(time.Second * 20)
 signature.ETA = &eta

2. 重试任务

在将任务声明为失败以前,能够设置屡次重试尝试。斐波那契序列将用于在一段时间内分隔重试请求。这里可使用两种方法,第一种直接对tsak signature中的retryTimeoutRetryCount字段进行设置,就能够,重试时间将按照斐波那契数列进行叠加。

//task signature
 signature := &tasks.Signature{
  Name: "sum",
  Args: []tasks.Arg{
   {
    Type:  "[]int64",
    Value: []int64{1,2,3,4,5,6,7,8,9,10},
   },
  },
  RetryTimeout: 100,
  RetryCount: 3,
 }

或者,你可使用return.tasks.ErrRetryTaskLater 返回任务并指定重试的持续时间。

func Sum(args []int64) (int64, error) {
 sum := int64(0)
 for _, arg := range args {
  sum += arg
 }

 return sum, tasks.NewErrRetryTaskLater("我说他错了"4 * time.Second)

}

3. 工做流

上面咱们讲的都是运行一个异步任务,可是咱们每每作项目时,一个需求是须要多个异步任务以编排好的方式执行的,因此咱们就可使用machinery的工做流来完成。

3.1 Groups

Group 是一组任务,它们将相互独立地并行执行。仍是画个图吧,这样看起来更明了:

一块儿来看一个简单的例子:

 // group
 group,err :=tasks.NewGroup(signature1,signature2,signature3)
 if err != nil{
  log.Println("add group failed",err)
 }

 asyncResults, err :=server.SendGroupWithContext(context.Background(),group,10)
 if err != nil {
  log.Println(err)
 }
 for _, asyncResult := range asyncResults{
  results,err := asyncResult.Get(1)
  if err != nil{
   log.Println(err)
   continue
  }
  log.Printf(
   "%v  %v  %v\n",
   asyncResult.Signature.Args[0].Value,
   tasks.HumanReadableResults(results),
  )
 }

group中的任务是并行执行的。

3.2 chords

咱们在作项目时,每每会有一些回调场景,machiney也为咱们考虑到了这一点,Chord容许你定一个回调任务在groups中的全部任务执行结束后被执行。

来看一段代码:

callback := &tasks.Signature{
  Name: "call",
 }



 group, err := tasks.NewGroup(signature1, signature2, signature3)
 if err != nil {

  log.Printf("Error creating group: %s", err.Error())
  return
 }

 chord, err := tasks.NewChord(group, callback)
 if err != nil {
  log.Printf("Error creating chord: %s", err)
  return
 }

 chordAsyncResult, err := server.SendChordWithContext(context.Background(), chord, 0)
 if err != nil {
  log.Printf("Could not send chord: %s", err.Error())
  return
 }

 results, err := chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
 if err != nil {
  log.Printf("Getting chord result failed with error: %s", err.Error())
  return
 }
 log.Printf("%v\n", tasks.HumanReadableResults(results))

上面的例子并行执行task一、task二、task3,聚合它们的结果并将它们传递给callback任务。

3.3 chains

chain就是一个接一个执行的任务集,每一个成功的任务都会触发chain中的下一个任务。

看这样一段代码:

//chain
 chain,err := tasks.NewChain(signature1,signature2,signature3,callback)
 if err != nil {

  log.Printf("Error creating group: %s", err.Error())
  return
 }
 chainAsyncResult, err := server.SendChainWithContext(context.Background(), chain)
 if err != nil {
  log.Printf("Could not send chain: %s", err.Error())
  return
 }

 results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
 if err != nil {
  log.Printf("Getting chain result failed with error: %s", err.Error())
 }
 log.Printf(" %v\n", tasks.HumanReadableResults(results))

上面的例子执行task1,而后是task2,而后是task3。当一个任务成功完成时,结果被附加到chain中下一个任务的参数列表的末尾,最终执行callback任务。

文中代码地址:https://github.com/asong2020/Golang_Dream/tree/master/machinery/example

总结

这一篇文章到这里就结束了,machinery还有不少用法,好比定时任务、定时任务组等等,就不在这一篇文章介绍了。更多使用方法解锁能够观看machinery文档。由于machiney没有中文文档,因此我在学习的过程本身翻译了一篇中文文档,须要的小伙伴们自取。

获取步骤:关注公众号【Golang梦工厂】,后台回复:machiney便可获取无水印版~~~

好啦,这一篇文章到这就结束了,咱们下期见~~。但愿对大家有用,又不对的地方欢迎指出,可添加个人golang交流群,咱们一块儿学习交流。

结尾给你们发一个小福利吧,最近我在看[微服务架构设计模式]这一本书,讲的很好,本身也收集了一本PDF,有须要的小伙能够到自行下载。获取方式:关注公众号:[Golang梦工厂],后台回复:[微服务],便可获取。

我翻译了一份GIN中文文档,会按期进行维护,有须要的小伙伴后台回复[gin]便可下载。

我是asong,一名普普统统的程序猿,让gi我一块儿慢慢变强吧。我本身建了一个golang交流群,有须要的小伙伴加我vx,我拉你入群。欢迎各位的关注,咱们下期见~~~

推荐往期文章:


本文分享自微信公众号 - Golang梦工厂(AsongDream)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索