2013年1月15日火曜日

FluentdからCloudWatchのカスタムメトリクスにデータを登録してみた(UPDATE!)

スズキです。
こちらが最新です!(本記事は少し古い...)
FluentdからCloudWatchのカスタムメトリクスにデータを登録してみた(UPDATE2!)
以前、下記のような仕組みと、それを実現するFluentdのプラグイン
(out_cloudwatch.rb)を紹介しました。
FluentdからCloudWatchのカスタムメトリクスにデータを登録してみた
今回は、それを更に次のようなことができるようにアップデートしてみました。


※"history"は"bash"のHistoryログが対象で"audit"は"auditd"のAuditログが対象です。

上記の紹介記事からの改良点
  • 複数のDataCounterプラグインで集計をした結果を、一つのCloudWatchプラグインで
    データ登録できるように。
  • 一つのCloudWatchプラグインで、複数のカスタムメトリクスを対象にできるように。
  • Dimensionにタグ(Tag)とホスト名(Hostname)を登録。
  • 同じデータをDimensionがタグ(Tag)のみのものと、タグ(Tag)とホスト名(HostName)のもので登録。
    • AWSコンソールでタグ(Tag)で各ホストを集約したグラフが確認できるように。
  • CloudWatchプラグインの親クラスをTimeSlicedOutputにして、任意のインターバルで
    データ登録できるように。
  • "IAM Role"の有効期限が切れても対応できるように。

CloudWatchの表示


結果からいきます。まずは、ホスト名とタグの両方をDimensionに指定したものです。

次に、タグで複数のホストを集約したものです。

(MetricNameはBashCountじゃなくSyslogCountの方がよかったなー...)

設定ファイル(/etc/td-agent/td-agent.conf)


CloudWatchプラグインのポイントは、こんな感じでしょうか?
  • "flush_interval"で任意のインターバルでCloudWatchにデータ登録。
  • "metric"で"tag"ごとにカスタムメトリクスが指定可能。
<source>
    type     tail
    format   syslog
    path     /opt/suz-lab/var/log/syslog/all.log
    pos_file /opt/suz-lab/var/lib/td-agent/pos/tail.syslog.pos
    tag      tail.syslog
</source>
<match tail.syslog>
    type copy
    <store>
        type file
        path /tmp/tail.syslog
    </store>
    <store>
        type      datacounter
        unit      minute
        aggregate all
        count_key ident
        pattern1  history ^-bash$
        tag       history.datacounter.syslog
    </store>
    <store>
        type      datacounter
        unit      minute
        aggregate all
        count_key ident
        pattern1  audit ^audispd$
        tag       audit.datacounter.syslog
    </store>
</match>
<match *.datacounter.syslog>
    type copy
    <store>
        type file
        path /tmp/datacounter.syslog
    </store>
    <store>
        type                 cloudwatch
        buffer_type          file
        buffer_path          /opt/suz-lab/var/lib/td-agent/buf/cloudwatch.datacounter.syslog
        flush_interval       5m
        cloud_watch_endpoint monitoring.ap-northeast-1.amazonaws.com
        namespace            SUZ-LAB/TEST
        <metric>
            tag  history.datacounter.syslog
            name BashCount
            key  history_count
            unit Count
        </metric>
        <metric>
            tag  audit.datacounter.syslog
            name BashCount
            key  audit_count
            unit Count
        </metric>
    </store>
</match>

CloudWatchプラグイン(/etc/td-agent/plugin/out_cloudwatch.rb)


下記のコードで上記の仕様を実装(しているつもり...)。
module Fluent
    require 'aws-sdk'
    class CloudWatchOutput < TimeSlicedOutput

        MAX_METRIC_DATA_SIZE = 20 
        Fluent::Plugin.register_output('cloudwatch', self)
        config_param :aws_key_id,           :string, :default => nil
        config_param :aws_sec_key,          :string, :default => nil
        config_param :cloud_watch_endpoint, :string, :default => 'monitoring.ap-northeast-1.amazonaws.com'
        config_param :namespace,            :string

        def initialize
            super
            @metrics = {}
        end

        def configure(conf)
            super
            conf.elements.select {|e|
                e.name == 'metric'
            }.each do |e|
                @metrics[e['tag']] = {
                    'name' => e['name'],
                    'key'  => e['key'],
                    'unit' => e['unit']
                }
            end
        end

        def start
            super
            AWS.config(
                :access_key_id        => @aws_key_id,
                :secret_access_key    => @aws_sec_key,
                :cloud_watch_endpoint => @cloud_watch_endpoint
            )
        end

        def shutdown
            super
        end

        def format(tag, time, record)
            record["tag"]       =  tag
            record["timestamp"] =  Time.at(time).iso8601
            record.to_msgpack
        end
        
        def write(chunk)
            metric_data = []
            chunk.msgpack_each do |record|
                $log.debug(record.inspect)
                tmp_data_by_tag = {
                    :metric_name => @metrics[record['tag']]['name'],
                    :timestamp   => record['timestamp'],
                    :value       => record[@metrics[record['tag']]['key']],
                    :unit        => @metrics[record['tag']]['unit'],
                    :dimensions  => [ { :name  => 'Tag', :value => record['tag'] } ]
                }
                metric_data << tmp_data_by_tag
                tmp_data_by_tag_and_hostname = Marshal.load(Marshal.dump(tmp_data_by_tag))
                tmp_data_by_tag_and_hostname[:dimensions] << {
                    :name  => 'HostName', :value => Socket.gethostname
                }
                metric_data << tmp_data_by_tag_and_hostname
            end
            $log.debug(metric_data.inspect)
            until metric_data.length <= 0 do
                tmp_data = metric_data.slice!(0, MAX_METRIC_DATA_SIZE)
                $log.debug(tmp_data.inspect)
                AWS::CloudWatch.new.put_metric_data(
                    :namespace   => @namespace,
                    :metric_data => tmp_data
                )
            end
        end

    end
end

そして「Monitoring_Integrationパターン」の再考へ...
--------
http://www.suz-lab.com

2 コメント:

Kazuki Ohta さんのコメント...

ぜひ、gemとしてpublishしてください :)

鈴木宏康 さんのコメント...

ぜひ!
(まずはgemとしてpublishする方法の勉強からですが...)