前回のポストで、Windows Azure AppFabricサービス バスにおけるQueueの利用についてご紹介しました。
前回も少しだけ触れましたが、パブリッシャ・サブスクライバ モデルを実現するためのTopicの利用も、Queue利用とほぼ同じ手順で利用できるという話をしました。
唯一異なる点は、1つのパブリッシャ(発行者)に対し複数のサブスクライバ(購読者)が存在するため、各Topic(発行される記事)に対して複数のSubscriptionを作成する必要がある点です。
各Subscriptionは必ず1つのTopicに属し、所属するTopicにポストされたメッセージを各サブスクライバに配信します。
一方、Topicにメッセージを発信する発信者はどのSubscriptionにメッセージを配信するかを意識する必要はありません。
Topicに対しメッセージを送信するだけで、自動的にTopicに属するSubscriptionに対してメッセージが配信されます。
それでは、実際にTopicにメッセージを層んするためのコードを見ていきましょう。
private Uri m_BaseAddress;
private string m_Issuer = “owner”;
private string m_SharedKey = “各Namespaceに対応するSharedKeyを指定する。”;
private TokenProvider m_TokenProvider;
private NamespaceManager m_NamespaceManager;
private MessagingFactory m_MessagingFactory;
private TopicClient m_TopicClient;private void CreateAndSend(string messagebody)
{
//サービスバスのBaseAddressを生成
m_BaseAddress = ServiceBusEnvironment.CreateServiceUri(“sb”, “yournamespace”, string.Empty);
//資格情報の作成
m_TokenProvider
= TokenProvider.CreateSharedSecretTokenProvider(m_Issuer, m_SharedKey);
//トピックの新規作成(同一名トピックが存在する場合、一旦削除する)
m_ NamespaceManager = new NamespaceManager(m_BaseAddress, m_TokenProvider);
if(m_ NamespaceManager.TopicExists(“NotifyTopic”))
m_ NamespaceManager.DeleteTopic(“NotifyTopic”);
m_ NamespaceManager.CreateTopic(“NotifyTopic”);
//メッセージファクトリの作成
m_MessageingFactory = MessagingFactory.Create(m_BaseAddress, m_TokenProvider);
//TopicClient作成
m_TopicClient = m_MessageingFactory.CreateTopicClient(“NotifyTopic”);
//Subscriptionを新規作成し、Topicと関連付ける
m_NsManager.CreateSubscription(m_TopicClient.Path, “Category1”);
m_NsManager.CreateSubscription(m_TopicClient.Path, “Category2”);
m_NsManager.CreateSubscription(m_TopicClient.Path, “Category3”);
//メッセージの送信
m_TopicClient.Send(new BrokeredMessage(messagebody));
}
この例では、Topicの作成~メッセージの送信を一度に実行していますが、多くの場合には初期化時にTopicおよびSubscriptionの作成を実行しておき、適宜メッセージを送信することとなります。
したがって、初期化処理はサンプルコード中の最下部のm_TopicClient.Send()メソッドを除いた部分を実行しておけば大丈夫です。
それでは、サブスクリプション側のインプリメントを見ていきましょう。
private Uri m_BaseAddress;
private string m_Issuer = “owner”;
private string m_SharedKey = “各Namespaceに対応するSharedKeyを指定する。”;
private TokenProvider m_TokenProvider;
private SubscriptionClient m_SubClient;
private MessagingFactory m_MessageingFactory;
private void CreateSubscription ()
{
//サービスバスのBaseAddressを生成
m_BaseAddress = ServiceBusEnvironment.CreateServiceUri(“sb” , “yournamespace” , string.Empty);
//資格情報の作成
m_TokenProvider
= TokenProvider.CreateSharedSecretTokenProvider(m_Issuer, m_SharedKey);
//メッセージファクトリの作成
m_MessageingFactory = MessagingFactory.Create(m_BaseAddress, m_TokenProvider);
//サブスクリプション・クライアントの作成
m_SubClient = m_MessageingFactory.CreateSubscriptionClient(“NotifyTopic”,“Category1”);
}
private void TryReceive ()
{
//メッセージオブジェクトの定義
BrokeredMessage message;
//メッセージの受信(1秒待ってメッセージが存在しない場合はパス)
message = m_SubClient.Receive(TimeSpan.FromSeconds(1));
if (message != null)
{
//メッセージが受信できた場合にメッセージ本文を表示
MessageBox.Show(message.GetBody<string>(), m_SubClient.Name);
//メッセージの処理の完了を通知
m_SubClient.Complete(message.LockToken);
}
}
この例では、CreateSubscriptionメソッドでSubscriptionを作成し、TryReceiveメソッドでメッセージを受信しています。
メッセージ形式はQueueと同様BrokeredMessageであることがわかります。
つまり、シリアライズ可能なオブジェクトをメッセージ本体に含めて送ることが可能です。
また、メッセージの処理が完了した後にSubscriptionClientのCompleteメソッドを忘れずに実行してください。
これを忘れてしまうと、再購読時に同一のメッセージを受信してしまいます。
言い換えれば、処理が完了するまではメッセージが保存されていますので、途中で失敗した場合には再度メッセージを受信することでリカバリが可能であるともいえます。
Comments are closed here.