Monthly Archives: Marzec 2015

Kwietniowe wydarzenia

2015-03-22

W drugiej połowie kwietnia odbędzie się kilka interesujących konferencji:

Do zobaczenia!

Reklamy

Azure Service Bus brokered messaging – dodatkowe funkcje

2015-03-12

W ostatnim temacie cyklu poświęconego usłudze Azure Service Bus chciałbym przedstawić bardziej rozbudowane scenariusze pracy z wiadomościami takie jak: obsługa pod-kolejek DeadLetter, opóźnione przetwarzanie, wykrywanie zdublowanych wiadomości, harmonogram dostarczania wiadomości, transakcje przy wysyłaniu i odbieraniu wielu wiadomości, sesje oraz praca w modelu request-response.

 

Pod-kolejka DeadLetter

Wiadomości umieszczane są w specjalnej pod-kolejce DeadLetter w następujących przypadkach:

  • Wiadomość wygaśnie i jednocześnie dla kolejki lub subskrypcji atrybut EnableDeadLetteringOnMessageExpiration ustawiony jest na true.
  • Przekroczona zostanie maksymalna liczba prób dostarczenia wiadomości, którą określa się dla kolejki lub subskrypcji poprzez atrybut MaxDeliveryCount (domyślnie 10). Każdorazowe wywołanie metody Abandon na wiadomości powoduje zwiększenie jej licznika prób dostarczenia.
  • Wystąpi wyjątek podczas sprawdzania filtra subskrypcji dla danej wiadomości i jednocześnie dla subskrypcji atrybut EnableDeadLetteringOnFilterEvaluationExceptions ustawiony jest na true.
  • Na wiadomości wywołana zostanie metoda DeadLetter.

Ustawienie domyślnego czasu wygasania wiadomości oraz przenoszenia ich do pod-kolejki DeadLetter (ustawień nie można zmienić po utworzeniu kolejki):

QueueDescription queue = new QueueDescription("queueName")
{
    DefaultMessageTimeToLive = TimeSpan.FromSeconds(60),
    EnableDeadLetteringOnMessageExpiration = true
};

Ustawienie czasu wygaśnięcia wiadomości przed jej wysłaniem:

BrokeredMessage message = new BrokeredMessage();
message.TimeToLive = TimeSpan.FromMinutes(30);

Pobieranie wiadomości z pod-kolejki DeadLetter:

//queueName/$DeadLetterQueue
string queueDeadLetterPath = QueueClient.FormatDeadLetterPath("queueName");
QueueClient deadLetterQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", queueDeadLetterPath, ReceiveMode.ReceiveAndDelete);
BrokeredMessage message = deadLetterQueueClient.Receive();

 

Opóźnione przetwarzanie – Defer

Klasa BrokeredMessage posiada metodę Defer, która pozwala na odłożenie przetwarzania wiadomości pobranej z kolejki lub subskrypcji. Po wywołaniu metody Defer wiadomość pozostaje w kolejce, ale można się do niej ponownie odwołać jedynie poprzez jej numer. Oznacza to konieczność zapamiętania numeru wiadomości przed wywołaniem metody Defer, w przeciwnym wypadku stracimy możliwość jej odczytu i wiadomość pozostanie w kolejce do czasu wygaśnięcia. Do odczytu wiadomości można wykorzystać jej numer jedynie w przypadku wcześniejszego użycia metody Defer.

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName", ReceiveMode.PeekLock);

//Defer
BrokeredMessage message = queueClient.Receive();
long messageNumber = message.SequenceNumber;
message.Defer();

//Receive
BrokeredMessage deferredMessage = queueClient.Receive(messageNumber);
deferredMessage.Complete();

 

Wykrywanie zdublowanych wiadomości

Wykrywanie duplikatów w wysłanych wiadomościach opiera się na atrybucie MessageId wiadomości oraz ustawionym oknie czasowym w kolejce lub topiku. Jeżeli kolejka lub topik jest skonfigurowany pod kątem wykrywania duplikatów, klient może wielokrotnie ponawiać wysłanie tej samej wiadomości, a usługa zignoruje wszystkie duplikaty. Przy włączonym wykrywaniu duplikatów (atrybut RequiresDuplicateDetection) atrybut DuplicateDetectionHistoryTimeWindow określa, przez jaki czas wartości atrybutu MessageId wiadomości będą przechowywane w celu wykrywania zdublowanych obiektów.

