Monthly Archives: Listopad 2014

Azure Service Bus brokered messaging – Topics, Subscriptions

2014-11-18

Po omówieniu kolejek, dziś opiszę pracę z topikami i subskrypcjami w usłudze Microsoft Azure Service Bus. Tak jak poprzednio, w prezentowanych przykładach wiadomość przesyłana pomiędzy nadawcą i odbiorcą będzie zawierała obiekt zamówienia:

public class Order
{
  public string Id { get; set; }
  public DateTime Date { get; set; }
  public string Customer { get; set; }
  public List<string> Details { get; set; }
}

W kodzie będę się odwoływał do dwóch zmiennych zawierających connection string dla przestrzeni nazw usługi oraz nazwę topiku:

public readonly string connectionString = String.Format(
  "Endpoint={0};SharedAccessKeyName={1};SharedAccessKey={2}",
  "sb://tempnamespace.servicebus.windows.net/",
  "RootManageSharedAccessKey",
  "zFUqkwWoDrem9O8UkdG0pq0sheJze0U/P93f42Aykdc=");

public readonly string topicName = "orderstopic";

 

Zarządzanie topikiem

Zarządzanie topikami odbywa się w analogiczny sposób jak w przypadku kolejek. Tu także korzystamy z obiektu NamespaceManager używając jedynie innych metod, jednak co do zasady są to dokładnie te same mechanizmy. Poniżej znajdują się przykłady tworzenia, modyfikacji oraz usunięcia topiku.

Proste tworzenie topiku:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.TopicExists(topicName))
  namespaceManager.CreateTopic(topicName);

Tworzenie topiku za pomocą obiektu TopicDescription:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.TopicExists(topicName))
{
  TopicDescription topic = new TopicDescription(topicName)
  {
    MaxSizeInMegabytes = 1024,
    RequiresDuplicateDetection = true
  };
  namespaceManager.CreateTopic(topic);
}

Odczyt właściwości oraz modyfikacja topiku:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

TopicDescription topic = namespaceManager.GetTopic(topicName);

long subscriptionCount = topic.SubscriptionCount;
long sizeInBytes = topic.SizeInBytes;

topic.MaxSizeInMegabytes = 2048;
namespaceManager.UpdateTopic(topic);

Usunięcie topiku:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

namespaceManager.DeleteTopic(topicName);

 

Zarządzanie subskrypcjami

Po wysłaniu wiadomości do topiku, w celu jej odczytu odbiorca nie odwołuje się bezpośrednio do topiku tylko korzysta z wybranej subskrypcji. To jakie wiadomości z topiku trafią do danej subskrypcji określane jest poprzez filtry operujące na atrybutach wiadomości. Podstawową różnicą w stosunku do kolejek jest to, iż w przypadku topiku tą samą wiadomość może otrzymać kilku odbiorców. Jeżeli dana wiadomość przesłana do topiku spełni warunki filtrów więcej niż jednej subskrypcji to jej kopia zostanie przekazana do każdej z nich. Należy jednak pamiętać, że filtry subskrypcji sprawdzane są tylko w momencie pojawienia się wiadomości w topiku. Jeżeli utworzymy nową subskrypcję to mogą do niej trafić jedynie wiadomości wysłane od tego momentu, wszystkie już istniejące w topiku mimo spełnionych warunków filtra nie zostaną do niej przekazane. Mamy trzy możliwości definiowania subskrypcji: bez filtra (będą do niej trafiać wszystkie wiadomości z topiku), z filtrem typu CorrelationFilter (filtr odnoszący się do wartości atrybutu CorrelationId wiadomości) lub z filtrem typu SqlFilter (filtr odnoszący się do poszczególnych elementów atrybutu Properties wiadomości).

Subskrypcję bez filtra tworzymy podając jedynie jej nazwę oraz nazwę topiku, z którym będzie powiązana:

string subscriptionName = "ordersall";

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
  namespaceManager.CreateSubscription(topicName, subscriptionName);

W przypadku tworzenia subskrypcji z filtrem typu CorrelationFilter, oprócz nazwy topiku musimy przekazać definicję filtra. Tworząc filtr podajemy wartość atrybutu CorrelationId wiadomości, które mają trafiać do tej subskrypcji:

string subscriptionName = "ordersdepartmenta";

