最近PrePANの開発を手伝っていて、Workerの仕組みをQudoで作りました。初めてWorkerの仕組みを一から作ったのでメモしておきます。
Worker処理に必要な部品、それぞれのQudoでの実装、Workerプロセスを管理するためのdaemontools、capistranoでのdeployという順番で書いていきます。
Workerとは
ざっくり言うと非同期に色々実行するための仕組みです。perlだとTheSchwartz、Qudo、Jonkとかがあります。
Worker処理に必要な部品
今回作ってみて、Worker処理は大きく分けて次の三つくらいのものが必要だと分かりました。
- ApplicationからJobをinsertする部分(Qudo)
- 実際のJobの処理(Qudo::Worker)
- Jobの実行を管理して、Jobに処理を委譲する部分(Qudo, Qudo::Parallel::Manager)
それぞれのQudoでの実装
Qudoはid:nekokakさんやid:masartzさんが作ったWorker処理を実装するためのモジュールです。TheSchwartzとかと比べるとかなり薄く、実装も簡単に出来ました。
Qudo用のDBの作成
Qudoを使うためにはdatabaseが必要なので作っておきます。mysqlで作るのなら以下のコマンドで作ります。
qudo --db=qudo --user=root --rdbms=mysql --use_innodb
ApplicationからJobをinsertする部分
Qudoの場合、insert部分はQudoが担当しています。僕は下のような感じで継承して使いました。
ちなみにour $instanceにしてるのはsingletonにしたかったからです。あとdefault_hooksとかでargをJSONシリアライズするようにはしています。
package Sample::Qudo::Client; use strict; use warnings; use base qw(Qudo); sub new { our $instance ||= do { my ($class, $args) = @_; $class->SUPER::new( databases => [+{ dsn => 'dbi:mysql:qudo', username => 'root', password => '', }], default_hooks => ['Qudo::Hook::Serialize::JSON'], %{$args || {}}, ); }; } 1;
あとは処理したいjobのクラスとか、parameterを指定してenqueueするだけです。
my $client = Sample::Qudo::Client->new; $client->enqueue( 'Sample::Worker::Echo', { arg => {message => 'echo message'} }, );
簡単ですね
実際のJobの処理
QudoだとQudo::Workerが担当しています。job queueに入ってるmessageをprintするだけのWorkerだと下のように実装します。
package Sample::Worker::Echo; use strict; use warnings; use parent qw(Qudo::Worker); sub work { my ($self , $job ) = @_; my $arg = $job->arg; my $message = $arg->{message}; if (defined $message) { print $message; } $job->completed(); # or $job->abort } sub max_retries { 2 } sub retry_delay { 10 } 1;
処理はworkメソッドに書いていきます。うまくいったらcompleted()を、失敗したらabort()を呼ぶのがポイントですね。
あとmax_retriesとかretry_delayとかメソッドを作っておくと、エラーが起こったときに指定した分だけretryしてくれます。
簡単ですね!
Jobの実行を管理して、Jobに処理を委譲する部分
Qudoでは、こちらもQudoが担当しています。ただし、今回はpreforkモデルで実行したかったのでQudo::Parallel::Managerを使いました。
script/worker.plというスクリプトを作って、下のように実装しました。
#! /usr/bin/env perl use strict; use warnings; use FindBin; use lib "$FindBin::Bin/../lib"; use Qudo::Parallel::Manager; my $manager = Qudo::Parallel::Manager->new( databases => [+{ dsn => 'dbi:mysql:qudo', username => 'root', password => '', }], default_hooks => ['Qudo::Hook::Serialize::JSON'], manager_abilities => [ 'Sample::Worker::Echo', ], work_delay => 5, max_workers => 3, min_spare_workers => 1, auto_load_worker => 1, debug => 1, ); $manager->run; # start fork and work.
ポイントはmanager_abilitiesに実行したいworkerクラスを入れておくことくらいですかね。
簡単ですね!!!
実際に動かしてみる
ここまで来たらあとは実際にworkerを動かすだけです。
perl script/worker.pl
と実行して、ジョブキューにenqueueしてあげると、処理されてる様子がわかります。
これでWorkerのApplication側の実装は終わりました。
Qudo便利、超簡単ですね!!!!!
Workerプロセスを管理するためのdaemontools
さてここまででApplication側の実装が終わったわけですが、実際に本番環境でworkerプロセスを動かさなければいけません。
上に書いてあるみたいにサーバでperl script/worker.plしてもいいんですが、logが取りづらいし、死活管理もできないので、おすすめしません。今回はdaemontoolsを利用してworkerプロセスの管理を行いました。
runスクリプト
daemontools用にまずrunスクリプトを書きます。下のようにしてみました。
worker.run.sh
#!/bin/sh exec 2>&1 export PLACK_ENV=production export APPROOT=/home/app/www/Sample-Worker/current cd $APPROOT || exit 1 exec setuidgid app /usr/bin/perl script/worker.pl
実際はSampleというproductと仮定して今回の記事を書いているのですが、Sample-WorkerをAPPROOTにしてるのはworker用のコードとweb application用のコードを別の場所に配置したかったからです。
このあとに書くcapistranoとも関係してきます。
run logスクリプト
log用のスクリプトも必要です。今回はちゃんとlog用スクリプト書いてないんですが、ちゃんと書いたほうがいいですね。
worker.log.run.sh
#!/bin/sh exec 2>&1 exec setuidgid app /usr/bin/multilog t ./main
runスクリプトの配置
実際は配置に関して言えばcapistranoの仕事になるのですが、わかりやすさのためここにも書いておきます。
server側でdaemontoolsの配置通りにapplicationからのsymlinkを貼るだけです。capistranoの関係でディレクトリはこんな感じになりました。
sudo ln -s /home/app/www/Sample-Worker/current/bin/worker.run.sh /etc/service/worker/run sudo ln -s /home/app/www/Sample-Worker/current/bin/worker.log.run.sh /etc/service/worker/log/run
daemontoolsによる起動
あとはworkerを起動するだけです。以下のコマンドでstartとかstopとかrestartとか出来ます。
Qudo::Parallel::WorkerはSIGTERMを受けたら終了するようになってるので、下のようにしました。
# start sudo svc -u /etc/service/worker # stop sudo svc -d /etc/service/worker # restart sudo svc -t /etc/service/worker sudo svc -u /etc/service/worker
capistranoでのdeploy
さてdaemontoolsを使った時点で、実際にworker処理はうまく実行できるんですが、毎回server入ったり、deployしたりするのめんどいということで、capistranoとも連携させて見ました。
capistranoとは
このあたり読むといいと思います。
deployの前提
今回は以下のようなことを前提としてdeployします。
- Sampleというproductをdeployする
- 配置は/home/app/www/以下にする
- applicationとworkerは配置するdirectoryを別にする
- application用のdaemontools runスクリプトは別にあるとする
config/deploy.rb
config/deploy.rbは下のような感じになりました。
set :user, 'app' set :application, 'Sample' set :host, 'hoge.host.h' # 環境にあわせて set :deploy_to, "/home/app/www/#{application}" set :worker_deploy_to, "/home/app/www/#{application}-Worker" set :daemontools_dir, "/etc/service/#{application}" set :worker_daemontools_dir, "/etc/service/worker" set :scm, 'git' set :deploy_via, :copy set :repository, "git@hoge.host.h:/var/git/project/#{application}" # 環境に合わせて set :branch, 'master' default_run_options[:pty] = true set :ssh_options, { :forward_agent => true } role :app, host namespace :sample do task :setup do sudo "mkdir -p #{deploy_to}" sudo "mkdir -p #{daemontools_dir}/log/main" deploy.setup sudo "chown -R #{user}:#{user} #{deploy_to}" sudo "chown -R #{user}:#{user} /etc/service/#{application}" deploy.update sudo "ln -s #{deploy_to}/current/bin/run.sh #{daemontools_dir}/run" sudo "ln -s #{deploy_to}/current/bin/log.run.sh #{daemontools_dir}/log/run" end task :update do deploy.update end task :start, :roles => [:app] do sudo "svc -u #{daemontools_dir}" end task :stop, :roles => [:app] do sudo "svc -d #{daemontools_dir}" end task :restart, :roles => [:app] do sudo "svc -t #{daemontools_dir}" end task :reload, :roles => [:app] do sudo "svc -h #{daemontools_dir}" end end namespace :worker do task :setup do set :deploy_to, worker_deploy_to sudo "mkdir -p #{worker_deploy_to}" sudo "mkdir -p #{worker_daemontools_dir}/log/main" deploy.setup sudo "chown -R #{user}:#{user} #{worker_deploy_to}" sudo "chown -R #{user}:#{user} #{worker_daemontools_dir}" worker.update run "qudo --db=qudo --user=root --rdbms=mysql --use_innodb" sudo "ln -s #{worker_deploy_to}/current/bin/worker.run.sh #{worker_daemontools_dir}/run" sudo "ln -s #{worker_deploy_to}/current/bin/worker.log.run.sh #{worker_daemontools_dir}/log/run" end task :update do set :deploy_to, worker_deploy_to deploy.update end task :start, :roles => [:app] do sudo "svc -u #{worker_daemontools_dir}" end task :stop, :roles => [:app] do sudo "svc -d #{worker_daemontools_dir}" end task :restart, :roles => [:app] do sudo "svc -t #{worker_daemontools_dir}" sudo "svc -u #{worker_daemontools_dir}" end end
capitranoで配置する
さて配置ですが、初回のみsetupを実行します。
cap sample:setup cap worker:setup
あとは更新したいときにdeployを実行します。
cap sample:deploy cap worker:deploy
それぞれのstartやstopも出来ます。
cap sample:start cap sample:stop cap sample:restart cap worker:start cap worker:stop cap worker:restart
便利ですね。
まとめ
今回はQudo, daemontools, capistranoを使ってWorker処理の実装、起動、配置までをまとめてみました。
Qudoを使ったら、何個かはまりどころはあったにしろ、Worker処理の実装部分はすんなりいけたので超便利でした! id:nekokak++, id:masartz++
あとわからない所とかできたらいいなってことが何個かあって
- Qudo::Parallel::Managerでmin_spare_workersとかmax_spare_workersを指定するとどういう動きになるのか
- Qudoでjobをenqueueするときfuncに毎回入れようとしてduplicate起こってるけど、仕様?
- Qudo::Workerもいい感じにdeserializeしたい
- 教えてもらったので記事を修正しておきました(11/14)
ってことでした。分かる人教えてください!!
それではWorkerとか作りたくなったら参考にしてみてください。終わり。