$shibayu36->blog;

クラスター株式会社のソフトウェアエンジニアです。エンジニアリングや読書などについて書いています。

Qudo, daemontools, capistranoを使ってWorker処理の仕組みを作る

 最近PrePANの開発を手伝っていて、Workerの仕組みをQudoで作りました。初めてWorkerの仕組みを一から作ったのでメモしておきます。
 Worker処理に必要な部品、それぞれのQudoでの実装、Workerプロセスを管理するためのdaemontoolscapistranoでのdeployという順番で書いていきます。

Workerとは

 ざっくり言うと非同期に色々実行するための仕組みです。perlだとTheSchwartz、Qudo、Jonkとかがあります。

Worker処理に必要な部品

 今回作ってみて、Worker処理は大きく分けて次の三つくらいのものが必要だと分かりました。

  • ApplicationからJobをinsertする部分(Qudo)
  • 実際のJobの処理(Qudo::Worker)
  • Jobの実行を管理して、Jobに処理を委譲する部分(Qudo, Qudo::Parallel::Manager)

それぞれのQudoでの実装

 Qudoはid:nekokakさんやid:masartzさんが作ったWorker処理を実装するためのモジュールです。TheSchwartzとかと比べるとかなり薄く、実装も簡単に出来ました。

Qudo用のDBの作成

 Qudoを使うためにはdatabaseが必要なので作っておきます。mysqlで作るのなら以下のコマンドで作ります。

qudo --db=qudo --user=root --rdbms=mysql --use_innodb
ApplicationからJobをinsertする部分

 Qudoの場合、insert部分はQudoが担当しています。僕は下のような感じで継承して使いました。
 ちなみにour $instanceにしてるのはsingletonにしたかったからです。あとdefault_hooksとかでargをJSONシリアライズするようにはしています。

package Sample::Qudo::Client;
use strict;
use warnings;

use base qw(Qudo);

sub new {
    our $instance ||= do {
        my ($class, $args) = @_;

        $class->SUPER::new(
            databases     => [+{
                dsn      => 'dbi:mysql:qudo',
                username => 'root',
                password => '',
            }],
            default_hooks => ['Qudo::Hook::Serialize::JSON'],
            %{$args || {}},
        );
    };
}

1;

あとは処理したいjobのクラスとか、parameterを指定してenqueueするだけです。

my $client = Sample::Qudo::Client->new;
$client->enqueue(
    'Sample::Worker::Echo',
    { arg => {message => 'echo message'} },
);

簡単ですね

実際のJobの処理

 QudoだとQudo::Workerが担当しています。job queueに入ってるmessageをprintするだけのWorkerだと下のように実装します。

package Sample::Worker::Echo;
use strict;
use warnings;

use parent qw(Qudo::Worker);

sub work {
    my ($self , $job ) = @_;

    my $arg = $job->arg;
    my $message = $arg->{message};

    if (defined $message) {
        print $message;
    }

    $job->completed(); # or $job->abort
}

sub max_retries { 2 }

sub retry_delay { 10 }

1;

 処理はworkメソッドに書いていきます。うまくいったらcompleted()を、失敗したらabort()を呼ぶのがポイントですね。
 あとmax_retriesとかretry_delayとかメソッドを作っておくと、エラーが起こったときに指定した分だけretryしてくれます。

簡単ですね!

Jobの実行を管理して、Jobに処理を委譲する部分

 Qudoでは、こちらもQudoが担当しています。ただし、今回はpreforkモデルで実行したかったのでQudo::Parallel::Managerを使いました。

 script/worker.plというスクリプトを作って、下のように実装しました。

#! /usr/bin/env perl
use strict;
use warnings;

use FindBin;
use lib "$FindBin::Bin/../lib";

use Qudo::Parallel::Manager;