CorrelationFilter filter = new CorrelationFilter("Department A");

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
  namespaceManager.CreateSubscription(topicName, subscriptionName, filter);

Przy użyciu powyżej zdefiniowanej subskrypcji otrzymamy wszystkie wiadomości wysłane do topiku, które w atrybucie CorrelationId będą miały wartość „Department A”.

Jeżeli chcemy mieć większe możliwości filtrowania możemy utworzyć subskrypcję z filtrem typu SqlFilter. W tym przypadku tworząc filtr podajemy wyrażenie logiczne operujące na elementach atrybutu Properties wiadomości (jest to atrybut typu IDictionary<string, object>). Budując wyrażenie mamy do dyspozycji następujące operatory: =, >, <, !=, <>, and, or, (, ). Daje to bardzo duże możliwości tworzenia warunków filtrowania. Przykładowy filtr może wyglądać tak: „(Region != ‚PL’ or ItemsCount > 10) and (Sender = ‚Jan Kowalski’)”, gdzie Region, ItemsCount i Sender to klucze we właściwości Properties wiadomości.

Oto w jaki sposób utworzyć subskrypcję z filtrem SqlFilter:

string subscriptionName = "orderswholesalepl";

SqlFilter filter = new SqlFilter("Region = 'PL' and ItemsCount > 10");

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
  namespaceManager.CreateSubscription(topicName, subscriptionName, filter);

Do powyższej subskrypcji trafią wszystkie wiadomości z topiku, które we właściwości Properties będą posiadały klucz Region z wartością „PL” oraz klucz ItemsCount z wartością większą od 10.

Do utworzenia subskrypcji możemy także użyć obiektu SubscriptionDescription, pozwalającego na dodatkową konfigurację:

string subscriptionName = "orderswholesalepl";

SqlFilter filter = new SqlFilter("Region = 'PL' and ItemsCount > 10");

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
{
  SubscriptionDescription subscription =
    new SubscriptionDescription(topicName, subscriptionName)
  {
    RequiresSession = false,
    LockDuration = TimeSpan.FromMinutes(1)
  };
  namespaceManager.CreateSubscription(subscription, filter);
}

Z obiektu SubscriptionDescription możemy również skorzystać w celu odczytu informacji o subskrypcji oraz jej modyfikacji:

string subscriptionName = "ordersall";

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

SubscriptionDescription subscription =
  namespaceManager.GetSubscription(topicName, subscriptionName);

long messageCount = subscription.MessageCount;

subscription.MaxDeliveryCount = 5;
namespaceManager.UpdateSubscription(subscription);

Usunięcie subskrypcji sprowadza się do wywołania metody DeleteSubscription na obiekcie NamespaceManager:

string subscriptionName = "ordersall";

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

namespaceManager.DeleteSubscription(topicName, subscriptionName);

 

Wysyłanie wiadomości do topiku

Wysłanie wiadomości do topiku wygląda praktycznie tak samo jak w przypadku kolejki. Mamy dwie możliwości, albo użyjemy dedykowanego obiektu TopicClient albo uniwersalnego MessageSender znanego z kolejek (tym razem tworzonego poprzez podanie nazwy topiku):

