Sidekiq (8700✨)html
git : https://github.com/mperham/sidekiq mysql
https://www.cnblogs.com/richard1234/p/3829074.html (一篇文章,讲的比较清楚)git
https://wdxtub.com/2016/07/06/sidekiq-guide/ (wiki的翻译)github
进阶精华贴:https://ruby-china.org/topics/31470; https://ruby-china.org/topics/36825 web
Sidekiq 是Ruby社区最受欢迎的多线程的异步任务框架之一,几乎是Rails项目标配。redis
很是优秀的后台任务处理软件,其依赖Redis实现队列任务的增长,重试和调度等。 sql
在Web请求中,有不少任务是能够放到后台执行的,好比用户购买商品付款成功后,就能够直接向用户购买成功,相应的短信发送,物流通知等等就能够放到后台任务去作,不用在用户购买的同时当即执行,这些任务也称做异步任务。数据库
基本概念: api
Job
缓存
在 Sidekiq 中的 Job 指的是某一个任务的一次执行, 注意与咱们一般意义上说 “一个 Job” 指的是一类 Job.(一类任务)
Worker
由于 Sidekiq
是使用 Celluoid 来完成其多线程的控制的, 而 Celluoid 是 Ruby 中的多线程模式 Actor 模式的实现, 因此在 Sidekiq 中的 Worker 咱们以拟人的方式去理解. 我拥有一个工人, 不过有一点要区分的是这些 Worker 都是按照”操做手册”在执行任务, 因此他不会被限制在某一类任务上.
Queue
队列的意义在于区分任务而且让任务排队, Sidekiq 中将每一类的任务使用一个 Queue 来区分开.
按照视频步骤操做失败。 谷歌中文的帖子,最后成功。
- 根据教材安装gem 'sidekiq'
- 进行相关配置,下载的rails模版已经在sidekiq.rb中配置,并在routs.rb中加入 monitoring监控。
- rails g sidekiq:worker Hard
- 而后打开在方法perform(参数)内设置如puts("xxx")等等,
- 在控制器action,models中建立一个job。job是指某一个任务的一次执行,一般咱们说一个job🈯️的是一类job。
class HomeController < ApplicationController
def index
HardWorker.perform_async('北京天安门', 5)
end
end
⚠️ :也能够建立一个将来处理的job:HardWorker.perform_in(1.minutes, '北京王府', 5)
6. 启动服务器rails server。启动bundle exec sidekiq 还要启动redis-server.
7. 打开网页localhost:3000,及监控localhost:3000/sidekiq
8. 在terminal的bundle(fserver_watch),就是打开sidekig的那个窗,可看到每次处理job时的输出:
北京天安门: 5
2018-06-25T12:51:36.332Z 8891 TID-ovn7u3uq7 HardWorker JID-93c778e31d4cecfee0e3a3cd INFO: done: 0.022 sec
❌,从rails console中进行操做,提示错误 (已解决✅缘由多是没有rails db:migrate)。
)
> HardWorker.perform_async('bob', 5)
Traceback (most recent call last):
1: from (irb):1
NameError (uninitialized constant HardWorker)
✅返回 => "350788261bbeb33e6a7b187c"
如何访问 http://localhost:3000/sidekiq 进入监控页面
(点击查看Monitoring文档)
步骤:在routes.rb中设置:
- require 'sidekiq/web'
- 在Rails.application.routes.draw的块内,添加mount Sidekiq::Web => '/sidekiq'
Rails.application.routes.draw do
require 'sidekiq/web'
mount Sidekiq::Web => '/sidekiq'
root to: 'addresses#index'
resources :addresses
end
Sidekiq 管理 UI
在实际网站经营中,只有管理员才可以看监控,所以须要加上管理员的权限验证。在第2步:
config/routes.rb
Rails.application.routes.draw do
+ require 'sidekiq/web'
+ authenticate :user, lambda { |u| u.is_admin? } do + mount Sidekiq::Web => '/sidekiq'
+ end
is_admin?是user.rb中的验证方法。
mount(app, options=nil)是增长一个Rack-based application。例子:
mount(SomeRackApp => "some_route", as: '客制化helper's name')
新增config/sidekiq.yml
The Sidekiq configuration file is a YAML file that Sidekiq server uses to configure itself, by default located at config/sidekiq.yml
.
只有在使用高级功能时才须要建立这个文件,好比并发池such as concurrency pool size, 命名named queues, PID file location, etc.
---
:concurrency: 5 staging: :concurrency: 10 production: :concurrency: 20 :queues: - critical - default - low
Activejob能够和Sidekiq一块儿使用
之前的博客: https://www.cnblogs.com/chentianwei/p/9123322.html
操做文档: https://github.com/mperham/sidekiq/wiki/Active+Job
首先:
而后在config/application.rb中的 Application类中添加:
class Application < Rails::Application
# ...
config.active_job.queue_adapter = :sidekiq
end
或者在config/environments/developments或者 production.rb中添加。
也能够直接在在每一个job Basis内添加self.active_job = XXX
最后,在任意位置添加job queue.
ExamleJob.perform_later(参数)
(见之前的博客,enqueue, callback )
Activejob补充:
入队一个等待performed的job,当queueing system空闲的时候,就会当即执行:
CompanyJob.perform_later(参数)
入队一个在将来某个时间执行的job:
CompanyJob.set(wait_until: Date.tomorrow.noon).perform_later(参数)
当即执行一个job
CompanyJob.perform_now(参数)
set()方法:建立一个预制选项的job。根据选择中的参数来调用perform_later方法入队job。
option:
- wait: 指定的延长多少时间后执行入队job,如wait: 5.minutes
- wait_until: 指定到将来的某个时间点时,入队job
- priority: 10 入队这个job并设定优先级
- queue: :some_queue 入队这个job到指定的队列queue, 写队列名字
例子:
VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last)
queue_as :default : 设置队列的名字,默认在jobs/XXX_job.rb中自动生成,能够改为类名。
queue_name: 这个方法会调用队列的名字,如CompanyJob.queue_name返回:default
Active Job Callbacks (API)
6个钩子,after, before, around. 针对enqueue和perform
retry_job
ActiveJob::Exceptions中的实例方法。配合rescue_from使用。当从你的job中,营救一个exception,你可让Active Job再尝试执行这个job.⚠️和set()方法的option同样。
客制化 错误处理
Activejob不支持丰富的Sidekiq's retry feature.
它使用一个简单的抽象方式来解析retries,基于遭遇的指定的例外。
⚠️,rails server后,在网页上出现❌提示pasting
NameError in HomeController#index
uninitialized constant ExampleJob::ErrorLoadingSite
从rescue_frome()中,去掉ErrorLoadingSite参数,则能够执行。
class ExampleJob < ApplicationJob::base
queue_as :default
rescue_from(ErrorLoadingSite) do
retry_job wait: 5.minutes, queue: :low_priority
end
def perform(args)
puts "#{args} "
end
end
限制
Rails容许对象做为参数传入perform方法。
def perform(user)
user.send_welcome_email!
end
可是若是在job 入队后,perform方法调用以前,user记录被删除的状况下,会升起raise
所以:须要加上rescue_from
rescue_from ActiveJob::DeserializationError do |exception|
# handle a deleted user record
end
✅使用标准的Sidekiq写法,则是:
def perform(user_id)
user = User.find_by(id: user_id)
if user
user.send_welcome_email!
else
# handle a deleted user record
end
end
Action Mailer(没看)
Job ID (不知道用处)
ActiveJob有本身的Job ID。Rails5,能够获得Sidekiq's JID
> com = CompanyJob.perform_later
Enqueued CompanyJob (Job ID: 0c4eb163-60fc-4b91-a4d5-1098f50c8c65) to Sidekiq(default)
=>
#<CompanyJob:0x00007f858ef6c190 @arguments=[], @job_id="0c4eb163-60fc-4b91-a4d5-1098f50c8c65", @queue_name="default", @priority=nil, @executions=0, @provider_job_id="5c8f31d1e02d6deca5371bfa">
> com.provider_job_id =>"5c8f31d1e02d6deca5371bfa"
provider_job_id就是Sidekiq's JID
❌Activejob没有perform_in()方法
我不知道如何使用,延时功能。
Sidekiq是一个后台任务进程的框架。 经过在后台perform work,它让你测量你的app。
这须要3部分:
Client
Sidekiq客户端运行在Ruby应用进程中, 容许咱们建立jobs以后,过一段时间再进行处理。
2个方法来建立一个job:
MyWorker.perform_async(
1,
2,
3)
Sidekiq::Client.push(
'class' => MyWorker,
'args' => [
1,
2,
3])
|
2个方法等价,建立一个Hash表明这个job,序列化Hash为一个JSON string,并把string入栈(in Redis)。这意味参数只能是简单的JSON 数据类型,复杂的Ruby对象(Date,Time, ActiveRecord)会出现没法正常序列化sserialize的问题。
⚠️使用Activejob的话,能够传递记录对象做为参数。不过设置上要当心。
Redis
It holds all the job data along with runtime and historical data to power Sidekiq's Web UI. Redis 为Sidekiq提供数据存储。
See Using Redis for info about connecting to Redis.(暂时忽略)
Rails5.2提供了Redis Cache Store内建支持(使用哪一个缓存存储器由使用者决定。点击看博客
Redis 是一种小型数据结构key-value结构,做为搭配用的数据库来使用。咱们在百宝箱用 sidekiq 实做异步时看过它。
Server
每一个Sidekiq服务进程会从在Redis中的队列拉任务,并处理它们。和你的web进程同样,Sidekiq boots Rails,这样全部的jobs和workers就能使用所有的Rails API。服务器将实例化the worker并调用其中的perform方法及其参数。
1.让参数小而简单,若是使用Activejob,就能传递复杂对象。
2让job知足事物性。⚠️不明白
3.拥抱并发性。
Sidekiq是并发设计的,能够同时执行大量jobs.
你可使用链接池来限制总共的链接数量,若是你的Sidekqi进程占用太多流量。
Concurrency 并发
https://github.com/mperham/sidekiq/wiki/Advanced-Options#concurrency
默认一个进程建立25个threads。若是crushing你的机器i/o,你能够调整它:
sidekiq -c 10
另外不要设高于50的threads。在config/database.yml中,设置链接pool的数量最好和threads的数量相等。
production:
adapter: mysql2
database: foo_production
pool: 25
提供一些初始设置,给queue换个名字,设置retry的最大次数
- queue : use a named queue for this Worker, default 'default'
- retry : enable the RetryJobs middleware for this Worker, default true. Alternatively, you can specify the max. number of times a job is retried (ie. :retry => 3)
- backtrace : whether to save any error backtrace in the retry payload to display in web UI, can be true, false or an integer number of lines to save, default false. (不太懂)
class HardWorker
include Sidekiq::Worker
sidekiq_options :queue => :crawler, :retry => false, :backtrace => true
def perform(name, count)
end
end
Redis的使用: (点击看详细文档)
Sidekiq 使用 Redis 来保存全部的 job 和操做数据。默认会去链接位于 localhost:6379
的 Redis 服务器。可是在生产环境中须要自定义地址。
具体须要修改 config/initializers/sidekiq.rb
文件
若是要使用云服务器的话:具体看文档。
数据汇出也是一个经常使用异步来处理的任务,流程和汇入其实87分像:
- 创建 RegistraionExport model,这个 model 会纪录是那个 user 作汇出、是哪一个 event 要汇出,以及存储最后汇出的档案
- 创建一个 RegistrationExports controller,这个 controller 让用户能够新增汇出纪录,以及浏览汇出纪录
- 创建 ExportWorkerJob,这个异步任务会执行汇出操做,并将汇出的档案放到 RegistraionExport model 上
- 异步任务最后完成时,能够寄 E-mail 通知用户汇出的档案已经准备好了
最后,汇出和汇入的功能要完整实作的话,还须要考虑档案存储的位置。咱们用 carrierwave 上传的档案,默认是公开的。可是汇出和汇入的档案,应该也必需要检查有没有权限才行。这部分的实做牵扯到咱们使用哪一种档案服务器:
报名15分钟内没完成自动取消
异步处理能够设定延迟时间set方法
rails g job check_registration
app/jobs/check_registration_job.rb
class CheckRegistrationJob < ApplicationJob
queue_as :default
+ def perform(registration_id) + registration = Registration.find(registration_id) + + if registration.status != "confirmed" + registration.status = "cancalled" + registration.save! + end
app/controllers/registrations_controller.rb
class RegistrationsController < ApplicationController
before_action :find_event def create @registration = @event.registrations.new(registration_params) @registration.ticket = @event.tickets.find( params[:registration][:ticket_id] ) @registration.status = "pending" @registration.user = current_user @registration.current_step = 1 if @registration.save
flash[:notice] = "报名成功,请在15分钟内完成操做!" + CheckRegistrationJob.set( wait: 15.minutes ).perform_later(@registration.id)
注册过程被分解为3step,第一步是create,此时提示用户在"15分钟内完成操做。由于15分钟后会执行队列中的CheckRegistrationJob类中的方法perform。根据注册的状态status来判断是否确认。见⬆️。