2013年1月18日金曜日

"dstat(fluent-plugin-dstat)"の情報をCloudWatchに登録してみた

スズキです。
本記事に出てくるプラグインはRubyGems.orgに公開しています。(多少の変更あり)
FluentdのCloudWatchプラグイン(fluent-plugin-cloudwatch_ya)を"RubyGems.org"に公開してみた
近頃、下記のようにFluentdを使ってCloudWatchのカスタムメトリクスにデータを
登録することをやっていたのですが、
今回は下記のプラグインで取得したデータを、CloudWatchに登録してみました。
(CloudWatchプラグインを汎用化するための目的もあります)
fluent-plugin-dstatの紹介
最終的にはCloudWatchは下記のような状態になります。
("Load Avarage"の1分平均をCloudWatchに登録 )


"dstat"と"fluent-plugin-dstat"のインストール

# yum -y install dstat
...
# /usr/lib64/fluent/ruby/bin/gem install fluent-plugin-dstat
...

"fluent-plugin-dstat"を利用した設定ファイル

# cat /etc/td-agent/td-agent.conf
<source>
    type   dstat
    tag    dstat
    option -l
    delay  60 
</source>
<match dstat>
    type copy
    <store>
        type file
        path /tmp/dstat
    </store>
</match>
※LoadAvarage(-l)を60秒毎に取得し、"/tmp/dstat"にログ出力しています。

"fluent-plugin-dstat"の出力ログ

# tail /tmp/dstat.20130117.b4d37926e2a840ea3
...
2013-01-17T23:12:37+09:00       dstat   {"hostname":"ip-10-200-32-123","dstat":{"load avg":{"1m":"0.220","5m":"1.030","15m":"1.930"}}}
2013-01-17T23:13:37+09:00       dstat   {"hostname":"ip-10-200-32-123","dstat":{"load avg":{"1m":"0.080","5m":"0.840","15m":"1.800"}}}
2013-01-17T23:14:37+09:00       dstat   {"hostname":"ip-10-200-32-123","dstat":{"load avg":{"1m":"0.030","5m":"0.680","15m":"1.690"}}}

CloudWatchプラグインの準備("jsonpath"のインストール)

# /usr/lib64/fluent/ruby/bin/gem install jsonpath
...
※"fluent-plugin-dstat"のログ(JSON)がネストしてるので"jsonpath"で指定できるように

CloudWatchプラグインの設定

# cat /etc/td-agent/td-agent.conf
<source>
    type   dstat
    tag    dstat
    option -l
    delay  60 
</source>
<match dstat>
    type copy
    <store>
        type file
        path /tmp/dstat
    </store>
    <store>
        type                 cloudwatch
        buffer_type          file
        buffer_path          /opt/suz-lab/var/lib/td-agent/buf/cloudwatch.dstat
        flush_interval       1m
        cloud_watch_endpoint monitoring.ap-northeast-1.amazonaws.com
        namespace            SUZ-LAB/TEST
        <metric>
            metric_name LoadAvg1m
            value_key                   $['dstat']['load avg']['1m']
            unit                        None
            outcast_no_dimension_metric yes
            <dimensions>
                instance_id yes
            </dimensions>
        </metric>
    </store>
</match>
※CloudWatchプラグインの設定に関しては別途まとめます...

CloudWatchプラグインのソース

# cat /etc/td-agent/plugin/out_cloudwatch.rb 
module Fluent
    require 'net/http'
    require 'jsonpath'
    require 'aws-sdk'
    class CloudWatchOutput < TimeSlicedOutput

        METRIC_DATA_MAX_NUM = 20 
        Fluent::Plugin.register_output('cloudwatch', self)
        config_param :aws_key_id,           :string, :default => nil
        config_param :aws_sec_key,          :string, :default => nil
        config_param :cloud_watch_endpoint, :string, :default => 'monitoring.ap-northeast-1.amazonaws.com'
        config_param :namespace,            :string

        def configure(conf)
            super
            instance_id = Net::HTTP.get(
             '169.254.169.254', '/1.0/meta-data/instance-id'
            )
            @metric_list = []
            conf.elements.select {|element|
                element.name == 'metric'
            }.each do |metric|
                dimensions_list = []
                if not metric['outcast_no_dimension_metric'] == 'yes' then
                    dimensions_list << []
                end
                metric.elements.select {|element|
                    element.name == 'dimensions'
                }.each do |dimensions|
                    dimension_list = []
                    dimensions.each do |dimension, value|
                        if    dimension.start_with?("dimension") then
                            name_and_value = value.split("=")
                            dimension_list << {
                                'name' => name_and_value[0],
                                'value' => name_and_value[1]
                            }
                        elsif dimension == 'instance_id' and value == 'yes' then
                            dimension_list << {
                                'name' => 'InstanceId',
                                'value' => instance_id
                            }
                        elsif dimension == 'fluent_tag' and value == 'yes' then
                            dimension_list << {
                                'name' => 'FluentTag',
                                'value' => nil
                            }
                        end
                    end
                    dimensions_list << dimension_list
                end
                @metric_list << {
                    'metric_name'     => metric['metric_name'],
                    'value_key'       => metric['value_key'],
                    'unit'            => metric['unit'],
                    'dimensions_list' => dimensions_list
                }
            end
            $log.warn(@metric_list.inspect)
        end

        def format(tag, time, record)
            record["tag"]       =  tag
            record["timestamp"] =  Time.at(time).iso8601
            record.to_msgpack
        end
        
        def write(chunk)
            metric_data = []
            chunk.msgpack_each do |record|
                @metric_list.each do |metric|
                  value = JsonPath.new(
                      metric['value_key']
                  ).first(record)
                    if not value.nil? then
                        metric['dimensions_list'].each do |dimensions|
                            dimensions.each do |dimension_list|
                                if dimension_list['name'] == 'FluentTag' then
                                    dimension_list['value'] = record['tag'];
                                end
                            end
                            metric_data << {
                                :metric_name => metric['metric_name'],
                                :timestamp   => record['timestamp'],
                                :value       => value,
                                :unit        => metric['unit'],
                                :dimensions  => dimensions
                            }
                        end
                    end
                end
            end
            AWS.config(
                :access_key_id        => @aws_key_id,
                :secret_access_key    => @aws_sec_key,
                :cloud_watch_endpoint => @cloud_watch_endpoint
            )
            cloud_watch = AWS::CloudWatch.new
            until metric_data.length <= 0 do
                $log.warn(metric_data.slice(
                    0, METRIC_DATA_MAX_NUM).inspect
                )
                cloud_watch.put_metric_data(
                    :namespace   => @namespace,
                    :metric_data => metric_data.slice!(
                        0, METRIC_DATA_MAX_NUM
                    )
                )
            end
        end

    end
end
※"statistic_values"以外は仕様Fixのつもり... なので、そろそろ"gem"に...

ということで次はgemにしてCloudWatchプラグインの仕様を紹介する予定...
--------
http://www.suz-lab.com

0 コメント: