AWS IoTで受信したメッセージをKinesis Firehoseで扱う

前回の記事でRaspberry Piで作成したIoTデバイスからAWS IoTにセンサーデータを送信する仕組みを作りました。

今回は、そのセンサーデータをKinesis Firehoseを使ってS3に格納してみましょう。AWS IoTからS3へのメッセージの格納は別にKinesis Firehoseを使う必要はないのですが、敢えてと言うことで・・・。

AWS IoTでルールの作成

まず、AWS IoTのACTからルールを作成します。

ルール作成

ルールの名前は、Status_Kinesis_Firehoseにしました。

AWS IoTに送信されたメッセージから、どのメッセージをルールの対象にするかは、SQL風の定義で指定できます。

ここでは、属性(項目)はすべてを示す*、トピックフィルターには+/statusとします。

アクションの追加をクリックします。

アクションの指定

Kinesis Firehoseのアクションを指定します。

まだFirehoseのDelivery Streamがないので、作成を行います。

Firehose Delivery Streamの作成

Firehoseの画面が表示されるので、Create Delivery Streamボタンをクリックします。

Delivery Streamの作成はStep1〜5まで続くので、下記のとおりに進めます。

Step 1

Delivery Streamの名前を付けます。

Direct PUT or other sourcesを選択します。これを指定すると、AWS IoT以外にFirehose APIやKinesis Agentなどからも同じDelivery Streamを使用してデータを配信できます。

Step 2

Step 2はデフォルトのままで構いません。

Step 3

Step 3では、配信先を指定します。ここではS3を指定しますが、他にもRedshiftやElasticsearch、Splunkが指定できます。

S3の配信先として、バケットとPrefixを指定します。

Step 4

Step 4では、FirehoseからS3にアクセスするためのIAMロールを指定します。ここでは、新規のロールを作成しました。

Step 5

Step 5はレビュー画面です。設定内容を確認して、Create Delivery Streamボタンをクリックします。

アクションの設定

Firehose Delivery Streamの作成が終わったので、AWS IoTのルール作成画面に戻ってストリーム名を指定します。もちろん、いま作成したDelivery Streamの名前です。

Separatorは\n(改行)を指定します。複数のメッセージをキャッシュしてS3に保存を行うので、メッセージ間のセパレータが必要です。改行を指定すると、1メッセージ1行のファイルになります。

AWS IoTからFirehoseにアクセスするためのIAMロールを指定します。新規ロールを作成して、それを指定しました。

アクションの指定が終わったら、最初のルール作成画面に戻りますので、ルールの作成ボタンをクリックします。

S3の確認

それでは、少し時間が経ってからS3のバケットを確認してみましょう。

指定したPrefix(status)の下に、年月日時のフォルダができて、ファイルが保存されています。

ファイルの中身を見てみると、このようにメッセージが保存されています。

これでセンサーデータのS3への保存は完了です。次回はAthenaを使ってアドホックな分析をしてみようと思います。

この記事を書いた人

井上 研一

株式会社ビビンコ代表取締役、ITエンジニア/経済産業省推進資格ITコーディネータ。AI・IoTに強いITコーディネータとして活動。画像認識モデルを活用したアプリや、生成AIを業務に組み込むためのサービス「Gen2Go」の開発などを行っている。近著に「使ってわかった AWSのAI」、「ワトソンで体感する人工知能」。日本全国でセミナー・研修講師としての登壇も多数。