Author Archives: mn

Python – funkcje operujące na kolekcjach

2017-04-14

Dziś pokażę, w jaki sposób poprawić czytelność i prostotę kodu poprzez wykorzystanie odpowiednich funkcji Pythona. Naszym zadaniem będzie stworzenie klasy Vector, przyjmującej w konstruktorze listę liczb całkowitych oraz posiadającej kilka metod. Oto wymagania dla tej klasy:

a = Vector([1,2,3])
b = Vector([4,5,6])
a.add(b) #zwraca obiekt: Vector([5,7,9])
a.subtract(b) #zwraca obiekt: Vector([-3,-3,-3])
a.dot(b) #zwraca wynik: 1*4+2*5+3*6 = 32
a.norm() #zwraca wynik: sqrt(1^2+2^2+3^2) = 3.74
a.equals(b) #zwraca wynik: False
a.equals(Vector([1,2,3])) #zwraca wynik: True
str(a) #zwraca łańcuch: "(1,2,3)"
#w przypadku metod add, subtract i dot, gdy a i b mają inną liczbę elementów powinien zostać zgłoszony wyjątek

Skoro znamy wymagania, możemy przejść do implementacji klasy Vector:

from math import sqrt

class Vector():
    def __init__(self, items):
        self.items = items

    def add(self, other):
        if len(self.items) != len(other.items):
            raise Exception('Error')
        a = []
        for i in range(len(self.items)):
            a.append(self.items[i] + other.items[i])
        return Vector(a)

    def subtract(self, other):
        if len(self.items) != len(other.items):
            raise Exception('Error')
        a = []
        for i in range(len(self.items)):
            a.append(self.items[i] - other.items[i])
        return Vector(a)

    def dot(self, other):
        if len(self.items) != len(other.items):
            raise Exception('Error')
        a = 0
        for i in range(len(self.items)):
            a = a + (self.items[i] * other.items[i])
        return a

    def norm(self):
        a = 0
        for i in self.items:
            a = a + (i*i)
        return sqrt(a)

    def equals(self, other):
        if len(self.items) != len(other.items):
            return False
        for i in range(len(self.items)):
            if self.items[i] != other.items[i]:
                return False
        return True

    def __str__(self):
        s = "("
        for i in self.items:
            if len(s) > 1:
                s = s + ","
            s = s + str(i)
        s = s + ")"
        return s

Powyższy kod działa i realizuje wszystkie wymagania. Czy można go jednak uprościć? Jak widać we wszystkich metodach użyte zostały pętle for oraz warunki if. Czy możliwe jest pozbycie się ich wszystkich? Wykorzystując odpowiednie funkcje Pythona nie stanowi to problemu. Oto klasa Vector w nowej wersji:

from operator import add, sub, mul, eq
from functools import reduce
from math import sqrt

class Vector():
    def __init__(self, items):
        self.items = items

    def add(self, other):
        assert(len(self.items) == len(other.items))
        return Vector(map(add, self.items, other.items))

    def subtract(self, other):
        assert(len(self.items) == len(other.items))
        return Vector(map(sub, self.items, other.items))

    def dot(self, other):
        assert(len(self.items) == len(other.items))
        func = (lambda x, y: x + y)
        return reduce(func, map(mul, self.items, other.items))

    def norm(self):
        return sqrt(sum(i * i for i in self.items))

    def equals(self, other):
        eqLen = (len(self.items) == len(other.items))
        return eqLen and all(map(eq, self.items, other.items))

    def __str__(self):
        return "({0})".format(",".join(map(str, self.items)))

Zgłaszanie wyjątków zostało zastąpione funkcją assert, która robi to automatycznie gdy przekazany warunek nie zostanie spełniony. Z kolei wszystkie pętle stały się zbędne po wykorzystaniu funkcji map, reduce, sum oraz all. Oto opis tych i kilku podobnych funkcji Pythona:

  • add(a, b) – to samo co a + b
  • sub(a, b) – to samo co a – b
  • mul(a, b) – to samo co a * b
  • eq(a, b) – to samo co a == b
  • sum(iterable) – zwraca sumę przekazanych elementów
  • max(iterable) – zwraca największy z przekazanych elementów
  • min(iterable) – zwraca najmniejszy z przekazanych elementów
  • any(iterable) – zwraca True jeżeli dla któregokolwiek z przekazanych elementów bool(x) == True
  • all(iterable) – zwraca True jeżeli dla wszystkich przekazanych elementów bool(x) == True
  • map(function, *iterables) – zwraca iterator, którego elementy są wynikami przekazanej funkcji, która z kolei jako argumenty przyjmuje kolejne elementy przekazanych kolekcji. Liczba wynikowych elementów odpowiada liczbie elementów najkrótszej z przekazanych kolekcji. Np. list(map(add, [1,2,3], [4,5,6])) = [5,7,9] lub list(map(eq, [1,2,3], [1,5,3])) = [True, False, True]
  • reduce(function, sequence[, initial]) – redukuje przekazaną kolekcję do pojedynczej wartości poprzez wykonanie przekazanej dwuargumentowej funkcji na każdym elemencie kolekcji. W każdym kroku do funkcji przekazywany jest bieżący wynik operacji (poprzedni wynik funkcji) wraz z kolejnym elementem kolekcji. Jeżeli nie zostanie podana opcjonalna wartość początkowa, będzie nią pierwszy element kolekcji.
  • filter(function, iterable) – zwraca iterator zawierający te elementy przekazanej kolekcji, dla których przekazana funkcja zwróciła wartość True. Jeżeli nie zostanie przekazana funkcja (None), zwrócone zostaną elementy, dla których bool(x) == True

