Azure Service Bus brokered messaging – Topics, Subscriptions

2014-11-18

Po omówieniu kolejek, dziś opiszę pracę z topikami i subskrypcjami w usłudze Microsoft Azure Service Bus. Tak jak poprzednio, w prezentowanych przykładach wiadomość przesyłana pomiędzy nadawcą i odbiorcą będzie zawierała obiekt zamówienia:

public class Order
{
  public string Id { get; set; }
  public DateTime Date { get; set; }
  public string Customer { get; set; }
  public List<string> Details { get; set; }
}

W kodzie będę się odwoływał do dwóch zmiennych zawierających connection string dla przestrzeni nazw usługi oraz nazwę topiku:

public readonly string connectionString = String.Format(
  "Endpoint={0};SharedAccessKeyName={1};SharedAccessKey={2}",
  "sb://tempnamespace.servicebus.windows.net/",
  "RootManageSharedAccessKey",
  "zFUqkwWoDrem9O8UkdG0pq0sheJze0U/P93f42Aykdc=");

public readonly string topicName = "orderstopic";

 

Zarządzanie topikiem

Zarządzanie topikami odbywa się w analogiczny sposób jak w przypadku kolejek. Tu także korzystamy z obiektu NamespaceManager używając jedynie innych metod, jednak co do zasady są to dokładnie te same mechanizmy. Poniżej znajdują się przykłady tworzenia, modyfikacji oraz usunięcia topiku.

Proste tworzenie topiku:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.TopicExists(topicName))
  namespaceManager.CreateTopic(topicName);

Tworzenie topiku za pomocą obiektu TopicDescription:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.TopicExists(topicName))
{
  TopicDescription topic = new TopicDescription(topicName)
  {
    MaxSizeInMegabytes = 1024,
    RequiresDuplicateDetection = true
  };
  namespaceManager.CreateTopic(topic);
}

Odczyt właściwości oraz modyfikacja topiku:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

TopicDescription topic = namespaceManager.GetTopic(topicName);

long subscriptionCount = topic.SubscriptionCount;
long sizeInBytes = topic.SizeInBytes;

topic.MaxSizeInMegabytes = 2048;
namespaceManager.UpdateTopic(topic);

Usunięcie topiku:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

namespaceManager.DeleteTopic(topicName);

 

Zarządzanie subskrypcjami

Po wysłaniu wiadomości do topiku, w celu jej odczytu odbiorca nie odwołuje się bezpośrednio do topiku tylko korzysta z wybranej subskrypcji. To jakie wiadomości z topiku trafią do danej subskrypcji określane jest poprzez filtry operujące na atrybutach wiadomości. Podstawową różnicą w stosunku do kolejek jest to, iż w przypadku topiku tą samą wiadomość może otrzymać kilku odbiorców. Jeżeli dana wiadomość przesłana do topiku spełni warunki filtrów więcej niż jednej subskrypcji to jej kopia zostanie przekazana do każdej z nich. Należy jednak pamiętać, że filtry subskrypcji sprawdzane są tylko w momencie pojawienia się wiadomości w topiku. Jeżeli utworzymy nową subskrypcję to mogą do niej trafić jedynie wiadomości wysłane od tego momentu, wszystkie już istniejące w topiku mimo spełnionych warunków filtra nie zostaną do niej przekazane. Mamy trzy możliwości definiowania subskrypcji: bez filtra (będą do niej trafiać wszystkie wiadomości z topiku), z filtrem typu CorrelationFilter (filtr odnoszący się do wartości atrybutu CorrelationId wiadomości) lub z filtrem typu SqlFilter (filtr odnoszący się do poszczególnych elementów atrybutu Properties wiadomości).

Subskrypcję bez filtra tworzymy podając jedynie jej nazwę oraz nazwę topiku, z którym będzie powiązana:

string subscriptionName = "ordersall";

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
  namespaceManager.CreateSubscription(topicName, subscriptionName);

W przypadku tworzenia subskrypcji z filtrem typu CorrelationFilter, oprócz nazwy topiku musimy przekazać definicję filtra. Tworząc filtr podajemy wartość atrybutu CorrelationId wiadomości, które mają trafiać do tej subskrypcji:

string subscriptionName = "ordersdepartmenta";

CorrelationFilter filter = new CorrelationFilter("Department A");

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
  namespaceManager.CreateSubscription(topicName, subscriptionName, filter);

Przy użyciu powyżej zdefiniowanej subskrypcji otrzymamy wszystkie wiadomości wysłane do topiku, które w atrybucie CorrelationId będą miały wartość „Department A”.

Jeżeli chcemy mieć większe możliwości filtrowania możemy utworzyć subskrypcję z filtrem typu SqlFilter. W tym przypadku tworząc filtr podajemy wyrażenie logiczne operujące na elementach atrybutu Properties wiadomości (jest to atrybut typu IDictionary<string, object>). Budując wyrażenie mamy do dyspozycji następujące operatory: =, >, <, !=, <>, and, or, (, ). Daje to bardzo duże możliwości tworzenia warunków filtrowania. Przykładowy filtr może wyglądać tak: „(Region != ‚PL’ or ItemsCount > 10) and (Sender = ‚Jan Kowalski’)”, gdzie Region, ItemsCount i Sender to klucze we właściwości Properties wiadomości.

Oto w jaki sposób utworzyć subskrypcję z filtrem SqlFilter:

string subscriptionName = "orderswholesalepl";

SqlFilter filter = new SqlFilter("Region = 'PL' and ItemsCount > 10");

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
  namespaceManager.CreateSubscription(topicName, subscriptionName, filter);

Do powyższej subskrypcji trafią wszystkie wiadomości z topiku, które we właściwości Properties będą posiadały klucz Region z wartością „PL” oraz klucz ItemsCount z wartością większą od 10.

Do utworzenia subskrypcji możemy także użyć obiektu SubscriptionDescription, pozwalającego na dodatkową konfigurację:

string subscriptionName = "orderswholesalepl";

SqlFilter filter = new SqlFilter("Region = 'PL' and ItemsCount > 10");

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
{
  SubscriptionDescription subscription =
    new SubscriptionDescription(topicName, subscriptionName)
  {
    RequiresSession = false,
    LockDuration = TimeSpan.FromMinutes(1)
  };
  namespaceManager.CreateSubscription(subscription, filter);
}

Z obiektu SubscriptionDescription możemy również skorzystać w celu odczytu informacji o subskrypcji oraz jej modyfikacji:

string subscriptionName = "ordersall";

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

SubscriptionDescription subscription =
  namespaceManager.GetSubscription(topicName, subscriptionName);

long messageCount = subscription.MessageCount;

subscription.MaxDeliveryCount = 5;
namespaceManager.UpdateSubscription(subscription);

Usunięcie subskrypcji sprowadza się do wywołania metody DeleteSubscription na obiekcie NamespaceManager:

string subscriptionName = "ordersall";

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

namespaceManager.DeleteSubscription(topicName, subscriptionName);

 

Wysyłanie wiadomości do topiku

Wysłanie wiadomości do topiku wygląda praktycznie tak samo jak w przypadku kolejki. Mamy dwie możliwości, albo użyjemy dedykowanego obiektu TopicClient albo uniwersalnego MessageSender znanego z kolejek (tym razem tworzonego poprzez podanie nazwy topiku):

