ページの先頭です

ヘッダーの始まりです


本文の始まりです

エンジニアブログ

エンジニアブログ

2015.05.14

MAGELLANでMQTTを使ってデータをBigQueryに収集

【2016年10月27日追記】MAGELLAN BLOCKSとのサービス統合により、MQTTは使用できなくなりました

Groovenauts では MAGELLAN というIoT向けのサービスを提供しています。今回は MAGELLAN が持つ機能のひとつである MQTT?対応を活用してデータ収集を簡単に行うサンプルの紹介をします。

MQTTとは?

MQTT(MQ Telemetry Transport) とは「MQTT is a machine-to-machine (M2M)/”Internet of Things” connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport」(http://mqtt.org/より引用)と定義されているように IoT や M2M(Machine-to-Machine)向けの軽量プロトコルであり、デバイスからのデータ送受信やブロードキャストに向いています。最近では Facebook の Messenger に採用されるなど Web でもリアルタイム性を重要視するアプリやサービスでの事例もあり注目が集まってきています。

コネクション指向で Publish/Subscribe 型のプロトコルで、クライアントはブローカー(Broker)と呼ばれるサーバーに接続し、トピック(Topic)と呼ばれる”/”で区切られた文字列をキーとしてメッセージの送信(Publish)や、Topicへのメッセージの受信(Subscribe)を行うことができます。

MAGELLANとMQTT

MAGELLAN ではプロジェクトを作成し Worker をデプロイすると、自動的に MQTT のブローカーとしてのインタフェースが利用可能になります。これにより、MQTT を用いたクライアント間のメッセージのやりとりの他、Worker へのメッセージ送信を MQTT の Publish によって行うことが可能になります。

MQTT?の接続時に送信先の Worker によって割り当てられるトークンを用いた認証が必要になりますが、その実装はクライアントSDKによって簡易に行うことができます。

BigQuery

BigQuery とは Google Cloud Platform のサービスのひとつで、大規模なデータをクラウド上に収集し、SQLに似たクエリ言語でデータの検索/集計を非常に高速に行うことができるサービスです。データストレージとしても非常にコストパフォーマンスに優れているため、様々なデータの集積先として活用する事例が増えています。

今回の記事では Google Cloud Platform および BigQuery の利用を開始する方法は割愛します。MAGELLAN から BigQuery を利用するためには、サービスアカウントの登録と証明書ファイルの取得が必要になりますので、以下のサンプルを試してみる際には Google Developers Console の “APIと認証”>”認証情報”から、「新しいクライアント ID を作成」というボタンからサービスアカウントの追加を行って表示されるメールアドレスとダウンロードされる証明書(.p12ファイル)を控えておいてください。
クライアント ID の作成時は、アプリケーションの種類には”サービスアカウント”、キーのタイプには “P12 Key” を選択してください。

Create Google Cloud Platform Service Account

サンプル Worker: bigquery-worker

MAGELLAN の Worker としてデプロイ可能なサンプル Worker に magellan/bigquery-worker を追加しました。これは MQTT のメッセージを受け取りそのペイロードを JSON として解釈して BigQuery に投入するサンプル Worker です。

MAGELLAN は現在ベータテスト中ですが公式サイトからアカウントの申請をすることでどなたでもご利用できます。まだアカウントを作成していない方はここからメールアドレスを登録してアカウントを取得してください。

MAGELLAN の Worker のデプロイ方法は Getting Startedデプロイ のページを参照してください。Worker のイメージ名には “magellan/bigquery-worker:latest” を指定してください。またこのサンプル Worker は DB は利用しませんので CloudSQL?の準備は不要です。

投入先の BigQuery の情報を設定するために以下のような YAML ファイルを用意します。インデント(行の先頭の空白)も必要なのでコピーして貼り付ける際はご注意ください。

environment_vars_yaml: |
  ---
  GCP_PROJECT_ID: "your-project-id"
  GCP_SERVICE_ACCOUNT_EMAIL: "xxxxxx@developer.gserviceaccount.com"
  GCP_PRIVATE_KEY: "base64 encoded string of .p12 file content"
  BIGQUERY_DATASET: "sample_dataset"
  BIGQUERY_TABLE: "sample_table"

GCP_PROJECT_ID には Google Cloud Platform のプロジェクトID(プロジェクト名とは別のものですので注意してください)、GCP_SERVICE_ACCOUNT_EMAIL には事前に登録しておいた Google Cloud Platform のサービスアカウントのメールアドレスを設定してください。

GCP_PRIVATE_KEY にはサービスアカウント生成時にダウンロードされた .p12 ファイルの内容を Base64 エンコードしたものを渡します。この文字列を得るためには以下のようなコマンドを実行してください(base64 コマンドがインストールされているものとします)。

$ base64 -w 0 -i service-account.p12

BIGQUERY_DATASET と BIGQUERY_TABLE は BigQuery に投入する時のデータセット/テーブル名を指定します。データセットは事前に作成しておく必要があるので、コンソールから”ビッグデータ”>”BigQuery”を選択してデータセットを作成しておいてください。テーブルは自動的に作成されるので事前の作成は不要です。

Worker のデプロイはコマンドラインから以下のように実施することができます。管理コンソールからの Worker のデプロイは現在開発中で、近日リリース予定です。

# メールアドレスと管理コンソールから取得できる認証トークン、またはパスワードを入力してログインしてください
$ magellan-cli login
$ magellan-cli worker create BigQuerySample magellan/bigquery-worker:latest -A worker.yml
$ magellan-cli stage release_now

magellan-cli worker create のオプションの “BigQuerySample” は Worker に識別用の名前をつけるためのものなので別の名称に変更してかまいません。

MQTT メッセージ送信

次はサンプルWorkerに対して MQTT で JSON でメッセージを送信すれば BigQuery にデータを格納できます。今回は動作確認のために magellan-cli の機能を利用して MQTT メッセージを送信します。magellan-cli では stage を選択した状態でそのステージへの HTTPS および MQTT のリクエストを送信する動作確認用のサブコマンドが用意されているので、これを利用します。

送信するトピックは、現在のところ “worker/” ではじまるトピックは Worker へ渡されます。またサンプルWorkerは任意のトピックに Publish されたメッセージを全て同じBigQueryのテーブルに格納します(将来のバージョンアップで変更する可能性があります)ので、トピック名は “worker/” ではじまるものであればなんでもかまいません。

magellan-cli mqtt pub worker/sample '{"message":"Hello BigQuery Sample Worker!", "value": 72}'

“OK” と表示されれば送信は成功しています。MQTT はリプライがないため、Worker が正しく動作しているかどうかは BigQuery へのクエリで確認しましょう。

BigQuery で JSON のデータを検索

magellan/bigquery-worker は MQTT? のペイロードのJSONをそのまま文字列型のフィールドとして BigQuery のテーブルに格納します。また受信時刻を time というフィールドに格納します。格納先の BigQuery のテーブルのスキーマは以下のようになります。

フィールド名 説明
time INTEGER データ投入時刻(UNIX Time)
json STRING JSONエンコードされたデータ

BigQuery のクエリ言語には JSON 形式のフィールドから値を取り出すための関数が提供されているので、このようなスキーマで格納されていても必要なフィールドの値を取り出すことが可能です。以下のようなクエリを BigQuery のコンソールや bq コマンドで実行してみましょう。データセット名やテーブル名は実際に指定した名称にあわせて実行してください。

SELECT FORMAT_UTC_USEC((time+9*60*60)*1000000) as time,
       JSON_EXTRACT_SCALAR(json, "$.message") as message,
       JSON_EXTRACT_SCALAR(json, "$.value") as value
FROM [sample_dataset.sample_table]
ORDER BY time DESC LIMIT 1000

MQTT で投入したメッセージが BigQuery に格納されていることを確認できます。

次回は実際にデバイスから MQTT? のメッセージを MAGELLAN に送信する方法を紹介してみたいと思います。

トップへ戻る

本文の終わりです