Wykorzystując powyższe funkcje – dodatkowo łącząc je z listami składanymi – możemy w prosty i przejrzysty sposób wykonać wiele operacji, które w innych językach programowania wymagałyby zastosowania skomplikowanych i mało czytelnych pętli. Właśnie dlatego tak lubię Pythona. 🙂

Reklamy

Python – odczyt oraz modyfikacja dokumentów XML

2017-03-30

W dzisiejszym wpisie pokażę, w jaki sposób odczytywać oraz modyfikować dokumenty XML w języku Python.

Poniżej znajduje się przykładowy dokument XML zawierający informacje o książkach:

<?xml version='1.0' encoding='UTF-8'?>
<books>
  <book>
    <title>Krótka historia czasu</title>
    <author>Stephen Hawking</author>
    <publisher>Zysk i S-ka</publisher>
    <publication_date>2015</publication_date>
    <chapters>
      <chapter number="1">
        <title>Nasz obraz Wszechświata</title>
        <page>13</page>
      </chapter>
      <chapter number="2">
        <title>Czas i przestrzeń</title>
        <page>25</page>
      </chapter>
    </chapters>
  </book>
  <book>
    <title>Filozofia kosmologii</title>
    <author>Michał Heller</author>
    <publisher>Copernicus Center Press</publisher>
    <publication_date>2013</publication_date>
    <chapters>
      <chapter number="1">
        <title>Kosmologia przed Einsteinem</title>
        <page>13</page>
      </chapter>
      <chapter number="2">
        <title>Kosmologia 1917-1965</title>
        <page>39</page>
      </chapter>
    </chapters>
  </book>
</books>

A oto klasy, które reprezentują obiekty książki i rozdziału z powyższego pliku:

class Book():
    def __init__(self, title):
        self.title = title
        self.author = None
        self.publisher = None
        self.publicationDate = None
        self.chapters = list()

class Chapter():
    def __init__(self, book, title, page):
        self.book = book
        self.title = title
        self.page = page
        self.number = None

Mając dokument XML oraz klasy odpowiednich obiektów, możemy przejść do operacji odczytu oraz modyfikacji dokumentu.

Odczyt danych z pliku XML:

from xml.etree.ElementTree import parse, Element, SubElement

def ReadBooks():
    file = r"D:\App\!Python\Test\books.xml"
    books = list()
    doc = parse(file)
    root = doc.getroot()
    for bookElement in root.iterfind("book"):
        title = bookElement.findtext("title")
        book = Book(title)
        book.author = bookElement.findtext("author")
        book.publisher = bookElement.findtext("publisher")
        book.publicationDate = bookElement.findtext("publication_date")
        for chapterElement in bookElement.iterfind("chapters/chapter"):
            title = chapterElement.findtext("title")
            page = chapterElement.findtext("page")
            chapter = Chapter(book, title, page)
            chapter.number = chapterElement.get("number")
            book.chapters.append(chapter)
        book.chapters.sort(key = lambda c : int(c.page))
        books.append(book)
    return books

Dodanie elementu do pliku XML:

from xml.etree.ElementTree import parse, Element, SubElement

def AddBook(book):
    file = r"D:\App\!Python\Test\books.xml"
    doc = parse(file)
    root = doc.getroot()                    
    bookElement = Element("book")
    SubElement(bookElement, "title").text = book.title
    SubElement(bookElement, "author").text = book.author
    SubElement(bookElement, "publisher").text = book.publisher
    SubElement(bookElement, "publication_date").text = book.publicationDate
    chaptersElement = SubElement(bookElement, "chapters")
    for chapter in book.chapters:
        chapterElement = Element("chapter")
        chapterElement.set("number", chapter.number)
        SubElement(chapterElement, "title").text = chapter.title
        SubElement(chapterElement, "page").text = chapter.page
        chaptersElement.append(chapterElement)
    root.append(bookElement)
    doc.write(file, encoding = "UTF-8", xml_declaration = True)

Modyfikacja elementu w pliku XML:

from xml.etree.ElementTree import parse, Element, SubElement

def EditBook(oldBook, newBook):
    file = r"D:\App\!Python\Test\books.xml"
    doc = parse(file)
    root = doc.getroot()
    bookElement = [b for b in root.iterfind("book") if b.findtext("title") == oldBook.title][0]
    bookElement.find("title").text = newBook.title
    bookElement.find("author").text = newBook.author
    bookElement.find("publisher").text = newBook.publisher
    bookElement.find("publication_date").text = newBook.publicationDate
    doc.write(file, encoding = "UTF-8", xml_declaration = True)

def EditChapter(oldChapter, newChapter):
    file = r"D:\App\!Python\Test\books.xml"
    doc = parse(file)
    root = doc.getroot()
    bookElement = [b for b in root.iterfind("book") if b.findtext("title") == oldChapter.book.title][0]
    chapterElement = [c for c in bookElement.iterfind("chapters/chapter") if c.findtext("title") == oldChapter.title][0]
    chapterElement.set("number", newChapter.number)
    chapterElement.find("title").text = newChapter.title
    chapterElement.find("page").text = newChapter.page
    doc.write(file, encoding = "UTF-8", xml_declaration = True)

Usunięcie elementu z pliku XML:

from xml.etree.ElementTree import parse, Element, SubElement

def DeleteBook(book):
    file = r"D:\App\!Python\Test\books.xml"
    doc = parse(file)
    root = doc.getroot()
    bookElement = [b for b in root.iterfind("book") if b.findtext("title") == book.title][0]
    root.remove(bookElement)
    doc.write(file, encoding = "UTF-8", xml_declaration = True)

Wiosenne wydarzenia

2017-02-14

W najbliższym czasie odbędzie się kilka bardzo ciekawych konferencji, na które z niecierpliwością czekam. Co mnie najbardziej cieszy, to pojawienie się nowej ścieżki na 4Developers poświęconej językowi Python:

