Jesienne wydarzenia

2015-09-05

W najbliższym czasie odbędzie się kilka ciekawych konferencji:

Ja wybieram się na wszystkie z nich. Do zobaczenia!

Synchronization Service – supported runtime

2015-04-08

Ostatnio uruchamiając synchronizację na jednym z agentów otrzymałem następujący błąd dotyczący własnego rozszerzenia:

The management agent failed on run profile. The run step stopped because a required rules extension „Demo.FIM.ActiveDirectory.dll” could not be loaded.

Po upewnieniu się, że wspomniana biblioteka znajduje się w katalogu Extensions zajrzałem do loga systemowego. Tam opis błędu był znacznie bardziej rozbudowany i wyglądał tak:

Verify that the rules extension is located in the Extensions directory. If the extension is present, confirm that the version of the .NET framework  that can run the extension is installed on the server and that a supportedRuntimes entry in the configuration files specifies that version. The synchronization engine will not be able to load an extension that is built with a newer version of the .NET framework than the version of the .NET runtime it is hosting.

Oraz tak:

Could not load file or assembly ‚file:///C:\Program Files\Microsoft Forefront Identity Manager\2010\Synchronization Service\Extensions\Demo.FIM.ActiveDirectory.dll’ or one of its dependencies. Operation is not supported. (Exception from HRESULT: 0x80131515)

An attempt was made to load an assembly from a network location which would have caused the assembly to be sandboxed in previous versions of the .NET Framework. This release of the .NET Framework does not enable CAS policy by default, so this load may be dangerous. If this load is not intended to sandbox the assembly, please enable the loadFromRemoteSources switch. See http://go.microsoft.com/fwlink/?LinkId=155569 for more information.

Oczywiście wszystkie wymagane wersje frameworków były zainstalowane. Po krótkich poszukiwaniach w internecie okazało się, że błąd ten jest ogólnie znany i można go rozwiązać w następujący sposób:

      1. Otwieramy plik konfiguracyjny procesu Synchronization Service (miiserver.exe): C:\Program Files\Microsoft Forefront Identity Manager\2010\Synchronization Service\Bin\miiserver.exe.config
      2. Odnajdujemy element startup:
        <startup useLegacyV2RuntimeActivationPolicy="true">
          <supportedRuntime version="v4.0"></supportedRuntime>
          <supportedRuntime version="v2.0.50727"></supportedRuntime>
        </startup>
        
      3. Zamieniamy kolejność wersji supportedRuntime:
        <startup useLegacyV2RuntimeActivationPolicy="true">
          <supportedRuntime version="v2.0.50727"></supportedRuntime>
          <supportedRuntime version="v4.0"></supportedRuntime>
        </startup>
        

Powyższa operacja rozwiązała problem z tym konkretnym agentem. Dodam tylko, że jego rozszerzenie było skompilowane w wersji .NET 3.5 i korzystało również z bibliotek w wersji .NET 2.0. Okazało się jednak, że w tym momencie taki sam błąd zaczął pojawiać się przy uruchamianiu innego agenta (typu ECMA), którego kod z powodu wykorzystywanych zależności był skompilowany w wersji .NET 4.5. Jak więc poradzić sobie w tej sytuacji skoro dwóch agentów wymaga dwóch różnych konfiguracji? Można jednego z nich uruchamiać w kontekście Synchronization Service, a drugiego w oddzielnym procesie z inną konfiguracją. Wystarczy dla drugiego agenta zaznaczyć opcję „Run this management agent in a separate process”, a następnie otworzyć plik konfiguracyjny dla agentów uruchamianych w oddzielnym procesie „C:\Program Files\Microsoft Forefront Identity Manager\2010\Synchronization Service\Bin\mmsscrpt.exe.config” i w nim ustawić supportedRuntime w następujący sposób:

<startup useLegacyV2RuntimeActivationPolicy="true">
  <supportedRuntime version="v4.0"></supportedRuntime>
  <supportedRuntime version="v2.0.50727"></supportedRuntime>
</startup>

Kwietniowe wydarzenia

2015-03-22

W drugiej połowie kwietnia odbędzie się kilka interesujących konferencji:

Do zobaczenia!

Azure Service Bus brokered messaging – dodatkowe funkcje

2015-03-12

W ostatnim temacie cyklu poświęconego usłudze Azure Service Bus chciałbym przedstawić bardziej rozbudowane scenariusze pracy z wiadomościami takie jak: obsługa pod-kolejek DeadLetter, opóźnione przetwarzanie, wykrywanie zdublowanych wiadomości, harmonogram dostarczania wiadomości, transakcje przy wysyłaniu i odbieraniu wielu wiadomości, sesje oraz praca w modelu request-response.

 

Pod-kolejka DeadLetter

Wiadomości umieszczane są w specjalnej pod-kolejce DeadLetter w następujących przypadkach:

  • Wiadomość wygaśnie i jednocześnie dla kolejki lub subskrypcji atrybut EnableDeadLetteringOnMessageExpiration ustawiony jest na true.
  • Przekroczona zostanie maksymalna liczba prób dostarczenia wiadomości, którą określa się dla kolejki lub subskrypcji poprzez atrybut MaxDeliveryCount (domyślnie 10). Każdorazowe wywołanie metody Abandon na wiadomości powoduje zwiększenie jej licznika prób dostarczenia.
  • Wystąpi wyjątek podczas sprawdzania filtra subskrypcji dla danej wiadomości i jednocześnie dla subskrypcji atrybut EnableDeadLetteringOnFilterEvaluationExceptions ustawiony jest na true.
  • Na wiadomości wywołana zostanie metoda DeadLetter.

Ustawienie domyślnego czasu wygasania wiadomości oraz przenoszenia ich do pod-kolejki DeadLetter (ustawień nie można zmienić po utworzeniu kolejki):

QueueDescription queue = new QueueDescription("queueName")
{
    DefaultMessageTimeToLive = TimeSpan.FromSeconds(60),
    EnableDeadLetteringOnMessageExpiration = true
};

Ustawienie czasu wygaśnięcia wiadomości przed jej wysłaniem:

BrokeredMessage message = new BrokeredMessage();
message.TimeToLive = TimeSpan.FromMinutes(30);

Pobieranie wiadomości z pod-kolejki DeadLetter:

//queueName/$DeadLetterQueue
string queueDeadLetterPath = QueueClient.FormatDeadLetterPath("queueName");
QueueClient deadLetterQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", queueDeadLetterPath, ReceiveMode.ReceiveAndDelete);
BrokeredMessage message = deadLetterQueueClient.Receive();

 

Opóźnione przetwarzanie – Defer

Klasa BrokeredMessage posiada metodę Defer, która pozwala na odłożenie przetwarzania wiadomości pobranej z kolejki lub subskrypcji. Po wywołaniu metody Defer wiadomość pozostaje w kolejce, ale można się do niej ponownie odwołać jedynie poprzez jej numer. Oznacza to konieczność zapamiętania numeru wiadomości przed wywołaniem metody Defer, w przeciwnym wypadku stracimy możliwość jej odczytu i wiadomość pozostanie w kolejce do czasu wygaśnięcia. Do odczytu wiadomości można wykorzystać jej numer jedynie w przypadku wcześniejszego użycia metody Defer.

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName", ReceiveMode.PeekLock);

//Defer
BrokeredMessage message = queueClient.Receive();
long messageNumber = message.SequenceNumber;
message.Defer();

//Receive
BrokeredMessage deferredMessage = queueClient.Receive(messageNumber);
deferredMessage.Complete();

 

Wykrywanie zdublowanych wiadomości

Wykrywanie duplikatów w wysłanych wiadomościach opiera się na atrybucie MessageId wiadomości oraz ustawionym oknie czasowym w kolejce lub topiku. Jeżeli kolejka lub topik jest skonfigurowany pod kątem wykrywania duplikatów, klient może wielokrotnie ponawiać wysłanie tej samej wiadomości, a usługa zignoruje wszystkie duplikaty. Przy włączonym wykrywaniu duplikatów (atrybut RequiresDuplicateDetection) atrybut DuplicateDetectionHistoryTimeWindow określa, przez jaki czas wartości atrybutu MessageId wiadomości będą przechowywane w celu wykrywania zdublowanych obiektów.

QueueDescription queueDescription = new QueueDescription("queueName")
{
    RequiresDuplicateDetection = true,
    DuplicateDetectionHistoryTimeWindow = new TimeSpan(1, 0, 0)
};

 

Harmonogram dostarczania wiadomości

Poprzez atrybut ScheduledEnqueueTimeUtc klasy BrokeredMessage mamy możliwość określenia, kiedy dana wiadomość wysłana do kolejki lub topiku będzie dostępna do pobrania. Wartość wspomnianego atrybutu musi być przekazana jako czas UTC.

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName");

BrokeredMessage message = new BrokeredMessage();
message.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddDays(7);
queueClient.Send(message);

 

Transakcje

Podczas wysyłania wielu wiadomości do pojedynczej kolejki lub topiku cały proces możemy objąć transakcją, co gwarantuje nam dostarczenie wszystkich wiadomości lub żadnej z nich w przypadku pojawienia się błędu. Z kolei zastosowanie transakcji podczas pobierania wielu wiadomości z danej kolejki lub subskrypcji pozwala na anulowanie całego przetwarzania w przypadku wystąpienia błędu przy jednej z wiadomości. Transakcja może obejmować wiadomości pobierane z różnych subskrypcji o ile należą one do tego samego topiku.

Wysyłanie wiadomości w transakcji:

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName");

using (TransactionScope scope = new TransactionScope())
{
    BrokeredMessage message1 = new BrokeredMessage("Message 1");
    queueClient.Send(message1);

    BrokeredMessage message2 = new BrokeredMessage("Message 2");
    queueClient.Send(message2);

    scope.Complete();
}

Pobieranie wiadomości w transakcji:

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName", ReceiveMode.PeekLock);

using (TransactionScope scope = new TransactionScope())
{
    BrokeredMessage message1 = queueClient.Receive();
    string messageBody1 = message1.GetBody<string>();
    message1.Complete();

    BrokeredMessage message2 = queueClient.Receive();
    string messageBody2 = message2.GetBody<string>();
    message2.Complete();

    scope.Complete();
}

 

Sesje

Usługa Azure Service Bus udostępnia mechanizm sesji pozwalający na grupowanie wysyłanych wiadomości. Sesje są idealnym rozwiązaniem w przypadku konieczności przetworzenia wielu wzajemnie ze sobą powiązanych wiadomości. Przykładem takiego scenariusza jest podzielenie jednej dużej wiadomości na kilka mniejszych (ze względu na dopuszczalny rozmiar) i wysłanie ich w ramach pojedynczej sesji. Załóżmy, że chcemy wysłać obiekt zamówienia składający się z nagłówka oraz wielu pozycji. Możemy wówczas podzielić go na wiele wiadomości, gdzie pierwsza z nich będzie zawierała nagłówek, a kolejne poszczególne pozycje. Tak przygotowany zestaw wiadomości możemy wysłać w ramach pojedynczej sesji, podając jako jej identyfikator np. numer zamówienia. Dzięki temu podczas pobierania wiadomości będziemy w stanie powtórnie zgrupować je w ramach jednego zamówienia.

W celu włączenia obsługi sesji, kolejka lub subskrypcja musi być utworzona z atrybutem RequiresSession ustawionym na true. Wówczas wszystkie wiadomości wysyłane do kolejki lub topiku zawierającego takie subskrypcje muszą mieć ustawioną wartość we właściwości SessionId (dowolna wartość typu string będąca identyfikatorem sesji). Pobieranie wiadomości z tak skonfigurowanej kolejki lub subskrypcji odbywa się poprzez obiekt MessageSession zamiast standardowych QueueClient lub SubscriptionClient. Wywołanie metody AcceptMessageSession na obiekcie klienta kolejki lub subskrypcji spowoduje oczekiwanie przez podany w parametrze czas (lub domyślny minutę) na otrzymanie wiadomości. Jeżeli to nastąpi, zwracany jest obiekt MessageSession zawierający właściwość SessionId i umożliwiający pobranie wszystkich wiadomości w ramach tej sesji. Istnieje również możliwość przekazania do metody AcceptMessageSession identyfikatora sesji, co spowoduje oczekiwanie na wiadomość jedynie z tej konkretnej sesji. Jeżeli przez określony czas nie zostanie odebrana żadna wiadomość, zgłoszony zostaje wyjątek TimeoutException.

Utworzenie kolejki z obsługą sesji:

QueueDescription orderQueueDescription = new QueueDescription("queueName")
{
    RequiresSession = true
};

Wysłanie kilku wiadomości w ramach jednej sesji:

Order order = new Order();
order.Id = "1/2015";
order.Details = new List<string> { "Item1", "Item2", "Item3" };

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName");

foreach (string orderItem in order.Details)
{
    BrokeredMessage message = new BrokeredMessage(orderItem);
    message.SessionId = order.Id;
    queueClient.Send(message);
}

Pobranie wszystkich wiadomości z jednej sesji:

MessageSession orderSession = null;
try
{
    orderSession = queueClient.AcceptMessageSession();
}
catch (TimeoutException ex)
{
}            

if (orderSession != null)
{
    string orderId = orderSession.SessionId;

    while (true)
    {
        BrokeredMessage orderItemMessage = orderSession.Receive();
        if (orderItemMessage != null)
        {
            string orderItem = orderItemMessage.GetBody<string>();
            orderItemMessage.Complete();
        }
        else
            break;
    }

    orderSession.Close();
}

 

Model Request-Response

Tryb brokered messaging w Azure Service Bus polega na komunikacji asynchronicznej gdzie nadawca i odbiorca pracują niezależnie, a wiadomości są kolejkowane na serwerach Azure. Mimo to, w trybie tym możliwa jest realizacja modelu Request-Response. Do tego celu wykorzystywane są dwie kolejki (żądań i odpowiedzi) oraz sesje. Poniżej znajduje się przykładowy scenariusz, w którym wysyłane jest zapytanie o szczegóły produktu, a następnie pobierana jest odpowiedź dla tego konkretnego żądania.

Klient wysyła do kolejki żądań wiadomość z zapytaniem o szczegóły produktu, podając we właściwości Properties jego identyfikator oraz ustawiając właściwość ReplyToSessionId na własny identyfikator sesji, w ramach której będzie oczekiwał na odpowiedź:

QueueClient requestQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", "ProductRequestQueue");

string sessionId = Guid.NewGuid().ToString();

BrokeredMessage requestMessage = new BrokeredMessage();
requestMessage.Properties.Add("productId", "1");
requestMessage.ReplyToSessionId = sessionId;
requestQueueClient.Send(requestMessage);

Serwer pobiera wiadomość z kolejki żądań i zapamiętuje nadany identyfikator sesji:

QueueClient requestQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", "ProductRequestQueue", ReceiveMode.PeekLock);

BrokeredMessage requestMessage = requestQueueClient.Receive();
string productId = requestMessage.Properties["productId"] as string;
string sessionId = requestMessage.ReplyToSessionId;
requestMessage.Complete();

Serwer przygotowuje wiadomość z informacjami o produkcie (we właściwości Label zapisywane jest to czy produkt został znaleziony) i wysyła ją z ustawionym identyfikatorem sesji do kolejki odpowiedzi:

QueueClient responseQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", "ProductResponseQueue");

string productInfo = "ProductInfo";
BrokeredMessage responseMessage = new BrokeredMessage(productInfo);
responseMessage.SessionId = sessionId;
responseMessage.Label = "Found";
responseQueueClient.Send(responseMessage);

Klient pobiera wiadomość z kolejki odpowiedzi odwołując się do wcześniej nadanego identyfikatora sesji, dzięki czemu mamy gwarancję otrzymania właściwej wiadomości:

QueueClient responseQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", "ProductResponseQueue", ReceiveMode.PeekLock);

MessageSession session = responseQueueClient.AcceptMessageSession(sessionId);
BrokeredMessage responseMessage = session.Receive();
if (responseMessage.Label == "Found")
{
    string productInfo = responseMessage.GetBody<string>();
}
responseMessage.Complete();
session.Close();

 

Service Bus Explorer

Na zakończenie chciałbym wspomnieć o bardzo wygodnym narzędziu pozwalającym zarządzać usługą Azure Service Bus z poziomu aplikacji desktopowej: Service Bus Explorer

 

Linki

Azure Service Bus brokered messaging – Topics, Subscriptions

2014-11-18

Po omówieniu kolejek, dziś opiszę pracę z topikami i subskrypcjami w usłudze Microsoft Azure Service Bus. Tak jak poprzednio, w prezentowanych przykładach wiadomość przesyłana pomiędzy nadawcą i odbiorcą będzie zawierała obiekt zamówienia:

public class Order
{
  public string Id { get; set; }
  public DateTime Date { get; set; }
  public string Customer { get; set; }
  public List<string> Details { get; set; }
}

W kodzie będę się odwoływał do dwóch zmiennych zawierających connection string dla przestrzeni nazw usługi oraz nazwę topiku:

public readonly string connectionString = String.Format(
  "Endpoint={0};SharedAccessKeyName={1};SharedAccessKey={2}",
  "sb://tempnamespace.servicebus.windows.net/",
  "RootManageSharedAccessKey",
  "zFUqkwWoDrem9O8UkdG0pq0sheJze0U/P93f42Aykdc=");

public readonly string topicName = "orderstopic";

 

Zarządzanie topikiem

Zarządzanie topikami odbywa się w analogiczny sposób jak w przypadku kolejek. Tu także korzystamy z obiektu NamespaceManager używając jedynie innych metod, jednak co do zasady są to dokładnie te same mechanizmy. Poniżej znajdują się przykłady tworzenia, modyfikacji oraz usunięcia topiku.

Proste tworzenie topiku:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.TopicExists(topicName))
  namespaceManager.CreateTopic(topicName);

Tworzenie topiku za pomocą obiektu TopicDescription:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.TopicExists(topicName))
{
  TopicDescription topic = new TopicDescription(topicName)
  {
    MaxSizeInMegabytes = 1024,
    RequiresDuplicateDetection = true
  };
  namespaceManager.CreateTopic(topic);
}

Odczyt właściwości oraz modyfikacja topiku:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

TopicDescription topic = namespaceManager.GetTopic(topicName);

long subscriptionCount = topic.SubscriptionCount;
long sizeInBytes = topic.SizeInBytes;

topic.MaxSizeInMegabytes = 2048;
namespaceManager.UpdateTopic(topic);

Usunięcie topiku:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

namespaceManager.DeleteTopic(topicName);

 

Zarządzanie subskrypcjami

Po wysłaniu wiadomości do topiku, w celu jej odczytu odbiorca nie odwołuje się bezpośrednio do topiku tylko korzysta z wybranej subskrypcji. To jakie wiadomości z topiku trafią do danej subskrypcji określane jest poprzez filtry operujące na atrybutach wiadomości. Podstawową różnicą w stosunku do kolejek jest to, iż w przypadku topiku tą samą wiadomość może otrzymać kilku odbiorców. Jeżeli dana wiadomość przesłana do topiku spełni warunki filtrów więcej niż jednej subskrypcji to jej kopia zostanie przekazana do każdej z nich. Należy jednak pamiętać, że filtry subskrypcji sprawdzane są tylko w momencie pojawienia się wiadomości w topiku. Jeżeli utworzymy nową subskrypcję to mogą do niej trafić jedynie wiadomości wysłane od tego momentu, wszystkie już istniejące w topiku mimo spełnionych warunków filtra nie zostaną do niej przekazane. Mamy trzy możliwości definiowania subskrypcji: bez filtra (będą do niej trafiać wszystkie wiadomości z topiku), z filtrem typu CorrelationFilter (filtr odnoszący się do wartości atrybutu CorrelationId wiadomości) lub z filtrem typu SqlFilter (filtr odnoszący się do poszczególnych elementów atrybutu Properties wiadomości).

Subskrypcję bez filtra tworzymy podając jedynie jej nazwę oraz nazwę topiku, z którym będzie powiązana:

string subscriptionName = "ordersall";

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
  namespaceManager.CreateSubscription(topicName, subscriptionName);

W przypadku tworzenia subskrypcji z filtrem typu CorrelationFilter, oprócz nazwy topiku musimy przekazać definicję filtra. Tworząc filtr podajemy wartość atrybutu CorrelationId wiadomości, które mają trafiać do tej subskrypcji:

string subscriptionName = "ordersdepartmenta";

CorrelationFilter filter = new CorrelationFilter("Department A");

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
  namespaceManager.CreateSubscription(topicName, subscriptionName, filter);

Przy użyciu powyżej zdefiniowanej subskrypcji otrzymamy wszystkie wiadomości wysłane do topiku, które w atrybucie CorrelationId będą miały wartość „Department A”.

Jeżeli chcemy mieć większe możliwości filtrowania możemy utworzyć subskrypcję z filtrem typu SqlFilter. W tym przypadku tworząc filtr podajemy wyrażenie logiczne operujące na elementach atrybutu Properties wiadomości (jest to atrybut typu IDictionary<string, object>). Budując wyrażenie mamy do dyspozycji następujące operatory: =, >, <, !=, <>, and, or, (, ). Daje to bardzo duże możliwości tworzenia warunków filtrowania. Przykładowy filtr może wyglądać tak: „(Region != ‚PL’ or ItemsCount > 10) and (Sender = ‚Jan Kowalski’)”, gdzie Region, ItemsCount i Sender to klucze we właściwości Properties wiadomości.

Oto w jaki sposób utworzyć subskrypcję z filtrem SqlFilter:

string subscriptionName = "orderswholesalepl";

SqlFilter filter = new SqlFilter("Region = 'PL' and ItemsCount > 10");

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
  namespaceManager.CreateSubscription(topicName, subscriptionName, filter);

Do powyższej subskrypcji trafią wszystkie wiadomości z topiku, które we właściwości Properties będą posiadały klucz Region z wartością „PL” oraz klucz ItemsCount z wartością większą od 10.

Do utworzenia subskrypcji możemy także użyć obiektu SubscriptionDescription, pozwalającego na dodatkową konfigurację:

string subscriptionName = "orderswholesalepl";

SqlFilter filter = new SqlFilter("Region = 'PL' and ItemsCount > 10");

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
{
  SubscriptionDescription subscription =
    new SubscriptionDescription(topicName, subscriptionName)
  {
    RequiresSession = false,
    LockDuration = TimeSpan.FromMinutes(1)
  };
  namespaceManager.CreateSubscription(subscription, filter);
}

Z obiektu SubscriptionDescription możemy również skorzystać w celu odczytu informacji o subskrypcji oraz jej modyfikacji:

string subscriptionName = "ordersall";

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

SubscriptionDescription subscription =
  namespaceManager.GetSubscription(topicName, subscriptionName);

long messageCount = subscription.MessageCount;

subscription.MaxDeliveryCount = 5;
namespaceManager.UpdateSubscription(subscription);

Usunięcie subskrypcji sprowadza się do wywołania metody DeleteSubscription na obiekcie NamespaceManager:

string subscriptionName = "ordersall";

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

namespaceManager.DeleteSubscription(topicName, subscriptionName);

 

Wysyłanie wiadomości do topiku

Wysłanie wiadomości do topiku wygląda praktycznie tak samo jak w przypadku kolejki. Mamy dwie możliwości, albo użyjemy dedykowanego obiektu TopicClient albo uniwersalnego MessageSender znanego z kolejek (tym razem tworzonego poprzez podanie nazwy topiku):