TopicClient topicClient =
  TopicClient.CreateFromConnectionString(connectionString, topicName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

TopicClient topicClient = messagingFactory.CreateTopicClient(topicName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageSender messageSender =
  messagingFactory.CreateMessageSender(topicName);

Samo przygotowanie wiadomości nie różni się niczym od tego, co opisywałem przy kolejkach. Tworzymy obiekt klasy BrokeredMessage i przekazujemy go do metody Send klienta topiku:

Order order = new Order()
{
  Id = "1",
  Date = DateTime.UtcNow,
  Customer = "Company ABC",
  Details = new List<string> { "Product1", "Product2", "Product3" }
};

BrokeredMessage message = new BrokeredMessage(order)
{
  MessageId = Guid.NewGuid().ToString(),
  Label = "Application X",
  CorrelationId = "Department A",
  TimeToLive = TimeSpan.FromHours(12)
};
message.Properties.Add("Sender", "Jan Kowalski");
message.Properties.Add("Region", "PL");
message.Properties.Add("ItemsCount", order.Details.Count);

messageSender.Send(message);

messageSender.Close();

 

Odbieranie wiadomości przy użyciu subskrypcji

W celu odczytu wiadomości wysłanych do topiku odbiorca nie odwołuje się bezpośrednio do topiku tylko korzysta z określonej subskrypcji, która z jego punktu widzenia posiada funkcjonalność kolejki. Przy użyciu wybranej subskrypcji będziemy w stanie odczytać jedynie te wiadomości, które spełniły warunki jej filtra. Podobnie jak w przypadku kolejek odczyt wiadomości może być realizowany w trybie ReceiveAndDelete lub PeekLock. Po więcej szczegółów odsyłam do poprzedniego tematu. Aby odebrać wiadomość musimy utworzyć obiekt klienta subskrypcji. Możemy skorzystać z klasy SubscriptionClient lub MessageReceiver. W przypadku tego drugiego w celu wskazania konkretnej subskrypcji podajemy ścieżkę w formacie „topicName/subscriptions/subscriptionName„:

string subscriptionName = "ordersall";

SubscriptionClient subscriptionClient =
  SubscriptionClient.CreateFromConnectionString(
    connectionString, topicName, subscriptionName, ReceiveMode.PeekLock);

lub:

string subscriptionName = "ordersall";

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

SubscriptionClient subscriptionClient =
  messagingFactory.CreateSubscriptionClient(
    topicName, subscriptionName, ReceiveMode.PeekLock);

lub:

string subscriptionName = "ordersall";

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageReceiver messageReceiver =
  messagingFactory.CreateMessageReceiver(
    String.Format("{0}/subscriptions/{1}", topicName, subscriptionName),
    ReceiveMode.PeekLock);

Pobieranie wiadomości nie różni się niczym w stosunku do tego, co opisywałem przy kolejkach. Tu także mamy do dyspozycji metody Peek, Receive oraz ReceiveBatch, a odebrana wiadomość to znany już obiekt klasy BrokeredMessage:

BrokeredMessage message =
  messageReceiver.Receive(TimeSpan.FromSeconds(5));

if (message != null)
{
  try
  {
    long messageNumber = message.SequenceNumber;
    string messageId = message.MessageId;
    string label = message.Label;
    string correlationId = message.CorrelationId;
    string orderSender = message.Properties["Sender"].ToString();
    string orderRegion = message.Properties["Region"].ToString();

    Order order = message.GetBody<Order>();

    if (orderRegion == "PL")
    {
      //Processing Order...

      message.Complete();
    }
    else
      message.DeadLetter();
  }
  catch (Exception ex)
  {
    message.Abandon();
  }
}
messageReceiver.Close();

W tym i poprzednim wpisie pokazałem podstawowe operacje związane z wysyłaniem i odbieraniem wiadomości przy wykorzystaniu kolejek, topików i subskrypcji. W następnym temacie przedstawię bardziej zaawansowane scenariusze pracy z wiadomościami.

Azure Service Bus brokered messaging – Queues

2014-11-07

W poprzednim temacie opisałem czym jest Azure Service Bus oraz jak rozpocząć korzystanie z tej usługi. Dziś zajmę się tematem kolejek (Queues). Pokażę w jaki sposób zarządzać kolejkami z poziomu kodu oraz jak oprogramować wysyłanie i odbieranie wiadomości. W prezentowanych przykładach wiadomość przesyłana pomiędzy nadawcą i odbiorcą będzie zawierała obiekt zamówienia:

public class Order
{
  public string Id { get; set; }
  public DateTime Date { get; set; }
  public string Customer { get; set; }
  public List<string> Details { get; set; }
}

Na początek zdefiniuję dwa pola, do których będę się odwoływał w kolejnych przykładach. Pierwszym z nich jest connectionString zawierający pobrany z portalu Microsoft Azure connection string dla utworzonej przestrzeni nazw. Drugim polem będzie queueName z nazwą kolejki:

public readonly string connectionString = String.Format(
  "Endpoint={0};SharedAccessKeyName={1};SharedAccessKey={2}",
  "sb://tempnamespace.servicebus.windows.net/",
  "RootManageSharedAccessKey",
  "zFUqkwWoDrem9O8UkdG0pq0sheJze0U/P93f42Aykdc=");

public readonly string queueName = "ordersqueue";

 

Zarządzanie kolejką

Zarządzanie kolejkami odbywa się za pośrednictwem obiektu NamespaceManager, który odpowiada za obsługę przestrzeni nazw Azure Service Bus wskazanej w connection string. Poniższy kod przygotowuje instancję obiektu NamespaceManager, sprawdza czy kolejka o podanej nazwie nie istnieje i tworzy ją w przestrzeni nazw usługi:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.QueueExists(queueName))
  namespaceManager.CreateQueue(queueName);

Stworzona w ten sposób kolejka we wszystkich atrybutach posiada domyślne wartości. Jeżeli kolejkę chcemy odpowiednio skonfigurować, do jej utworzenia możemy użyć obiektu QueueDescription:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.QueueExists(queueName))
{
  QueueDescription queue = new QueueDescription(queueName)
  {
    MaxSizeInMegabytes = 1024,
    RequiresSession = false,
    RequiresDuplicateDetection = true,
    LockDuration = TimeSpan.FromMinutes(1)
  };
  namespaceManager.CreateQueue(queue);
}

Modyfikacja ustawień istniejącej kolejki również wykonywana jest za pomocą obiektu QueueDescription zwróconego przez metodę GetQueue:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

QueueDescription queue = namespaceManager.GetQueue(queueName);
queue.MaxDeliveryCount = 5;
namespaceManager.UpdateQueue(queue);

W podobny sposób możemy pobrać wartości atrybutów tylko do odczytu prezentujących aktualny status kolejki:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

QueueDescription queue = namespaceManager.GetQueue(queueName);
long messageCount = queue.MessageCount;
long sizeInBytes = queue.SizeInBytes;

Usunięcie kolejki sprowadza się do wywołania metody DeleteQueue:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

namespaceManager.DeleteQueue(queueName);

 

Wysyłanie wiadomości

W celu wysłania lub odebrania wiadomości do/z kolejki musimy utworzyć obiekt klienta kolejki. Mamy trzy sposoby:

QueueClient queueClient =
  QueueClient.CreateFromConnectionString(connectionString, queueName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

QueueClient queueClient =
  messagingFactory.CreateQueueClient(queueName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageSender messageSender =
  messagingFactory.CreateMessageSender(queueName);

Jak widać, w pierwszym i drugim przypadku tworzymy obiekt klasy QueueClient, natomiast w trzecim przypadku obiekt klasy MessageSender. Ja preferuję ostatnie podejście, ponieważ MessageSender jest bardziej uniwersalnym obiektem. W zależności od tego czy jako parametr metody CreateMessageSender przekażemy nazwę kolejki czy topiku, zwrócony obiekt MessageSender za pomocą tych samych metod będzie odwoływał się do właściwego składnika usługi. Pozwala to na dowolne przełączanie naszej aplikacji na pracę z kolejką lub topikiem bez modyfikacji kodu.

Wiadomość wysyłana do kolejki reprezentowana jest przez obiekt klasy BrokeredMessage. Tworząc wiadomość, w konstruktorze przekazujemy obiekt stanowiący jej treść (body) – w naszym przypadku będzie to obiekt klasy Order. W poszczególnych atrybutach obiektu BrokeredMessage możemy umieścić dodatkowe informacje opisujące wiadomość. Dzięki atrybutowi Properties (typu IDictionary<string, object>) do przesyłanej wiadomości możemy dołączyć listę dowolnych parametrów. W celu wysłania przygotowanej wiadomości wystarczy wywołać metodę Send na obiekcie klienta kolejki:

Order order = new Order()
{
  Id = "1",
  Date = DateTime.UtcNow,
  Customer = "Company ABC",
  Details = new List<string> { "Product1", "Product2", "Product3" }
};

BrokeredMessage message = new BrokeredMessage(order)
{
  MessageId = Guid.NewGuid().ToString(),
  Label = "Application X",
  CorrelationId = "Department A",
  TimeToLive = TimeSpan.FromHours(12)
};
message.Properties.Add("Sender", "Jan Kowalski");
message.Properties.Add("Region", "PL");

messageSender.Send(message);

messageSender.Close();

 

Odbieranie wiadomości

Odczyt wiadomości z kolejki może się odbywać w dwóch trybach: ReceiveAndDelete lub PeekLock. W trybie ReceiveAndDelete wiadomość w momencie pobrania jest usuwana z kolejki. Jeżeli przetwarzanie nie powiedzie się to my musimy zadbać o to, żeby wiadomość nie została utracona. W trybie PeekLock po odebraniu wiadomości nie jest ona usuwana z kolejki, a jedynie ustawiana jest na niej odpowiednia blokada zapobiegająca ponownemu pobraniu. Domyślnie blokada zakładana jest na minutę, ale jej czas możemy określić podczas tworzenia kolejki za pomocą atrybutu LockDuration (maksymalnie 5 minut). Po zakończeniu przetwarzania wiadomości musimy poinformować usługę o jego statusie. Mamy do wyboru następujące metody na obiekcie wiadomości:

  • Complete – wiadomość przetworzona poprawnie, zostaje usunięta z kolejki
  • Defer – odłożenie przetwarzania, wiadomość pozostaje w kolejce, ale można się do niej ponownie odwołać jedynie poprzez jej numer (ten scenariusz opiszę w kolejnych tematach)
  • DeadLetter – przeniesienie wiadomości do specjalnej pod-kolejki zawierającej „wymarłe” wiadomości (ten scenariusz także opiszę w kolejnych tematach)
  • Abandon – anulowanie przetwarzania, z wiadomości zdejmowana jest blokada i staje się ona ponownie dostępna do pobrania

Aby odebrać wiadomość musimy utworzyć obiekt klienta kolejki. Domyślnie odbieranie wiadomości realizowane jest w trybie PeekLock, ale możemy to określić podczas tworzenia obiektu. Podobnie jak przy wysyłaniu wiadomości tu także możemy korzystać bezpośrednio z obiektu QueueClient lub bardziej uniwersalnej wersji MessageReceiver:

QueueClient queueClient =
  QueueClient.CreateFromConnectionString(connectionString, queueName, ReceiveMode.PeekLock);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageReceiver messageReceiver =
  messagingFactory.CreateMessageReceiver(queueName, ReceiveMode.PeekLock);

W celu pobrania wiadomości z kolejki wywołujemy metodę Receive, z możliwością podania timeout-u dla operacji. Wiadomości zwracane są w kolejności FIFO i jeżeli  kolejka nie jest pusta w wyniku otrzymamy obiekt BrokeredMessage (w przeciwnym wypadku będzie to null). Odczyt właściwej treści wiadomości odbywa się poprzez metodę GetBody<T>:

BrokeredMessage message =
  messageReceiver.Receive(TimeSpan.FromSeconds(5));

if (message != null)
{
  try
  {
    long messageNumber = message.SequenceNumber;
    string messageId = message.MessageId;
    string label = message.Label;
    string correlationId = message.CorrelationId;
    string orderSender = message.Properties["Sender"].ToString();
    string orderRegion = message.Properties["Region"].ToString();

    Order order = message.GetBody<Order>();

    if (orderRegion == "PL")
    {
      //Processing Order...

      message.Complete();
    }
    else
      message.DeadLetter();
  }
  catch (Exception ex)
  {
    message.Abandon();
  }
}
messageReceiver.Close();

Oprócz pokazanego standardowego pobierania wiadomości mamy jeszcze dwie opcje. Pierwszą z nich jest podgląd, czyli możliwość odczytu wiadomości bez usuwania z kolejki i zakładania blokady. Służy do tego metoda Peek:

BrokeredMessage message = messageReceiver.Peek();

Drugą opcją jest pobranie za jednym razem większej liczby wiadomości. Wystarczy wywołać metodę ReceiveBatch z parametrem określającym liczbę wiadomości do pobrania:

IEnumerable<BrokeredMessage> messages = messageReceiver.ReceiveBatch(10);

To wszystko jeżeli chodzi o podstawowe operacje na kolejkach Azure Service Bus. Oczywiście jest to tylko niezbędne minimum pozwalające zbudować pierwsze działające rozwiązanie. W kolejnych tematach przedstawię znacznie bardziej rozbudowane scenariusze pracy z wiadomościami takie jak: obsługa pod-kolejek DeadLetter, opóźnione przetwarzanie, wykrywanie zdublowanych wiadomości, harmonogram dostarczania wiadomości, sesje, transakcje przy wysyłaniu i odbieraniu wielu wiadomości oraz praca w modelu request-response. Ale zanim przejdę do tych tematów, w następnym wpisie zajmę się topikami i subskrypcjami.