Windows Azure AppFabricサービス バスでのTopic利用

2011年11月7日 | By TatsuakiSakai | Filed in: Micosoft Azure.

前回のポストで、Windows Azure AppFabricサービス バスにおけるQueueの利用についてご紹介しました。

前回も少しだけ触れましたが、パブリッシャ・サブスクライバ モデルを実現するためのTopicの利用も、Queue利用とほぼ同じ手順で利用できるという話をしました。

唯一異なる点は、1つのパブリッシャ(発行者)に対し複数のサブスクライバ(購読者)が存在するため、各Topic(発行される記事)に対して複数のSubscriptionを作成する必要がある点です。

各Subscriptionは必ず1つのTopicに属し、所属するTopicにポストされたメッセージを各サブスクライバに配信します。

一方、Topicにメッセージを発信する発信者はどのSubscriptionにメッセージを配信するかを意識する必要はありません。

Topicに対しメッセージを送信するだけで、自動的にTopicに属するSubscriptionに対してメッセージが配信されます。

それでは、実際にTopicにメッセージを層んするためのコードを見ていきましょう。

 

private Uri m_BaseAddress;
private string m_Issuer = “owner”;
private string m_SharedKey = “各Namespaceに対応するSharedKeyを指定する。”;
private TokenProvider m_TokenProvider;
private NamespaceManager m_NamespaceManager;
private MessagingFactory m_MessagingFactory;
private TopicClient m_TopicClient;

private void CreateAndSend(string messagebody)
{
//サービスバスのBaseAddressを生成
 m_BaseAddress = ServiceBusEnvironment.CreateServiceUri(“sb”, “yournamespace”, string.Empty);
 //資格情報の作成
 m_TokenProvider
= TokenProvider.CreateSharedSecretTokenProvider(m_Issuer, m_SharedKey);
 //トピックの新規作成(同一名トピックが存在する場合、一旦削除する)
 m_ NamespaceManager = new NamespaceManager(m_BaseAddress, m_TokenProvider);
 if(m_ NamespaceManager.TopicExists(“NotifyTopic”))
m_ NamespaceManager.DeleteTopic(“NotifyTopic”);
 m_ NamespaceManager.CreateTopic(“NotifyTopic”);
 //メッセージファクトリの作成
 m_MessageingFactory = MessagingFactory.Create(m_BaseAddress, m_TokenProvider);
 //TopicClient作成
 m_TopicClient = m_MessageingFactory.CreateTopicClient(“NotifyTopic”);
 //Subscriptionを新規作成し、Topicと関連付ける
 m_NsManager.CreateSubscription(m_TopicClient.Path, “Category1”);
 m_NsManager.CreateSubscription(m_TopicClient.Path, “Category2”);
 m_NsManager.CreateSubscription(m_TopicClient.Path, “Category3”);
 //メッセージの送信
 m_TopicClient.Send(new BrokeredMessage(messagebody));
}

この例では、Topicの作成~メッセージの送信を一度に実行していますが、多くの場合には初期化時にTopicおよびSubscriptionの作成を実行しておき、適宜メッセージを送信することとなります。

したがって、初期化処理はサンプルコード中の最下部のm_TopicClient.Send()メソッドを除いた部分を実行しておけば大丈夫です。

それでは、サブスクリプション側のインプリメントを見ていきましょう。

 

private Uri m_BaseAddress;
private string m_Issuer = “owner”;
private string m_SharedKey = “各Namespaceに対応するSharedKeyを指定する。”;
private TokenProvider m_TokenProvider;
private SubscriptionClient m_SubClient;
private MessagingFactory m_MessageingFactory;
private void CreateSubscription ()
{
 //サービスバスのBaseAddressを生成
 m_BaseAddress = ServiceBusEnvironment.CreateServiceUri(“sb” ,  “yournamespace” ,  string.Empty);
 //資格情報の作成
 m_TokenProvider
  = TokenProvider.CreateSharedSecretTokenProvider(m_Issuer, m_SharedKey);
 //メッセージファクトリの作成
 m_MessageingFactory = MessagingFactory.Create(m_BaseAddress, m_TokenProvider);
 //サブスクリプション・クライアントの作成
 m_SubClient = m_MessageingFactory.CreateSubscriptionClient(NotifyTopic”,“Category1”);
}
private void TryReceive ()
{
 //メッセージオブジェクトの定義
 BrokeredMessage message;
 //メッセージの受信(1秒待ってメッセージが存在しない場合はパス)
 message = m_SubClient.Receive(TimeSpan.FromSeconds(1));
 if (message != null)
 {
  //メッセージが受信できた場合にメッセージ本文を表示
  MessageBox.Show(message.GetBody<string>(), m_SubClient.Name);
  //メッセージの処理の完了を通知
  m_SubClient.Complete(message.LockToken);
 }
}

 

この例では、CreateSubscriptionメソッドでSubscriptionを作成し、TryReceiveメソッドでメッセージを受信しています。

メッセージ形式はQueueと同様BrokeredMessageであることがわかります。

つまり、シリアライズ可能なオブジェクトをメッセージ本体に含めて送ることが可能です。

また、メッセージの処理が完了した後にSubscriptionClientのCompleteメソッドを忘れずに実行してください。

これを忘れてしまうと、再購読時に同一のメッセージを受信してしまいます。

言い換えれば、処理が完了するまではメッセージが保存されていますので、途中で失敗した場合には再度メッセージを受信することでリカバリが可能であるともいえます。

 


Comments are closed here.