Do zobaczenia 🙂

Astronomy Picture of the Day jako tapeta

2017-02-10

Astronomy Picture of the Day (APOD) to strona udostępniona przez NASA, na której codziennie pojawia się nowe zdjęcie astronomiczne. Oprócz aktualnego zdjęcia, mamy również dostęp do fotografii archiwalnych z każdego dnia od 1995 roku. Adres strony to https://apod.nasa.gov. Bardzo często zdjęcia z tej strony ustawiałem jako tło pulpitu w systemie, wymagało to jednak wejścia na stronę, ręcznego pobrania obrazu i ustawienia go jako tapeta. Jakiś czas temu, na początku mojej nauki Pythona, postanowiłem napisać skrypt robiący to automatycznie, dodatkowo z możliwością pobierania zdjęć archiwalnych:

import datetime
import urllib.request
import ctypes

date = input("Fotografia z dnia (format RRMMDD, domyślnie dzisiejsza data): ")

if not date:
    date = datetime.datetime.now().strftime("%y%m%d")

print("Data: " + date)

apod = "http://apod.nasa.gov/apod/"
url = "{0}ap{1}.html".format(apod, date)

print("Url: " + url)

with urllib.request.urlopen(url) as response:
    for line in response:
        decodedLine = line.decode('utf-8')
        startIndex = decodedLine.find('<a href="image/')
        if startIndex != -1:
            urlLine = decodedLine.strip()
            break

endIndex = urlLine.find('">')
imgUrl = apod + urlLine[startIndex + 9 : endIndex]
imgExt = imgUrl[-3 :]

print("Link do obrazu: " + urlLine)
print("Url obrazu: " + imgUrl)

filePath = r"D:\Apod\apod." + imgExt
urllib.request.urlretrieve(imgUrl, filePath)

print("Plik zapisany w: " + filePath)

SPI_SETDESKWALLPAPER = 20
SPIF_UPDATEINIFILE = 1
SPIF_SENDWININICHANGE = 2

ctypes.windll.user32.SystemParametersInfoW(SPI_SETDESKWALLPAPER, 0, filePath, SPIF_UPDATEINIFILE | SPIF_SENDWININICHANGE)

input("Gotowe...")

Na początku skryptu wczytywana jest data dla pobieranego zdjęcia (domyślnie jest to dzisiejszy dzień). Następnie budowany jest adres odpowiedniej strony i pobierana jej zawartość. Po odszukaniu adresu obrazu, jest on pobierany i zapisywany w określonej lokalizacji na dysku. Na końcu pobrany obraz ustawiany jest jako tapeta.

Python – wstęp

2016-10-17

Tym wpisem chciałbym rozpocząć cykl tematów poświęconych językowi Python. Jednak zanim przejdę do konkretnych przykładów kodu, dziś pokażę jak rozpocząć pracę z tym językiem.

Python jest językiem interpretowanym, co oznacza konieczność instalacji interpretera. W tym celu wchodzimy na stronę www.python.org, przechodzimy do działu Download, pobieramy najnowszą wersję dla systemu Windows i instalujemy. Domyślnie Python instalowany jest w katalogu C:\Program Files (x86)\Python35-32 (w zależności od wersji). Warto dodać ten folder do zmiennej środowiskowej Path. Po zakończeniu instalacji możemy sprawdzić czy wszystko działa. Z poziomu wiersza poleceń uruchamiamy interpreter (python.exe) i naszym oczom powinna pokazać się konsola Python. Tutaj możemy już pisać kod:

pythonconsole

Jeżeli chcemy uruchomić kod umieszczony w pliku, to wprowadzamy polecenie „python.exe my_python_code.py” lub po prostu ustawiamy w systemie uruchamianie plików *.py przez aplikację python.exe.

Wiemy już w jaki sposób uruchamiać kod Pythona, teraz opiszę kilka darmowych środowisk ułatwiających jego pisanie. Co prawda do pisania kodu wystarczy notatnik (polecam Notepad2), jednak sposób ten sprawdzi się jedynie przy niewielkich skryptach. W przypadku większych projektów warto użyć bardziej zaawansowanego środowiska.

 

IDLE

idle

IDLE to środowisko programistyczne dołączone do standardowej instalacji Pythona. Co ciekawe, napisane jest całkowicie w Pythonie. Osobiście nie przepadam za nim, gdyż na tle innych narzędzi prezentuje się po prostu archaicznie (zarówno pod względem wyglądu jak i funkcjonalności).

 

Visual Studio Code

pythonvscode

Visual Studio Code to lekki, szybki i całkowicie darmowy edytor programistyczny o sporych możliwościach. Dzięki mechanizmowi rozszerzeń dostępne są wtyczki praktycznie dla wszystkich popularnych języków. Aby wykorzystać go do pisania w Pythonie wystarczy kilka kroków:

1) Instalujemy rozszerzenie dla Pythona: Python – Don Jayamanne

2) Przechodzimy do File -> Preferences -> User Settings i wpisujemy ścieżkę do interpretera Python:

// Place your settings in this file to overwrite the default settings
{
    "python.pythonPath": "C:/Program Files (x86)/Python35-32/python.exe"
}

3) W celu uruchomienia tworzonej aplikacji używamy skrótu Ctrl+Shift+B, następnie (tylko przy pierwszym uruchomieniu) klikamy Configure Task Runner, wybieramy Others i wprowadzamy ustawienia:

{
    // See https://go.microsoft.com/fwlink/?LinkId=733558
    // for the documentation about the tasks.json format
    "version": "0.1.0",
    "command": "python",
    "isShellCommand": true,
    "args": ["${file}"],
    "showOutput": "always"
}

Po tych krokach mamy w pełni skonfigurowane środowisko z takimi funkcjami jak kolorowanie składni, IntelliSense, refactoring czy debugowanie.

 

Visual Studio Community

pythonvs

Darmowa wersja środowiska Visual Studio w pełni wspiera tworzenie aplikacji w Pythonie. Musimy jedynie podczas instalacji zaznaczyć opcję Python Tools for Visual Studio. Dzięki temu otrzymujemy kolorowanie składni, IntelliSense, refactoring, debugowanie, interaktywne okno czy zarządzanie zainstalowanymi pakietami.

Jesienne wydarzenia

2015-09-05

W najbliższym czasie odbędzie się kilka ciekawych konferencji:

Ja wybieram się na wszystkie z nich. Do zobaczenia!

Synchronization Service – supported runtime

2015-04-08

Ostatnio uruchamiając synchronizację na jednym z agentów otrzymałem następujący błąd dotyczący własnego rozszerzenia:

The management agent failed on run profile. The run step stopped because a required rules extension „Demo.FIM.ActiveDirectory.dll” could not be loaded.

Po upewnieniu się, że wspomniana biblioteka znajduje się w katalogu Extensions zajrzałem do loga systemowego. Tam opis błędu był znacznie bardziej rozbudowany i wyglądał tak:

Verify that the rules extension is located in the Extensions directory. If the extension is present, confirm that the version of the .NET framework  that can run the extension is installed on the server and that a supportedRuntimes entry in the configuration files specifies that version. The synchronization engine will not be able to load an extension that is built with a newer version of the .NET framework than the version of the .NET runtime it is hosting.

Oraz tak:

Could not load file or assembly ‚file:///C:\Program Files\Microsoft Forefront Identity Manager\2010\Synchronization Service\Extensions\Demo.FIM.ActiveDirectory.dll’ or one of its dependencies. Operation is not supported. (Exception from HRESULT: 0x80131515)

An attempt was made to load an assembly from a network location which would have caused the assembly to be sandboxed in previous versions of the .NET Framework. This release of the .NET Framework does not enable CAS policy by default, so this load may be dangerous. If this load is not intended to sandbox the assembly, please enable the loadFromRemoteSources switch. See http://go.microsoft.com/fwlink/?LinkId=155569 for more information.

Oczywiście wszystkie wymagane wersje frameworków były zainstalowane. Po krótkich poszukiwaniach w internecie okazało się, że błąd ten jest ogólnie znany i można go rozwiązać w następujący sposób:

      1. Otwieramy plik konfiguracyjny procesu Synchronization Service (miiserver.exe): C:\Program Files\Microsoft Forefront Identity Manager\2010\Synchronization Service\Bin\miiserver.exe.config
      2. Odnajdujemy element startup:
        <startup useLegacyV2RuntimeActivationPolicy="true">
          <supportedRuntime version="v4.0"></supportedRuntime>
          <supportedRuntime version="v2.0.50727"></supportedRuntime>
        </startup>
        
      3. Zamieniamy kolejność wersji supportedRuntime:
        <startup useLegacyV2RuntimeActivationPolicy="true">
          <supportedRuntime version="v2.0.50727"></supportedRuntime>
          <supportedRuntime version="v4.0"></supportedRuntime>
        </startup>
        

Powyższa operacja rozwiązała problem z tym konkretnym agentem. Dodam tylko, że jego rozszerzenie było skompilowane w wersji .NET 3.5 i korzystało również z bibliotek w wersji .NET 2.0. Okazało się jednak, że w tym momencie taki sam błąd zaczął pojawiać się przy uruchamianiu innego agenta (typu ECMA), którego kod z powodu wykorzystywanych zależności był skompilowany w wersji .NET 4.5. Jak więc poradzić sobie w tej sytuacji skoro dwóch agentów wymaga dwóch różnych konfiguracji? Można jednego z nich uruchamiać w kontekście Synchronization Service, a drugiego w oddzielnym procesie z inną konfiguracją. Wystarczy dla drugiego agenta zaznaczyć opcję „Run this management agent in a separate process”, a następnie otworzyć plik konfiguracyjny dla agentów uruchamianych w oddzielnym procesie „C:\Program Files\Microsoft Forefront Identity Manager\2010\Synchronization Service\Bin\mmsscrpt.exe.config” i w nim ustawić supportedRuntime w następujący sposób:

<startup useLegacyV2RuntimeActivationPolicy="true">
  <supportedRuntime version="v4.0"></supportedRuntime>
  <supportedRuntime version="v2.0.50727"></supportedRuntime>
</startup>

Kwietniowe wydarzenia

2015-03-22

W drugiej połowie kwietnia odbędzie się kilka interesujących konferencji:

Do zobaczenia!

Azure Service Bus brokered messaging – dodatkowe funkcje

2015-03-12

W ostatnim temacie cyklu poświęconego usłudze Azure Service Bus chciałbym przedstawić bardziej rozbudowane scenariusze pracy z wiadomościami takie jak: obsługa pod-kolejek DeadLetter, opóźnione przetwarzanie, wykrywanie zdublowanych wiadomości, harmonogram dostarczania wiadomości, transakcje przy wysyłaniu i odbieraniu wielu wiadomości, sesje oraz praca w modelu request-response.

 

Pod-kolejka DeadLetter

Wiadomości umieszczane są w specjalnej pod-kolejce DeadLetter w następujących przypadkach:

  • Wiadomość wygaśnie i jednocześnie dla kolejki lub subskrypcji atrybut EnableDeadLetteringOnMessageExpiration ustawiony jest na true.
  • Przekroczona zostanie maksymalna liczba prób dostarczenia wiadomości, którą określa się dla kolejki lub subskrypcji poprzez atrybut MaxDeliveryCount (domyślnie 10). Każdorazowe wywołanie metody Abandon na wiadomości powoduje zwiększenie jej licznika prób dostarczenia.
  • Wystąpi wyjątek podczas sprawdzania filtra subskrypcji dla danej wiadomości i jednocześnie dla subskrypcji atrybut EnableDeadLetteringOnFilterEvaluationExceptions ustawiony jest na true.
  • Na wiadomości wywołana zostanie metoda DeadLetter.

Ustawienie domyślnego czasu wygasania wiadomości oraz przenoszenia ich do pod-kolejki DeadLetter (ustawień nie można zmienić po utworzeniu kolejki):

QueueDescription queue = new QueueDescription("queueName")
{
    DefaultMessageTimeToLive = TimeSpan.FromSeconds(60),
    EnableDeadLetteringOnMessageExpiration = true
};

Ustawienie czasu wygaśnięcia wiadomości przed jej wysłaniem:

BrokeredMessage message = new BrokeredMessage();
message.TimeToLive = TimeSpan.FromMinutes(30);

Pobieranie wiadomości z pod-kolejki DeadLetter:

//queueName/$DeadLetterQueue
string queueDeadLetterPath = QueueClient.FormatDeadLetterPath("queueName");
QueueClient deadLetterQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", queueDeadLetterPath, ReceiveMode.ReceiveAndDelete);
BrokeredMessage message = deadLetterQueueClient.Receive();

 

Opóźnione przetwarzanie – Defer

Klasa BrokeredMessage posiada metodę Defer, która pozwala na odłożenie przetwarzania wiadomości pobranej z kolejki lub subskrypcji. Po wywołaniu metody Defer wiadomość pozostaje w kolejce, ale można się do niej ponownie odwołać jedynie poprzez jej numer. Oznacza to konieczność zapamiętania numeru wiadomości przed wywołaniem metody Defer, w przeciwnym wypadku stracimy możliwość jej odczytu i wiadomość pozostanie w kolejce do czasu wygaśnięcia. Do odczytu wiadomości można wykorzystać jej numer jedynie w przypadku wcześniejszego użycia metody Defer.

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName", ReceiveMode.PeekLock);

//Defer
BrokeredMessage message = queueClient.Receive();
long messageNumber = message.SequenceNumber;
message.Defer();

//Receive
BrokeredMessage deferredMessage = queueClient.Receive(messageNumber);
deferredMessage.Complete();

 

Wykrywanie zdublowanych wiadomości

Wykrywanie duplikatów w wysłanych wiadomościach opiera się na atrybucie MessageId wiadomości oraz ustawionym oknie czasowym w kolejce lub topiku. Jeżeli kolejka lub topik jest skonfigurowany pod kątem wykrywania duplikatów, klient może wielokrotnie ponawiać wysłanie tej samej wiadomości, a usługa zignoruje wszystkie duplikaty. Przy włączonym wykrywaniu duplikatów (atrybut RequiresDuplicateDetection) atrybut DuplicateDetectionHistoryTimeWindow określa, przez jaki czas wartości atrybutu MessageId wiadomości będą przechowywane w celu wykrywania zdublowanych obiektów.

QueueDescription queueDescription = new QueueDescription("queueName")
{
    RequiresDuplicateDetection = true,
    DuplicateDetectionHistoryTimeWindow = new TimeSpan(1, 0, 0)
};

 

Harmonogram dostarczania wiadomości

Poprzez atrybut ScheduledEnqueueTimeUtc klasy BrokeredMessage mamy możliwość określenia, kiedy dana wiadomość wysłana do kolejki lub topiku będzie dostępna do pobrania. Wartość wspomnianego atrybutu musi być przekazana jako czas UTC.

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName");

BrokeredMessage message = new BrokeredMessage();
message.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddDays(7);
queueClient.Send(message);

 

Transakcje

Podczas wysyłania wielu wiadomości do pojedynczej kolejki lub topiku cały proces możemy objąć transakcją, co gwarantuje nam dostarczenie wszystkich wiadomości lub żadnej z nich w przypadku pojawienia się błędu. Z kolei zastosowanie transakcji podczas pobierania wielu wiadomości z danej kolejki lub subskrypcji pozwala na anulowanie całego przetwarzania w przypadku wystąpienia błędu przy jednej z wiadomości. Transakcja może obejmować wiadomości pobierane z różnych subskrypcji o ile należą one do tego samego topiku.

Wysyłanie wiadomości w transakcji:

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName");

using (TransactionScope scope = new TransactionScope())
{
    BrokeredMessage message1 = new BrokeredMessage("Message 1");
    queueClient.Send(message1);

    BrokeredMessage message2 = new BrokeredMessage("Message 2");
    queueClient.Send(message2);

    scope.Complete();
}

Pobieranie wiadomości w transakcji:

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName", ReceiveMode.PeekLock);

using (TransactionScope scope = new TransactionScope())
{
    BrokeredMessage message1 = queueClient.Receive();
    string messageBody1 = message1.GetBody<string>();
    message1.Complete();

    BrokeredMessage message2 = queueClient.Receive();
    string messageBody2 = message2.GetBody<string>();
    message2.Complete();

    scope.Complete();
}

 

Sesje

Usługa Azure Service Bus udostępnia mechanizm sesji pozwalający na grupowanie wysyłanych wiadomości. Sesje są idealnym rozwiązaniem w przypadku konieczności przetworzenia wielu wzajemnie ze sobą powiązanych wiadomości. Przykładem takiego scenariusza jest podzielenie jednej dużej wiadomości na kilka mniejszych (ze względu na dopuszczalny rozmiar) i wysłanie ich w ramach pojedynczej sesji. Załóżmy, że chcemy wysłać obiekt zamówienia składający się z nagłówka oraz wielu pozycji. Możemy wówczas podzielić go na wiele wiadomości, gdzie pierwsza z nich będzie zawierała nagłówek, a kolejne poszczególne pozycje. Tak przygotowany zestaw wiadomości możemy wysłać w ramach pojedynczej sesji, podając jako jej identyfikator np. numer zamówienia. Dzięki temu podczas pobierania wiadomości będziemy w stanie powtórnie zgrupować je w ramach jednego zamówienia.

