Archiwa blogu
Azure Service Bus brokered messaging – Topics, Subscriptions
2014-11-18
Po omówieniu kolejek, dziś opiszę pracę z topikami i subskrypcjami w usłudze Microsoft Azure Service Bus. Tak jak poprzednio, w prezentowanych przykładach wiadomość przesyłana pomiędzy nadawcą i odbiorcą będzie zawierała obiekt zamówienia:
public class Order { public string Id { get; set; } public DateTime Date { get; set; } public string Customer { get; set; } public List<string> Details { get; set; } }
W kodzie będę się odwoływał do dwóch zmiennych zawierających connection string dla przestrzeni nazw usługi oraz nazwę topiku:
public readonly string connectionString = String.Format( "Endpoint={0};SharedAccessKeyName={1};SharedAccessKey={2}", "sb://tempnamespace.servicebus.windows.net/", "RootManageSharedAccessKey", "zFUqkwWoDrem9O8UkdG0pq0sheJze0U/P93f42Aykdc="); public readonly string topicName = "orderstopic";
Zarządzanie topikiem
Zarządzanie topikami odbywa się w analogiczny sposób jak w przypadku kolejek. Tu także korzystamy z obiektu NamespaceManager używając jedynie innych metod, jednak co do zasady są to dokładnie te same mechanizmy. Poniżej znajdują się przykłady tworzenia, modyfikacji oraz usunięcia topiku.
Proste tworzenie topiku:
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); if (!namespaceManager.TopicExists(topicName)) namespaceManager.CreateTopic(topicName);
Tworzenie topiku za pomocą obiektu TopicDescription:
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); if (!namespaceManager.TopicExists(topicName)) { TopicDescription topic = new TopicDescription(topicName) { MaxSizeInMegabytes = 1024, RequiresDuplicateDetection = true }; namespaceManager.CreateTopic(topic); }
Odczyt właściwości oraz modyfikacja topiku:
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); TopicDescription topic = namespaceManager.GetTopic(topicName); long subscriptionCount = topic.SubscriptionCount; long sizeInBytes = topic.SizeInBytes; topic.MaxSizeInMegabytes = 2048; namespaceManager.UpdateTopic(topic);
Usunięcie topiku:
NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); namespaceManager.DeleteTopic(topicName);
Zarządzanie subskrypcjami
Po wysłaniu wiadomości do topiku, w celu jej odczytu odbiorca nie odwołuje się bezpośrednio do topiku tylko korzysta z wybranej subskrypcji. To jakie wiadomości z topiku trafią do danej subskrypcji określane jest poprzez filtry operujące na atrybutach wiadomości. Podstawową różnicą w stosunku do kolejek jest to, iż w przypadku topiku tą samą wiadomość może otrzymać kilku odbiorców. Jeżeli dana wiadomość przesłana do topiku spełni warunki filtrów więcej niż jednej subskrypcji to jej kopia zostanie przekazana do każdej z nich. Należy jednak pamiętać, że filtry subskrypcji sprawdzane są tylko w momencie pojawienia się wiadomości w topiku. Jeżeli utworzymy nową subskrypcję to mogą do niej trafić jedynie wiadomości wysłane od tego momentu, wszystkie już istniejące w topiku mimo spełnionych warunków filtra nie zostaną do niej przekazane. Mamy trzy możliwości definiowania subskrypcji: bez filtra (będą do niej trafiać wszystkie wiadomości z topiku), z filtrem typu CorrelationFilter (filtr odnoszący się do wartości atrybutu CorrelationId wiadomości) lub z filtrem typu SqlFilter (filtr odnoszący się do poszczególnych elementów atrybutu Properties wiadomości).
Subskrypcję bez filtra tworzymy podając jedynie jej nazwę oraz nazwę topiku, z którym będzie powiązana:
string subscriptionName = "ordersall"; NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); if (!namespaceManager.SubscriptionExists(topicName, subscriptionName)) namespaceManager.CreateSubscription(topicName, subscriptionName);
W przypadku tworzenia subskrypcji z filtrem typu CorrelationFilter, oprócz nazwy topiku musimy przekazać definicję filtra. Tworząc filtr podajemy wartość atrybutu CorrelationId wiadomości, które mają trafiać do tej subskrypcji:
string subscriptionName = "ordersdepartmenta"; CorrelationFilter filter = new CorrelationFilter("Department A"); NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); if (!namespaceManager.SubscriptionExists(topicName, subscriptionName)) namespaceManager.CreateSubscription(topicName, subscriptionName, filter);
Przy użyciu powyżej zdefiniowanej subskrypcji otrzymamy wszystkie wiadomości wysłane do topiku, które w atrybucie CorrelationId będą miały wartość „Department A”.
Jeżeli chcemy mieć większe możliwości filtrowania możemy utworzyć subskrypcję z filtrem typu SqlFilter. W tym przypadku tworząc filtr podajemy wyrażenie logiczne operujące na elementach atrybutu Properties wiadomości (jest to atrybut typu IDictionary<string, object>). Budując wyrażenie mamy do dyspozycji następujące operatory: =, >, <, !=, <>, and, or, (, ). Daje to bardzo duże możliwości tworzenia warunków filtrowania. Przykładowy filtr może wyglądać tak: „(Region != ‚PL’ or ItemsCount > 10) and (Sender = ‚Jan Kowalski’)”, gdzie Region, ItemsCount i Sender to klucze we właściwości Properties wiadomości.
Oto w jaki sposób utworzyć subskrypcję z filtrem SqlFilter:
string subscriptionName = "orderswholesalepl"; SqlFilter filter = new SqlFilter("Region = 'PL' and ItemsCount > 10"); NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); if (!namespaceManager.SubscriptionExists(topicName, subscriptionName)) namespaceManager.CreateSubscription(topicName, subscriptionName, filter);
Do powyższej subskrypcji trafią wszystkie wiadomości z topiku, które we właściwości Properties będą posiadały klucz Region z wartością „PL” oraz klucz ItemsCount z wartością większą od 10.
Do utworzenia subskrypcji możemy także użyć obiektu SubscriptionDescription, pozwalającego na dodatkową konfigurację:
string subscriptionName = "orderswholesalepl"; SqlFilter filter = new SqlFilter("Region = 'PL' and ItemsCount > 10"); NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); if (!namespaceManager.SubscriptionExists(topicName, subscriptionName)) { SubscriptionDescription subscription = new SubscriptionDescription(topicName, subscriptionName) { RequiresSession = false, LockDuration = TimeSpan.FromMinutes(1) }; namespaceManager.CreateSubscription(subscription, filter); }
Z obiektu SubscriptionDescription możemy również skorzystać w celu odczytu informacji o subskrypcji oraz jej modyfikacji:
string subscriptionName = "ordersall"; NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); SubscriptionDescription subscription = namespaceManager.GetSubscription(topicName, subscriptionName); long messageCount = subscription.MessageCount; subscription.MaxDeliveryCount = 5; namespaceManager.UpdateSubscription(subscription);
Usunięcie subskrypcji sprowadza się do wywołania metody DeleteSubscription na obiekcie NamespaceManager:
string subscriptionName = "ordersall"; NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); namespaceManager.DeleteSubscription(topicName, subscriptionName);
Wysyłanie wiadomości do topiku
Wysłanie wiadomości do topiku wygląda praktycznie tak samo jak w przypadku kolejki. Mamy dwie możliwości, albo użyjemy dedykowanego obiektu TopicClient albo uniwersalnego MessageSender znanego z kolejek (tym razem tworzonego poprzez podanie nazwy topiku):
TopicClient topicClient = TopicClient.CreateFromConnectionString(connectionString, topicName);
lub:
MessagingFactory messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString); TopicClient topicClient = messagingFactory.CreateTopicClient(topicName);
lub:
MessagingFactory messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString); MessageSender messageSender = messagingFactory.CreateMessageSender(topicName);
Samo przygotowanie wiadomości nie różni się niczym od tego, co opisywałem przy kolejkach. Tworzymy obiekt klasy BrokeredMessage i przekazujemy go do metody Send klienta topiku:
Order order = new Order() { Id = "1", Date = DateTime.UtcNow, Customer = "Company ABC", Details = new List<string> { "Product1", "Product2", "Product3" } }; BrokeredMessage message = new BrokeredMessage(order) { MessageId = Guid.NewGuid().ToString(), Label = "Application X", CorrelationId = "Department A", TimeToLive = TimeSpan.FromHours(12) }; message.Properties.Add("Sender", "Jan Kowalski"); message.Properties.Add("Region", "PL"); message.Properties.Add("ItemsCount", order.Details.Count); messageSender.Send(message); messageSender.Close();
Odbieranie wiadomości przy użyciu subskrypcji
W celu odczytu wiadomości wysłanych do topiku odbiorca nie odwołuje się bezpośrednio do topiku tylko korzysta z określonej subskrypcji, która z jego punktu widzenia posiada funkcjonalność kolejki. Przy użyciu wybranej subskrypcji będziemy w stanie odczytać jedynie te wiadomości, które spełniły warunki jej filtra. Podobnie jak w przypadku kolejek odczyt wiadomości może być realizowany w trybie ReceiveAndDelete lub PeekLock. Po więcej szczegółów odsyłam do poprzedniego tematu. Aby odebrać wiadomość musimy utworzyć obiekt klienta subskrypcji. Możemy skorzystać z klasy SubscriptionClient lub MessageReceiver. W przypadku tego drugiego w celu wskazania konkretnej subskrypcji podajemy ścieżkę w formacie „topicName/subscriptions/subscriptionName„:
string subscriptionName = "ordersall"; SubscriptionClient subscriptionClient = SubscriptionClient.CreateFromConnectionString( connectionString, topicName, subscriptionName, ReceiveMode.PeekLock);
lub:
string subscriptionName = "ordersall"; MessagingFactory messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString); SubscriptionClient subscriptionClient = messagingFactory.CreateSubscriptionClient( topicName, subscriptionName, ReceiveMode.PeekLock);
lub:
string subscriptionName = "ordersall"; MessagingFactory messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString); MessageReceiver messageReceiver = messagingFactory.CreateMessageReceiver( String.Format("{0}/subscriptions/{1}", topicName, subscriptionName), ReceiveMode.PeekLock);
Pobieranie wiadomości nie różni się niczym w stosunku do tego, co opisywałem przy kolejkach. Tu także mamy do dyspozycji metody Peek, Receive oraz ReceiveBatch, a odebrana wiadomość to znany już obiekt klasy BrokeredMessage:
BrokeredMessage message = messageReceiver.Receive(TimeSpan.FromSeconds(5)); if (message != null) { try { long messageNumber = message.SequenceNumber; string messageId = message.MessageId; string label = message.Label; string correlationId = message.CorrelationId; string orderSender = message.Properties["Sender"].ToString(); string orderRegion = message.Properties["Region"].ToString(); Order order = message.GetBody<Order>(); if (orderRegion == "PL") { //Processing Order... message.Complete(); } else message.DeadLetter(); } catch (Exception ex) { message.Abandon(); } } messageReceiver.Close();
W tym i poprzednim wpisie pokazałem podstawowe operacje związane z wysyłaniem i odbieraniem wiadomości przy wykorzystaniu kolejek, topików i subskrypcji. W następnym temacie przedstawię bardziej zaawansowane scenariusze pracy z wiadomościami.