TopicClient topicClient =
  TopicClient.CreateFromConnectionString(connectionString, topicName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

TopicClient topicClient = messagingFactory.CreateTopicClient(topicName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageSender messageSender =
  messagingFactory.CreateMessageSender(topicName);

Samo przygotowanie wiadomości nie różni się niczym od tego, co opisywałem przy kolejkach. Tworzymy obiekt klasy BrokeredMessage i przekazujemy go do metody Send klienta topiku:

Order order = new Order()
{
  Id = "1",
  Date = DateTime.UtcNow,
  Customer = "Company ABC",
  Details = new List<string> { "Product1", "Product2", "Product3" }
};

BrokeredMessage message = new BrokeredMessage(order)
{
  MessageId = Guid.NewGuid().ToString(),
  Label = "Application X",
  CorrelationId = "Department A",
  TimeToLive = TimeSpan.FromHours(12)
};
message.Properties.Add("Sender", "Jan Kowalski");
message.Properties.Add("Region", "PL");
message.Properties.Add("ItemsCount", order.Details.Count);

messageSender.Send(message);

messageSender.Close();

 

Odbieranie wiadomości przy użyciu subskrypcji

W celu odczytu wiadomości wysłanych do topiku odbiorca nie odwołuje się bezpośrednio do topiku tylko korzysta z określonej subskrypcji, która z jego punktu widzenia posiada funkcjonalność kolejki. Przy użyciu wybranej subskrypcji będziemy w stanie odczytać jedynie te wiadomości, które spełniły warunki jej filtra. Podobnie jak w przypadku kolejek odczyt wiadomości może być realizowany w trybie ReceiveAndDelete lub PeekLock. Po więcej szczegółów odsyłam do poprzedniego tematu. Aby odebrać wiadomość musimy utworzyć obiekt klienta subskrypcji. Możemy skorzystać z klasy SubscriptionClient lub MessageReceiver. W przypadku tego drugiego w celu wskazania konkretnej subskrypcji podajemy ścieżkę w formacie „topicName/subscriptions/subscriptionName„:

string subscriptionName = "ordersall";

SubscriptionClient subscriptionClient =
  SubscriptionClient.CreateFromConnectionString(
    connectionString, topicName, subscriptionName, ReceiveMode.PeekLock);

lub:

string subscriptionName = "ordersall";

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

SubscriptionClient subscriptionClient =
  messagingFactory.CreateSubscriptionClient(
    topicName, subscriptionName, ReceiveMode.PeekLock);

lub:

string subscriptionName = "ordersall";

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageReceiver messageReceiver =
  messagingFactory.CreateMessageReceiver(
    String.Format("{0}/subscriptions/{1}", topicName, subscriptionName),
    ReceiveMode.PeekLock);

Pobieranie wiadomości nie różni się niczym w stosunku do tego, co opisywałem przy kolejkach. Tu także mamy do dyspozycji metody Peek, Receive oraz ReceiveBatch, a odebrana wiadomość to znany już obiekt klasy BrokeredMessage:

BrokeredMessage message =
  messageReceiver.Receive(TimeSpan.FromSeconds(5));

if (message != null)
{
  try
  {
    long messageNumber = message.SequenceNumber;
    string messageId = message.MessageId;
    string label = message.Label;
    string correlationId = message.CorrelationId;
    string orderSender = message.Properties["Sender"].ToString();
    string orderRegion = message.Properties["Region"].ToString();

    Order order = message.GetBody<Order>();

    if (orderRegion == "PL")
    {
      //Processing Order...

      message.Complete();
    }
    else
      message.DeadLetter();
  }
  catch (Exception ex)
  {
    message.Abandon();
  }
}
messageReceiver.Close();

W tym i poprzednim wpisie pokazałem podstawowe operacje związane z wysyłaniem i odbieraniem wiadomości przy wykorzystaniu kolejek, topików i subskrypcji. W następnym temacie przedstawię bardziej zaawansowane scenariusze pracy z wiadomościami.

Azure Service Bus brokered messaging – Queues

2014-11-07

W poprzednim temacie opisałem czym jest Azure Service Bus oraz jak rozpocząć korzystanie z tej usługi. Dziś zajmę się tematem kolejek (Queues). Pokażę w jaki sposób zarządzać kolejkami z poziomu kodu oraz jak oprogramować wysyłanie i odbieranie wiadomości. W prezentowanych przykładach wiadomość przesyłana pomiędzy nadawcą i odbiorcą będzie zawierała obiekt zamówienia:

public class Order
{
  public string Id { get; set; }
  public DateTime Date { get; set; }
  public string Customer { get; set; }
  public List<string> Details { get; set; }
}

Na początek zdefiniuję dwa pola, do których będę się odwoływał w kolejnych przykładach. Pierwszym z nich jest connectionString zawierający pobrany z portalu Microsoft Azure connection string dla utworzonej przestrzeni nazw. Drugim polem będzie queueName z nazwą kolejki:

public readonly string connectionString = String.Format(
  "Endpoint={0};SharedAccessKeyName={1};SharedAccessKey={2}",
  "sb://tempnamespace.servicebus.windows.net/",
  "RootManageSharedAccessKey",
  "zFUqkwWoDrem9O8UkdG0pq0sheJze0U/P93f42Aykdc=");

public readonly string queueName = "ordersqueue";

 

Zarządzanie kolejką

Zarządzanie kolejkami odbywa się za pośrednictwem obiektu NamespaceManager, który odpowiada za obsługę przestrzeni nazw Azure Service Bus wskazanej w connection string. Poniższy kod przygotowuje instancję obiektu NamespaceManager, sprawdza czy kolejka o podanej nazwie nie istnieje i tworzy ją w przestrzeni nazw usługi:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.QueueExists(queueName))
  namespaceManager.CreateQueue(queueName);

Stworzona w ten sposób kolejka we wszystkich atrybutach posiada domyślne wartości. Jeżeli kolejkę chcemy odpowiednio skonfigurować, do jej utworzenia możemy użyć obiektu QueueDescription:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.QueueExists(queueName))
{
  QueueDescription queue = new QueueDescription(queueName)
  {
    MaxSizeInMegabytes = 1024,
    RequiresSession = false,
    RequiresDuplicateDetection = true,
    LockDuration = TimeSpan.FromMinutes(1)
  };
  namespaceManager.CreateQueue(queue);
}

Modyfikacja ustawień istniejącej kolejki również wykonywana jest za pomocą obiektu QueueDescription zwróconego przez metodę GetQueue:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

QueueDescription queue = namespaceManager.GetQueue(queueName);
queue.MaxDeliveryCount = 5;
namespaceManager.UpdateQueue(queue);

W podobny sposób możemy pobrać wartości atrybutów tylko do odczytu prezentujących aktualny status kolejki:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

QueueDescription queue = namespaceManager.GetQueue(queueName);
long messageCount = queue.MessageCount;
long sizeInBytes = queue.SizeInBytes;

Usunięcie kolejki sprowadza się do wywołania metody DeleteQueue:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

namespaceManager.DeleteQueue(queueName);

 

Wysyłanie wiadomości

W celu wysłania lub odebrania wiadomości do/z kolejki musimy utworzyć obiekt klienta kolejki. Mamy trzy sposoby:

QueueClient queueClient =
  QueueClient.CreateFromConnectionString(connectionString, queueName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

QueueClient queueClient =
  messagingFactory.CreateQueueClient(queueName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageSender messageSender =
  messagingFactory.CreateMessageSender(queueName);

Jak widać, w pierwszym i drugim przypadku tworzymy obiekt klasy QueueClient, natomiast w trzecim przypadku obiekt klasy MessageSender. Ja preferuję ostatnie podejście, ponieważ MessageSender jest bardziej uniwersalnym obiektem. W zależności od tego czy jako parametr metody CreateMessageSender przekażemy nazwę kolejki czy topiku, zwrócony obiekt MessageSender za pomocą tych samych metod będzie odwoływał się do właściwego składnika usługi. Pozwala to na dowolne przełączanie naszej aplikacji na pracę z kolejką lub topikiem bez modyfikacji kodu.

Wiadomość wysyłana do kolejki reprezentowana jest przez obiekt klasy BrokeredMessage. Tworząc wiadomość, w konstruktorze przekazujemy obiekt stanowiący jej treść (body) – w naszym przypadku będzie to obiekt klasy Order. W poszczególnych atrybutach obiektu BrokeredMessage możemy umieścić dodatkowe informacje opisujące wiadomość. Dzięki atrybutowi Properties (typu IDictionary<string, object>) do przesyłanej wiadomości możemy dołączyć listę dowolnych parametrów. W celu wysłania przygotowanej wiadomości wystarczy wywołać metodę Send na obiekcie klienta kolejki:

Order order = new Order()
{
  Id = "1",
  Date = DateTime.UtcNow,
  Customer = "Company ABC",
  Details = new List<string> { "Product1", "Product2", "Product3" }
};

BrokeredMessage message = new BrokeredMessage(order)
{
  MessageId = Guid.NewGuid().ToString(),
  Label = "Application X",
  CorrelationId = "Department A",
  TimeToLive = TimeSpan.FromHours(12)
};
message.Properties.Add("Sender", "Jan Kowalski");
message.Properties.Add("Region", "PL");

messageSender.Send(message);

messageSender.Close();

 

Odbieranie wiadomości

Odczyt wiadomości z kolejki może się odbywać w dwóch trybach: ReceiveAndDelete lub PeekLock. W trybie ReceiveAndDelete wiadomość w momencie pobrania jest usuwana z kolejki. Jeżeli przetwarzanie nie powiedzie się to my musimy zadbać o to, żeby wiadomość nie została utracona. W trybie PeekLock po odebraniu wiadomości nie jest ona usuwana z kolejki, a jedynie ustawiana jest na niej odpowiednia blokada zapobiegająca ponownemu pobraniu. Domyślnie blokada zakładana jest na minutę, ale jej czas możemy określić podczas tworzenia kolejki za pomocą atrybutu LockDuration (maksymalnie 5 minut). Po zakończeniu przetwarzania wiadomości musimy poinformować usługę o jego statusie. Mamy do wyboru następujące metody na obiekcie wiadomości:

  • Complete – wiadomość przetworzona poprawnie, zostaje usunięta z kolejki
  • Defer – odłożenie przetwarzania, wiadomość pozostaje w kolejce, ale można się do niej ponownie odwołać jedynie poprzez jej numer (ten scenariusz opiszę w kolejnych tematach)
  • DeadLetter – przeniesienie wiadomości do specjalnej pod-kolejki zawierającej „wymarłe” wiadomości (ten scenariusz także opiszę w kolejnych tematach)
  • Abandon – anulowanie przetwarzania, z wiadomości zdejmowana jest blokada i staje się ona ponownie dostępna do pobrania

Aby odebrać wiadomość musimy utworzyć obiekt klienta kolejki. Domyślnie odbieranie wiadomości realizowane jest w trybie PeekLock, ale możemy to określić podczas tworzenia obiektu. Podobnie jak przy wysyłaniu wiadomości tu także możemy korzystać bezpośrednio z obiektu QueueClient lub bardziej uniwersalnej wersji MessageReceiver:

QueueClient queueClient =
  QueueClient.CreateFromConnectionString(connectionString, queueName, ReceiveMode.PeekLock);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageReceiver messageReceiver =
  messagingFactory.CreateMessageReceiver(queueName, ReceiveMode.PeekLock);

W celu pobrania wiadomości z kolejki wywołujemy metodę Receive, z możliwością podania timeout-u dla operacji. Wiadomości zwracane są w kolejności FIFO i jeżeli  kolejka nie jest pusta w wyniku otrzymamy obiekt BrokeredMessage (w przeciwnym wypadku będzie to null). Odczyt właściwej treści wiadomości odbywa się poprzez metodę GetBody<T>:

BrokeredMessage message =
  messageReceiver.Receive(TimeSpan.FromSeconds(5));

if (message != null)
{
  try
  {
    long messageNumber = message.SequenceNumber;
    string messageId = message.MessageId;
    string label = message.Label;
    string correlationId = message.CorrelationId;
    string orderSender = message.Properties["Sender"].ToString();
    string orderRegion = message.Properties["Region"].ToString();

    Order order = message.GetBody<Order>();

    if (orderRegion == "PL")
    {
      //Processing Order...

      message.Complete();
    }
    else
      message.DeadLetter();
  }
  catch (Exception ex)
  {
    message.Abandon();
  }
}
messageReceiver.Close();

Oprócz pokazanego standardowego pobierania wiadomości mamy jeszcze dwie opcje. Pierwszą z nich jest podgląd, czyli możliwość odczytu wiadomości bez usuwania z kolejki i zakładania blokady. Służy do tego metoda Peek:

BrokeredMessage message = messageReceiver.Peek();

Drugą opcją jest pobranie za jednym razem większej liczby wiadomości. Wystarczy wywołać metodę ReceiveBatch z parametrem określającym liczbę wiadomości do pobrania:

IEnumerable<BrokeredMessage> messages = messageReceiver.ReceiveBatch(10);

To wszystko jeżeli chodzi o podstawowe operacje na kolejkach Azure Service Bus. Oczywiście jest to tylko niezbędne minimum pozwalające zbudować pierwsze działające rozwiązanie. W kolejnych tematach przedstawię znacznie bardziej rozbudowane scenariusze pracy z wiadomościami takie jak: obsługa pod-kolejek DeadLetter, opóźnione przetwarzanie, wykrywanie zdublowanych wiadomości, harmonogram dostarczania wiadomości, sesje, transakcje przy wysyłaniu i odbieraniu wielu wiadomości oraz praca w modelu request-response. Ale zanim przejdę do tych tematów, w następnym wpisie zajmę się topikami i subskrypcjami.

Azure Service Bus brokered messaging – komunikacja w systemach rozproszonych

2014-10-25

Dzisiejszym wpisem chciałbym rozpocząć cykl kilku tematów poświęconych omówieniu Azure Service Bus – jednego ze składników Microsoft Azure umożliwiającego wymianę informacji w systemach rozproszonych. Azure Service Bus pozwala na komunikację w dwóch trybach: relayed messaging oraz brokered messaging. Tryb relayed polega na synchronicznej wymianie wiadomości, podczas której nadawca i odbiorca muszą być dostępni on-line (analogicznie do WebService). Z kolei tryb brokered pozwala na komunikację asynchroniczną gdzie nadawca i odbiorca pracują niezależnie, a wiadomości są kolejkowane na serwerach Azure. Znacznie ciekawszy jest drugi ze wspomnianych trybów i właśnie jemu poświęcę ten cykl. Dziś przedstawię ogólną koncepcję komunikacji w trybie brokered messaging, omówię poszczególne elementy Azure Service Bus takie jak Queue, Topic, Subscription, Brokered Message oraz pokażę kroki niezbędne do rozpoczęcia budowy konkretnego rozwiązania. W następnych wpisach znajdzie się już znacznie więcej kodu prezentującego praktyczne wykorzystanie omawianych elementów.

Jak wspomniałem wcześniej, tryb brokered messaging w Azure Service Bus pozwala na asynchroniczną wymianę informacji pomiędzy nadawcą i odbiorcą. Nadawca wysyłając wiadomość nie wie kiedy oraz przez kogo zostanie ona odczytana, jego jedynym zadaniem jest jej utworzenie i wysłanie do usługi Service Bus. Z kolei odbiorca w dowolnym momencie łączy się z usługą w celu pobrania (w kolejności FIFO) i przetworzenia oczekujących wiadomości. Największą zaletą tego rozwiązania jest fakt, że za cały mechanizm kolejkowania i zwracania wiadomości odpowiada Azure, naszą rolą jest jedynie oprogramowanie wysyłania i odbierania wiadomości. Budując rozwiązanie komunikacyjne oparte o Service Bus w trybie brokered messaging mamy do wyboru dwa podejścia: kolejki (Queues) oraz topiki z subskrypcjami (Topics, Subscriptions).

 

Queue

Poniższy schemat prezentuje zasadę komunikacji przy użyciu kolejek:

sb_queues

Źródło: MSDN

Najważniejszym elementem jest tutaj kolejka (Queue), która pełni rolę pojemnika na wiadomości. Nadrzędnym elementem jest Namespace, który może zawierać wiele kolejek (ich nazwy muszą być unikalne). Nadawcą może być dowolna aplikacja lub usługa, której rolą jest nawiązanie połączenia z usługą Azure Service Bus, utworzenie wiadomości oraz wysłanie jej do konkretnej kolejki. Odbiorcą również może być dowolny rodzaj aplikacji, a co najważniejsze z jednej kolejki może korzystać wielu odbiorców równocześnie. Jak to działa? Wiadomości zwracane są z kolejki w kolejności ich dodawania (FIFO). Po każdorazowym zgłoszeniu się odbiorcy po wiadomość usługa Azure Service Bus gwarantuje zwrócenie danej wiadomości tylko raz. W wyniku każdego kolejnego zgłoszenia otrzymamy następną wiadomość niezależnie czy będzie to ten sam odbiorca czy inny. Jak łatwo zauważyć, daje to bardzo duże możliwości skalowania naszego rozwiązania bez żadnych nakładów pracy. Jeżeli widzimy, że nasza aplikacja odbierająca nie nadąża z przetwarzaniem wiadomości, wystarczy uruchomić jej kopię na innej maszynie i automatycznie otrzymujemy wzrost wydajności dzięki zrównolegleniu operacji.

 

Topic, Subscription

Drugim podejściem jest wykorzystanie topików i subskrypcji. Oto jak wygląda schemat komunikacji w tym przypadku:

sb_topics

Źródło: MSDN

Od strony nadawcy praktycznie nic się nie zmienia w stosunku do kolejek. Tworzenie wiadomości odbywa się w identyczny sposób, a jedyną różnicą jest wysyłanie ich do topiku zamiast do kolejki. Z punktu widzenia nadawcy topik funkcjonalnie odpowiada kolejce. Większe różnice są po stronie odbiorcy. W celu odczytu wiadomości odbiorca nie odwołuje się bezpośrednio do topiku tylko korzysta z wybranej subskrypcji, która z jego punktu widzenia posiada funkcjonalność kolejki. To jakie wiadomości z topiku trafią do danej subskrypcji określane jest poprzez filtry operujące na atrybutach wiadomości. Podstawową różnicą w stosunku do kolejek jest to, iż w przypadku topiku tą samą wiadomość może otrzymać kilku odbiorców. Jeżeli dana wiadomość przesłana do topiku spełni warunki filtrów więcej niż jednej subskrypcji to jej kopia zostanie przekazana do każdej z nich. Warto dodać, że filtry subskrypcji sprawdzane są tylko w momencie pojawienia się wiadomości w topiku. Jeżeli utworzymy nową subskrypcję to mogą do niej trafić jedynie wiadomości wysłane od tego momentu, wszystkie wcześniejsze mimo spełnionych warunków filtra nie zostaną do niej przekazane. Możemy również utworzyć subskrypcję bez filtra, wówczas trafią do niej wszystkie wiadomości przesłane do topiku. Sam mechanizm odczytu wiadomości z konkretnej subskrypcji nie różni się niczym od kolejki. Z jednej subskrypcji może równocześnie korzystać wielu odbiorców, gdzie podobnie jak w przypadku kolejki gwarantowane jest dostarczenie danej wiadomości tylko do jednego z nich.

 

Brokered Message

Niezależnie od wybranego schematu komunikacji (kolejki lub topiki z subskrypcjami) wiadomość przesyłana pomiędzy nadawcą i odbiorcą ma następującą strukturę:

sb_message

Składa się ona z nagłówka (header) oraz treści (body). Nagłówek posiada szereg zdefiniowanych atrybutów, w których możemy umieścić dodatkowe informacje opisujące naszą wiadomość (m.in. MessageId, CorrelationId, SessionId, Label, Properties). Dzięki atrybutowi Properties (typu IDictionary<string, object>) do przesyłanej wiadomości możemy dołączyć listę dowolnych parametrów. Wspomniane wcześniej filtry subskrypcji decydujące o tym, które wiadomości z topiku zostaną przekazane do danej subskrypcji mogą korzystać z właściwości CorrelationId lub poszczególnych elementów właściwości Properties. Główną częścią wiadomości jest jej treść – czyli sekcja body, w której możemy umieścić konkretny obiekt. W tym miejscu trzeba zaznaczyć, iż maksymalny rozmiar przesyłanej wiadomości to 256kB, z czego 64kB przypada na nagłówek a 192kB na treść. Może się wydawać, że dla wielu scenariuszy nie jest to rozmiar wystarczający, istnieje jednak sposób na poradzenie sobie z tym ograniczeniem, co pokażę w kolejnych wpisach.

 

Konfiguracja Azure Service Bus

Przed rozpoczęciem tworzenia konkretnego rozwiązania musimy skonfigurować usługę Azure Service Bus. Logujemy się do portalu Microsoft Azure, wybieramy sekcję Service Bus i klikamy Create w celu utworzenia przestrzeni nazw (Namespace), w której będą znajdować się nasze kolejki, topiki i subskrypcje.

azure_sb

Podajemy nazwę dla tworzonego obiektu Namespace, wybieramy region i wskazujemy typ MESSAGING. Po zakończeniu operacji musimy pobrać Connection String dla utworzonej przestrzeni nazw, za pomocą którego nasze aplikacje będą łączyły się z usługą. Klikamy Connection Information i kopiujemy widoczny Connection String:

sb_namespace_cs

Składa się on z trzech elementów: adresu Namespace (Endpoint), nazwy polisy zawierającej klucz dostępu (SharedAccessKeyName) oraz samego klucza (SharedAccessKey):

Endpoint=sb://tempnamespace.servicebus.windows.net/;
SharedAccessKeyName=RootManageSharedAccessKey;
SharedAccessKey=zFUqkwWoDrem9O8UkdG0pq0sheJze0U/P93f42Aykdc=

Polisami oraz kluczami możemy zarządzać w opcji Configure naszej przestrzeni nazw:

sb_ns_configure

Jak widać domyślnie tworzona jest polisa RootManageSharedAccessKey z uprawnieniami zarządzania przestrzenią (Manage) oraz wysyłania i odbierania wiadomości (Send, Listen). Jeżeli dla danej aplikacji chcemy ograniczyć te prawa (np. tylko wysyłanie wiadomości) możemy utworzyć nową polisę i pobrać odpowiedni Connection String. Każda polisa posiada dwa klucze dostępowe (primary i secondary) działające równocześnie. Pozwala to na okresową zmianę kluczy bez konieczności natychmiastowej aktualizacji wszystkich klientów.

Przestrzeń nazw posiada także opcje Queues oraz Topics pozwalające na tworzenie kolejek, topików i subskrypcji, ale ja w kolejnych wpisach pokażę jak robić to z poziomu kodu.

 

Konfiguracja projektu Visual Studio

Ostatnim elementem jaki pozostał jest podłączenie odpowiednich referencji do naszego projektu w celu skorzystania z API usługi Azure Service Bus. W tym celu klikamy prawym klawiszem na References, wybieramy Manage NuGet Packages, wyszukujemy paczkę Microsoft Azure Service Bus i instalujemy. W efekcie do naszego projektu dodane zostaną referencje do bibliotek Microsoft.ServiceBus oraz Microsoft.WindowsAzure.Configuration.

W kolejnych wpisach znajdzie się zdecydowanie więcej kodu, a rozpocznę od pokazania jak oprogramować wysyłanie i odbieranie wiadomości do/z kolejki.

Zmiana Capabilities dla istniejącego agenta

2014-10-03

Podczas budowy agenta typu Extensible Connection 2.0 w naszej klasie implementujemy między innymi interfejs IMAExtensible2GetCapabilities. Wymaga to dodania do niej publicznej właściwości Capabilities zwracającej obiekt typu MACapabilities. Obiekt ten posiada szereg ustawień konfigurujących pracę naszego agenta. Oto przykładowa implementacja tej właściwości:

public MACapabilities Capabilities
{
  get
  {
    return new MACapabilities()
      {
        ConcurrentOperation = false,
        DeltaImport = false,
        DistinguishedNameStyle = MADistinguishedNameStyle.Generic,
        DeleteAddAsReplace = false,
        ExportType = MAExportType.AttributeReplace,
        NoReferenceValuesInFirstExport = false,
        SupportExport = true
      };
  }
}

Podczas dodawania agenta do Synchronization Service jednym z kroków jest Capabilities:

MaCapabilities

Właśnie w tym momencie następuje odwołanie do właściwości Capabilities w klasie naszego agenta i odczyt znajdujących się tam ustawień. Niestety tu pojawia się pewien problem. Odczyt ten następuje tylko podczas dodawania nowego agenta i pobrane ustawienia są dla niego zapamiętywane w aktualnej postaci. Jeżeli później wejdziemy w edycję istniejącego agenta to kroku Capabilities już nie zobaczymy. Co więcej, nawet jak wyeksportujemy agenta do pliku i następnie utworzymy nowego poprzez import, to ustawienia Capabilities nie zostaną wczytane z biblioteki a jedynie odtworzone z pliku eksportu. Co to oznacza? Jeżeli zmodyfikujemy ustawienia MACapabilities w naszej klasie to pomimo podmiany biblioteki agent cały czas będzie działał zgodnie z ustawieniami zapamiętanymi podczas jego tworzenia. Nie ma możliwości odświeżenia Capabilities dla istniejącego agenta więc nie jesteśmy w stanie zmienić dla niego tych ustawień (Refresh interfaces nie zadziała). Możemy oczywiście usunąć aktualnego agenta i utworzyć nowego, ale takie rozwiązanie przy rozbudowanym agencie (posiadającym kilka typów obiektów, do tego filtry oraz kilkanaście przepływów) oznacza żmudną i podatną na błędy pracę. Jaki jest więc sposób na wprowadzenie zmian w Capabilities dla istniejącego agenta? Ja stosuję następującą metodę:

  1. Modyfikujemy właściwość Capabilities w klasie agenta, kompilujemy i podmieniamy bibliotekę
  2. Dodajemy do Synchronization Service nowego agenta korzystającego z naszej biblioteki (pod dowolną nazwą, bez konfigurowania)
  3. Eksportujemy oryginalnego i nowego agenta do plików xml
  4. Usuwamy obydwu agentów z Synchronization Service
  5. Podmieniamy wartości elementów <capabilities-mask> i <capability-bits> w pliku xml oryginalnego agenta na wartości z pliku nowego agenta
  6. Importujemy oryginalnego agenta

W elementach <capabilities-mask> i <capability-bits> znajdują się ustawienia Capabilities zapisane podczas tworzenia agenta. Po zmianie ich na nowe wartości i imporcie nasz dotychczasowy agent będzie działał zgodnie z oczekiwaniami.

Polecenie TOP z argumentem WITH TIES

2014-09-15

W dzisiejszym wpisie pokażę dosyć mało znaną konstrukcję TOP…WITH TIES. Muszę przyznać, że sam trafiłem na nią całkiem niedawno mimo, iż była ona dostępna już w SQL Server 2000. Załóżmy, że mamy pewien zbiór danych:

ties1

Teraz chcemy wybrać trzy firmy o największej sprzedaży. Company C i Company D mają taką samą wartość więc oczekujemy, że obie firmy znajdą się w wynikach zapytania. Niestety standardowe polecenie TOP 3 zwróci nam dokładnie trzy rekordy, pomijając wyniki ex-aequo.

Do tej pory w celu rozwiązania powyższego problemu stosowałem funkcję RANK(). Najpierw tworzyłem zapytanie z dodatkową kolumną rankingową:

select
  Company,
  TotalSales,
  rank() over(order by TotalSales desc) r
from Sales

Dzięki czemu otrzymywałem taki oto zbiór:

ties2

Następnie tworzyłem ostateczne zapytanie:

select
  Company,
  TotalSales
from
(
  select
    Company,
    TotalSales,
    rank() over(order by TotalSales desc) r
  from Sales
) Sales
where r <= 3

Otrzymując oczekiwane dane:

ties3

Okazuje się jednak, że dzięki poleceniu TOP z argumentem WITH TIES otrzymamy dokładnie ten sam wynik:

select top 3 with ties
  Company,
  TotalSales
from Sales
order by TotalSales desc

Dodanie do polecenia TOP klauzuli WITH TIES powoduje dołączenie do wynikowego zbioru danych wszystkich rekordów posiadających takie same wartości w kolumnach ORDER BY jak ostatni zwrócony rekord.

Najbliższe wydarzenia

2014-09-03

Nadchodząca jesień zapowiada się niezwykle ciekawie pod względem wydarzeń w świecie dev. Oto lista zbliżających się konferencji poświęconych technologiom .NET/SQL Server:

Ja zamierzam wybrać się na .NET DeveloperDays i SQLDay. Do zobaczenia:)

Obserwuj

Otrzymuj każdy nowy wpis na swoją skrzynkę e-mail.