W celu włączenia obsługi sesji, kolejka lub subskrypcja musi być utworzona z atrybutem RequiresSession ustawionym na true. Wówczas wszystkie wiadomości wysyłane do kolejki lub topiku zawierającego takie subskrypcje muszą mieć ustawioną wartość we właściwości SessionId (dowolna wartość typu string będąca identyfikatorem sesji). Pobieranie wiadomości z tak skonfigurowanej kolejki lub subskrypcji odbywa się poprzez obiekt MessageSession zamiast standardowych QueueClient lub SubscriptionClient. Wywołanie metody AcceptMessageSession na obiekcie klienta kolejki lub subskrypcji spowoduje oczekiwanie przez podany w parametrze czas (lub domyślny minutę) na otrzymanie wiadomości. Jeżeli to nastąpi, zwracany jest obiekt MessageSession zawierający właściwość SessionId i umożliwiający pobranie wszystkich wiadomości w ramach tej sesji. Istnieje również możliwość przekazania do metody AcceptMessageSession identyfikatora sesji, co spowoduje oczekiwanie na wiadomość jedynie z tej konkretnej sesji. Jeżeli przez określony czas nie zostanie odebrana żadna wiadomość, zgłoszony zostaje wyjątek TimeoutException.

Utworzenie kolejki z obsługą sesji:

QueueDescription orderQueueDescription = new QueueDescription("queueName")
{
    RequiresSession = true
};

Wysłanie kilku wiadomości w ramach jednej sesji:

Order order = new Order();
order.Id = "1/2015";
order.Details = new List<string> { "Item1", "Item2", "Item3" };

QueueClient queueClient =
  QueueClient.CreateFromConnectionString("connectionString", "queueName");

foreach (string orderItem in order.Details)
{
    BrokeredMessage message = new BrokeredMessage(orderItem);
    message.SessionId = order.Id;
    queueClient.Send(message);
}

Pobranie wszystkich wiadomości z jednej sesji:

MessageSession orderSession = null;
try
{
    orderSession = queueClient.AcceptMessageSession();
}
catch (TimeoutException ex)
{
}            

if (orderSession != null)
{
    string orderId = orderSession.SessionId;

    while (true)
    {
        BrokeredMessage orderItemMessage = orderSession.Receive();
        if (orderItemMessage != null)
        {
            string orderItem = orderItemMessage.GetBody<string>();
            orderItemMessage.Complete();
        }
        else
            break;
    }

    orderSession.Close();
}

 

Model Request-Response

Tryb brokered messaging w Azure Service Bus polega na komunikacji asynchronicznej gdzie nadawca i odbiorca pracują niezależnie, a wiadomości są kolejkowane na serwerach Azure. Mimo to, w trybie tym możliwa jest realizacja modelu Request-Response. Do tego celu wykorzystywane są dwie kolejki (żądań i odpowiedzi) oraz sesje. Poniżej znajduje się przykładowy scenariusz, w którym wysyłane jest zapytanie o szczegóły produktu, a następnie pobierana jest odpowiedź dla tego konkretnego żądania.

Klient wysyła do kolejki żądań wiadomość z zapytaniem o szczegóły produktu, podając we właściwości Properties jego identyfikator oraz ustawiając właściwość ReplyToSessionId na własny identyfikator sesji, w ramach której będzie oczekiwał na odpowiedź:

QueueClient requestQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", "ProductRequestQueue");

string sessionId = Guid.NewGuid().ToString();

BrokeredMessage requestMessage = new BrokeredMessage();
requestMessage.Properties.Add("productId", "1");
requestMessage.ReplyToSessionId = sessionId;
requestQueueClient.Send(requestMessage);

Serwer pobiera wiadomość z kolejki żądań i zapamiętuje nadany identyfikator sesji:

QueueClient requestQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", "ProductRequestQueue", ReceiveMode.PeekLock);

BrokeredMessage requestMessage = requestQueueClient.Receive();
string productId = requestMessage.Properties["productId"] as string;
string sessionId = requestMessage.ReplyToSessionId;
requestMessage.Complete();

Serwer przygotowuje wiadomość z informacjami o produkcie (we właściwości Label zapisywane jest to czy produkt został znaleziony) i wysyła ją z ustawionym identyfikatorem sesji do kolejki odpowiedzi:

QueueClient responseQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", "ProductResponseQueue");

string productInfo = "ProductInfo";
BrokeredMessage responseMessage = new BrokeredMessage(productInfo);
responseMessage.SessionId = sessionId;
responseMessage.Label = "Found";
responseQueueClient.Send(responseMessage);

Klient pobiera wiadomość z kolejki odpowiedzi odwołując się do wcześniej nadanego identyfikatora sesji, dzięki czemu mamy gwarancję otrzymania właściwej wiadomości:

QueueClient responseQueueClient =
  QueueClient.CreateFromConnectionString("connectionString", "ProductResponseQueue", ReceiveMode.PeekLock);

MessageSession session = responseQueueClient.AcceptMessageSession(sessionId);
BrokeredMessage responseMessage = session.Receive();
if (responseMessage.Label == "Found")
{
    string productInfo = responseMessage.GetBody<string>();
}
responseMessage.Complete();
session.Close();

 

Service Bus Explorer

Na zakończenie chciałbym wspomnieć o bardzo wygodnym narzędziu pozwalającym zarządzać usługą Azure Service Bus z poziomu aplikacji desktopowej: Service Bus Explorer

 

