Skalér asynkrone klient-serverforbindelser med ReactiveServer Links with Reactive
On januar 25, 2022 by admin- 01/31/2019
- 17 minutter at læse
juni 2016
Volume 31 Number 6
Af Peter Vogel | juni 2016
Da asynkron behandling er blevet mere almindelig i applikationsudvikling, Microsoft .NET Framework har fået en lang række værktøjer, der understøtter specifikke asynkrone designmønstre. Ofte handler det at skabe en veldesignet asynkron applikation ofte om at genkende det designmønster, som din applikation implementerer, og derefter vælge det rigtige sæt .NET-komponenter.
I nogle tilfælde kræver matchet integration af flere .NET-komponenter. Stephen Cleary’s artikel “Patterns for Asynchronous MVVM Applications” (Mønstre til asynkrone MVVM-applikationer): Commands” (bit.ly/233Kocr/233Kocr) viser, hvordan man fuldt ud understøtter Model-View-ViewModel-mønstret (MVVM) på en asynkron måde. I andre tilfælde kræver understøttelse kun én komponent fra .NET Framework. Jeg har diskuteret implementering af provider/consumers-mønstret ved hjælp af BlockingCollection i mine VisualStudioMagazine.com Practical .NET-kronikker “Create Simple, Reliable Asynchronous Apps with BlockingCollection” (bit.ly/1TuOpE6) og “Create Sophisticated Asynchronous Applications with BlockingCollection” (bit.ly/1SpYYD4).
Et andet eksempel er implementering af observatørdesignmønstret til overvågning af en langvarig operation på asynkron vis. I dette scenarie fungerer en asynkron metode, der returnerer et enkelt Task-objekt, ikke, fordi klienten ofte returnerer en strøm af resultater. I disse scenarier kan du udnytte mindst to værktøjer fra .NET Framework: ObservableCollection og Reactive Extensions (Rx). Til enkle løsninger er ObservableCollection (sammen med nøgleordene async og await) alt, hvad du har brug for. Men til mere “interessante” og især begivenhedsdrevne problemer giver Rx dig bedre kontrol over processen.
Definition af mønsteret
Mens observatørmønsteret ofte anvendes i UI-designmønstre – herunder Model-View-Controller (MVC), Model-View-Presenter (MVP) og MVVM-UI’er bør betragtes som blot ét scenarie fra et større sæt af scenarier, hvor observatørmønsteret finder anvendelse. Definitionen af observatørmønsteret (citat fra Wikipedia) er: “Et objekt, kaldet subjektet, vedligeholder en liste over sine afhængige, kaldet observatører, og underretter dem automatisk om enhver tilstandsændring, normalt ved at kalde en af deres metoder.”
Reelt set handler observatørmønstret om at få resultater fra langvarige processer til klienten, så snart disse resultater er tilgængelige. Uden en eller anden version af observatørmønsteret skal klienterne vente, indtil det sidste resultat er tilgængeligt, og derefter få alle resultaterne sendt til dem i en enkelt klump. I en stadig mere asynkron verden ønsker man, at observatørerne skal behandle resultaterne parallelt med klienten, efterhånden som resultaterne bliver tilgængelige. For at understrege, at du taler om mere end brugergrænseflader, når du udnytter observatørmønstret, vil jeg bruge “klient” og “server” i stedet for “observatør” og “emne” i resten af denne artikel.
Problemer og muligheder
Der er mindst tre problemer og to muligheder med observatørmønstret. Det første problem er problemet med den bortfaldne lytter: Mange implementeringer af observatørmønstret kræver, at serveren skal have en reference til alle sine klienter. Som følge heraf kan klienterne blive holdt i hukommelsen af serveren, indtil serveren forlader den. Dette er naturligvis ikke en optimal løsning for en langvarig proces i et dynamisk system, hvor klienterne ofte tilslutter sig og afbryder forbindelsen.
Det er dog kun et symptom på det andet, større problem: Mange implementeringer af observatørmønsteret kræver, at serveren og klienten er tæt koblet, hvilket kræver, at både serveren og klienten er til stede hele tiden. I det mindste bør klienten være i stand til at afgøre, om serveren er til stede og vælge ikke at knytte sig til den; desuden bør serveren kunne fungere, selv om der ikke er nogen klienter, der accepterer resultater.
Det tredje problem er ydelsesrelateret: Hvor lang tid vil det tage for serveren at underrette alle klienter? Ydelsen i observatørmønsteret er direkte påvirket af antallet af klienter, der skal underrettes. Der er derfor mulighed for at forbedre ydeevnen i observatørmønsteret ved at lade klienten filtrere de resultater, der kommer tilbage fra serveren, på forhånd. Dette tager også højde for de scenarier, hvor serveren producerer flere resultater (eller et bredere udvalg af resultater), end klienten er interesseret i: Klienten kan angive, at den kun vil blive underrettet i bestemte tilfælde. Den anden præstationsmulighed ligger i at genkende, hvornår serveren ikke har nogen resultater eller er færdig med at producere resultater. Klienterne kan springe over at erhverve de ressourcer, der er nødvendige for at behandle serverbegivenheder, indtil klienten er garanteret, at der er noget at behandle, og klienterne kan frigive disse ressourcer, så snart de ved, at de har behandlet det sidste resultat.
Fra observatør til offentliggørelse/afmelding
Inddragelse af disse overvejelser fører fra simple implementeringer af observatørmønsteret til den relaterede offentliggør/afmeldingsmodel. Publish/subscribe implementerer observatørmønsteret på en løst koblet måde, der lader servere og klienter udføre, selv om den anden ikke er tilgængelig i øjeblikket. Publish/subscribe implementerer også typisk filtrering på klientsiden ved at lade klienten abonnere enten på specifikke emner/kanaler (“Underret mig om købsordrer”) eller på attributter, der er knyttet til forskellige typer indhold (“Underret mig om alle hastende anmodninger”).
Et problem er dog stadigvæk tilbage. Alle implementeringer af observatørmønstret har en tendens til at koble klienter og servere tæt sammen til et bestemt beskedformat. Det kan være vanskeligt at ændre formatet for en meddelelse i de fleste implementeringer af publish/subscribe, fordi alle klienterne skal opdateres til at bruge det nye format.
På mange måder svarer dette til beskrivelsen af en server-side cursor i en database. For at minimere transmissionsudgifterne returnerer databaseserveren ikke resultater, når hver række hentes. Men for store sæt af rækker returnerer databasen heller ikke alle rækker i en enkelt batch til sidst. I stedet returnerer databaseserveren typisk delmængder fra en markør på serveren ofte efterhånden som disse delmængder bliver tilgængelige. Med en database behøver klienten og serveren ikke at være til stede samtidig: Databaseserveren kan køre, når der ikke er nogen klienter til stede; en klient kan kontrollere, om serveren er tilgængelig, og hvis ikke, beslutte, hvad den ellers kan gøre (hvis noget). Filtreringsprocessen (SQL) er også meget fleksibel. Men hvis databasemotoren ændrer det format, den bruger til at returnere rækker, skal alle klienter i det mindste omkompileres.
Behandling af en cache af objekter
Som mit casestudie for at se på en simpel observatørmønsterimplementering bruger jeg som min server en klasse, der søger i en in-memory cache af fakturaer. Denne server kunne i slutningen af sin behandling returnere en samling af alle fakturaer. Jeg vil dog foretrække, at klienten behandler fakturaerne individuelt og parallelt med serverens søgeproces. Det betyder, at jeg foretrækker en version af processen, som returnerer hver enkelt faktura, efterhånden som den findes, og lader klienten behandle hver enkelt faktura parallelt med søgningen efter den næste faktura.
En simpel implementering af serveren kunne se således ud:
private List<Invoice> foundInvoices = new List<Invoice>();public List<Invoice> FindInvoices(decimal Amount){ foundInvoices.Clear(); Invoice inv; // ...search logic to add invoices to the collection foundInvoices.Add(inv); // ...repeat until all invoices found return foundInvoices;}
Mere avancerede løsninger kunne bruge yield return til at returnere hver enkelt faktura, efterhånden som den findes, i stedet for at samle listen. Uanset hvad, vil en klient, der kalder FindInvoices-metoden, ønske at udføre nogle kritiske aktiviteter før og efter behandlingen. Når det første element er fundet, vil klienten f.eks. måske aktivere en MatchingInvoices-liste for at holde fakturaerne hos klienten eller erhverve/initialisere eventuelle ressourcer, der er nødvendige for at behandle en faktura. Efterhånden som der tilføjes yderligere fakturaer, skal klienten behandle hver enkelt faktura og, når serveren signalerer, at den sidste faktura er hentet, frigive eventuelle ressourcer, der ikke længere er nødvendige, fordi der ikke er “flere” fakturaer at behandle.
Ved hentning af en database vil en læsning f.eks. blokere, indtil den første række er returneret. Når den første række er returneret, initialiserer klienten de ressourcer, der er nødvendige for at behandle en række. Læsningen returnerer også false, når den sidste række hentes, så klienten kan frigive disse ressourcer, fordi der ikke er flere rækker at behandle.
Skabelse af enkle løsninger med ObservableCollection
Det mest oplagte valg til implementering af observatørmønstret i .NET Framework er ObservableCollection. ObservableCollection giver klienten besked (via en hændelse), når den ændres.
Den kræver kun to ændringer, når jeg skriver min eksempelserver om, så den bruger ObservableCollection-klassen. For det første skal den samling, der indeholder resultaterne, defineres som en ObservableCollection og gøres offentlig. For det andet er det ikke længere nødvendigt, at metoden returnerer et resultat: Serveren behøver kun at tilføje fakturaer til samlingen.
Den nye implementering af serveren kan se således ud:
public List<Invoice> FindInvoices(decimal Amount){ public ObservableCollection<Invoice> foundInvoices = new ObservableCollection<Invoice>(); public void FindInvoices(decimal Amount) { foundInvoices.Clear(); Invoice inv; // ...search logic to set inv foundInvoices.Add(inv); // ...repeat until all invoices are added to the collection }
En klient, der anvender denne version af serveren, behøver kun at tilslutte en hændelseshåndtering til hændelsen CollectionChanged i fakturaadministrationens samling foundInvoices. I følgende kode har jeg fået klassen til at implementere grænsefladen IDisposable for at understøtte afbrydelse af forbindelsen til hændelsen:
public class SearchInvoices: IDisposable{ InvoiceManagement invMgmt = new InvoiceManagement(); public void SearchInvoices() { invMgmt.foundInvoices.CollectionChanged += InvoicesFound; } public void Dispose() { invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged; }
I klienten får CollectionChanged-hændelsen overdraget et NotifyCollectionChangedEventArgs-objekt som sin anden parameter. Dette objekts Action-egenskab angiver både hvilken ændring der blev udført på samlingen (handlingerne er: samlingen blev ryddet, nye elementer blev tilføjet til samlingen, eksisterende elementer blev flyttet/udskiftet/fjernet) og oplysninger om de ændrede elementer (en samling af alle tilføjede elementer, en samling af elementer, der var til stede i samlingen før de nye elementer blev tilføjet, positionen for det element, der blev flyttet/fjernet/udskiftet).
En simpel kode i klienten, der asynkront ville behandle hver enkelt faktura, efterhånden som den tilføjes til samlingen i serveren, ville se ud som koden i figur 1.
Figur 1 Asynkron behandling af fakturaer ved hjælp af ObservableCollection
private async void InvoicesFound(object sender, NotifyCollectionChangedEventArgs e){ switch (e.Action) { case NotifyCollectionChangedAction.Reset: { // ...initial item processing return; } case NotifyCollectionChangedAction.Add: { foreach (Invoice inv in e.NewItems) { await HandleInvoiceAsync(inv); } return; } }}
Selv om denne kode er simpel, er den måske utilstrækkelig til dine behov, især hvis du håndterer en langvarig proces eller arbejder i et dynamisk miljø. Ud fra et asynkront designmæssigt synspunkt kunne koden f.eks. fange det Task-objekt, der returneres af HandleInvoiceAsync, så klienten kunne administrere de asynkrone opgaver. Du skal også sørge for, at CollectionChanged-hændelsen udløses på UI-tråden, selv om FindInvoices kører på en baggrundstråd.
På grund af det sted, hvor Clear-metoden kaldes i serverklassen (lige før der søges efter den første faktura), kan Action-egenskabens Reset-værdi bruges som et signal om, at det første element er ved at blive hentet. Men det kan naturligvis være, at der ikke findes nogen fakturaer i søgningen, så brugen af Reset Action kan resultere i, at klienten allokerer ressourcer, som faktisk aldrig bliver brugt. For rent faktisk at håndtere behandling af “første element” skal du tilføje et flag til behandlingen af Tilføj handling, så den kun udføres, når det første element er fundet.
Dertil kommer, at serveren har et begrænset antal muligheder for at angive, at den sidste faktura er fundet, så klienten kan holde op med at vente på “den næste”. Serveren kunne formentlig rydde samlingen efter at have fundet det sidste element, men det ville blot tvinge mere kompleks behandling ind i behandlingen af Nulstil handling (har jeg behandlet fakturaer? Hvis ja, så har jeg behandlet den sidste faktura; hvis nej, så er jeg ved at behandle den første faktura).
Mens ObservableCollection til simple problemer vil være fint, vil enhver rimelig sofistikeret implementering baseret på ObservableCollection (og enhver applikation, der sætter pris på effektivitet) kræve noget kompliceret kode, især i klienten.
Rx-løsningerne
Hvis du ønsker asynkron behandling, kan Rx (tilgængelig via NuGet) give en bedre løsning til implementering af observatørmønsteret ved at låne fra publish/subscribe-modellen. Denne løsning giver også en LINQ-baseret filtreringsmodel, bedre signalering af betingelser for første/sidste element og bedre fejlhåndtering.
Rx kan også håndtere mere interessante observatørimplementeringer end dem, der er mulige med en ObservableCollection. I min case study kan min server, efter at have returneret den oprindelige liste over fakturaer, fortsætte med at kontrollere for nye fakturaer, der tilføjes til cachen, efter at den oprindelige søgning er afsluttet (og som matcher søgekriterierne, naturligvis). Når der dukker en faktura op, der opfylder kriterierne, skal kunden have besked om begivenheden, så den nye faktura kan føjes til listen. Rx understøtter disse former for begivenhedsbaserede udvidelser af observatørmønstret bedre end ObservableCollection.
Der er to vigtige grænseflader i Rx til understøttelse af observatørmønstret. Det første er IObservable<T>, der implementeres af serveren og angiver en enkelt metode: Subscribe. Den server, der implementerer Subscribe-metoden, får en reference til et objekt fra en klient. For at håndtere problemet med en udgået lytter returnerer Subscribe-metoden en reference til klienten for et objekt, der implementerer grænsefladen IDisposable. Klienten kan bruge dette objekt til at afbryde forbindelsen med serveren. Når klienten afbryder forbindelsen, forventes det, at serveren fjerner klienten fra en af sine interne lister.
Den anden er IObserver<T>-grænsefladen, som skal implementeres af klienten. Denne grænseflade kræver, at klienten implementerer og eksponerer tre metoder over for serveren: OnNext, OnCompleted og OnError. Den kritiske metode her er OnNext, som serveren bruger til at sende en meddelelse til klienten (i min undersøgelse ville denne meddelelse være nye fakturaobjekter, der returneres, efterhånden som de enkelte objekter vises). Serveren kan bruge klientens OnCompleted-metode til at signalere, at der ikke er flere data. Den tredje metode, OnError, giver serveren mulighed for at signalere til klienten, at der er opstået en undtagelse.
Du er naturligvis velkommen til selv at implementere IObserver-grænsefladen (den er en del af .NET Framework). Sammen med ObservableCollection kan det være alt, hvad du har brug for, hvis du opretter en synkron løsning (jeg har også skrevet en klumme om det, “Writing Cleaner Code with Reactive Extensions” ).
Derimod indeholder Rx flere pakker, der leverer asynkrone implementeringer af disse grænseflader, herunder implementeringer til JavaScript og RESTful-tjenester. Rx Subject-klassen indeholder en implementering af IObservable, der gør det nemmere at implementere en asynkron udgivelses-/subscribe-version af observatørmønstret.
Skabelse af en asynkron løsning
Skabelse af en server til at arbejde med et Subject-objekt kræver meget få ændringer i den oprindelige synkrone kode på serversiden. Jeg erstatter den gamle ObservableCollection med et Subject-objekt, der vil videregive hver enkelt faktura, som den vises til alle lyttende klienter. Jeg erklærer Subject-objektet for offentligt, så klienterne kan få adgang til det:
public class InvoiceManagement{ public IObservable<Invoice> foundInvoice = new Subject<Invoice>();
I stedet for at tilføje en faktura til en samling bruger jeg Subject’s OnNext-metode til at videregive hver faktura til klienten, efterhånden som den bliver fundet:
public void FindInvoices(decimal Amount){ inv = GetInvoicesForAmount(Amount) // Poll for invoices foundInvoice.OnNext(inv); // ...repeat...}
I min klient erklærer jeg først en instans af serverklassen. Derefter kalder jeg i en metode, der er markeret som asynkron, Subject’s Subscribe-metode for at angive, at jeg vil begynde at hente meddelelser:
public class InvoiceManagementTests{ InvoiceManagement invMgmt = new InvoiceManagement(); public async void ProcessInvoices() { invMgmt.foundInvoice.Subscribe<Invoice>();
For at filtrere resultaterne til kun at omfatte de fakturaer, jeg ønsker, kan jeg anvende en LINQ-anvisning på Subject-objektet. I dette eksempel filtreres fakturaerne til de fakturaer, der er tilbageført (for at bruge Rx LINQ-udvidelser skal du tilføje en using-erklæring for System.Reactive.Linq-navneområdet):
invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();
Når jeg er begyndt at lytte til emnet, kan jeg angive, hvilken behandling jeg ønsker at foretage, når jeg modtager en faktura. Jeg kan f.eks. bruge FirstAsync til kun at behandle den første faktura, der returneres af tjenesten. I dette eksempel bruger jeg await-erklæringen med kald til FirstAsync, så jeg kan returnere kontrollen til hoveddelen af min applikation, mens jeg behandler fakturaen. Denne kode venter på at hente den første faktura, går derefter videre til den kode, jeg bruger til at initialisere fakturabehandlingsprocessen, og behandler til sidst fakturaen:
Invoice inv;inv = await invMgmt.foundInvoice.FirstAsync();// ...setup code invoices...HandleInvoiceAsync(inv);
Et forbehold: FirstAsync blokerer, hvis serveren endnu ikke har produceret nogen resultater. Hvis du ønsker at undgå blokering, kan du bruge FirstOrDefaultAsync, som returnerer nul, hvis serveren ikke har produceret nogen resultater. Hvis der ikke er nogen resultater, kan klienten selv bestemme, hvad der eventuelt skal gøres.
Det mere typiske tilfælde er, at klienten ønsker at behandle alle de returnerede fakturaer (efter filtrering) og at gøre det asynkront. I det tilfælde kan du i stedet for at bruge en kombination af Subscribe og OnNext bare bruge ForEachAsync-metoden i stedet for at bruge en kombination af Subscribe og OnNext. Du kan videregive en metode eller et lambdaudtryk, der behandler indgående resultater. Hvis du videregiver en metode (som ikke kan være asynkron), som jeg gør her, vil den metode få overdraget den faktura, der udløste ForEachAsync:
invMgmt.foundInvoice.ForEachAsync(HandleInvoice);
ForEachAsync-metoden kan også få overdraget et annulleringstoken for at lade klienten signalere, at den afbryder forbindelsen. En god praksis vil være at videregive tokenet, når du kalder en af Rx *Async-metoderne for at understøtte, at klienten kan afslutte behandlingen uden at skulle vente på, at alle objekter er behandlet.
ForEachAsync vil ikke behandle et resultat, der allerede er behandlet af en First (eller FirstOrDefaultAsync) metode, så du kan bruge FirstOrDefaultAsync med ForEachAsync til at kontrollere, om serveren har noget at behandle, før du behandler efterfølgende objekter. Emnets IsEmpty-metode vil dog udføre den samme kontrol på en mere enkel måde. Hvis klienten skal allokere de ressourcer, der er nødvendige for at behandle resultaterne, giver IsEmpty klienten mulighed for at kontrollere, om der er noget at gøre, før den allokerer disse ressourcer (et alternativ ville være at allokere disse ressourcer på det første objekt, der behandles i sløjfen). Brug af IsEmpty med en klient, der kontrollerer, om der er nogen resultater, før der allokeres ressourcer (og behandlingen påbegyndes), og som samtidig understøtter annullering, ville give kode, der ligner Figur 2.
Figur 2 Kode til understøttelse af annullering og udsættelse af behandling, indtil resultaterne er klar
CancellationTokenSource cancelSource = new CancellationTokenSource();CancellationToken cancel;cancel = cancelSource.Token;if (!await invMgmt.foundInvoice.IsEmpty()){ // ...setup code for processing invoices... try { invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel); } catch (Exception ex) { if (ex.GetType() != typeof(CancellationToken)) { // ...report message } } // ...clean up code when all invoices are processed or client disconnects}
Opfyldning
Hvis alt, hvad du har brug for, er en simpel implementering af observatørmønstret, kan ObservableCollection måske gøre alt, hvad du har brug for til at behandle en strøm af resultater. For bedre kontrol og til en begivenhedsbaseret applikation vil Subject-klassen og de udvidelser, der følger med Rx, lade din applikation arbejde i en asynkron tilstand ved at understøtte en kraftig implementering af publish/subscribe-modellen (og jeg har ikke kigget på det rige bibliotek af operatører, der følger med Rx). Hvis du arbejder med Rx, er det værd at downloade Rx Design Guide (bit.ly/1VOPxGS), som diskuterer den bedste praksis for forbrug og produktion af observerbare strømme.
Rx giver også en vis støtte til konvertering af den meddelelsestype, der overføres mellem klienten og serveren ved hjælp af ISubject<TSource, TResult>-grænsefladen. ISubject<TSource, TResult>-grænsefladen angiver to datatyper: en “in”-datatype og en “out”-datatype. I den klasse Subject, der implementerer denne grænseflade, kan du udføre alle de operationer, der er nødvendige for at konvertere det resultat, der returneres fra serveren (in-datatypen), til det resultat, som klienten har brug for (out-datatypen). Desuden er in-parameteren kovariant (den accepterer den angivne datatype eller noget, som datatypen arver fra), og out-parameteren er kontravariant (den accepterer den angivne datatype eller noget, som afledes af den), hvilket giver dig yderligere fleksibilitet.
Vi lever i en stadig mere asynkron verden, og i denne verden vil observatørmønstret blive vigtigere – det er et nyttigt værktøj til enhver grænseflade mellem processer, hvor serverprocessen returnerer mere end et enkelt resultat. Heldigvis har du flere muligheder for at implementere observatørmønsteret i .NET Framework, herunder ObservableCollection og Rx.
Peter Vogel er systemarkitekt og chef i PH&V Information Services. PH&V tilbyder full-stack-konsulentbistand fra UX-design til objektmodellering og databasedesign.
Tak til følgende tekniske eksperter fra Microsoft for at have gennemgået denne artikel: Stephen Cleary, James McCaffrey og Dave Sexton
Stephen Cleary har arbejdet med multithreading og asynkron programmering i 16 år og har brugt async-understøttelse i Microsoft .NET Framework siden den første community technology preview. Han er forfatter til “Concurrency in C# Cookbook” (O’Reilly Media, 2014). Hans hjemmeside, herunder hans blog, er på stephencleary.com.
Diskuter denne artikel i MSDN Magazine-forummet
Skriv et svar