Azure Service Bus brokered messaging – Queues

2014-11-07

W poprzednim temacie opisałem czym jest Azure Service Bus oraz jak rozpocząć korzystanie z tej usługi. Dziś zajmę się tematem kolejek (Queues). Pokażę w jaki sposób zarządzać kolejkami z poziomu kodu oraz jak oprogramować wysyłanie i odbieranie wiadomości. W prezentowanych przykładach wiadomość przesyłana pomiędzy nadawcą i odbiorcą będzie zawierała obiekt zamówienia:

public class Order
{
  public string Id { get; set; }
  public DateTime Date { get; set; }
  public string Customer { get; set; }
  public List<string> Details { get; set; }
}

Na początek zdefiniuję dwa pola, do których będę się odwoływał w kolejnych przykładach. Pierwszym z nich jest connectionString zawierający pobrany z portalu Microsoft Azure connection string dla utworzonej przestrzeni nazw. Drugim polem będzie queueName z nazwą kolejki:

public readonly string connectionString = String.Format(
  "Endpoint={0};SharedAccessKeyName={1};SharedAccessKey={2}",
  "sb://tempnamespace.servicebus.windows.net/",
  "RootManageSharedAccessKey",
  "zFUqkwWoDrem9O8UkdG0pq0sheJze0U/P93f42Aykdc=");

public readonly string queueName = "ordersqueue";

 

Zarządzanie kolejką

Zarządzanie kolejkami odbywa się za pośrednictwem obiektu NamespaceManager, który odpowiada za obsługę przestrzeni nazw Azure Service Bus wskazanej w connection string. Poniższy kod przygotowuje instancję obiektu NamespaceManager, sprawdza czy kolejka o podanej nazwie nie istnieje i tworzy ją w przestrzeni nazw usługi:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.QueueExists(queueName))
  namespaceManager.CreateQueue(queueName);

Stworzona w ten sposób kolejka we wszystkich atrybutach posiada domyślne wartości. Jeżeli kolejkę chcemy odpowiednio skonfigurować, do jej utworzenia możemy użyć obiektu QueueDescription:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.QueueExists(queueName))
{
  QueueDescription queue = new QueueDescription(queueName)
  {
    MaxSizeInMegabytes = 1024,
    RequiresSession = false,
    RequiresDuplicateDetection = true,
    LockDuration = TimeSpan.FromMinutes(1)
  };
  namespaceManager.CreateQueue(queue);
}

Modyfikacja ustawień istniejącej kolejki również wykonywana jest za pomocą obiektu QueueDescription zwróconego przez metodę GetQueue:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

QueueDescription queue = namespaceManager.GetQueue(queueName);
queue.MaxDeliveryCount = 5;
namespaceManager.UpdateQueue(queue);

W podobny sposób możemy pobrać wartości atrybutów tylko do odczytu prezentujących aktualny status kolejki:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

QueueDescription queue = namespaceManager.GetQueue(queueName);
long messageCount = queue.MessageCount;
long sizeInBytes = queue.SizeInBytes;

Usunięcie kolejki sprowadza się do wywołania metody DeleteQueue:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

namespaceManager.DeleteQueue(queueName);

 

Wysyłanie wiadomości

W celu wysłania lub odebrania wiadomości do/z kolejki musimy utworzyć obiekt klienta kolejki. Mamy trzy sposoby:

QueueClient queueClient =
  QueueClient.CreateFromConnectionString(connectionString, queueName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

QueueClient queueClient =
  messagingFactory.CreateQueueClient(queueName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageSender messageSender =
  messagingFactory.CreateMessageSender(queueName);

Jak widać, w pierwszym i drugim przypadku tworzymy obiekt klasy QueueClient, natomiast w trzecim przypadku obiekt klasy MessageSender. Ja preferuję ostatnie podejście, ponieważ MessageSender jest bardziej uniwersalnym obiektem. W zależności od tego czy jako parametr metody CreateMessageSender przekażemy nazwę kolejki czy topiku, zwrócony obiekt MessageSender za pomocą tych samych metod będzie odwoływał się do właściwego składnika usługi. Pozwala to na dowolne przełączanie naszej aplikacji na pracę z kolejką lub topikiem bez modyfikacji kodu.

Wiadomość wysyłana do kolejki reprezentowana jest przez obiekt klasy BrokeredMessage. Tworząc wiadomość, w konstruktorze przekazujemy obiekt stanowiący jej treść (body) – w naszym przypadku będzie to obiekt klasy Order. W poszczególnych atrybutach obiektu BrokeredMessage możemy umieścić dodatkowe informacje opisujące wiadomość. Dzięki atrybutowi Properties (typu IDictionary<string, object>) do przesyłanej wiadomości możemy dołączyć listę dowolnych parametrów. W celu wysłania przygotowanej wiadomości wystarczy wywołać metodę Send na obiekcie klienta kolejki:

Order order = new Order()
{
  Id = "1",
  Date = DateTime.UtcNow,
  Customer = "Company ABC",
  Details = new List<string> { "Product1", "Product2", "Product3" }
};

BrokeredMessage message = new BrokeredMessage(order)
{
  MessageId = Guid.NewGuid().ToString(),
  Label = "Application X",
  CorrelationId = "Department A",
  TimeToLive = TimeSpan.FromHours(12)
};
message.Properties.Add("Sender", "Jan Kowalski");
message.Properties.Add("Region", "PL");

messageSender.Send(message);

messageSender.Close();

 

Odbieranie wiadomości

Odczyt wiadomości z kolejki może się odbywać w dwóch trybach: ReceiveAndDelete lub PeekLock. W trybie ReceiveAndDelete wiadomość w momencie pobrania jest usuwana z kolejki. Jeżeli przetwarzanie nie powiedzie się to my musimy zadbać o to, żeby wiadomość nie została utracona. W trybie PeekLock po odebraniu wiadomości nie jest ona usuwana z kolejki, a jedynie ustawiana jest na niej odpowiednia blokada zapobiegająca ponownemu pobraniu. Domyślnie blokada zakładana jest na minutę, ale jej czas możemy określić podczas tworzenia kolejki za pomocą atrybutu LockDuration (maksymalnie 5 minut). Po zakończeniu przetwarzania wiadomości musimy poinformować usługę o jego statusie. Mamy do wyboru następujące metody na obiekcie wiadomości:

  • Complete – wiadomość przetworzona poprawnie, zostaje usunięta z kolejki
  • Defer – odłożenie przetwarzania, wiadomość pozostaje w kolejce, ale można się do niej ponownie odwołać jedynie poprzez jej numer (ten scenariusz opiszę w kolejnych tematach)
  • DeadLetter – przeniesienie wiadomości do specjalnej pod-kolejki zawierającej „wymarłe” wiadomości (ten scenariusz także opiszę w kolejnych tematach)
  • Abandon – anulowanie przetwarzania, z wiadomości zdejmowana jest blokada i staje się ona ponownie dostępna do pobrania

Aby odebrać wiadomość musimy utworzyć obiekt klienta kolejki. Domyślnie odbieranie wiadomości realizowane jest w trybie PeekLock, ale możemy to określić podczas tworzenia obiektu. Podobnie jak przy wysyłaniu wiadomości tu także możemy korzystać bezpośrednio z obiektu QueueClient lub bardziej uniwersalnej wersji MessageReceiver:

QueueClient queueClient =
  QueueClient.CreateFromConnectionString(connectionString, queueName, ReceiveMode.PeekLock);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageReceiver messageReceiver =
  messagingFactory.CreateMessageReceiver(queueName, ReceiveMode.PeekLock);

W celu pobrania wiadomości z kolejki wywołujemy metodę Receive, z możliwością podania timeout-u dla operacji. Wiadomości zwracane są w kolejności FIFO i jeżeli  kolejka nie jest pusta w wyniku otrzymamy obiekt BrokeredMessage (w przeciwnym wypadku będzie to null). Odczyt właściwej treści wiadomości odbywa się poprzez metodę GetBody<T>:

BrokeredMessage message =
  messageReceiver.Receive(TimeSpan.FromSeconds(5));

if (message != null)
{
  try
  {
    long messageNumber = message.SequenceNumber;
    string messageId = message.MessageId;
    string label = message.Label;
    string correlationId = message.CorrelationId;
    string orderSender = message.Properties["Sender"].ToString();
    string orderRegion = message.Properties["Region"].ToString();

    Order order = message.GetBody<Order>();

    if (orderRegion == "PL")
    {
      //Processing Order...

      message.Complete();
    }
    else
      message.DeadLetter();
  }
  catch (Exception ex)
  {
    message.Abandon();
  }
}
messageReceiver.Close();

Oprócz pokazanego standardowego pobierania wiadomości mamy jeszcze dwie opcje. Pierwszą z nich jest podgląd, czyli możliwość odczytu wiadomości bez usuwania z kolejki i zakładania blokady. Służy do tego metoda Peek:

BrokeredMessage message = messageReceiver.Peek();

Drugą opcją jest pobranie za jednym razem większej liczby wiadomości. Wystarczy wywołać metodę ReceiveBatch z parametrem określającym liczbę wiadomości do pobrania:

IEnumerable<BrokeredMessage> messages = messageReceiver.ReceiveBatch(10);

To wszystko jeżeli chodzi o podstawowe operacje na kolejkach Azure Service Bus. Oczywiście jest to tylko niezbędne minimum pozwalające zbudować pierwsze działające rozwiązanie. W kolejnych tematach przedstawię znacznie bardziej rozbudowane scenariusze pracy z wiadomościami takie jak: obsługa pod-kolejek DeadLetter, opóźnione przetwarzanie, wykrywanie zdublowanych wiadomości, harmonogram dostarczania wiadomości, sesje, transakcje przy wysyłaniu i odbieraniu wielu wiadomości oraz praca w modelu request-response. Ale zanim przejdę do tych tematów, w następnym wpisie zajmę się topikami i subskrypcjami.

Reklamy

Posted on 2014-11-07, in .NET/C# and tagged , , , , . Bookmark the permalink. Dodaj komentarz.

Skomentuj

Wprowadź swoje dane lub kliknij jedną z tych ikon, aby się zalogować:

Logo WordPress.com

Komentujesz korzystając z konta WordPress.com. Wyloguj / Zmień )

Zdjęcie z Twittera

Komentujesz korzystając z konta Twitter. Wyloguj / Zmień )

Zdjęcie na Facebooku

Komentujesz korzystając z konta Facebook. Wyloguj / Zmień )

Zdjęcie na Google+

Komentujesz korzystając z konta Google+. Wyloguj / Zmień )

Connecting to %s

%d blogerów lubi to: