Rails 最近增加了一个作业队列系统,让我们来看看如何使用。
Run, baby, run!
这个队列 API 非常简单,你将对象放到队列中,而这个对象需要提供一个名为 run 的方法,下面是个简单例子:
1 |
class TestJob |
2 |
def run |
3 |
puts "I am running!" |
4 |
end |
5 |
end |
6 |
7 |
Rails.queue.push(TestJob. new ) |
8 |
=> "I am running!" |
对大多数人来说,这已足够。队列是在一个独立的线程中运行,因此你的应用不会因为一些很复杂的作业而导致无响应
Rails 中的基本队列并不是一个长期的解决方案,其目的仅仅是提供一个通用的 API 可以用来支持更可靠的队列系统。例如当你需要从 Resque 切换到 Sidekiq,你不需要更改你应用代码,你只需关心对象进入队列以及编组。
你也可以编写自己的队列,下面是一个简单的队列实现:
1 |
class MyQueue |
2 |
def push(job) |
3 |
job.run |
4 |
end |
5 |
end |
要使用你自定义的队列,只需要在 application.rb 中设置:
1 |
config.queue = MyQueue |
上面例子来源于 Rails 的测试套件,它定义了一个非异步的队列,当作业被放到队列中便立即执行。下面让我们开发一个实际的作业,无需依赖于 Queue 类。
01 |
class MyQueue |
02 |
def initialize |
03 |
@queue = [] |
04 |
end |
05 |
06 |
def push(job) |
07 |
@queue .push(job) |
08 |
end |
09 |
10 |
def pop |
11 |
@queue .pop |
12 |
end |
13 |
end |
这个例子我们实现了一个简单的队列,接下来你需要告诉 Rails 的 QueueConsumer 来使用这个队列,可以在 application.rb 的 initializer 块中设置:
1 |
intializer 'start queue consumer' do |app| |
2 |
qpp.queue_consumer = config.queue_consumer.start(app.queue) |
3 |
at_exit { app.queue.consumer.shutdown } |
4 |
end |
然后我们将新的作业放到队列中:
1 |
Rails.queue.push(TestJob. new ) |
啥也没有,为什么?检查 QueueConsumer:
1 |
Rails.application.queue_consumer |
2 |
=> #<Rails::Queueing::ThreadedConsumer @queue=#<MyQueue @queue=[]>, @thread=#<Thread dead>> |
然后你会发现线程死了,我们可以强行要求队列处理:
1 |
Rails.application.queue_consumer.start |
2 |
=> "I am running!" |
回过头来看看到底发生了什么。首先我们找到 ThreadedConsumer#start
01 |
def start |
02 |
@thread = Thread . new do |
03 |
while job = @queue .pop |
04 |
begin |
05 |
job.run |
06 |
rescue Exception => e |
07 |
handle_exception e |
08 |
end |
09 |
end |
10 |
end |
11 |
self |
12 |
end |
这个线程只有在 @queue.pop 返回一个 true 值的时候才会运行,这不太合理,我们需要不断的将对象推到队列中,让我们来看看 Queue#pop 发生了什么:
01 |
# Retrieves data from the queue. If the queue is empty, the calling thread is |
02 |
# suspended until data is pushed onto the queue. If +non_block+ is true, the |
03 |
# thread isn't suspended, and an exception is raised. |
04 |
# |
05 |
def pop(non_block= false ) |
06 |
while true |
07 |
@mutex .synchronize do |
08 |
@waiting .delete( Thread .current) |
09 |
if @que .empty? |
10 |
raise ThreadError, "queue empty" if non_block |
11 |
@waiting .push Thread .current |
12 |
@resource .wait( @mutex ) |
13 |
else |
14 |
retval = @que .shift |
15 |
@resource .signal |
16 |
return retval |
17 |
end |
18 |
end |
19 |
end |
20 |
end |
终于有点明白了,Queue#pop 是一个无限的循环在等待其需要的内容。而我们简单的 MyQueue 类在 ThreadConsumer#start 调用的时候会返回 nil,因此队列里没对象,线程就结束了。甚至当我们往队列里放对象时,再次 pop 操作后也会结束。
简单起见,只需要让 MyQueue 继承 Queue 即可:
1 |
class MyQueue < Queue |
2 |
end |
现在我们可以:
1 |
Rails.queue.push(TestJob. new ) |
2 |
=> "I am running!" |
Rails 4.0 中的队列系统是一个非常简单的解决方案,我们期待正式版的发布,能够支持更多更好的后台作业处理库。
需要注意的是,目前的队列还是 beta 版本,API 可能还有有所更改。