my $manager = Qudo::Parallel::Manager->new(
    databases         => [+{
        dsn      => 'dbi:mysql:qudo',
        username => 'root',
        password => '',
    }],
    default_hooks => ['Qudo::Hook::Serialize::JSON'],
    manager_abilities => [
        'Sample::Worker::Echo',
    ],
    work_delay        => 5,
    max_workers       => 3,
    min_spare_workers => 1,
    auto_load_worker  => 1,
    debug             => 1,
);
$manager->run; # start fork and work.

 ポイントはmanager_abilitiesに実行したいworkerクラスを入れておくことくらいですかね。

 簡単ですね!!!

実際に動かしてみる

 ここまで来たらあとは実際にworkerを動かすだけです。

perl script/worker.pl

と実行して、ジョブキューにenqueueしてあげると、処理されてる様子がわかります。

 これでWorkerのApplication側の実装は終わりました。
 Qudo便利、超簡単ですね!!!!!

Workerプロセスを管理するためのdaemontools

 さてここまででApplication側の実装が終わったわけですが、実際に本番環境でworkerプロセスを動かさなければいけません。
 上に書いてあるみたいにサーバでperl script/worker.plしてもいいんですが、logが取りづらいし、死活管理もできないので、おすすめしません。今回はdaemontoolsを利用してworkerプロセスの管理を行いました。

daemontoolsとは

 http://www.emaillab.org/djb/daemontools/daemontools-howto.htmlを読むとわかりやすいです。

runスクリプト

 daemontools用にまずrunスクリプトを書きます。下のようにしてみました。

worker.run.sh

#!/bin/sh
exec 2>&1
export PLACK_ENV=production
export APPROOT=/home/app/www/Sample-Worker/current
cd $APPROOT || exit 1

exec setuidgid app /usr/bin/perl script/worker.pl

 実際はSampleというproductと仮定して今回の記事を書いているのですが、Sample-WorkerをAPPROOTにしてるのはworker用のコードとweb application用のコードを別の場所に配置したかったからです。
 このあとに書くcapistranoとも関係してきます。

run logスクリプト

 log用のスクリプトも必要です。今回はちゃんとlog用スクリプト書いてないんですが、ちゃんと書いたほうがいいですね。

worker.log.run.sh

#!/bin/sh
exec 2>&1
exec setuidgid app /usr/bin/multilog t ./main
runスクリプトの配置

 実際は配置に関して言えばcapistranoの仕事になるのですが、わかりやすさのためここにも書いておきます。
 server側でdaemontoolsの配置通りにapplicationからのsymlinkを貼るだけです。capistranoの関係でディレクトリはこんな感じになりました。

sudo ln -s /home/app/www/Sample-Worker/current/bin/worker.run.sh /etc/service/worker/run
sudo ln -s /home/app/www/Sample-Worker/current/bin/worker.log.run.sh /etc/service/worker/log/run
daemontoolsによる起動

 あとはworkerを起動するだけです。以下のコマンドでstartとかstopとかrestartとか出来ます。
 Qudo::Parallel::WorkerはSIGTERMを受けたら終了するようになってるので、下のようにしました。

# start
sudo svc -u /etc/service/worker

# stop
sudo svc -d /etc/service/worker

# restart
sudo svc -t /etc/service/worker
sudo svc -u /etc/service/worker

capistranoでのdeploy

 さてdaemontoolsを使った時点で、実際にworker処理はうまく実行できるんですが、毎回server入ったり、deployしたりするのめんどいということで、capistranoとも連携させて見ました。

deployの前提

 今回は以下のようなことを前提としてdeployします。

  • Sampleというproductをdeployする
  • 配置は/home/app/www/以下にする
  • applicationとworkerは配置するdirectoryを別にする
  • application用のdaemontools runスクリプトは別にあるとする
config/deploy.rb

 config/deploy.rbは下のような感じになりました。

set :user,             'app'
set :application,      'Sample'
set :host,             'hoge.host.h' # 環境にあわせて
set :deploy_to,        "/home/app/www/#{application}"
set :worker_deploy_to, "/home/app/www/#{application}-Worker"
set :daemontools_dir,  "/etc/service/#{application}"
set :worker_daemontools_dir, "/etc/service/worker"

set :scm,         'git'
set :deploy_via,  :copy
set :repository,  "git@hoge.host.h:/var/git/project/#{application}" # 環境に合わせて
set :branch,      'master'

default_run_options[:pty] = true
set :ssh_options, { :forward_agent => true }

role :app, host

namespace :sample do
  task :setup do
    sudo "mkdir -p #{deploy_to}"
    sudo "mkdir -p #{daemontools_dir}/log/main"

    deploy.setup

    sudo "chown -R #{user}:#{user} #{deploy_to}"
    sudo "chown -R #{user}:#{user} /etc/service/#{application}"

    deploy.update

    sudo "ln -s #{deploy_to}/current/bin/run.sh #{daemontools_dir}/run"
    sudo "ln -s #{deploy_to}/current/bin/log.run.sh #{daemontools_dir}/log/run"
  end

  task :update do
    deploy.update
  end

  task :start, :roles => [:app] do
    sudo "svc -u #{daemontools_dir}"
  end

  task :stop, :roles => [:app] do
    sudo "svc -d #{daemontools_dir}"
  end

  task :restart, :roles => [:app] do
    sudo "svc -t #{daemontools_dir}"
  end

  task :reload, :roles => [:app] do
    sudo "svc -h #{daemontools_dir}"
  end
end

namespace :worker do
  task :setup do
    set :deploy_to, worker_deploy_to

    sudo "mkdir -p #{worker_deploy_to}"
    sudo "mkdir -p #{worker_daemontools_dir}/log/main"

    deploy.setup

    sudo "chown -R #{user}:#{user} #{worker_deploy_to}"
    sudo "chown -R #{user}:#{user} #{worker_daemontools_dir}"

    worker.update
    run "qudo --db=qudo --user=root --rdbms=mysql --use_innodb"

    sudo "ln -s #{worker_deploy_to}/current/bin/worker.run.sh #{worker_daemontools_dir}/run"
    sudo "ln -s #{worker_deploy_to}/current/bin/worker.log.run.sh #{worker_daemontools_dir}/log/run"
  end

  task :update do
    set :deploy_to, worker_deploy_to
    deploy.update
  end

  task :start, :roles => [:app] do
    sudo "svc -u #{worker_daemontools_dir}"
  end

  task :stop, :roles => [:app] do
    sudo "svc -d #{worker_daemontools_dir}"
  end

  task :restart, :roles => [:app] do
    sudo "svc -t #{worker_daemontools_dir}"
    sudo "svc -u #{worker_daemontools_dir}"
  end

end
capitranoで配置する

 さて配置ですが、初回のみsetupを実行します。

cap sample:setup
cap worker:setup

 あとは更新したいときにdeployを実行します。

cap sample:deploy
cap worker:deploy

 それぞれのstartやstopも出来ます。

cap sample:start
cap sample:stop
cap sample:restart

cap worker:start
cap worker:stop
cap worker:restart

 便利ですね。

まとめ

 今回はQudo, daemontools, capistranoを使ってWorker処理の実装、起動、配置までをまとめてみました。
 Qudoを使ったら、何個かはまりどころはあったにしろ、Worker処理の実装部分はすんなりいけたので超便利でした! id:nekokak++, id:masartz++

 あとわからない所とかできたらいいなってことが何個かあって

  • Qudo::Parallel::Managerでmin_spare_workersとかmax_spare_workersを指定するとどういう動きになるのか
  • Qudoでjobをenqueueするときfuncに毎回入れようとしてduplicate起こってるけど、仕様?
  • Qudo::Workerもいい感じにdeserializeしたい
    • 教えてもらったので記事を修正しておきました(11/14)

ってことでした。分かる人教えてください!!

 それではWorkerとか作りたくなったら参考にしてみてください。終わり。