Skalera asynkrona klient-serverlänkar med ReactiveServer Links with Reactive
On januari 25, 2022 by admin- 01/31/2019
- 17 minuter att läsa
juni 2016
Volym 31 nummer 6
Av Peter Vogel | juni 2016
Att asynkron behandling har blivit vanligare i applikationsutveckling, Microsoft .NET Framework har fått ett stort antal verktyg som stöder specifika asynkrona designmönster. Ofta handlar det för att skapa en väldesignad asynkron applikation om att känna igen designmönstret som din applikation implementerar och sedan välja rätt uppsättning .NET-komponenter.
I vissa fall kräver matchningen att flera .NET-komponenter integreras. Stephen Clearys artikel ”Patterns for Asynchronous MVVM Applications” (Mönster för asynkrona MVVM-applikationer): Commands” (bit.ly/233Kocr), visar hur man fullt ut stöder MVVM-mönstret (Model-View-ViewModel) på ett asynkront sätt. I andra fall kräver stödet endast en komponent från .NET Framework. Jag har diskuterat implementering av provider/consumers-mönstret med hjälp av BlockingCollection i mina VisualStudioMagazine.com Practical .NET-krönikor ”Create Simple, Reliable Asynchronous Apps with BlockingCollection” (bit.ly/1TuOpE6) och ”Create Sophisticated Asynchronous Applications with BlockingCollection” (bit.ly/1SpYyD4).
Ett annat exempel är att implementera designmönstret observatör för att övervaka en långvarig operation asynkront. I det här scenariot fungerar inte en asynkron metod som returnerar ett enda Task-objekt eftersom klienten ofta returnerar en ström av resultat. För dessa scenarier kan du utnyttja minst två verktyg från .NET Framework: ObservableCollection och Reactive Extensions (Rx). För enkla lösningar är ObservableCollection (tillsammans med nyckelorden async och await) allt du behöver. Men för mer ”intressanta” och framför allt händelsestyrda problem ger Rx dig bättre kontroll över processen.
Definiera mönstret
Men även om observatörsmönstret ofta används i UI-designmönster – inklusive Model-View-Controller (MVC), Model-View-Presenter (MVP) och MVVM – bör UI:erna betraktas som bara ett scenario från en större uppsättning scenarier där observatörsmönstret är tillämpligt. Definitionen av observatörsmönstret (enligt Wikipedia) är följande: ”Ett objekt, som kallas subjektet, upprätthåller en lista över sina beroende personer, som kallas observatörer, och meddelar dem automatiskt om eventuella tillståndsförändringar, vanligtvis genom att anropa en av deras metoder.”
I själva verket handlar observatörsmönstret om att få resultat från långvariga processer till klienten så snart dessa resultat är tillgängliga. Utan någon version av observatörsmönstret måste klienterna vänta tills det sista resultatet är tillgängligt och sedan få alla resultat skickade till sig i en enda klump. I en alltmer asynkron värld vill du att observatörerna ska bearbeta resultaten parallellt med klienten när resultaten blir tillgängliga. För att betona att du talar om mer än användargränssnitt när du utnyttjar observatörsmönstret kommer jag att använda ”klient” och ”server” i stället för ”observatör” och ”subjekt” i resten av den här artikeln.
Problem och möjligheter
Det finns minst tre problem och två möjligheter med observatörsmönstret. Det första problemet är problemet med den bortfallna lyssnaren: Många implementeringar av observatörsmönstret kräver att servern har en referens till alla sina klienter. Detta leder till att klienterna kan hållas i minnet av servern tills servern avslutas. Detta är naturligtvis inte en optimal lösning för en långvarig process i ett dynamiskt system där klienter ofta ansluter och avbryter anslutningen.
Problemet med den bortfallande lyssnaren är dock bara ett symptom på det andra, större problemet: Många implementationer av observatörsmönstret kräver att servern och klienten är tätt sammankopplade, vilket innebär att både servern och klienten måste vara närvarande hela tiden. Klienten bör åtminstone kunna avgöra om servern är närvarande och välja att inte ansluta sig. Dessutom bör servern kunna fungera även om det inte finns några klienter som tar emot resultat.
Det tredje problemet är prestandarelaterat: Hur lång tid kommer det att ta för servern att meddela alla klienter? Prestandan i observatörsmönstret påverkas direkt av antalet klienter som ska meddelas. Det finns därför en möjlighet att förbättra prestanda i observatörsmönstret genom att låta klienten i förväg filtrera de resultat som kommer tillbaka från servern. Detta gäller även scenarier där servern producerar fler resultat (eller en större variation av resultat) än vad klienten är intresserad av: Klienten kan ange att den endast vill bli underrättad i vissa fall. Den andra prestandamöjligheten finns när det gäller att känna igen när servern inte har några resultat eller har slutat producera resultat. Klienterna kan hoppa över att förvärva resurser som krävs för att bearbeta serverhändelser tills klienten är garanterad att det finns något att bearbeta, och klienterna kan släppa dessa resurser så snart de vet att de har bearbetat det sista resultatet.
Från observatör till publicera/prenumerera
Att ta hänsyn till dessa överväganden leder från enkla implementationer av observatörsmönstret till den relaterade publicera/prenumerera-modellen. Publish/subscribe implementerar observatörsmönstret på ett löst kopplat sätt som gör att servrar och klienter kan utföras även om den andra inte är tillgänglig för tillfället. Publish/subscribe implementerar också vanligtvis filtrering på klientsidan genom att låta klienten prenumerera antingen på specifika ämnen/kanaler (”Meddela mig om inköpsorder”) eller på attribut som är förknippade med olika typer av innehåll (”Meddela mig om brådskande förfrågningar”).
En fråga kvarstår dock. Alla implementeringar av observatörsmönstret tenderar att koppla klienter och servrar till ett specifikt meddelandeformat. Att ändra formatet för ett meddelande i de flesta implementationer av publish/subscribe kan vara svårt eftersom alla klienter måste uppdateras för att använda det nya formatet.
På många sätt liknar detta beskrivningen av en server-side cursor i en databas. För att minimera överföringskostnaderna returnerar databasservern inte resultaten när varje rad hämtas. För stora raduppsättningar returnerar databasen dock inte heller alla rader i ett enda parti i slutet. I stället returnerar databasservern vanligtvis delmängder från en markör som hålls på servern ofta när dessa delmängder blir tillgängliga. Med en databas behöver klienten och servern inte vara närvarande samtidigt: En klient kan kontrollera om servern är tillgänglig och, om så inte är fallet, bestämma vad (om något) den kan göra. Filtreringsprocessen (SQL) är också mycket flexibel. Om databasmotorn ändrar det format den använder för att returnera rader måste dock alla klienter åtminstone kompileras om.
Bearbetning av en cache av objekt
Som min fallstudie för att titta på en enkel implementering av ett observatörsmönster använder jag som min server en klass som söker i en cache av fakturor i minnet. Den servern skulle i slutet av bearbetningen kunna returnera en samling av alla fakturor. Jag skulle dock föredra att låta klienten behandla fakturorna individuellt och parallellt med serverns sökprocess. Det innebär att jag föredrar en version av processen som returnerar varje faktura allteftersom den hittas och låter klienten bearbeta varje faktura parallellt med sökningen efter nästa faktura.
En enkel implementering av servern kan se ut så här:
private List<Invoice> foundInvoices = new List<Invoice>();public List<Invoice> FindInvoices(decimal Amount){ foundInvoices.Clear(); Invoice inv; // ...search logic to add invoices to the collection foundInvoices.Add(inv); // ...repeat until all invoices found return foundInvoices;}
Mer sofistikerade lösningar skulle kunna använda yield return för att returnera varje faktura allteftersom den hittas i stället för att sätta ihop listan. Oavsett detta kommer en klient som anropar metoden FindInvoices att vilja utföra några kritiska aktiviteter före och efter bearbetningen. När det första objektet har hittats kanske klienten till exempel vill aktivera en MatchingInvoices-lista för att hålla fakturorna hos klienten eller förvärva/initialisera eventuella resurser som krävs för att behandla en faktura. När ytterligare fakturor läggs till måste klienten behandla varje faktura och när servern signalerar att den sista fakturan är hämtad, frigöra alla resurser som inte längre behövs eftersom det inte finns ”fler” fakturor att behandla.
Under en databashämtning blockeras t.ex. en läsning tills den första raden returneras. När den första raden returneras initialiserar klienten de resurser som behövs för att behandla en rad. Läsningen returnerar också false när den sista raden hämtas, vilket låter klienten släppa dessa resurser eftersom det inte finns fler rader att bearbeta.
Skapa enkla lösningar med ObservableCollection
Det mest uppenbara valet för att implementera observatörsmönstret i .NET Framework är ObservableCollection. ObservableCollection meddelar klienten (via en händelse) när den ändras.
Att skriva om min exempelserver för att använda klassen ObservableCollection kräver endast två ändringar. För det första måste den samling som innehåller resultaten definieras som en ObservableCollection och göras offentlig. För det andra är det inte längre nödvändigt att metoden returnerar ett resultat: Servern behöver bara lägga till fakturor i samlingen.
Den nya implementeringen av servern kan se ut så här:
public List<Invoice> FindInvoices(decimal Amount){ public ObservableCollection<Invoice> foundInvoices = new ObservableCollection<Invoice>(); public void FindInvoices(decimal Amount) { foundInvoices.Clear(); Invoice inv; // ...search logic to set inv foundInvoices.Add(inv); // ...repeat until all invoices are added to the collection }
En klient som använder den här versionen av servern behöver bara koppla upp en händelsehanterare till händelsen CollectionChanged i samlingen InvoiceManagement’s foundInvoices. I följande kod har jag låtit klassen implementera gränssnittet IDisposable för att stödja bortkoppling från händelsen:
public class SearchInvoices: IDisposable{ InvoiceManagement invMgmt = new InvoiceManagement(); public void SearchInvoices() { invMgmt.foundInvoices.CollectionChanged += InvoicesFound; } public void Dispose() { invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged; }
I klienten skickas CollectionChanged-händelsen till ett NotifyCollectionChangedEventArgs-objekt som andra parameter. Objektets Action-egenskap anger både vilken ändring som utfördes på samlingen (åtgärderna är: samlingen rensades, nya objekt lades till samlingen, befintliga objekt flyttades/ersattes/flyttades) och information om de ändrade objekten (en samling av alla objekt som lades till, en samling av objekt som fanns i samlingen innan de nya objekten lades till, positionen för det objekt som flyttades/flyttades/ersattes).
En enkel kod i klienten som asynkront skulle behandla varje faktura när den läggs till i samlingen i servern skulle se ut som koden i figur 1.
Figur 1 Asynkron behandling av fakturor med hjälp av ObservableCollection
private async void InvoicesFound(object sender, NotifyCollectionChangedEventArgs e){ switch (e.Action) { case NotifyCollectionChangedAction.Reset: { // ...initial item processing return; } case NotifyCollectionChangedAction.Add: { foreach (Invoice inv in e.NewItems) { await HandleInvoiceAsync(inv); } return; } }}
Den här koden är visserligen enkel, men den kanske inte räcker till för dina behov, särskilt inte om du hanterar en långdragen process eller arbetar i en dynamisk miljö. Ur en asynkron designsynvinkel skulle koden till exempel kunna fånga upp Task-objektet som returneras av HandleInvoiceAsync så att klienten kan hantera de asynkrona uppgifterna. Du vill också se till att händelsen CollectionChanged utlöses på UI-tråden även om FindInvoices körs på en bakgrundstråd.
På grund av var Clear-metoden anropas i serverklassen (precis innan den första fakturan söks) kan Action-egendomens Reset-värde användas som en signal om att det första objektet är på väg att hämtas. Men det kan naturligtvis hända att inga fakturor hittas i sökningen, så användningen av Reset Action kan leda till att klienten allokerar resurser som egentligen aldrig används. För att faktiskt hantera behandling av ”första objektet” måste du lägga till en flagga i behandlingen av Add Action så att den endast utförs när det första objektet har hittats.
Servern har dessutom ett begränsat antal alternativ för att indikera att den sista fakturan har hittats, så att klienten kan sluta vänta på ”nästa faktura”. Servern skulle förmodligen kunna rensa samlingen efter att ha hittat den sista posten, men det skulle bara tvinga in mer komplex behandling i behandlingen av återställningsåtgärden (har jag behandlat fakturor? Om ja, har jag behandlat den sista fakturan; om nej, är jag på väg att behandla den första fakturan).
ObservableCollection är visserligen bra för enkla problem, men varje någorlunda sofistikerad implementering som bygger på ObservableCollection (och varje tillämpning som värdesätter effektivitet) kommer att kräva en del komplicerad kod, särskilt i klienten.
The Rx Solutions
Om du vill ha asynkron behandling kan Rx (tillgänglig via NuGet) ge en bättre lösning för att implementera observatörsmönstret genom att låna från publish/subscribe-modellen. Den här lösningen ger också en LINQ-baserad filtreringsmodell, bättre signalering för villkor för första/sista objektet och bättre felhantering.
Rx kan också hantera mer intressanta observatörsimplementationer än vad som är möjligt med en ObservableCollection. I min fallstudie kan min server, efter att ha returnerat den första listan med fakturor, fortsätta att kontrollera om det finns nya fakturor som läggs till i cacheminnet efter att den ursprungliga sökningen avslutats (och som matchar sökkriterierna, förstås). När en faktura som uppfyller kriterierna dyker upp vill klienten bli informerad om händelsen så att den nya fakturan kan läggas till i listan. Rx stöder den här typen av händelsebaserade utvidgningar av observatörsmönstret bättre än ObservableCollection.
Det finns två viktiga gränssnitt i Rx för att stödja observatörsmönstret. Det första är IObservable<T>, som implementeras av servern och anger en enda metod: Subscribe. Servern som implementerar Subscribe-metoden får en referens till ett objekt från en klient. För att hantera problemet med en lyssnare som inte längre finns kvar returnerar Subscribe-metoden en referens till klienten för ett objekt som implementerar gränssnittet IDisposable. Klienten kan använda det objektet för att koppla från servern. När klienten kopplar bort klienten förväntas servern ta bort klienten från någon av sina interna listor.
Det andra är gränssnittet IObserver<T>, som måste implementeras av klienten. Detta gränssnitt kräver att klienten implementerar och exponerar tre metoder för servern: OnNext, OnCompleted och OnError. Den kritiska metoden här är OnNext, som används av servern för att skicka ett meddelande till klienten (i min fallstudie skulle meddelandet vara nya fakturaobjekt som returneras när vart och ett av dem dyker upp). Servern kan använda klientens OnCompleted-metod för att signalera att det inte finns mer data. Den tredje metoden, OnError, är ett sätt för servern att signalera till klienten att ett undantag har inträffat.
Du är naturligtvis välkommen att implementera IObserver-gränssnittet själv (det är en del av .NET Framework). Tillsammans med ObservableCollection kan det vara allt du behöver om du skapar en synkron lösning (jag har skrivit en kolumn om det också, ”Writing Cleaner Code with Reactive Extensions” ).
Rx innehåller dock flera paket som tillhandahåller asynkrona implementeringar av dessa gränssnitt, inklusive implementeringar för JavaScript och RESTful-tjänster. Rx Subject-klassen tillhandahåller en implementering av IObservable som förenklar implementeringen av en asynkron publicerings- och prenumerationsversion av observatörsmönstret.
Skapa en asynkron lösning
Skapandet av en server för att arbeta med ett Subject-objekt kräver mycket få ändringar av den ursprungliga synkrona koden på serversidan. Jag ersätter den gamla ObservableCollection med ett Subject-objekt som kommer att skicka varje faktura som den visas till alla lyssnande klienter. Jag deklarerar Subject-objektet som offentligt så att klienterna kan få tillgång till det:
public class InvoiceManagement{ public IObservable<Invoice> foundInvoice = new Subject<Invoice>();
I metodkroppen använder jag i stället för att lägga till en faktura till en samling, Subject:s OnNext-metod för att skicka varje faktura till klienten när den hittas:
public void FindInvoices(decimal Amount){ inv = GetInvoicesForAmount(Amount) // Poll for invoices foundInvoice.OnNext(inv); // ...repeat...}
I min klient deklarerar jag först en instans av serverklassen. I en metod som är markerad som asynkron anropar jag sedan ämnets Subscribe-metod för att ange att jag vill börja hämta meddelanden:
public class InvoiceManagementTests{ InvoiceManagement invMgmt = new InvoiceManagement(); public async void ProcessInvoices() { invMgmt.foundInvoice.Subscribe<Invoice>();
För att filtrera resultaten till endast de fakturor som jag vill ha, kan jag tillämpa ett LINQ-uttalande på Subject-objektet. Det här exemplet filtrerar fakturorna till de fakturor som är beställda i efterhand (för att använda Rx LINQ-tillägg måste du lägga till ett using-statement för namnområdet System.Reactive.Linq):
invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();
När jag väl har börjat lyssna på subjektet kan jag ange vilken bearbetning jag vill göra när jag tar emot en faktura. Jag kan till exempel använda FirstAsync för att bara behandla den första fakturan som returneras av tjänsten. I det här exemplet använder jag await-anvisningen med anropet till FirstAsync så att jag kan återlämna kontrollen till huvuddelen av mitt program medan jag behandlar fakturan. Den här koden väntar på att hämta den första fakturan, går sedan vidare till den kod jag använder för att initiera fakturahanteringsprocessen och behandlar slutligen fakturan:
Invoice inv;inv = await invMgmt.foundInvoice.FirstAsync();// ...setup code invoices...HandleInvoiceAsync(inv);
En varning: FirstAsync blockerar om servern ännu inte har producerat några resultat. Om du vill undvika blockering kan du använda FirstOrDefaultAsync, som returnerar noll om servern inte har producerat några resultat. Om det inte finns några resultat kan klienten bestämma vad som eventuellt ska göras.
Det mer typiska fallet är att klienten vill behandla alla fakturor som returneras (efter filtrering) och göra det asynkront. I det fallet kan du i stället för att använda en kombination av Subscribe och OnNext bara använda metoden ForEachAsync. Du kan överlämna en metod eller ett lambdauttryck som behandlar inkommande resultat. Om du överlämnar en metod (som inte kan vara asynkron), som jag gör här, kommer den metoden att överlämnas fakturan som utlöste ForEachAsync:
invMgmt.foundInvoice.ForEachAsync(HandleInvoice);
ForEachAsync-metoden kan också överlämnas en annulleringstoken för att låta klienten signalera att den kopplar bort. En bra metod är att överlämna tokenet när man anropar någon av Rx *Async-metoderna för att stödja att låta klienten avsluta bearbetningen utan att behöva vänta på att alla objekt ska bearbetas.
ForEachAsync bearbetar inte resultat som redan har bearbetats av en First-metod (eller FirstOrDefaultAsync-metod), så du kan använda FirstOrDefaultAsync tillsammans med ForEachAsync för att kontrollera om servern har något att bearbeta innan du bearbetar efterföljande objekt. Samma kontroll kan dock utföras enklare med ämnets IsEmpty-metod. Om klienten måste allokera de resurser som krävs för att bearbeta resultaten kan klienten med IsEmpty kontrollera om det finns något att göra innan resurserna allokeras (ett alternativ skulle vara att allokera resurserna på det första objektet som bearbetas i slingan). Att använda IsEmpty med en klient som kontrollerar om det finns några resultat innan resurserna allokeras (och bearbetningen påbörjas) samtidigt som man stöder avbokning skulle ge kod som ser ut ungefär som i figur 2.
Figur 2 Kod för att stödja avbokning och skjuta upp bearbetningen tills resultaten är färdiga
CancellationTokenSource cancelSource = new CancellationTokenSource();CancellationToken cancel;cancel = cancelSource.Token;if (!await invMgmt.foundInvoice.IsEmpty()){ // ...setup code for processing invoices... try { invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel); } catch (Exception ex) { if (ex.GetType() != typeof(CancellationToken)) { // ...report message } } // ...clean up code when all invoices are processed or client disconnects}
Avsluta
Om det enda du behöver är en enkel implementering av observatörsmönstret, så kanske ObservableCollection räcker till för att bearbeta en ström av resultat. För bättre kontroll och för en händelsebaserad applikation kan du med Subject-klassen och de tillägg som följer med Rx låta din applikation arbeta i ett asynkront läge genom att stödja en kraftfull implementering av publish/subscribe-modellen (och jag har inte tittat på det rika biblioteket av operatörer som följer med Rx). Om du arbetar med Rx är det värt att ladda ner Rx Design Guide (bit.ly/1VOPxGS), som diskuterar de bästa metoderna för att konsumera och producera observerbara strömmar.
Rx ger också ett visst stöd för att konvertera meddelandetypen som överförs mellan klienten och servern genom att använda gränssnittet ISubject<TSource, TResult>. Gränssnittet ISubject<TSource, TResult> specificerar två datatyper: en ”in”-datatyp och en ”out”-datatyp. Inom klassen Subject som implementerar detta gränssnitt kan du utföra alla nödvändiga operationer för att konvertera det resultat som returneras från servern (in-datatypen) till det resultat som klienten behöver (out-datatypen). Dessutom är in-parametern kovariant (den accepterar den angivna datatypen eller något som datatypen ärver från) och out-parametern är kontravariant (den accepterar den angivna datatypen eller något som härstammar från den), vilket ger dig ytterligare flexibilitet.
Vi lever i en alltmer asynkron värld och i den världen kommer observatörsmönstret att bli allt viktigare – det är ett användbart verktyg för alla gränssnitt mellan processer där serverprocessen returnerar mer än ett enda resultat. Lyckligtvis har du flera alternativ för att implementera observatörsmönstret i .NET Framework, inklusive ObservableCollection och Rx.
Peter Vogel är systemarkitekt och chef för PH&V Information Services. PH&V erbjuder konsulttjänster för hela stacken, från UX-design till objektmodellering och databasdesign.
Tack till följande tekniska experter från Microsoft för att de har granskat den här artikeln: Stephen Cleary, James McCaffrey och Dave Sexton
Stephen Cleary har arbetat med multitrådning och asynkron programmering i 16 år och har använt async-stödet i Microsoft .NET Framework sedan den första förhandsgranskningen. Han är författare till ”Concurrency in C# Cookbook” (O’Reilly Media, 2014). Hans hemsida, inklusive hans blogg, finns på stephencleary.com.
Diskutera den här artikeln i MSDN Magazine-forumet
.
Lämna ett svar