Programming Room.
Google Cloud Tips.
GCS Tips.
「Cloud Pub/Sub Notifications for Cloud Storage」の使い方.2019/12/13
更新 2020/02/11
「Cloud Pub/Sub Notifications for Cloud Storage」の使い方の説明です。
おしながき
概要.
公式ドキュメントでは、「Cloud Pub/Sub Notifications for Cloud Storage」に概要が、実装方法は「オブジェクト変更の登録」に説明されています。
ユーザがオブジェクトに変更を加えたこと契機に任意の機能を実行できます。またオブジェクトのへの変更を通知として保持しておき、任意のタイミングでGAE/GCEからCloud
Pub/Subのキューに溜まった通知を読み出して、その内容に応じた処理を行うことも可能です。
このような機能を実現するために、GCSではオブジェクトの変更があった際に、Cloud Pub/Subに通知するように設定を行います。通知の配信やキューイングは、正常に配信できなかった場合の再配信も含めて、Cloud
Pub/Subの機能を使用します。Cloud Pub/SubからGAE/GCEに実装したエンドポイントのURIに通知をPushするように設定しておきます。あるいは任意のタイミングでキューから通知を読み出します。
通知の流れ.
Cloud Pub/Subの通知(メッセージ)の流れを図にしてみました。大体こんな感じだと思います。用途に応じて、Pull型(右図では右半分の上段)、Push型(同下段)のどちらかを選択します。両方同時に使用することも可能です。
- アプリケーションのユーザがオブジェクトに変更を加える。アップロード(生成)/削除/メタデータの変更など。
- GCSがバケットの設定に従って、Cloud Pub/Subのトピックにメッセージを配信する。
- トピックに関連付けられているサブスクリプションに、メッセージが配信される。この図では2つのサブスクリプションが関連付けられています。
- サブスクリプションがPull型の場合、GAE/GCEなどから任意のタイミングでメッセージを読み出す。キューになっているので、未処理のメッセージを取り出すことができます。
サブスクリプションがPush型の場合、設定されたエンドポイントにメッセージがPOSTされます。POSTに失敗した場合は、最大で7日間、リトライされます。
必要な実装.
GCSのバケット、Cloud Pub/Subの トピック/サブスクリプション、GAE/GCEのアプリケーションとも、同じプロジェクトに存在する簡単な構成なら、以上を実現するために必要な実装は、以下のとおりです。
- Cloud Pub/Subで、トピックの作成。
- Cloud Pub/Subで、サブスクリプションの作成。
- GAE/GCEなどで、Pull型のサブスクリプションの場合は、サブスクリプションからメッセージの取得と、その内容に応じた処理の実装。
- GAE/GCEなどで、Push型のサブスクリプションの場合は、メッセージの内容に応じた処理を行うPushエンドポイントの実装。
- GCSで、バケットにオブジェクト変更の登録。
トピックの作成.
最初に必要なのはトピックの作成です。サブスクリプションも、バケットへのオブジェクト変更の登録も、トピックに関連付けるので、トピックが最初に必要になります。
トピックの作成方法は、「トピックとサブスクリプションの管理」の「トピックの管理」で説明されています。公式ドキュメントの説明のとおりなのですが、目的ごとに1度作ればおしまいなので、GCPコンソールのメニュー「Pub/Sub」-「トピック」で遷移する、トピックの一覧から作るのが一番簡単です。トピックIDの入力だけで済みます。暗号鍵はデフォルトの「Googleが管理する鍵」で十分です。
アプリケーション実行中に作成したいなら、コードで作成することもできます。Javaの場合はクライアントライブラリの TopicAdminClient.createTopic()メソッドで作成できます。その例も「トピックの管理」に載っています。
サブスクリプションの作成.
GCPコンソールからの作成.
サブスクリプションもGCPコンソールから作るのが簡単です。下記の方法で、右のスクリーンショットのようなサブスクリプションの作成画面に遷移できます。このスクリーンショットは、「配信タイプ」に「push」を選択した場合です。
- トピックの一覧で、作成したトピックの一番右の3点リーダ「…」で表示されるポップアップメニューから「サブスクリプションを作成」を選択。
- トピックの詳細で、「サブスクリプションを作成」ボタンを押下し、「カスタムアプリ向け」を選択。
- GCPコンソールメニューの「Pub/Sub」-「サブスクリプション」で表示されるサブスクリプションの一覧で、「サブスクリプションを作成」ボタンを押下。
サブスクリプションID.
作成するサブスクリプションの識別子です。トピックとサブスクリプションは多対多の関連付けができるので、サブスクリプションを一意に識別するためのものです。
トピックの名前.
最後のサブスクリプション一覧から遷移した場合は、トピック名が決まっていませんので、自分で入力する必要があります。それ以外の方法では、作ろうとしているサブスクリプションに関連付けられるトピック名は、入力済みになっています。
配信タイプ.
PullまたはPushを選択します。
Pushを選択すると、通知を配信するエンドポイントURLの入力を求められます。「Pushエンドポイントの実装」で説明するエンドポイントのURLを入力します。ここにGAEアプリケーションのURIを設定しておけば、GAEアプリケーションを起動できます。
「認証を有効にする」は、残念ですが私も未確認です。公式ドキュメントで関連するのは恐らく「App Engine スタンダード環境と Cloud Functions の URL の認証」ではないかと予想しています。
サブスクリプションの有効期限.
右のスクリーンショットを見ればわかりますが、サブスクリプションには有効期限があります。GCPコンソールには説明がありませんが、公式ドキュメント「サブスクリプションの有効期限ポリシーを変更する」では以下の様に説明されています。
デフォルトでは、サブスクリプションは非アクティブ状態が 31 日間続くと期限切れになります。
Cloud Pub/Sub がサブスクライバーのアクティビティ(オープン接続、有効な pull、push の成功など)を検出すると、サブスクリプション削除クロックが再起動されます。
『非アクティブ状態』の明確な定義の説明がありません。『オープン接続』って何? ってなりますが、恐らく新たなメッセージの受信と思われます。
有効期限切れの後どうなるのかも説明がありません。実験してみたところ、メッセージの有無に関わらず、サブスクリプションが消滅しました。未受信のメッセージが残ったままでも、有効期限が切れると消滅します。有効/無効のフラグみたいなものがあるわけではないので、再有効化とかできません。有効期限が切れてしまったら、再作成する必要があります。
また「サブスクリプションの有効期限ポリシーを変更する」の注意書きにあるとおり、有効期限を変更する機能はβ版です。お試しで使うくらいならまだしも、実用にしたいアプリケーションではデフォルトのままのほうが安全と思います。
確認応答期限.
Cloud Pub/Subが配信失敗とみなしてリトライするまでの待ち時間です。メッセージが配信されたらACKを返さないと配信失敗とみなされ、メッセージは再送されます。メッセージの再送が増えるとパフォーマンスや課金への影響が懸念されますので、短すぎる設定は避けるべきと思います。
GAEでエンドポイントを実装しているなら、インスタンスの最大有効時間(デフォルトなら60秒)と同じでいいと思います。
コードによる作成.
サブスクリプションに有効期限があって消滅する可能性があるのなら、アプリケーションの動作中に必要に応じてサブスクリプションの有無を確認し、無ければ作成する実装が望ましいことになります。コードでサブスクリプションを作成する例は、「サブスクリプションの管理」に、Pull/Pushとも例が載っています。
Pull/Pushとも SubscriptionAdminClient.createSubscription()メソッドで生成します。PullとPushの作り分けは、第3引数で渡すPushConfig型の値で決まります。Pullならデフォルトを PushConfig.getDefaultInstance()で渡せばいいようです。残念ながらこのメソッドはAPIリファレンスに載ってはいるものの、詳細は何も説明がありません。Pushの場合は、PushConfig.Builder.setPushEndpoint()メソッドを使ってPushエンドポイントを指定します。
サブスクリプションの有無の確認.
サブスクリプション名がわかっているなら、その存在の確認には SubscriptionAdminClient.getSubscription()メソッドが使えそうです。
特定のトピックに関件づけられたサブスクリプションを取得する手段はなさそうです。SubscriptionAdminClient.listSubscriptions()メソッドでプロジェクトのサブスクリプションを全部取得して、その中から目的のトピックに関連付けられているものを抜き出すくらいでしょうか。以下のコードのような感じです。プロジェクトのサブスクリプション数が SubscriptionAdminClient.listSubscriptions()メソッドで1回で全て取得できる程度に少ない前提ですが。
import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.pubsub.v1.ListSubscriptionsRequest; import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.Subscription; class SampleSubscription { protected static SubscriptionAdminClient subscriptionAdminClient; public SampleSubscription() { subscriptionAdminClient = SubscriptionAdminClient.create(); } public List<Subscription> listSubscriptions(String project, String topic) { // プロジェクトのサブスクリプションを得る ListSubscriptionsRequest.Builder builder = ListSubscriptionsRequest.newBuilder().setProject("projects/" +project); // 1ページに収まらないケースは無視 SubscriptionAdminClient.ListSubscriptionsPagedResponse response = subscriptionAdminClient.listSubscriptions(builder.build()); // トピックに関するものだけに絞る List<Subscription> list = new ArrayList<>(); for (Subscription subscript: response.iterateAll()) { if (topic.contentEquals(subscript.getTopic())) { list.add(subscript); } } return list; } }
サブスクリプションからのメッセージの取得.
Pullの場合のメッセージの取得は、「pullを使用したメッセージの受信」に説明されています。サンプルのコードも掲載されています。私はPullのほうはまだ試したことがないので、ここでは紹介だけです。
常駐プロセスでメッセージが取得できるまで待つ実装なら、「非同期 pull」が参考になります。
GAEのcronとかで定期実行してポーリングするなら、「同期 pull」が参考になります。
Pushエンドポイントの実装.
メッセージの受信.
Pushの場合はメッセージを受け取るエンドポイントを自分で用意する必要があります。その実装に関しては、「pushサブスクリプションの使用」に説明されています。エンドポイントはGAEなどに実装することができます。メッセージはPOSTメソッドで送られるので、エンドポイントはPOSTに対応させる必要があります。
メッセージの解析.
メッセージの本体はHTTPリクエストのボディに、「push メッセージの受信」からの下記の引用のように、JSON形式で格納されています。特定のメッセージ限定のデータは、"attributes"や"data"プロパティに格納されます。
{ "message": { "attributes": { "key": "value" }, "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==", "messageId": "136969346945" }, "subscription": "projects/myproject/subscriptions/mysubscription" }
Cloud Pub/Sub Notifications for Cloud Storageの場合は、「Cloud Pub/Sub Notifications for Cloud Storage」に"attributes"や"data"の説明があります。以下のサンプルコードは、実際に受信したメッセージを解析するクラスです。このコードでオブジェクトの生成/削除には対応できました。上書きやメタデータの更新など、その他は未確認です。カスタムしたtoString()メソッドも含んでいるので、ちょっと長いです。
import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; public class NotificationMessagePubSub { public class Message { public class Attributes { public String notificationConfig; // 登録されたCloud Pub/Sub Notifications for Cloud Storageを一意に識別するための文字列 public String eventType; // イベントの種類 public String eventTime; // イベントの発生時刻 public String payloadFormat; // ペイロードのフォーマット。"JSON_API_V1" or "NONE" public String bucketId; // バケットID public String objectId; // オブジェクトID public String objectGeneration; // オブジェクトの世代 public String toString() { StringBuilder builder = new StringBuilder() .append("{ ") .append("notificationConfig:\"").append(notificationConfig).append("\", ") .append("eventType:\"").append(eventType).append("\", ") .append("eventTime:\"").append(eventTime).append("\", ") .append("payloadFormat:\"").append(payloadFormat).append("\", ") .append("bucketId:\"").append(bucketId).append("\", ") .append("objectId:\"").append(objectId).append("\", ") .append("objectGeneration:").append(objectGeneration).append(", ") .append("}"); return builder.toString(); } } public Attributes attributes; public String data; // ペイロードをBase64エンコードした文字列 public String messageId; // このメッセージを一意に識別するための識別子 public String publishTime; // 発行時刻 public String toString() { StringBuilder builder = new StringBuilder() .append("{ ") .append("attributes:").append(attributes.toString()).append(", ") .append("data:\"").append(data).append("\", ") .append("messageId:\"").append(messageId).append("\", ") .append("publishTime:\"").append(publishTime).append("\", ") .append("}"); return builder.toString(); } } public Message message; public String subscription; // 配信先のサブスクリプション名 public String toString() { StringBuilder builder = new StringBuilder() .append("{ ") .append("message:").append(message).append(", ") .append("subscription:\"").append(subscription).append("\", ") .append("}, "); return builder.toString(); } // メッセージ解析 public static NotificationMessagePubSub parseMessage(HttpServletRequest request) { NotificationMessagePubSub message = null; ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // message_id/publish_timeは無視 try { message = mapper.readValue(request.getInputStream(), NotificationMessagePubSub.class); } catch (IOException e) { // 解析エラー } return message; } }
ペイロードであるdataプロパティの値は、payloadFormatの値により変わります。
payloadFormatの値 | ペイロードの内容 |
---|---|
NONE | nullが渡される。 |
JSON_API_V1 | オブジェクトのメタデータです。JSON APIでオブジェクトをアップロードした際のレスポンスのボディと同等の内容です。 |
注意点.
- プロパティ名がキャメルケースとスネークケースで重複しているプロパティが存在する。具体的には以下のとおり。 上記のサンプルではObjectMapperで例外発生させないように、片方を無視しています。
- message.messageId / message.message_id
- message.publishTime / message.publish_time
- message.attributes.notificationConfigの値は、 「バケットにオブジェクト変更の登録」で説明する登録を識別するための文字列です。実際取得したものは下記のフォーマットでした。
projects/_/buckets/バケット名/notificationConfigs/登録ID
- message.attributes.eventTypeが、「イベントの種類」に説明の、発生したイベントの種類です。オブジェクトの削除なら"OBJECT_DELETE"とか。
- message.attributes.objectIdが、イベントの対象になったオブジェクトです。削除の場合は、削除されたオブジェクトのIDですので、すでにこのオブジェクトは無くなっています。
ACK処理.
エンドポイントは受信したメッセージに対してACKを返す必要があります。「pushメッセージの受信」に説明されているように、Pushエンドポイントの場合はHTTPリクエストに返すレスポンスのステータスコードを設定するだけです。基本的には、正常に受信できたなら、成功を表す200番台のステータスコードを返します。それ以外のステータスコードを返せば、リトライされます。
バケットにオブジェクト変更の登録.
概要.
GCSのバケットに、オブジェクト変更を登録する方法は、「オブジェクト変更の登録」に説明されています。残念ながらGCPコンソールでは行えません。またコードで行う際も、クライアントライブラリも用意されておらず、JSON APIでしか登録できないようです。JSON APIは「オブジェクトのアップロード」/「オブジェクトのダウンロード」でブラウザから実行する実装を紹介しました。同様に実行できるはずですが、処理内容から通常はブラウザから実行する実装ではなく、GAE/GCEで行うものと思います。
メッセージ発行権限の設定.
初期状態ではGCSのサービスアカウントは、Pub/Subのメッセージを発行する権限を持っていません。GCSのサービスアカウントにメッセージを発行する権限を与えないと、オブジェクト変更の登録APIを呼んでもエラーになります。メッセージ発行権限の設定が、プロジェクトに対して1回必要です。
GCSのサービスアカウントは、プロジェクトが作成された時点で存在しているようですが、初期状態ではGCPコンソールのサービスアカウントに表示されません。そのためREST APIを利用して知る必要があります。その手順は「前提条件」の5のリンク先「Cloud Storageサービスアカウントの取得」に書かれているとおりです。JSON APIを使って取得してみると、以下の様に得られます。アクセストークンはOAuth2.0 Playgroundから取得するのが簡単です。"email_address"プロパティの値が、GCSのサービスアカウントのメールアドレスになります。
> curl -X GET -H "Authorization: Bearer アクセストークン" "https://www.googleapis.com/storage/v1/projects/プロジェクト名/serviceAccount" { "kind": "storage#serviceAccount", "email_address": "service-596527082535@gs-project-accounts.iam.gserviceaccount.com" }
このサービスアカウントに対して権限を設定する手順が、「前提条件」の6のリンク先「Google Cloud Consoleによアクセス制御」に説明されています。GCPコンソールのIAMで「追加」を押すと右のようなパネルが出てくるので、「新しいメンバー」に取得したGCSサービスアカウントのメールアドレスを入力し、役割に「Pub/Subパブリッシャー」を選択します。
設定が完了すると、一覧に先ほど入力したGCSサービスアカウントのメールアドレスの行が追加されているはずです。
アクセストークンの取得.
オブジェクト変更の登録APIをコールするためのアクセストークンが必要です。このアクセストークンは、先ほど「メッセージ発行権限の設定」で使用したものとは必要なスコープが異なります。「通知構成の適用」ではOAuth2.0 Playgroundから取得する方法が記載されていますが、プロジェクト管理者による権限の選択や承認が必要になるので、GAE/GCEから実行するには不向きです。
アクセストークンの取得はいろいろなケースで出てくると思うので、別ページで「サービスアカウントによるアクセストークンの取得」にまとめました。
「オブジェクトのアップロード」/「オブジェクトのダウンロード」では、ブラウザからの実行なので「ユーザ中心のフローによるアクセストークンの取得」で取得しました。しかし今回はGAE/GCEから実行するので、この方法は使えません。また「割り当てと上限」-「ヒントと注意点」に記載の様にサービスアカウントを使用する場合に比べて、ユーザアカウントを使用する場合は、処理速度の上限が引き下げられます。この点は、忘れたころに影響しそうです。
必要なスコープ.
オブジェクト変更の登録に必要なスコープは、https://www.googleapis.com/auth/devstorage.read_write
のみです。オブジェクト変更の登録APIのコール.
JSON APIのエンドポイント.
登録のJSON APIのURIは以下のとおりです。オブジェクト変更の通知はバケット単位で設定するので、対象になるバケット名を含んでいます。
HTTPメソッドは、登録はPOSTを使用します。
https://www.googleapis.com/storage/v1/b/バケット名/notificationConfigs
リクエストのボディ.
リクエストのボディはJSON形式で、ここで通知を配信したいトピック名を指定します。
payload_formatプロパティの値は、「ペイロード」に説明があります。すでに「Pushエンドポイントの実装」で説明していますが、受信したメッセージのmessage.dataプロパティの内容を指定します。
{ topic: "projects/プロジェクトID/topics/トピックID", payload_format: "JSON_API_V1" }
Javaコードのサンプル.
以下は、GAEからオブジェクト変更の登録を行うJavaのサンプルです。
import com.google.appengine.api.appidentity.AppIdentityService; import com.google.appengine.api.appidentity.AppIdentityServiceFactory; import com.google.appengine.api.urlfetch.HTTPHeader; import com.google.appengine.api.urlfetch.HTTPMethod; import com.google.appengine.api.urlfetch.HTTPRequest; import com.google.appengine.api.urlfetch.HTTPResponse; import com.google.appengine.api.urlfetch.URLFetchService; import com.google.appengine.api.urlfetch.URLFetchServiceFactory; class SampleSetNotification { // アクセストークンの取得用 static AppIdentityService appIdentityService = AppIdentityServiceFactory.getAppIdentityService(); // スコープ static final List<String> OBJECT_NOTIFICATION_SCOPES = Arrays.asList( "https://www.googleapis.com/auth/pubsub", "https://www.googleapis.com/auth/devstorage.read_write", ); public void setObjectNotification(String bucket, String topic) throws IOException { // JSON APIのURI URLFetchService fetchsvc = URLFetchServiceFactory.getURLFetchService(); URL notifurl = new URL("https://www.googleapis.com/storage/v1/b/" +bucket +"/notificationConfigs"); // リクエスト HTTPRequest request = new HTTPRequest(notifurl, HTTPMethod.POST); // 登録はPOST request.addHeader(new HTTPHeader("Content-Type", "application/json")); // コンテンツタイプはJSON // トピックに関連付けるためのボディ String payload = "{" +"topic : \"" +topic +"\"," // トピック名 +"payload_format : \"NONE\"" // ペイロードなし +"}"; request.setPayload(payload.getBytes()); // アクセストークンをセット AppIdentityService.GetAccessTokenResult tokenres = appIdentityService.getAccessToken(OBJECT_NOTIFICATION_SCOPES); request.addHeader(new HTTPHeader("Authorization", "Bearer " +tokenres.getAccessToken())); // JSON APIをコール HTTPResponse response = fetchsvc.fetch(request); // 必要ならエラー処理 } }
注意事項.
- Pushの場合、エンドポイントを正しく作ってACKを返しておかないと、延々リトライされてしまいます。
- 全く同じ内容でエラーにもならず2重登録できてしまいます。それぞれで通知が行われるので、2重にメッセージが届くようになります。
- なぜか、ACKを返す前で、確認応答期限より前に、1つだけ重複してメッセージが届くケースがありました。
登録内容の確認.
登録内容の取得も、GSUTILまたはJSON APIしか用意されていません。HTTPメソッドをPOSTからGETに替えて、ボディを空にするだけで、バケットに登録された内容を取得できます。
登録内容の取得だけなら、アクセストークン取得時に必要なスコープは、https://www.googleapis.com/auth/devstorage.read_only
のみです。
デバッグのTips.
- GCPコンソールのストレージブラウザでオブジェクトをアップロード/削除するだけでも、メッセージの配信が行われます。動作確認に最適です。
- 未処理のメッセージの破棄は、GCPコンソールのサブスクリプションの詳細のメニューから「メッセージのパージ」で、まとめてできます。
Copyright 2005-2024, yosshie.