QueueDescription queueDescription = new QueueDescription("queueName")
{
    RequiresDuplicateDetection = true,
    DuplicateDetectionHistoryTimeWindow = new TimeSpan(1, 0, 0)
};

 

Harmonogram dostarczania wiadomości

Poprzez atrybut ScheduledEnqueueTimeUtc klasy BrokeredMessage mamy możliwość określenia, kiedy dana wiadomość wysłana do kolejki lub topiku będzie dostępna do pobrania. Wartość wspomnianego atrybutu musi być przekazana jako czas UTC.

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName");

BrokeredMessage message = new BrokeredMessage();
message.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddDays(7);
queueClient.Send(message);

 

Transakcje

Podczas wysyłania wielu wiadomości do pojedynczej kolejki lub topiku cały proces możemy objąć transakcją, co gwarantuje nam dostarczenie wszystkich wiadomości lub żadnej z nich w przypadku pojawienia się błędu. Z kolei zastosowanie transakcji podczas pobierania wielu wiadomości z danej kolejki lub subskrypcji pozwala na anulowanie całego przetwarzania w przypadku wystąpienia błędu przy jednej z wiadomości. Transakcja może obejmować wiadomości pobierane z różnych subskrypcji o ile należą one do tego samego topiku.

Wysyłanie wiadomości w transakcji:

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName");

using (TransactionScope scope = new TransactionScope())
{
    BrokeredMessage message1 = new BrokeredMessage("Message 1");
    queueClient.Send(message1);

    BrokeredMessage message2 = new BrokeredMessage("Message 2");
    queueClient.Send(message2);

    scope.Complete();
}

Pobieranie wiadomości w transakcji:

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName", ReceiveMode.PeekLock);

using (TransactionScope scope = new TransactionScope())
{
    BrokeredMessage message1 = queueClient.Receive();
    string messageBody1 = message1.GetBody<string>();
    message1.Complete();

    BrokeredMessage message2 = queueClient.Receive();
    string messageBody2 = message2.GetBody<string>();
    message2.Complete();

    scope.Complete();
}

 

Sesje

Usługa Azure Service Bus udostępnia mechanizm sesji pozwalający na grupowanie wysyłanych wiadomości. Sesje są idealnym rozwiązaniem w przypadku konieczności przetworzenia wielu wzajemnie ze sobą powiązanych wiadomości. Przykładem takiego scenariusza jest podzielenie jednej dużej wiadomości na kilka mniejszych (ze względu na dopuszczalny rozmiar) i wysłanie ich w ramach pojedynczej sesji. Załóżmy, że chcemy wysłać obiekt zamówienia składający się z nagłówka oraz wielu pozycji. Możemy wówczas podzielić go na wiele wiadomości, gdzie pierwsza z nich będzie zawierała nagłówek, a kolejne poszczególne pozycje. Tak przygotowany zestaw wiadomości możemy wysłać w ramach pojedynczej sesji, podając jako jej identyfikator np. numer zamówienia. Dzięki temu podczas pobierania wiadomości będziemy w stanie powtórnie zgrupować je w ramach jednego zamówienia.

W celu włączenia obsługi sesji, kolejka lub subskrypcja musi być utworzona z atrybutem RequiresSession ustawionym na true. Wówczas wszystkie wiadomości wysyłane do kolejki lub topiku zawierającego takie subskrypcje muszą mieć ustawioną wartość we właściwości SessionId (dowolna wartość typu string będąca identyfikatorem sesji). Pobieranie wiadomości z tak skonfigurowanej kolejki lub subskrypcji odbywa się poprzez obiekt MessageSession zamiast standardowych QueueClient lub SubscriptionClient. Wywołanie metody AcceptMessageSession na obiekcie klienta kolejki lub subskrypcji spowoduje oczekiwanie przez podany w parametrze czas (lub domyślny minutę) na otrzymanie wiadomości. Jeżeli to nastąpi, zwracany jest obiekt MessageSession zawierający właściwość SessionId i umożliwiający pobranie wszystkich wiadomości w ramach tej sesji. Istnieje również możliwość przekazania do metody AcceptMessageSession identyfikatora sesji, co spowoduje oczekiwanie na wiadomość jedynie z tej konkretnej sesji. Jeżeli przez określony czas nie zostanie odebrana żadna wiadomość, zgłoszony zostaje wyjątek TimeoutException.

Utworzenie kolejki z obsługą sesji:

QueueDescription orderQueueDescription = new QueueDescription("queueName")
{
    RequiresSession = true
};

Wysłanie kilku wiadomości w ramach jednej sesji:

Order order = new Order();
order.Id = "1/2015";
order.Details = new List<string> { "Item1", "Item2", "Item3" };

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName");

foreach (string orderItem in order.Details)
{
    BrokeredMessage message = new BrokeredMessage(orderItem);
    message.SessionId = order.Id;
    queueClient.Send(message);
}

Pobranie wszystkich wiadomości z jednej sesji:

MessageSession orderSession = null;
try
{
    orderSession = queueClient.AcceptMessageSession();
}
catch (TimeoutException ex)
{
}            

if (orderSession != null)
{
    string orderId = orderSession.SessionId;

    while (true)
    {
        BrokeredMessage orderItemMessage = orderSession.Receive();
        if (orderItemMessage != null)
        {
            string orderItem = orderItemMessage.GetBody<string>();
            orderItemMessage.Complete();
        }
        else
            break;
    }

    orderSession.Close();
}

 

Model Request-Response

Tryb brokered messaging w Azure Service Bus polega na komunikacji asynchronicznej gdzie nadawca i odbiorca pracują niezależnie, a wiadomości są kolejkowane na serwerach Azure. Mimo to, w trybie tym możliwa jest realizacja modelu Request-Response. Do tego celu wykorzystywane są dwie kolejki (żądań i odpowiedzi) oraz sesje. Poniżej znajduje się przykładowy scenariusz, w którym wysyłane jest zapytanie o szczegóły produktu, a następnie pobierana jest odpowiedź dla tego konkretnego żądania.

Klient wysyła do kolejki żądań wiadomość z zapytaniem o szczegóły produktu, podając we właściwości Properties jego identyfikator oraz ustawiając właściwość ReplyToSessionId na własny identyfikator sesji, w ramach której będzie oczekiwał na odpowiedź:

QueueClient requestQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", "ProductRequestQueue");

string sessionId = Guid.NewGuid().ToString();

BrokeredMessage requestMessage = new BrokeredMessage();
requestMessage.Properties.Add("productId", "1");
requestMessage.ReplyToSessionId = sessionId;
requestQueueClient.Send(requestMessage);

Serwer pobiera wiadomość z kolejki żądań i zapamiętuje nadany identyfikator sesji:

QueueClient requestQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", "ProductRequestQueue", ReceiveMode.PeekLock);

BrokeredMessage requestMessage = requestQueueClient.Receive();
string productId = requestMessage.Properties["productId"] as string;
string sessionId = requestMessage.ReplyToSessionId;
requestMessage.Complete();

Serwer przygotowuje wiadomość z informacjami o produkcie (we właściwości Label zapisywane jest to czy produkt został znaleziony) i wysyła ją z ustawionym identyfikatorem sesji do kolejki odpowiedzi:

QueueClient responseQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", "ProductResponseQueue");

string productInfo = "ProductInfo";
BrokeredMessage responseMessage = new BrokeredMessage(productInfo);
responseMessage.SessionId = sessionId;
responseMessage.Label = "Found";
responseQueueClient.Send(responseMessage);

Klient pobiera wiadomość z kolejki odpowiedzi odwołując się do wcześniej nadanego identyfikatora sesji, dzięki czemu mamy gwarancję otrzymania właściwej wiadomości:

QueueClient responseQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", "ProductResponseQueue", ReceiveMode.PeekLock);

MessageSession session = responseQueueClient.AcceptMessageSession(sessionId);
BrokeredMessage responseMessage = session.Receive();
if (responseMessage.Label == "Found")
{
    string productInfo = responseMessage.GetBody<string>();
}
responseMessage.Complete();
session.Close();

 

Service Bus Explorer

Na zakończenie chciałbym wspomnieć o bardzo wygodnym narzędziu pozwalającym zarządzać usługą Azure Service Bus z poziomu aplikacji desktopowej: Service Bus Explorer

 

Linki