C#でFluentdにデータを送信する

ログ収集ツールとして知られるFluentd
Fluentd Documentation
ドキュメントがしっかりしているので、導入から設定まで簡単にできます。 また、PHPやPythonなどの一部の言語にはライブラリも公開されており、プログラムからログデータを流し込むのも手軽にできます。 たとえば、PHPアプリケーションであれば、fluent-logger-phpというライブラリを使って APIのログを収集することもできますね。

自分もちょうど.NET Core(C#)でサーバーアプリケーションを作っていて、APIログを集めることになったので Fluentdを使おうと思いましたが、残念なことにC#にはライブラリが用意されていません。
用意されていない以上、Fluentdに流し込むためには自分で作るしかない。

というわけで、自分で実装した時のメモを書いておきますよ、と

・・・間違ったことは書いてないはず

実装環境

・nginx
・.NET Core 2.2(3系でも動作可能な想定)

流し込みはunix domain socket

fluentdのinput pluginはいろいろあるけど、今回はunix domain socketを採用。
.NETプロセスとfluentdプロセスは同一サーバーで動くので、unix domain socketを使った プロセス間通信が可能です。td-agent.confでunix domain socketを使うための設定は 公式ドキュメントの通りであるため、割愛します。

C#実装例

var now = DateTime.Now;
var data = new SampleData()
{
    svc_id = 1234,
    svc_subkey = "subkey",
    svc_uid = "abcdefg",
    sample_id = 5678,
    created_at = now.ToString("yyyy-MM-dd HH:mm:ss")
};
string json = JsonUtility.Serialize(data);

var dto = new DateTimeOffset(now, TimeZoneInfo.Local.GetUtcOffset(now));
json = string.Format("[{0},{1},{2}]", "\"practiceserver\"", dto.ToUnixTimeSeconds().ToString(), json);

// socket書き込み
NetworkStream n_stream = null;
Socket socket = null;
try
{
    string socketPath = "/var/run/td-agent/td-agent.sock";

    var unixDomainSocketEp = new UnixDomainSocketEndPoint(socketPath);
    socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.IP);
    socket.Connect(unixDomainSocketEp);

    n_stream = new NetworkStream(socket);
    using (var writer = new StreamWriter(n_stream))
    {
        writer.WriteLineAsync(json);
    }
}
catch(Exception e)
{
    _logger.LogError("[API]ERROR Occurred!! message: " + e.Message);
}
finally
{
    if (n_stream != null)
    {
        n_stream.Dispose();
    }
    if (socket != null)
    {
        socket.Shutdown(SocketShutdown.Both);
        socket.Close();
    }
}

ポイントはjson文字列

SampleDataクラスは、fluentdに流し込むサンプリングデータ本体です。
しれっと出てきたJsonUtilityクラスは、 クラスをjson文字列に変換してくれるユーティリティクラスで、これはこれで別途実装が必要なやつになります(内容は割愛)。
さて、サンプリングデータをfluentdに流し込んでデータ収集したいわけですが、クラスからjson化した文字列をそのままソケットに流し込めばよいわけではありません。 ある決まった書式に則って文字列を整形し、整形文字列の方をソケットに送信するという手順が必要になります。
その書式というのが、下記のような書式になります。

["tag名",UTC経過時間(int数値),サンプリングデータjson]

この書式自体もjson形式です。[]で全体を囲んでいます(配列を表すjson形式みたいな)。[]で囲んだ中に、左から順に
・tag名
・UTC経過時間(int数値)
・サンプリングデータjson
を記述していきます。tag名はfluentdに送信するときに設定する文字列のことですが、詳しい説明は公式ドキュメントを読んでください。 UTC経過時間は概ね上記実装例のようにやればOKです。サンプリングデータjsonはクラスからjson化した文字列をそのまま突っ込めばOKです。

なんでこんな書式なのかの説明は一旦省きまして、この書式ができればあとはそこまで難しくないです。
UnixDomainSocketEndPointクラスなるものがあって、こいつを利用してSocketクラスのConnectionを確立します(C#からソケットへの接続)。 確立できたらNetworkStreamクラスを利用して整形したjsonを流し込めば、C#からソケットに向けてデータが送信されるという流れです。 NetworkStream(Streamクラス)からのStreamWriterは、いわゆるファイル書き込みの時にも見かけるお約束な構図ですね。
なお、SocketクラスやNetworkStreamクラスは送信完了後にCloseおよびDisposeしましょう

td-agentログで確認しよう

ただしくfluentdにデータが送信できれば、td-agentのログに送信ログが書き出されるはずなので、チェックしてみるのもよいでしょう。
/var/log/td-agent/td-agent.log がそれです。
正しく送信できていない場合は、td-agentログに何も出力されなかったり、エラーログを吐いたりします。ちなみにエラーログが出るとわかりますが、 どうやらfluentdはrubyスクリプトで動いているようです。

なぜjsonがあんな書式なのか

さて、fluentdに送信するjsonがなぜあのような書式なのか。
一応理由があって、「fluent-logger-phpのソースコードを解析した結果、phpがああいう書式で送信を行なっていたことがわかったから」という理由になります。
Github: fluent-logger-php

ライブラリとはいえ、どこかに送信する処理が必ずある(じゃなきゃ送信なんてできないし)
そう思ってGithubで公開されているソースコードを解析しました。そこまで物量があるわけでもなかったのであまり手間もかからず、一安心しました。
PHPでもファイル書き込み関数を使ってjsonを書き込んでいましたが、jsonを整形している処理を追っていったら、上記のような書式であることが分かった、 という次第であります。

ちなみに上記書式を守らず送信すると、td-agentログにエラーが吐き出されます。rubyがjsonを受け取ろうとしても受け取れないよーって怒ったりします。気をつけましょう