Škálování asynchronních vazeb klient-Server Links with Reactive
On 25 ledna, 2022 by admin- 31.01.2019
- 17 minut čtení
Červen 2016
Volume 31 Number 6
By Peter Vogel | June 2016
Jak se asynchronní zpracování stává ve vývoji aplikací běžnější, Microsoft .NET Framework získal širokou škálu nástrojů, které podporují specifické asynchronní návrhové vzory. Vytvoření dobře navržené asynchronní aplikace často spočívá v rozpoznání návrhového vzoru, který vaše aplikace implementuje, a následném výběru správné sady komponent .NET.
V některých případech shoda vyžaduje integraci několika komponent .NET. Článek Stephena Clearyho „Vzory pro asynchronní aplikace MVVM: Příkazy“ (bit.ly/233Kocr) ukazuje, jak plně podporovat vzor Model-View-ViewModel (MVVM) asynchronním způsobem. V ostatních případech podpora vyžaduje pouze jednu komponentu z prostředí .NET Framework. Implementací vzoru provider/consumers pomocí BlockingCollection jsem se zabýval ve svých sloupcích VisualStudioMagazine.com Practical .NET, „Create Simple, Reliable Asynchronous Apps with BlockingCollection“ (bit.ly/1TuOpE6) a „Create Sophisticated Asynchronous Applications with BlockingCollection“ (bit.ly/1SpYyD4).
Dalším příkladem je implementace návrhového vzoru observer pro asynchronní sledování dlouhotrvající operace. V tomto scénáři nefunguje asynchronní metoda, která vrací jediný objekt Task, protože klient často vrací proud výsledků. Pro tyto scénáře můžete využít přinejmenším dva nástroje z prostředí .NET Framework: ObservableCollection a Reactive Extensions (Rx). Pro jednoduchá řešení stačí ObservableCollection (spolu s klíčovými slovy async a await). Pro „zajímavější“ a zejména událostmi řízené problémy vám však Rx poskytne lepší kontrolu nad procesem.
Definování vzoru
Přestože se vzor pozorovatele často používá ve vzorech návrhu uživatelského rozhraní – včetně Model-View-Controller (MVC), Model-View-Presenter (MVP) a MVVM-UI – je třeba považovat pouze za jeden ze scénářů z větší množiny scénářů, kde se vzor pozorovatele uplatní. Definice vzoru pozorovatele (citace z Wikipedie) zní: „Objekt, nazývaný subjekt, udržuje seznam svých závislých objektů, nazývaných pozorovatelé, a automaticky je informuje o všech změnách stavu, obvykle voláním jedné z jejich metod.“
Vzor pozorovatele je vlastně o tom, jak dostat výsledky z dlouhodobě probíhajících procesů ke klientovi, jakmile jsou tyto výsledky k dispozici. Bez některé verze vzoru pozorovatele musí klienti čekat, až bude k dispozici poslední výsledek, a pak si nechat poslat všechny výsledky v jednom kuse. Ve stále více asynchronním světě chcete, aby pozorovatelé zpracovávali výsledky paralelně s klientem, jakmile budou výsledky k dispozici. Abych zdůraznil, že při využití vzoru pozorovatele nemluvíte jen o uživatelském rozhraní, budu ve zbytku článku používat „klient“ a „server“ místo „pozorovatel“ a „subjekt“.
Problémy a příležitosti
Ve vzoru pozorovatele existují nejméně tři problémy a dvě příležitosti. Prvním problémem je problém lapsed-listener: Mnoho implementací vzoru observer vyžaduje, aby server držel odkaz na všechny své klienty. V důsledku toho mohou být klienti drženi v paměti serveru až do jeho ukončení. To samozřejmě není optimální řešení pro dlouhodobě běžící proces v dynamickém systému, kde se klienti často připojují a odpojují.
Problém lapsed-listener je však pouze příznakem druhého, většího problému: Mnoho implementací vzoru pozorovatele vyžaduje, aby server a klient byli úzce propojeni, což vyžaduje, aby server i klient byli neustále přítomni. Klient by měl být přinejmenším schopen zjistit, zda je server přítomen, a rozhodnout se, že se nepřipojí; kromě toho by měl být server schopen fungovat, i když nejsou žádní klienti přijímající výsledky.
Třetí problém souvisí s výkonem: Jak dlouho bude serveru trvat, než upozorní všechny klienty? Výkonnost ve vzoru pozorovatele je přímo ovlivněna počtem klientů, kteří mají být notifikováni. Proto existuje možnost zlepšit výkon ve vzoru pozorovatele tím, že klientovi umožníme preventivně filtrovat výsledky, které se vrátí ze serveru. To také řeší scénáře, kdy server produkuje více výsledků (nebo širší škálu výsledků), než o jaké má klient zájem: Klient může určit, že má být informován pouze v určitých případech. Druhá výkonnostní příležitost existuje v souvislosti s rozpoznáním, kdy server nemá žádné výsledky nebo kdy již skončil s produkcí výsledků. Klienti mohou přeskočit získávání prostředků potřebných ke zpracování událostí serveru, dokud klient nemá jistotu, že je co zpracovávat, a klienti mohou tyto prostředky uvolnit, jakmile vědí, že zpracovali poslední výsledek.
Od pozorovatele k modelu publish/subscribe
Zohlednění těchto úvah vede od jednoduchých implementací vzoru pozorovatel k souvisejícímu modelu publish/subscribe. Publish/subscribe implementuje vzor observer volně spřaženým způsobem, který umožňuje serverům a klientům vykonávat činnosti, i když je druhý z nich právě nedostupný. Publish/subscribe také obvykle implementuje filtrování na straně klienta tím, že umožňuje klientovi přihlásit se k odběru buď konkrétních témat/kanálů („Upozornit mě na objednávky“), nebo atributů spojených s různými druhy obsahu („Upozornit mě na všechny naléhavé požadavky“).
Jeden problém však zůstává. Všechny implementace vzoru pozorovatele mají tendenci pevně svázat klienty a servery s určitým formátem zpráv. Změna formátu zprávy ve většině implementací publish/subscribe může být obtížná, protože všichni klienti musí být aktualizováni, aby používali nový formát.
V mnoha ohledech se to podobá popisu kurzoru na straně serveru v databázi. Aby se minimalizovaly náklady na přenos, nevrací databázový server výsledky při načítání každého řádku. U velkých sad řádků však databáze také nevrací na konci všechny řádky v jedné dávce. Místo toho databázový server obvykle vrací podmnožiny z kurzoru drženého na serveru často, jakmile jsou tyto podmnožiny k dispozici. U databáze nemusí být klient a server přítomni současně: Klient může zkontrolovat, zda je server dostupný, a pokud ne, rozhodnout se, co (pokud vůbec něco) může dělat. Proces filtrování (SQL) je také velmi flexibilní. Pokud však databázový stroj změní formát, který používá pro vracení řádků, musí být všichni klienti přinejmenším překompilováni.
Zpracování mezipaměti objektů
Jako případovou studii při pohledu na implementaci jednoduchého vzoru pozorovatele používám jako server třídu, která prohledává mezipaměť faktur v paměti. Tento server by mohl na konci svého zpracování vrátit kolekci všech faktur. Raději bych však, aby klient zpracovával faktury jednotlivě a paralelně s procesem vyhledávání na serveru. To znamená, že dávám přednost verzi procesu, která vrací každou fakturu tak, jak je nalezena, a nechává klienta zpracovávat každou fakturu paralelně s vyhledáváním další faktury.
Jednoduchá implementace serveru by mohla vypadat takto:
private List<Invoice> foundInvoices = new List<Invoice>();public List<Invoice> FindInvoices(decimal Amount){ foundInvoices.Clear(); Invoice inv; // ...search logic to add invoices to the collection foundInvoices.Add(inv); // ...repeat until all invoices found return foundInvoices;}
Složitější řešení by mohla používat výnos return pro vrácení každé faktury tak, jak je nalezena, místo sestavování seznamu. Bez ohledu na to bude klient, který volá metodu FindInvoices, chtít provést některé kritické činnosti před a po zpracování. Například jakmile je nalezena první položka, klient může chtít zapnout seznam MatchingInvoices, který bude u klienta uchovávat faktury, nebo získat/inicializovat všechny prostředky potřebné ke zpracování faktury. Při přidávání dalších faktur by klient musel zpracovat každou fakturu a po signálu serveru, že je načtena poslední faktura, uvolnit všechny prostředky, které již nejsou potřeba, protože „už nejsou žádné“ faktury ke zpracování.
Při načítání databáze se například čtení zablokuje, dokud se nevrátí první řádek. Jakmile je vrácen první řádek, klient inicializuje všechny prostředky potřebné ke zpracování řádku. Čtení také vrátí false, když je načten poslední řádek, čímž umožní klientovi uvolnit tyto prostředky, protože již nejsou žádné další řádky ke zpracování.
Vytváření jednoduchých řešení pomocí ObservableCollection
Nejzřejmější volbou pro implementaci vzoru pozorovatele v prostředí .NET Framework je ObservableCollection. ObservableCollection upozorní klienta (prostřednictvím události), kdykoli dojde ke změně.
Přepsání mého ukázkového serveru tak, aby používal třídu ObservableCollection, vyžaduje pouze dvě změny. Za prvé, kolekce uchovávající výsledky musí být definována jako ObservableCollection a musí být veřejná. Za druhé již není nutné, aby metoda vracela výsledek:
Nová implementace serveru může vypadat takto:
public List<Invoice> FindInvoices(decimal Amount){ public ObservableCollection<Invoice> foundInvoices = new ObservableCollection<Invoice>(); public void FindInvoices(decimal Amount) { foundInvoices.Clear(); Invoice inv; // ...search logic to set inv foundInvoices.Add(inv); // ...repeat until all invoices are added to the collection }
Klient, který používá tuto verzi serveru, musí pouze připojit obsluhu události CollectionChanged kolekce InvoiceManagement’s foundInvoices. V následujícím kódu jsem nechal třídu implementovat rozhraní IDisposable, aby podporovala odpojení od události:
public class SearchInvoices: IDisposable{ InvoiceManagement invMgmt = new InvoiceManagement(); public void SearchInvoices() { invMgmt.foundInvoices.CollectionChanged += InvoicesFound; } public void Dispose() { invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged; }
V klientovi je události CollectionChanged předán jako druhý parametr objekt NotifyCollectionChangedEventArgs. Vlastnost Action tohoto objektu určuje jak to, jaká změna byla na kolekci provedena (akce jsou: kolekce byla vymazána, do kolekce byly přidány nové položky, existující položky byly přesunuty/nahrazeny/odstraněny), tak informace o změněných položkách (kolekce všech přidaných položek, kolekce položek přítomných v kolekci před přidáním nových položek, pozice položky, která byla přesunuta/odstraněna/nahrazena).
Jednoduchý kód v klientovi, který by asynchronně zpracovával každou fakturu při jejím přidání do kolekce na serveru, by vypadal jako kód na obrázku 1.
Obrázek 1 Asynchronní zpracování faktur pomocí ObservableCollection
private async void InvoicesFound(object sender, NotifyCollectionChangedEventArgs e){ switch (e.Action) { case NotifyCollectionChangedAction.Reset: { // ...initial item processing return; } case NotifyCollectionChangedAction.Add: { foreach (Invoice inv in e.NewItems) { await HandleInvoiceAsync(inv); } return; } }}
Ačkoli je tento kód jednoduchý, může být pro vaše potřeby nedostatečný, zejména pokud zpracováváte dlouhodobý proces nebo pracujete v dynamickém prostředí. Z hlediska asynchronního návrhu by kód mohl například zachytit objekt Task vrácený funkcí HandleInvoiceAsync, aby klient mohl spravovat asynchronní úlohy. Budete také chtít zajistit, aby událost CollectionChanged byla vyvolána ve vlákně uživatelského rozhraní, i když FindInvoices běží ve vlákně na pozadí.
Vzhledem k tomu, kde je ve třídě serveru volána metoda Clear (těsně před vyhledáním první faktury), lze hodnotu vlastnosti Action Reset použít jako signál, že se chystá načtení první položky. Při hledání však samozřejmě nemusí být nalezeny žádné faktury, takže použití Akce Reset může vést k tomu, že klient alokuje prostředky, které ve skutečnosti nikdy nepoužije. Chcete-li skutečně zpracovat „první položku“, museli byste do zpracování akce Přidat přidat příznak, aby se provedla pouze tehdy, když byla nalezena první položka.
Server má navíc omezený počet možností, jak dát najevo, že byla nalezena poslední faktura, aby klient mohl přestat čekat na „další“. Server by pravděpodobně mohl po nalezení poslední položky kolekci vymazat, ale to by si jen vynutilo složitější zpracování do zpracování akce Reset (zpracovával jsem Faktury? Pokud ano, pak jsem zpracoval poslední fakturu; pokud ne, pak se chystám zpracovat první fakturu).
Zatímco pro jednoduché problémy bude ObservableCollection v pořádku, každá rozumně sofistikovaná implementace založená na ObservableCollection (a každá aplikace, která si cení efektivity) bude vyžadovat komplikovaný kód, zejména v klientovi.
Řešení Rx
Pokud chcete asynchronní zpracování, pak Rx (dostupné prostřednictvím NuGet) může poskytnout lepší řešení pro implementaci vzoru observer tím, že si vypůjčí model publish/subscribe. Toto řešení také poskytuje model filtrování založený na LINQ, lepší signalizaci podmínek pro první/poslední položku a lepší zpracování chyb.
Rx také dokáže zpracovat zajímavější implementace pozorovatelů, než je možné s ObservableCollection. V mé případové studii může můj server po vrácení počátečního seznamu faktur pokračovat v kontrole nových faktur, které jsou přidány do mezipaměti po dokončení původního vyhledávání (a které samozřejmě odpovídají kritériím vyhledávání). Když se objeví faktura splňující kritéria, klient bude chtít být o této události informován, aby mohl novou fakturu přidat do seznamu. Rx podporuje tyto druhy rozšíření vzoru pozorovatele na základě událostí lépe než ObservableCollection.
V Rx existují dvě klíčová rozhraní pro podporu vzoru pozorovatele. Prvním je IObservable<T>, které je implementováno serverem a specifikuje jedinou metodu: Subscribe. Serveru implementujícímu metodu Subscribe bude předána reference na objekt od klienta. Aby se vyřešil problém s prošlým posluchačem, vrací metoda Subscribe klientovi odkaz na objekt, který implementuje rozhraní IDisposable. Klient může tento objekt použít k odpojení od serveru. Když se klient odpojí, očekává se, že server odstraní klienta ze všech svých interních seznamů.
Druhým rozhraním je IObserver<T>, které musí být implementováno klientem. Toto rozhraní vyžaduje, aby klient implementoval a vystavil serveru tři metody: OnNext, OnCompleted a OnError. Kritickou metodou je zde OnNext, kterou server používá k předání zprávy klientovi (v mé případové studii by touto zprávou byly nové objekty Invoice, které budou vráceny, jakmile se každý z nich objeví). Server může použít klientovu metodu OnCompleted, aby signalizoval, že už nejsou žádná další data. Třetí metoda, OnError, poskytuje serveru způsob, jak signalizovat klientovi, že došlo k výjimce.
Rozhraní IObserver si samozřejmě můžete implementovat sami (je součástí rozhraní .NET Framework). Spolu s ObservableCollection to může být vše, co potřebujete, pokud vytváříte synchronní řešení (i o tom jsem napsal sloupek „Writing Cleaner Code with Reactive Extensions“ ).
Rx však obsahuje několik balíčků, které poskytují asynchronní implementace těchto rozhraní, včetně implementací pro JavaScript a RESTful služby. Třída Rx Subject poskytuje implementaci IObservable, která zjednodušuje implementaci asynchronní publish/subscribe verze vzoru observer.
Vytvoření asynchronního řešení
Vytvoření serveru pro práci s objektem Subject vyžaduje jen velmi málo změn v původním synchronním kódu na straně serveru. Starý objekt ObservableCollection nahradím objektem Subject, který bude předávat každou fakturu tak, jak se objeví všem naslouchajícím klientům. Objekt Subject deklaruji jako veřejný, aby k němu klienti měli přístup:
public class InvoiceManagement{ public IObservable<Invoice> foundInvoice = new Subject<Invoice>();
V těle metody místo přidávání faktury do kolekce použiji metodu OnNext objektu Subject k předání každé faktury klientovi, jakmile ji najde:
public void FindInvoices(decimal Amount){ inv = GetInvoicesForAmount(Amount) // Poll for invoices foundInvoice.OnNext(inv); // ...repeat...}
V klientovi nejprve deklaruji instanci třídy serveru. Poté v metodě označené jako asynchronní zavolám metodu Subscribe subjektu, čímž dám najevo, že chci začít načítat zprávy:
public class InvoiceManagementTests{ InvoiceManagement invMgmt = new InvoiceManagement(); public async void ProcessInvoices() { invMgmt.foundInvoice.Subscribe<Invoice>();
Chci-li filtrovat výsledky pouze na požadované faktury, mohu na objekt Subject použít příkaz LINQ. Tento příklad filtruje faktury na ty, které jsou zpětně objednané (pro použití rozšíření Rx LINQ je třeba přidat příkaz using pro jmenný prostor System.Reactive.Linq):
invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();
Poté, co jsem začal naslouchat subjektu, mohu určit, jaké zpracování chci provést při přijetí faktury. Mohu například použít FirstAsync pro zpracování pouze první faktury vrácené službou. V tomto příkladu používám příkaz await s voláním FirstAsync, abych mohl během zpracování faktury vrátit řízení do hlavní části své aplikace. Tento kód čeká na načtení této první faktury, pak přejde na jakýkoli kód, který použiji k inicializaci procesu zpracování faktury, a nakonec zpracuje fakturu:
Invoice inv;inv = await invMgmt.foundInvoice.FirstAsync();// ...setup code invoices...HandleInvoiceAsync(inv);
Jediné upozornění: FirstAsync se zablokuje, pokud server ještě nevydal žádné výsledky. Pokud se chcete blokování vyhnout, můžete použít FirstOrDefaultAsync, který vrátí nulu, pokud server nevytvořil žádné výsledky. Pokud nejsou žádné výsledky, klient se může rozhodnout, co případně udělá.
Typičtějším případem je, že klient chce zpracovat všechny vrácené faktury (po filtrování) a to asynchronně. V takovém případě můžete místo kombinace metod Subscribe a OnNext použít pouze metodu ForEachAsync. Můžete předat metodu nebo lambda výraz, který zpracovává příchozí výsledky. Pokud předáte metodu (která nemůže být asynchronní), jako to dělám zde, bude této metodě předána faktura, která spustila ForEachAsync:
invMgmt.foundInvoice.ForEachAsync(HandleInvoice);
Metodě ForEachAsync lze také předat token zrušení, aby klient signalizoval, že se odpojuje. Dobrou praxí by bylo předat token při volání některé z metod Rx *Async, aby se podpořilo nechat klienta ukončit zpracování, aniž by musel čekat na zpracování všech objektů.
ForeEachAsync nezpracuje žádný výsledek, který již byl zpracován metodou First (nebo FirstOrDefaultAsync), takže můžete použít FirstOrDefaultAsync s ForEachAsync, abyste před zpracováním dalších objektů zkontrolovali, zda má server co zpracovávat. Metoda IsEmpty subjektu však provede stejnou kontrolu jednodušeji. Pokud má klient alokovat nějaké prostředky potřebné pro zpracování výsledků, IsEmpty mu umožní před přidělením těchto prostředků zkontrolovat, zda má co dělat (alternativou by bylo přidělit tyto prostředky při prvním zpracovávaném prvku v cyklu). Použití IsEmpty s klientem, který před přidělením prostředků (a zahájením zpracování) kontroluje, zda jsou nějaké výsledky, a zároveň podporuje zrušení, by dalo kód, který by vypadal nějak jako na obrázku 2.
Obrázek 2 Kód podporující zrušení a odložení zpracování do doby, než budou výsledky připraveny
CancellationTokenSource cancelSource = new CancellationTokenSource();CancellationToken cancel;cancel = cancelSource.Token;if (!await invMgmt.foundInvoice.IsEmpty()){ // ...setup code for processing invoices... try { invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel); } catch (Exception ex) { if (ex.GetType() != typeof(CancellationToken)) { // ...report message } } // ...clean up code when all invoices are processed or client disconnects}
Zabalení
Pokud potřebujete pouze jednoduchou implementaci vzoru pozorovatele, pak vám ke zpracování proudu výsledků může stačit ObservableCollection. Pro lepší kontrolu a pro aplikaci založenou na událostech umožní třída Subject a rozšíření dodávaná s Rx vaší aplikaci pracovat v asynchronním režimu díky podpoře výkonné implementace modelu publish/subscribe (a to jsem se ještě nepodíval na bohatou knihovnu operátorů dodávaných s Rx). Pokud pracujete s Rx, vyplatí se stáhnout si Průvodce návrhem Rx (bit.ly/1VOPxGS), který pojednává o osvědčených postupech při konzumaci a produkci pozorovatelných proudů.
Rx také poskytuje určitou podporu pro převod typu zprávy předávané mezi klientem a serverem pomocí rozhraní ISubject<TSource, TResult>. Rozhraní ISubject<TSource, TResult> specifikuje dva datové typy: datový typ „in“ a datový typ „out“. V rámci třídy Subject, která implementuje toto rozhraní, lze provádět všechny operace potřebné k převodu výsledku vráceného ze serveru (datový typ „in“) na výsledek požadovaný klientem (datový typ „out“). Parametr „in“ je navíc kovariantní (přijme zadaný datový typ nebo cokoli, co tento datový typ dědí) a parametr „out“ je kontravariantní (přijme zadaný datový typ nebo cokoli, co je od něj odvozeno), což vám dává další flexibilitu.
Žijeme ve stále více asynchronním světě a v tomto světě bude vzor pozorovatele stále důležitější – je to užitečný nástroj pro jakékoli rozhraní mezi procesy, kde proces serveru vrací více než jeden výsledek. Naštěstí máte několik možností implementace vzoru observer v prostředí .NET Framework, včetně ObservableCollection a Rx.
Peter Vogel je systémový architekt a ředitel společnosti PH&V Information Services. PH&V poskytuje komplexní poradenství od návrhu UX přes objektové modelování až po návrh databáze.
Děkujeme následujícím technickým expertům společnosti Microsoft za posouzení tohoto článku: Stephen Cleary, James McCaffrey a Dave Sexton
Stephen Cleary pracuje s vícevláknovým a asynchronním programováním již 16 let a podporu asynchronizace v prostředí Microsoft .NET Framework používá již od prvního komunitního technologického náhledu. Je autorem knihy „Concurrency in C# Cookbook“ (O’Reilly Media, 2014). Jeho domovská stránka včetně blogu je na adrese stephencleary.com.
Diskutujte o tomto článku ve fóru časopisu MSDN
.
Napsat komentář