Scale Asynchronous Client-Server Links with Reactive
On ianuarie 25, 2022 by admin- 01/31/2019
- 17 minute de citit
Iunie 2016
Volumul 31 Numărul 6
De Peter Vogel | Iunie 2016
Ca urmare a faptului că procesarea asincronă a devenit tot mai frecventă în dezvoltarea aplicațiilor, Microsoft .NET Framework a dobândit o mare varietate de instrumente care acceptă modele de proiectare asincrone specifice. Adesea, crearea unei aplicații asincrone bine concepute se reduce la recunoașterea modelului de proiectare pe care îl implementează aplicația dvs. și apoi la alegerea setului potrivit de componente .NET.
În unele cazuri, potrivirea necesită integrarea mai multor componente .NET. Articolul lui Stephen Cleary, „Patterns for Asynchronous MVVM Applications: Comenzi” (bit.ly/233Kocr), arată cum se poate susține pe deplin modelul Model-View-ViewModel (MVVM) într-un mod asincron. În alte cazuri, suportul necesită doar o singură componentă din .NET Framework. Am discutat despre implementarea modelului furnizor/consumator folosind BlockingCollection în rubricile mele din VisualStudioMagazine.com Practical .NET, „Create Simple, Reliable Asynchronous Apps with BlockingCollection” (bit.ly/1TuOpE6) și, „Create Sophisticated Asynchronous Applications with BlockingCollection” (bit.ly/1SpYyD4).
Un alt exemplu este implementarea modelului de proiectare observator pentru a monitoriza o operațiune de lungă durată în mod asincron. În acest scenariu, o metodă asincronă care returnează un singur obiect Task nu funcționează deoarece clientul returnează frecvent un flux de rezultate. Pentru aceste scenarii, puteți utiliza cel puțin două instrumente din .NET Framework: ObservableCollection și Reactive Extensions (Rx). Pentru soluții simple, ObservableCollection (împreună cu cuvintele cheie async și await) este tot ce vă trebuie. Cu toate acestea, pentru probleme mai „interesante” și, mai ales, bazate pe evenimente, Rx vă oferă un control mai bun asupra procesului.
Definirea modelului
În timp ce modelul observator este utilizat frecvent în modelele de proiectare UI – inclusiv Model-View-Controller (MVC), Model-View-Presenter (MVP) și MVVM – UI ar trebui să fie considerat doar un scenariu dintr-un set mai mare de scenarii în care se aplică modelul observator. Definiția modelului observator (citând din Wikipedia) este următoarea: „Observator pattern”: „Un obiect, numit subiect, menține o listă a dependenților săi, numiți observatori, și le notifică automat orice schimbare de stare, de obicei prin apelarea uneia dintre metodele lor.”
În realitate, modelul observator se referă la obținerea de rezultate din procesele care rulează de mult timp pentru client imediat ce aceste rezultate sunt disponibile. Fără o versiune a modelului observator, clienții trebuie să aștepte până când ultimul rezultat este disponibil și apoi să le fie trimise toate rezultatele într-o singură bucată. Într-o lume din ce în ce mai asincronă, doriți ca observatorii să proceseze rezultatele în paralel cu clientul, pe măsură ce acestea devin disponibile. Pentru a sublinia faptul că vorbiți despre mai mult decât despre UI atunci când folosiți modelul observator, voi folosi „client” și „server” în loc de „observator” și „subiect”, în restul acestui articol.
Probleme și oportunități
Există cel puțin trei probleme și două oportunități cu modelul observator. Prima problemă este problema „lapsed-listener”: Multe implementări ale modelului observator necesită ca serverul să dețină o referință la toți clienții săi. Ca urmare, clienții pot fi reținuți în memorie de către server până când acesta iese. În mod evident, aceasta nu este o soluție optimă pentru un proces de lungă durată într-un sistem dinamic în care clienții se conectează și se deconectează frecvent.
Problema ascultătorului expirat este însă doar un simptom al celei de-a doua probleme, mai mare: Multe implementări ale modelului observator necesită ca serverul și clientul să fie strâns cuplate, necesitând ca atât serverul cât și clientul să fie prezenți în permanență. Cel puțin, clientul ar trebui să fie capabil să determine dacă serverul este prezent și să aleagă să nu se atașeze; în plus, serverul ar trebui să fie capabil să funcționeze chiar dacă nu există clienți care să accepte rezultate.
Cea de-a treia problemă este legată de performanță: Cât timp îi va lua serverului să notifice toți clienții? Performanța în modelul observatorului este direct afectată de numărul de clienți care trebuie notificați. Prin urmare, există o oportunitate de a îmbunătăți performanța în modelul observatorului permițând clientului să filtreze preventiv rezultatele care revin de la server. Acest lucru se referă, de asemenea, la scenariile în care serverul produce mai multe rezultate (sau o varietate mai mare de rezultate) decât cele de care este interesat clientul: Clientul poate indica faptul că trebuie să fie notificat numai în anumite cazuri. A doua oportunitate de performanță se referă la recunoașterea momentului în care serverul nu are rezultate sau a terminat de produs rezultate. Clienții pot sări peste achiziționarea resurselor necesare pentru a procesa evenimentele serverului până când clientul are garanția că există ceva de procesat, iar clienții pot elibera aceste resurse de îndată ce știu că au procesat ultimul rezultat.
De la observator la publicare/subscriere
Includerea acestor considerații conduce de la implementări simple ale modelului observator la modelul aferent de publicare/subscriere. Publish/subscribe implementează modelul observatorului într-un mod slab cuplat care permite serverelor și clienților să execute chiar dacă celălalt este momentan indisponibil. Publish/subscribe implementează, de asemenea, de obicei, filtrarea pe partea clientului, permițând clientului să se aboneze fie la subiecte/canale specifice („Notifică-mă cu privire la comenzile de cumpărare”), fie la atributele asociate cu diferite tipuri de conținut („Notifică-mă cu privire la orice cerere urgentă”).
Rămâne totuși o problemă. Toate implementările modelului observator tind să cupleze strâns clienții și serverele la un format de mesaj specific. Schimbarea formatului unui mesaj în majoritatea implementărilor publish/subscribe poate fi dificilă, deoarece toți clienții trebuie să fie actualizați pentru a utiliza noul format.
În multe feluri, acest lucru este similar cu descrierea unui cursor pe server într-o bază de date. Pentru a minimiza costurile de transmisie, serverul bazei de date nu returnează rezultatele pe măsură ce fiecare rând este recuperat. Cu toate acestea, pentru seturi mari de rânduri, baza de date nu returnează, de asemenea, toate rândurile într-un singur lot la sfârșit. În schimb, serverul bazei de date returnează de obicei subseturi de la un cursor păstrat pe server, de multe ori pe măsură ce aceste subseturi devin disponibile. Cu o bază de date, clientul și serverul nu trebuie să fie prezenți simultan: Serverul bazei de date poate funcționa atunci când nu sunt prezenți clienți; un client poate verifica dacă serverul este accesibil și, în caz contrar, poate decide ce poate face (dacă mai poate face ceva). Procesul de filtrare (SQL) este, de asemenea, foarte flexibil. Cu toate acestea, dacă motorul bazei de date schimbă formatul pe care îl folosește pentru a returna rândurile, atunci toți clienții trebuie, cel puțin, să fie recompilați.
Procesarea unui cache de obiecte
Ca studiu de caz pentru a analiza o implementare simplă a modelului observator, folosesc ca server o clasă care caută un cache de facturi în memorie. Acest server ar putea, la sfârșitul procesării sale, să returneze o colecție a tuturor facturilor. Cu toate acestea, aș prefera ca clientul să proceseze facturile individual și în paralel cu procesul de căutare al serverului. Asta înseamnă că prefer o versiune a procesului care returnează fiecare factură pe măsură ce este găsită și permite clientului să proceseze fiecare factură în paralel cu căutarea următoarei facturi.
O implementare simplă a serverului ar putea arăta astfel:
private List<Invoice> foundInvoices = new List<Invoice>();public List<Invoice> FindInvoices(decimal Amount){ foundInvoices.Clear(); Invoice inv; // ...search logic to add invoices to the collection foundInvoices.Add(inv); // ...repeat until all invoices found return foundInvoices;}
Soluțiile mai sofisticate ar putea folosi returnarea randamentului pentru a returna fiecare factură pe măsură ce este găsită, mai degrabă decât să asambleze lista. Indiferent, un client care apelează metoda FindInvoices va dori să efectueze unele activități critice înainte și după procesare. De exemplu, odată găsit primul articol, clientul ar putea dori să activeze o listă MatchingInvoices pentru a păstra facturile la client sau să achiziționeze/initializeze orice resurse necesare pentru a procesa o factură. Pe măsură ce se adaugă facturi suplimentare, clientul ar trebui să proceseze fiecare factură și, atunci când serverul semnalează că ultima factură a fost recuperată, să elibereze toate resursele care nu mai sunt necesare deoarece „nu mai există” facturi de procesat.
În timpul unei recuperări a unei baze de date, de exemplu, o citire se va bloca până când primul rând este returnat. Odată ce primul rând este returnat, clientul inițializează orice resurse sunt necesare pentru a procesa un rând. De asemenea, citirea returnează false atunci când este recuperat ultimul rând, permițând clientului să elibereze acele resurse deoarece nu mai sunt rânduri de procesat.
Crearea de soluții simple cu ObservableCollection
Cea mai evidentă alegere pentru implementarea modelului observator în .NET Framework este ObservableCollection. ObservableCollection va notifica clientul (prin intermediul unui eveniment) ori de câte ori este modificată.
Scrierea serverului meu de exemplu pentru a utiliza clasa ObservableCollection necesită doar două modificări. În primul rând, colecția care deține rezultatele trebuie să fie definită ca ObservableCollection și să fie făcută publică. În al doilea rând, nu mai este necesar ca metoda să returneze un rezultat: Serverul trebuie doar să adauge facturi în colecție.
Noua implementare a serverului ar putea arăta astfel:
public List<Invoice> FindInvoices(decimal Amount){ public ObservableCollection<Invoice> foundInvoices = new ObservableCollection<Invoice>(); public void FindInvoices(decimal Amount) { foundInvoices.Clear(); Invoice inv; // ...search logic to set inv foundInvoices.Add(inv); // ...repeat until all invoices are added to the collection }
Un client care utilizează această versiune a serverului trebuie doar să conecteze un gestionar de evenimente la evenimentul CollectionChanged al colecției InvoiceManagement’s foundInvoices. În următorul cod, am pus clasa să implementeze interfața IDisposable pentru a suporta deconectarea de la eveniment:
public class SearchInvoices: IDisposable{ InvoiceManagement invMgmt = new InvoiceManagement(); public void SearchInvoices() { invMgmt.foundInvoices.CollectionChanged += InvoicesFound; } public void Dispose() { invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged; }
În client, evenimentului CollectionChanged i se transmite ca al doilea parametru un obiect NotifyCollectionChangedEventArgs. Proprietatea Action a acelui obiect specifică atât ce modificare a fost efectuată asupra colecției (acțiunile sunt: colecția a fost golită, noi elemente au fost adăugate în colecție, elementele existente au fost mutate/înlocuite/eliminate), cât și informații despre elementele modificate (o colecție a oricăror elemente adăugate, o colecție a elementelor prezente în colecție înainte de adăugarea noilor elemente, poziția elementului care a fost mutat/eliminat/înlocuit).
Codul simplu din client care ar procesa în mod asincron fiecare factură pe măsură ce este adăugată la colecția din server ar semăna cu codul din figura 1.
Figura 1 Procesarea asincronă a facturilor utilizând ObservableCollection
private async void InvoicesFound(object sender, NotifyCollectionChangedEventArgs e){ switch (e.Action) { case NotifyCollectionChangedAction.Reset: { // ...initial item processing return; } case NotifyCollectionChangedAction.Add: { foreach (Invoice inv in e.NewItems) { await HandleInvoiceAsync(inv); } return; } }}
Deși simplu, acest cod ar putea fi inadecvat pentru nevoile dumneavoastră, în special dacă gestionați un proces de lungă durată sau lucrați într-un mediu dinamic. Din punctul de vedere al unei proiectări asincrone, de exemplu, codul ar putea captura obiectul Task returnat de HandleInvoiceAsync, astfel încât clientul să poată gestiona sarcinile asincrone. De asemenea, veți dori să vă asigurați că evenimentul CollectionChanged este ridicat pe firul UI chiar dacă FindInvoices se execută pe un fir de fundal.
Din cauza locului în care este apelată metoda Clear în clasa server (chiar înainte de căutarea primei facturi), valoarea Reset a proprietății Action poate fi utilizată ca un semnal că primul element este pe cale să fie recuperat. Cu toate acestea, desigur, este posibil ca nicio factură să nu fie găsită în timpul căutării, astfel încât utilizarea acțiunii Reset ar putea avea ca rezultat alocarea de către client a unor resurse care nu sunt niciodată utilizate efectiv. Pentru a gestiona efectiv procesarea „primului articol”, ar trebui să adăugați un indicator la procesarea Acțiunii de adăugare pentru a se executa numai atunci când a fost găsit primul articol.
În plus, serverul are un număr limitat de opțiuni pentru a indica faptul că ultima factură a fost găsită, astfel încât clientul să nu mai aștepte „următoarea”. Serverul ar putea, probabil, să golească colecția după ce a găsit ultimul articol, dar acest lucru ar forța doar o procesare mai complexă în procesarea acțiunii de resetare (am procesat Facturi? Dacă da, atunci am procesat ultima factură; dacă nu, atunci sunt pe cale să procesez prima factură).
În timp ce, pentru probleme simple, ObservableCollection va fi în regulă, orice implementare rezonabil de sofisticată bazată pe ObservableCollection (și orice aplicație care apreciază eficiența) va necesita un cod complicat, în special în client.
Soluțiile Rx
Dacă doriți o procesare asincronă, atunci Rx (disponibil prin NuGet) poate oferi o soluție mai bună pentru implementarea modelului observator, împrumutând din modelul publish/subscribe. Această soluție oferă, de asemenea, un model de filtrare bazat pe LINQ, o mai bună semnalizare pentru condițiile primului/ultimului element și o mai bună gestionare a erorilor.
Rx poate gestiona, de asemenea, implementări de observatori mai interesante decât sunt posibile cu un ObservableCollection. În studiul meu de caz, după returnarea listei inițiale de facturi, serverul meu ar putea continua să verifice dacă există facturi noi care sunt adăugate în memoria cache după finalizarea căutării inițiale (și care corespund criteriilor de căutare, desigur). Atunci când apare o factură care corespunde criteriilor, clientul va dori să fie notificat cu privire la acest eveniment, astfel încât noua factură să poată fi adăugată la listă. Rx suportă aceste tipuri de extensii bazate pe evenimente ale modelului observator mai bine decât ObservableCollection.
Există două interfețe cheie în Rx pentru susținerea modelului observator. Prima este IObservable<T>, implementată de server și care specifică o singură metodă: Subscribe. Serverul care implementează metoda Subscribe va primi de la un client o referință la un obiect. Pentru a rezolva problema ascultătorului expirat, metoda Subscribe returnează o referință către client pentru un obiect care implementează interfața IDisposable. Clientul poate folosi acest obiect pentru a se deconecta de la server. Atunci când clientul se deconectează, se așteaptă ca serverul să elimine clientul din oricare dintre listele sale interne.
Cel de-al doilea este interfața IObserver<T>, care trebuie să fie implementată de către client. Această interfață necesită ca clientul să implementeze și să expună serverului trei metode: OnNext, OnCompleted și OnError. Metoda critică aici este OnNext, care este utilizată de server pentru a transmite un mesaj către client (în studiul meu de caz, acest mesaj ar fi noile obiecte Invoice care vor fi returnate pe măsură ce apare fiecare dintre ele). Serverul poate folosi metoda OnCompleted a clientului pentru a semnala că nu mai există date. A treia metodă, OnError, oferă o modalitate prin care serverul poate semnala clientului că a apărut o excepție.
Este binevenit să implementați singur interfața IObserver, bineînțeles (face parte din .NET Framework). Împreună cu ObservableCollection, s-ar putea să fie tot ce aveți nevoie dacă creați o soluție sincronă (am scris o rubrică și despre asta, „Writing Cleaner Code with Reactive Extensions” ).
Cu toate acestea, Rx include mai multe pachete care oferă implementări asincrone ale acestor interfețe, inclusiv implementări pentru JavaScript și servicii RESTful. Clasa Rx Subject oferă o implementare a IObservable care simplifică implementarea unei versiuni asincrone de publicare/subscriere a modelului observator.
Crearea unei soluții asincrone
Crearea unui server pentru a lucra cu un obiect Subject necesită foarte puține modificări ale codului original sincron de pe partea serverului. Înlocuiesc vechea ObservableCollection cu un obiect Subject care va transmite fiecare factură așa cum apare la orice client care ascultă. Declar obiectul Subject ca fiind public, astfel încât clienții să îl poată accesa:
public class InvoiceManagement{ public IObservable<Invoice> foundInvoice = new Subject<Invoice>();
În corpul metodei, în loc să adaug o factură la o colecție, folosesc metoda OnNext a Subject-ului pentru a trece fiecare factură către client pe măsură ce este găsită:
public void FindInvoices(decimal Amount){ inv = GetInvoicesForAmount(Amount) // Poll for invoices foundInvoice.OnNext(inv); // ...repeat...}
În clientul meu, declar mai întâi o instanță a clasei server. Apoi, într-o metodă marcată ca fiind asincronă, apelez metoda Subscribe a subiectului pentru a indica faptul că vreau să încep să recuperez mesajele:
public class InvoiceManagementTests{ InvoiceManagement invMgmt = new InvoiceManagement(); public async void ProcessInvoices() { invMgmt.foundInvoice.Subscribe<Invoice>();
Pentru a filtra rezultatele doar la facturile pe care le doresc, pot aplica o instrucțiune LINQ la obiectul Subject. Acest exemplu filtrează facturile la cele care sunt comandate înapoi (pentru a utiliza extensiile Rx LINQ va trebui să adăugați o instrucțiune using pentru spațiul de nume System.Reactive.Linq):
invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();
După ce am început să ascult subiectul, pot specifica ce procesare doresc să fac atunci când primesc o factură. Pot, de exemplu, să folosesc FirstAsync pentru a procesa doar prima factură returnată de serviciu. În acest exemplu, folosesc instrucțiunea await cu apelul la FirstAsync, astfel încât să pot returna controlul la corpul principal al aplicației mele în timp ce procesez factura. Acest cod așteaptă să recupereze acea primă factură, apoi trece la orice cod pe care îl folosesc pentru a inițializa procesul de procesare a facturii și, în cele din urmă, procesează factura:
Invoice inv;inv = await invMgmt.foundInvoice.FirstAsync();// ...setup code invoices...HandleInvoiceAsync(inv);
O singură atenționare: FirstAsync se va bloca dacă serverul nu a produs încă niciun rezultat. Dacă doriți să evitați blocarea, puteți utiliza FirstOrDefaultAsync, care va returna null dacă serverul nu a produs niciun rezultat. Dacă nu există rezultate, clientul poate decide ce să facă, dacă este cazul.
Cazul cel mai tipic este acela că clientul dorește să proceseze toate facturile returnate (după filtrare) și să facă acest lucru în mod asincron. În acest caz, în loc să folosiți o combinație de Subscribe și OnNext, puteți folosi doar metoda ForEachAsync. Puteți trece o metodă sau o expresie lambda care procesează rezultatele primite. Dacă treceți o metodă (care nu poate fi asincronă), așa cum fac eu aici, acelei metode i se va trece factura care a declanșat ForEachAsync:
invMgmt.foundInvoice.ForEachAsync(HandleInvoice);
Modelii ForEachAsync i se poate trece, de asemenea, un token de anulare pentru a permite clientului să semnaleze că se deconectează. O bună practică ar fi să se treacă jetonul atunci când se apelează oricare dintre metodele Rx *Async pentru a permite clientului să încheie procesarea fără a fi nevoit să aștepte ca toate obiectele să fie procesate.
ForEachAsync nu va procesa niciun rezultat deja procesat de o metodă First (sau FirstOrDefaultAsync), astfel încât puteți utiliza FirstOrDefaultAsync cu ForEachAsync pentru a verifica dacă serverul mai are ceva de procesat înainte de a procesa obiectele următoare. Cu toate acestea, metoda IsEmpty a subiectului va efectua aceeași verificare mai simplu. În cazul în care clientul trebuie să aloce orice resurse necesare pentru procesarea rezultatelor, IsEmpty îi permite clientului să verifice dacă mai are ceva de făcut înainte de a aloca aceste resurse (o alternativă ar fi să aloce aceste resurse la primul obiect procesat în buclă). Utilizând IsEmpty cu un client care verifică dacă există rezultate înainte de a aloca resurse (și de a începe procesarea), suportând în același timp anularea, s-ar obține un cod care arată ceva de genul figurii 2.
Figura 2 Cod pentru a suporta anularea și a amâna procesarea până când rezultatele sunt gata
CancellationTokenSource cancelSource = new CancellationTokenSource();CancellationToken cancel;cancel = cancelSource.Token;if (!await invMgmt.foundInvoice.IsEmpty()){ // ...setup code for processing invoices... try { invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel); } catch (Exception ex) { if (ex.GetType() != typeof(CancellationToken)) { // ...report message } } // ...clean up code when all invoices are processed or client disconnects}
Încheiere
Dacă tot ce aveți nevoie este o implementare simplă a modelului observator, atunci ObservableCollection ar putea fi tot ce vă trebuie pentru a procesa un flux de rezultate. Pentru un control mai bun și pentru o aplicație bazată pe evenimente, clasa Subject și extensiile care vin cu Rx vor permite aplicației dvs. să lucreze într-un mod asincron prin susținerea unei implementări puternice a modelului publish/subscribe (și nu m-am uitat la bogata bibliotecă de operatori care vine cu Rx). Dacă lucrați cu Rx, merită să descărcați Ghidul de proiectare Rx (bit.ly/1VOPxGS), care discută cele mai bune practici în consumarea și producerea de fluxuri observabile.
Rx oferă, de asemenea, un anumit suport pentru conversia tipului de mesaj transmis între client și server prin utilizarea interfeței ISubject<TSource, TResult>. Interfața ISubject<TSource, TResult> specifică două tipuri de date: un tip de date „in” și un tip de date „out”. În cadrul clasei Subject care implementează această interfață se pot efectua toate operațiile necesare pentru a converti rezultatul returnat de la server (tipul de date „in”) în rezultatul cerut de client (tipul de date „out”). Mai mult, parametrul „in” este covariant (va accepta tipul de date specificat sau orice alt tip de date din care moștenește tipul de date), iar parametrul „out” este contravariant (va accepta tipul de date specificat sau orice derivă din acesta), ceea ce vă oferă o flexibilitate suplimentară.
Vom trăi într-o lume din ce în ce mai asincronă și, în această lume, modelul observator va deveni din ce în ce mai important – este un instrument util pentru orice interfață între procese în care procesul server returnează mai mult decât un singur rezultat. Din fericire, aveți mai multe opțiuni pentru implementarea modelului observator în .NET Framework, inclusiv ObservableCollection și Rx.
Peter Vogel este arhitect de sisteme și director în cadrul PH&V Information Services. PH&V oferă consultanță full-stack de la proiectarea UX până la modelarea obiectelor și proiectarea bazelor de date.
Mulțumim următorilor experți tehnici Microsoft pentru revizuirea acestui articol: Stephen Cleary, James McCaffrey și Dave Sexton
Stephen Cleary a lucrat cu multithreading-ul și programarea asincronă timp de 16 ani și a folosit suportul async în Microsoft .NET Framework încă de la prima previzualizare tehnologică comunitară. El este autorul cărții „Concurrency in C# Cookbook” (O’Reilly Media, 2014). Pagina sa de internet, inclusiv blogul său, este la stephencleary.com.
Discută acest articol pe forumul revistei MSDN Magazine
.
Lasă un răspuns