Linki

Azure Service Bus brokered messaging – Topics, Subscriptions

2014-11-18

Po omówieniu kolejek, dziś opiszę pracę z topikami i subskrypcjami w usłudze Microsoft Azure Service Bus. Tak jak poprzednio, w prezentowanych przykładach wiadomość przesyłana pomiędzy nadawcą i odbiorcą będzie zawierała obiekt zamówienia:

public class Order
{
  public string Id { get; set; }
  public DateTime Date { get; set; }
  public string Customer { get; set; }
  public List<string> Details { get; set; }
}

W kodzie będę się odwoływał do dwóch zmiennych zawierających connection string dla przestrzeni nazw usługi oraz nazwę topiku:

public readonly string connectionString = String.Format(
  "Endpoint={0};SharedAccessKeyName={1};SharedAccessKey={2}",
  "sb://tempnamespace.servicebus.windows.net/",
  "RootManageSharedAccessKey",
  "zFUqkwWoDrem9O8UkdG0pq0sheJze0U/P93f42Aykdc=");

public readonly string topicName = "orderstopic";

 

Zarządzanie topikiem

Zarządzanie topikami odbywa się w analogiczny sposób jak w przypadku kolejek. Tu także korzystamy z obiektu NamespaceManager używając jedynie innych metod, jednak co do zasady są to dokładnie te same mechanizmy. Poniżej znajdują się przykłady tworzenia, modyfikacji oraz usunięcia topiku.

Proste tworzenie topiku:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.TopicExists(topicName))
  namespaceManager.CreateTopic(topicName);

Tworzenie topiku za pomocą obiektu TopicDescription:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.TopicExists(topicName))
{
  TopicDescription topic = new TopicDescription(topicName)
  {
    MaxSizeInMegabytes = 1024,
    RequiresDuplicateDetection = true
  };
  namespaceManager.CreateTopic(topic);
}

Odczyt właściwości oraz modyfikacja topiku:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

TopicDescription topic = namespaceManager.GetTopic(topicName);

long subscriptionCount = topic.SubscriptionCount;
long sizeInBytes = topic.SizeInBytes;

topic.MaxSizeInMegabytes = 2048;
namespaceManager.UpdateTopic(topic);

Usunięcie topiku:

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

namespaceManager.DeleteTopic(topicName);

 

Zarządzanie subskrypcjami

Po wysłaniu wiadomości do topiku, w celu jej odczytu odbiorca nie odwołuje się bezpośrednio do topiku tylko korzysta z wybranej subskrypcji. To jakie wiadomości z topiku trafią do danej subskrypcji określane jest poprzez filtry operujące na atrybutach wiadomości. Podstawową różnicą w stosunku do kolejek jest to, iż w przypadku topiku tą samą wiadomość może otrzymać kilku odbiorców. Jeżeli dana wiadomość przesłana do topiku spełni warunki filtrów więcej niż jednej subskrypcji to jej kopia zostanie przekazana do każdej z nich. Należy jednak pamiętać, że filtry subskrypcji sprawdzane są tylko w momencie pojawienia się wiadomości w topiku. Jeżeli utworzymy nową subskrypcję to mogą do niej trafić jedynie wiadomości wysłane od tego momentu, wszystkie już istniejące w topiku mimo spełnionych warunków filtra nie zostaną do niej przekazane. Mamy trzy możliwości definiowania subskrypcji: bez filtra (będą do niej trafiać wszystkie wiadomości z topiku), z filtrem typu CorrelationFilter (filtr odnoszący się do wartości atrybutu CorrelationId wiadomości) lub z filtrem typu SqlFilter (filtr odnoszący się do poszczególnych elementów atrybutu Properties wiadomości).

Subskrypcję bez filtra tworzymy podając jedynie jej nazwę oraz nazwę topiku, z którym będzie powiązana:

string subscriptionName = "ordersall";

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
  namespaceManager.CreateSubscription(topicName, subscriptionName);

W przypadku tworzenia subskrypcji z filtrem typu CorrelationFilter, oprócz nazwy topiku musimy przekazać definicję filtra. Tworząc filtr podajemy wartość atrybutu CorrelationId wiadomości, które mają trafiać do tej subskrypcji:

string subscriptionName = "ordersdepartmenta";

CorrelationFilter filter = new CorrelationFilter("Department A");

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
  namespaceManager.CreateSubscription(topicName, subscriptionName, filter);

Przy użyciu powyżej zdefiniowanej subskrypcji otrzymamy wszystkie wiadomości wysłane do topiku, które w atrybucie CorrelationId będą miały wartość „Department A”.

Jeżeli chcemy mieć większe możliwości filtrowania możemy utworzyć subskrypcję z filtrem typu SqlFilter. W tym przypadku tworząc filtr podajemy wyrażenie logiczne operujące na elementach atrybutu Properties wiadomości (jest to atrybut typu IDictionary<string, object>). Budując wyrażenie mamy do dyspozycji następujące operatory: =, >, <, !=, <>, and, or, (, ). Daje to bardzo duże możliwości tworzenia warunków filtrowania. Przykładowy filtr może wyglądać tak: „(Region != ‚PL’ or ItemsCount > 10) and (Sender = ‚Jan Kowalski’)”, gdzie Region, ItemsCount i Sender to klucze we właściwości Properties wiadomości.

Oto w jaki sposób utworzyć subskrypcję z filtrem SqlFilter:

string subscriptionName = "orderswholesalepl";

SqlFilter filter = new SqlFilter("Region = 'PL' and ItemsCount > 10");

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
  namespaceManager.CreateSubscription(topicName, subscriptionName, filter);

Do powyższej subskrypcji trafią wszystkie wiadomości z topiku, które we właściwości Properties będą posiadały klucz Region z wartością „PL” oraz klucz ItemsCount z wartością większą od 10.

Do utworzenia subskrypcji możemy także użyć obiektu SubscriptionDescription, pozwalającego na dodatkową konfigurację:

string subscriptionName = "orderswholesalepl";

SqlFilter filter = new SqlFilter("Region = 'PL' and ItemsCount > 10");

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
{
  SubscriptionDescription subscription =
    new SubscriptionDescription(topicName, subscriptionName)
  {
    RequiresSession = false,
    LockDuration = TimeSpan.FromMinutes(1)
  };
  namespaceManager.CreateSubscription(subscription, filter);
}

Z obiektu SubscriptionDescription możemy również skorzystać w celu odczytu informacji o subskrypcji oraz jej modyfikacji:

string subscriptionName = "ordersall";

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

SubscriptionDescription subscription =
  namespaceManager.GetSubscription(topicName, subscriptionName);

long messageCount = subscription.MessageCount;

subscription.MaxDeliveryCount = 5;
namespaceManager.UpdateSubscription(subscription);

Usunięcie subskrypcji sprowadza się do wywołania metody DeleteSubscription na obiekcie NamespaceManager:

string subscriptionName = "ordersall";

NamespaceManager namespaceManager =
  NamespaceManager.CreateFromConnectionString(connectionString);

namespaceManager.DeleteSubscription(topicName, subscriptionName);

 

Wysyłanie wiadomości do topiku

Wysłanie wiadomości do topiku wygląda praktycznie tak samo jak w przypadku kolejki. Mamy dwie możliwości, albo użyjemy dedykowanego obiektu TopicClient albo uniwersalnego MessageSender znanego z kolejek (tym razem tworzonego poprzez podanie nazwy topiku):

TopicClient topicClient =
  TopicClient.CreateFromConnectionString(connectionString, topicName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

TopicClient topicClient = messagingFactory.CreateTopicClient(topicName);

lub:

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageSender messageSender =
  messagingFactory.CreateMessageSender(topicName);

Samo przygotowanie wiadomości nie różni się niczym od tego, co opisywałem przy kolejkach. Tworzymy obiekt klasy BrokeredMessage i przekazujemy go do metody Send klienta topiku:

Order order = new Order()
{
  Id = "1",
  Date = DateTime.UtcNow,
  Customer = "Company ABC",
  Details = new List<string> { "Product1", "Product2", "Product3" }
};

BrokeredMessage message = new BrokeredMessage(order)
{
  MessageId = Guid.NewGuid().ToString(),
  Label = "Application X",
  CorrelationId = "Department A",
  TimeToLive = TimeSpan.FromHours(12)
};
message.Properties.Add("Sender", "Jan Kowalski");
message.Properties.Add("Region", "PL");
message.Properties.Add("ItemsCount", order.Details.Count);

messageSender.Send(message);

messageSender.Close();

 

Odbieranie wiadomości przy użyciu subskrypcji

W celu odczytu wiadomości wysłanych do topiku odbiorca nie odwołuje się bezpośrednio do topiku tylko korzysta z określonej subskrypcji, która z jego punktu widzenia posiada funkcjonalność kolejki. Przy użyciu wybranej subskrypcji będziemy w stanie odczytać jedynie te wiadomości, które spełniły warunki jej filtra. Podobnie jak w przypadku kolejek odczyt wiadomości może być realizowany w trybie ReceiveAndDelete lub PeekLock. Po więcej szczegółów odsyłam do poprzedniego tematu. Aby odebrać wiadomość musimy utworzyć obiekt klienta subskrypcji. Możemy skorzystać z klasy SubscriptionClient lub MessageReceiver. W przypadku tego drugiego w celu wskazania konkretnej subskrypcji podajemy ścieżkę w formacie „topicName/subscriptions/subscriptionName„:

string subscriptionName = "ordersall";

SubscriptionClient subscriptionClient =
  SubscriptionClient.CreateFromConnectionString(
    connectionString, topicName, subscriptionName, ReceiveMode.PeekLock);

lub:

string subscriptionName = "ordersall";

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

SubscriptionClient subscriptionClient =
  messagingFactory.CreateSubscriptionClient(
    topicName, subscriptionName, ReceiveMode.PeekLock);

lub:

string subscriptionName = "ordersall";

MessagingFactory messagingFactory =
  MessagingFactory.CreateFromConnectionString(connectionString);

MessageReceiver messageReceiver =
  messagingFactory.CreateMessageReceiver(
    String.Format("{0}/subscriptions/{1}", topicName, subscriptionName),
    ReceiveMode.PeekLock);

Pobieranie wiadomości nie różni się niczym w stosunku do tego, co opisywałem przy kolejkach. Tu także mamy do dyspozycji metody Peek, Receive oraz ReceiveBatch, a odebrana wiadomość to znany już obiekt klasy BrokeredMessage:

BrokeredMessage message =
  messageReceiver.Receive(TimeSpan.FromSeconds(5));

if (message != null)
{
  try
  {
    long messageNumber = message.SequenceNumber;
    string messageId = message.MessageId;
    string label = message.Label;
    string correlationId = message.CorrelationId;
    string orderSender = message.Properties["Sender"].ToString();
    string orderRegion = message.Properties["Region"].ToString();

    Order order = message.GetBody<Order>();

    if (orderRegion == "PL")
    {
      //Processing Order...

      message.Complete();
    }
    else
      message.DeadLetter();
  }
  catch (Exception ex)
  {
    message.Abandon();
  }
}
messageReceiver.Close();

W tym i poprzednim wpisie pokazałem podstawowe operacje związane z wysyłaniem i odbieraniem wiadomości przy wykorzystaniu kolejek, topików i subskrypcji. W następnym temacie przedstawię bardziej zaawansowane scenariusze pracy z wiadomościami.