Rails 4.0 先睹为快:作业队列

来源:开源中国社区 作者:红薯
  

Rails 最近增加了一个作业队列系统,让我们来看看如何使用。

Run, baby, run!

这个队列 API 非常简单,你将对象放到队列中,而这个对象需要提供一个名为 run 的方法,下面是个简单例子:

 
1 class TestJob
2 def run
3 puts "I am running!"
4 end
5 end
6  
7 Rails.queue.push(TestJob.new)
8 => "I am running!"

 

对大多数人来说,这已足够。队列是在一个独立的线程中运行,因此你的应用不会因为一些很复杂的作业而导致无响应

Rails 中的基本队列并不是一个长期的解决方案,其目的仅仅是提供一个通用的 API 可以用来支持更可靠的队列系统。例如当你需要从 Resque 切换到 Sidekiq,你不需要更改你应用代码,你只需关心对象进入队列以及编组。

你也可以编写自己的队列,下面是一个简单的队列实现:

 
1 class MyQueue
2 def push(job)
3 job.run
4 end
5 end

 

 

要使用你自定义的队列,只需要在 application.rb 中设置:

 
1 config.queue = MyQueue

 

上面例子来源于 Rails 的测试套件,它定义了一个非异步的队列,当作业被放到队列中便立即执行。下面让我们开发一个实际的作业,无需依赖于 Queue 类。

 
01 class MyQueue
02 def initialize
03 @queue = []
04 end
05  
06 def push(job)
07 @queue.push(job)
08 end
09  
10 def pop
11 @queue.pop
12 end
13 end

 

这个例子我们实现了一个简单的队列,接下来你需要告诉 Rails 的 QueueConsumer 来使用这个队列,可以在 application.rb 的 initializer 块中设置:

 
1 intializer 'start queue consumer' do |app|
2 qpp.queue_consumer = config.queue_consumer.start(app.queue)
3 at_exit { app.queue.consumer.shutdown }
4 end

然后我们将新的作业放到队列中:

 
1 Rails.queue.push(TestJob.new)

 

 

 

啥也没有,为什么?检查 QueueConsumer:

 
1 Rails.application.queue_consumer
2 => #<Rails::Queueing::ThreadedConsumer @queue=#<MyQueue @queue=[]>, @thread=#<Thread dead>>

 

 

然后你会发现线程死了,我们可以强行要求队列处理:

 
1 Rails.application.queue_consumer.start
2 => "I am running!"

 

 

回过头来看看到底发生了什么。首先我们找到 ThreadedConsumer#start

 
01 def start
02 @thread = Thread.new do
03 while job = @queue.pop
04 begin
05 job.run
06 rescue Exception => e
07 handle_exception e
08 end
09 end
10 end
11 self
12 end

 

 

 

这个线程只有在 @queue.pop 返回一个 true 值的时候才会运行,这不太合理,我们需要不断的将对象推到队列中,让我们来看看 Queue#pop 发生了什么:

 
01 # Retrieves data from the queue. If the queue is empty, the calling thread is
02 # suspended until data is pushed onto the queue. If +non_block+ is true, the
03 # thread isn't suspended, and an exception is raised.
04 #
05 def pop(non_block=false)
06 while true
07 @mutex.synchronize do
08 @waiting.delete(Thread.current)
09 if @que.empty?
10 raise ThreadError, "queue empty" if non_block
11 @waiting.push Thread.current
12 @resource.wait(@mutex)
13 else
14 retval = @que.shift
15 @resource.signal
16 return retval
17 end
18 end
19 end
20 end

 

 

终于有点明白了,Queue#pop 是一个无限的循环在等待其需要的内容。而我们简单的 MyQueue 类在 ThreadConsumer#start 调用的时候会返回 nil,因此队列里没对象,线程就结束了。甚至当我们往队列里放对象时,再次 pop 操作后也会结束。

简单起见,只需要让 MyQueue 继承 Queue 即可:

 
1 class MyQueue < Queue
2 end

 

 

现在我们可以:

 
1 Rails.queue.push(TestJob.new)
2 => "I am running!"

 

Rails 4.0 中的队列系统是一个非常简单的解决方案,我们期待正式版的发布,能够支持更多更好的后台作业处理库。

需要注意的是,目前的队列还是 beta 版本,API 可能还有有所更改。


时间:2012-06-26 07:52 来源:开源中国社区 作者:红薯 原文链接

好文,顶一下
(0)
0%
文章真差,踩一下
(0)
0%
------分隔线----------------------------


把开源带在你的身边-精美linux小纪念品
无觅相关文章插件,快速提升流量