TopicClient topicClient =
  TopicClient.CreateFromConnectionString(connectionString, topicName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

TopicClient topicClient = messagingFactory.CreateTopicClient(topicName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageSender messageSender =
  messagingFactory.CreateMessageSender(topicName);

Samo przygotowanie wiadomości nie różni się niczym od tego, co opisywałem przy kolejkach. Tworzymy obiekt klasy BrokeredMessage i przekazujemy go do metody Send klienta topiku:

Order order = new Order()
{
  Id = "1",
  Date = DateTime.UtcNow,
  Customer = "Company ABC",
  Details = new List<string> { "Product1", "Product2", "Product3" }
};

BrokeredMessage message = new BrokeredMessage(order)
{
  MessageId = Guid.NewGuid().ToString(),
  Label = "Application X",
  CorrelationId = "Department A",
  TimeToLive = TimeSpan.FromHours(12)
};
message.Properties.Add("Sender", "Jan Kowalski");
message.Properties.Add("Region", "PL");
message.Properties.Add("ItemsCount", order.Details.Count);

messageSender.Send(message);

messageSender.Close();

 

Odbieranie wiadomości przy użyciu subskrypcji

W celu odczytu wiadomości wysłanych do topiku odbiorca nie odwołuje się bezpośrednio do topiku tylko korzysta z określonej subskrypcji, która z jego punktu widzenia posiada funkcjonalność kolejki. Przy użyciu wybranej subskrypcji będziemy w stanie odczytać jedynie te wiadomości, które spełniły warunki jej filtra. Podobnie jak w przypadku kolejek odczyt wiadomości może być realizowany w trybie ReceiveAndDelete lub PeekLock. Po więcej szczegółów odsyłam do poprzedniego tematu. Aby odebrać wiadomość musimy utworzyć obiekt klienta subskrypcji. Możemy skorzystać z klasy SubscriptionClient lub MessageReceiver. W przypadku tego drugiego w celu wskazania konkretnej subskrypcji podajemy ścieżkę w formacie „topicName/subscriptions/subscriptionName„:

string subscriptionName = "ordersall";

SubscriptionClient subscriptionClient =
  SubscriptionClient.CreateFromConnectionString(
    connectionString, topicName, subscriptionName, ReceiveMode.PeekLock);

lub:

string subscriptionName = "ordersall";

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

SubscriptionClient subscriptionClient =
  messagingFactory.CreateSubscriptionClient(
    topicName, subscriptionName, ReceiveMode.PeekLock);

lub:

string subscriptionName = "ordersall";

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageReceiver messageReceiver =
  messagingFactory.CreateMessageReceiver(
    String.Format("{0}/subscriptions/{1}", topicName, subscriptionName),
    ReceiveMode.PeekLock);

Pobieranie wiadomości nie różni się niczym w stosunku do tego, co opisywałem przy kolejkach. Tu także mamy do dyspozycji metody Peek, Receive oraz ReceiveBatch, a odebrana wiadomość to znany już obiekt klasy BrokeredMessage:

BrokeredMessage message =
  messageReceiver.Receive(TimeSpan.FromSeconds(5));

if (message != null)
{
  try
  {
    long messageNumber = message.SequenceNumber;
    string messageId = message.MessageId;
    string label = message.Label;
    string correlationId = message.CorrelationId;
    string orderSender = message.Properties["Sender"].ToString();
    string orderRegion = message.Properties["Region"].ToString();

    Order order = message.GetBody<Order>();

    if (orderRegion == "PL")
    {
      //Processing Order...

      message.Complete();
    }
    else
      message.DeadLetter();
  }
  catch (Exception ex)
  {
    message.Abandon();
  }
}
messageReceiver.Close();

W tym i poprzednim wpisie pokazałem podstawowe operacje związane z wysyłaniem i odbieraniem wiadomości przy wykorzystaniu kolejek, topików i subskrypcji. W następnym temacie przedstawię bardziej zaawansowane scenariusze pracy z wiadomościami.

Reklamy

Posted on 2014-11-18, in .NET/C# and tagged , , , , , . Bookmark the permalink. Dodaj komentarz.

Skomentuj

Wprowadź swoje dane lub kliknij jedną z tych ikon, aby się zalogować:

Logo WordPress.com

Komentujesz korzystając z konta WordPress.com. Log Out / Zmień )

Zdjęcie z Twittera

Komentujesz korzystając z konta Twitter. Log Out / Zmień )

Facebook photo

Komentujesz korzystając z konta Facebook. Log Out / Zmień )

Google+ photo

Komentujesz korzystając z konta Google+. Log Out / Zmień )

Connecting to %s

%d blogerów lubi to: