Schaal Asynchrone Client-Server Links with Reactive
On januari 25, 2022 by admin- 01/31/2019
- 17 minuten om te lezen
Juni 2016
Volume 31 Nummer 6
Door Peter Vogel | Juni 2016
Zoals asynchrone verwerking meer gemeengoed is geworden in applicatieontwikkeling, heeft het Microsoft .NET Framework een grote verscheidenheid aan tools gekregen die specifieke asynchrone ontwerppatronen ondersteunen. Vaak komt het maken van een goed ontworpen asynchrone toepassing neer op het herkennen van het ontwerppatroon dat uw toepassing implementeert en vervolgens het kiezen van de juiste set van .NET-componenten.
In sommige gevallen vereist de overeenkomst de integratie van verschillende .NET-componenten. Stephen Cleary’s artikel, “Patterns for Asynchronous MVVM Applications: Commands” (bit.ly/233Kocr), laat zien hoe je het Model-View-ViewModel (MVVM) patroon volledig kunt ondersteunen op een asynchrone manier. In andere gevallen is voor ondersteuning slechts één component uit het .NET Framework nodig. Ik heb het implementeren van het provider/consumers patroon met behulp van de BlockingCollection besproken in mijn VisualStudioMagazine.com Practical .NET columns, “Create Simple, Reliable Asynchronous Apps with BlockingCollection” (bit.ly/1TuOpE6), en, “Create Sophisticated Asynchronous Applications with BlockingCollection” (bit.ly/1SpYyD4).
Een ander voorbeeld is het implementeren van het observer ontwerp patroon om een langlopende operatie asynchroon te monitoren. In dit scenario werkt een asynchrone methode die een enkel taakobject retourneert niet, omdat de client vaak een stroom resultaten retourneert. Voor deze scenario’s kun je gebruik maken van ten minste twee hulpmiddelen uit het .NET Framework: de ObservableCollection en Reactive Extensions (Rx). Voor eenvoudige oplossingen is de ObservableCollection (samen met de async en await keywords) alles wat je nodig hebt. Echter, voor meer “interessante” en, vooral, event-gedreven problemen, biedt Rx je een betere controle over het proces.
Defining the Pattern
Hoewel het observer pattern vaak gebruikt wordt in UI design patterns-waaronder Model-View-Controller (MVC), Model-View-Presenter (MVP) en MVVM-UI’s moeten beschouwd worden als slechts een scenario uit een grotere set van scenario’s waar het observer pattern van toepassing is. De definitie van het observer pattern (geciteerd uit Wikipedia) is: “Een object, genaamd het subject, onderhoudt een lijst van zijn afhankelijke personen, genaamd observers, en brengt hen automatisch op de hoogte van eventuele toestandsveranderingen, meestal door een van hun methoden aan te roepen.”
Eigenlijk gaat het observer patroon over het krijgen van resultaten van langlopende processen naar de client zodra die resultaten beschikbaar zijn. Zonder een versie van het observer pattern moeten clients wachten tot het laatste resultaat beschikbaar is en dan alle resultaten in één keer naar ze toegestuurd krijgen. In een wereld die steeds asynchrooner wordt, wil je dat de observers de resultaten parallel met de client verwerken zodra de resultaten beschikbaar zijn. Om te benadrukken dat je het over meer dan UI’s hebt als je het observer patroon gebruikt, zal ik in de rest van dit artikel “client” en “server” gebruiken in plaats van “observer” en “subject”.
Problemen en kansen
Er zijn tenminste drie problemen en twee kansen met het observer patroon. Het eerste probleem is het “lapsed-listener” probleem: Veel implementaties van het observer patroon vereisen dat de server een verwijzing naar al zijn clients vasthoudt. Als gevolg daarvan kunnen clients door de server in het geheugen worden gehouden totdat de server afsluit. Dit is natuurlijk geen optimale oplossing voor een lang lopend proces in een dynamisch systeem waar clients regelmatig verbinding maken en weer afbreken.
Het vervallen-listener probleem is echter slechts een symptoom van het tweede, grotere probleem: Veel implementaties van het observer patroon vereisen dat de server en de client strak gekoppeld zijn, waardoor zowel de server als de client altijd aanwezig moeten zijn. De client zou op zijn minst in staat moeten zijn om te bepalen of de server aanwezig is en te kiezen om niet te koppelen; bovendien zou de server in staat moeten zijn om te functioneren, zelfs als er geen clients zijn die resultaten accepteren.
De derde kwestie is gerelateerd aan prestaties: Hoe lang duurt het voordat de server alle cliënten heeft geïnformeerd? De prestaties in het waarnemerspatroon worden rechtstreeks beïnvloed door het aantal cliënten dat op de hoogte moet worden gebracht. Daarom is er een mogelijkheid om de prestaties in het observer patroon te verbeteren door de client preemptief de resultaten te laten filteren die terugkomen van de server. Dit behandelt ook de scenario’s waar de server meer resultaten produceert (of een grotere variëteit aan resultaten) dan waarin de client geïnteresseerd is: De client kan aangeven dat hij alleen in specifieke gevallen op de hoogte wil worden gebracht. De tweede performance mogelijkheid bestaat uit het herkennen wanneer de server geen resultaten heeft of klaar is met het produceren van resultaten. Clients kunnen het verwerven van resources overslaan, die nodig zijn om server events te verwerken, totdat de client er zeker van is dat er iets te verwerken valt en clients kunnen die resources vrijgeven zodra ze weten dat ze het laatste resultaat hebben verwerkt.
Van Observer naar Publish/Subscribe
Het meenemen van deze overwegingen leidt van eenvoudige implementaties van het observer patroon naar het gerelateerde publish/subscribe model. Publish/subscribe implementeert het observer patroon op een losjes gekoppelde manier die servers en clients laat uitvoeren, zelfs als de ander op dat moment niet beschikbaar is. Publish/subscribe implementeert ook typisch client-side filtering door de client te laten abonneren op specifieke onderwerpen/kanalen (“Notify me about purchase orders”) of op attributen geassocieerd met verschillende soorten inhoud (“Notify me about any urgent requests”).
Eén probleem blijft echter bestaan. Alle implementaties van het observer pattern hebben de neiging om clients en servers strak te koppelen aan een specifiek berichtformaat. Het veranderen van het formaat van een bericht in de meeste publish/subscribe implementaties kan moeilijk zijn, omdat alle clients moeten worden bijgewerkt om het nieuwe formaat te gebruiken.
In veel opzichten is dit vergelijkbaar met de beschrijving van een server-side cursor in een database. Om de transmissiekosten te minimaliseren, geeft de databaseserver geen resultaten terug als elke rij wordt opgehaald. Echter, voor grote rijenreeksen retourneert de database ook niet alle rijen in een enkele batch aan het eind. In plaats daarvan retourneert de database server meestal subsets van een cursor op de server, vaak wanneer die subsets beschikbaar worden. Met een database hoeven de client en de server niet gelijktijdig aanwezig te zijn: De databaseserver kan draaien wanneer er geen cliënten aanwezig zijn; een cliënt kan controleren of de server toegankelijk is en, zo niet, beslissen wat (indien iets) hij verder kan doen. Het filterproces (SQL) is ook zeer flexibel. Echter, als de database engine het formaat verandert dat het gebruikt om rijen terug te geven, dan moeten alle clients, op zijn minst, opnieuw worden gecompileerd.
Processing a Cache of Objects
Als mijn case study voor het kijken naar een eenvoudige observer pattern implementatie, gebruik ik als mijn server een klasse die een in-memory cache van facturen doorzoekt. Die server zou, aan het eind van zijn verwerking, een verzameling van alle facturen kunnen teruggeven. Ik geef er echter de voorkeur aan dat de client de facturen afzonderlijk verwerkt, parallel aan het zoekproces van de server. Dat betekent dat ik de voorkeur geef aan een versie van het proces, waarbij elke factuur wordt teruggegeven als hij is gevonden en de client elke factuur parallel aan het zoeken naar de volgende factuur kan verwerken.
Een eenvoudige implementatie van de server zou er als volgt uit kunnen zien:
private List<Invoice> foundInvoices = new List<Invoice>();public List<Invoice> FindInvoices(decimal Amount){ foundInvoices.Clear(); Invoice inv; // ...search logic to add invoices to the collection foundInvoices.Add(inv); // ...repeat until all invoices found return foundInvoices;}
Meer geavanceerde oplossingen zouden yield return kunnen gebruiken om elke factuur terug te geven als hij is gevonden, in plaats van de lijst samen te stellen. Hoe dan ook, een client die de FindInvoices methode aanroept zal een aantal kritische activiteiten willen uitvoeren voor en na de verwerking. Bijvoorbeeld, zodra het eerste item is gevonden, wil de client misschien een MatchingInvoices-lijst inschakelen om de facturen bij de client te houden of middelen verwerven/initialiseren die nodig zijn om een factuur te verwerken. Als extra facturen worden toegevoegd, zou de client elke factuur moeten verwerken en, wanneer de server signaleert dat de laatste factuur is opgehaald, resources vrijgeven die niet langer nodig zijn omdat er “geen” facturen meer zijn om te verwerken.
Tijdens het ophalen van een database, bijvoorbeeld, zal een read blokkeren totdat de eerste rij is geretourneerd. Zodra de eerste rij is geretourneerd, initialiseert de client alle bronnen die nodig zijn om een rij te verwerken. De read retourneert ook false wanneer de laatste rij is opgehaald, zodat de client deze bronnen kan vrijgeven omdat er geen rijen meer te verwerken zijn.
Eenvoudige oplossingen maken met ObservableCollection
De meest voor de hand liggende keuze voor het implementeren van het observer pattern in het .NET Framework is de ObservableCollection. De ObservableCollection informeert de client (via een event) wanneer deze is gewijzigd.
Herschrijven van mijn voorbeeldserver om de ObservableCollection class te gebruiken vereist slechts twee veranderingen. Ten eerste moet de collectie met de resultaten worden gedefinieerd als een ObservableCollection en openbaar worden gemaakt. Ten tweede, het is niet langer nodig dat de methode een resultaat teruggeeft: De server hoeft alleen maar facturen aan de verzameling toe te voegen.
De nieuwe implementatie van de server zou er als volgt uit kunnen zien:
public List<Invoice> FindInvoices(decimal Amount){ public ObservableCollection<Invoice> foundInvoices = new ObservableCollection<Invoice>(); public void FindInvoices(decimal Amount) { foundInvoices.Clear(); Invoice inv; // ...search logic to set inv foundInvoices.Add(inv); // ...repeat until all invoices are added to the collection }
Een client die deze versie van de server gebruikt, hoeft alleen maar een event handler aan te sluiten op de CollectionChanged event van de InvoiceManagement’s foundInvoices verzameling. In de volgende code heb ik de klasse de interface IDisposable laten implementeren om de verbinding met het event te kunnen verbreken:
public class SearchInvoices: IDisposable{ InvoiceManagement invMgmt = new InvoiceManagement(); public void SearchInvoices() { invMgmt.foundInvoices.CollectionChanged += InvoicesFound; } public void Dispose() { invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged; }
In de client wordt aan het CollectionChanged event een NotifyCollectionChangedEventArgs object als tweede parameter doorgegeven. De Action eigenschap van dat object specificeert zowel welke wijziging werd uitgevoerd op de collectie (de acties zijn: de collectie werd gewist, nieuwe items werden toegevoegd aan de collectie, bestaande items werden verplaatst / vervangen / verwijderd) en informatie over de gewijzigde items (een verzameling van alle toegevoegde items, een verzameling van items aanwezig in de collectie voorafgaand aan de nieuwe items werden toegevoegd, de positie van het item dat werd verplaatst / verwijderd / vervangen).
Eenvoudige code in de client die elke factuur asynchroon verwerkt als deze aan de verzameling in de server wordt toegevoegd, zou eruitzien als de code in figuur 1.
Figuur 1 Asynchrone verwerking van facturen met behulp van ObservableCollection
private async void InvoicesFound(object sender, NotifyCollectionChangedEventArgs e){ switch (e.Action) { case NotifyCollectionChangedAction.Reset: { // ...initial item processing return; } case NotifyCollectionChangedAction.Add: { foreach (Invoice inv in e.NewItems) { await HandleInvoiceAsync(inv); } return; } }}
Hoewel deze code eenvoudig is, kan deze ontoereikend zijn voor uw behoeften, vooral als u een lang lopend proces behandelt of in een dynamische omgeving werkt. Vanuit het oogpunt van een asynchroon ontwerp zou de code bijvoorbeeld het door HandleInvoiceAsync geretourneerde Task-object kunnen vastleggen, zodat de client de asynchrone taken zou kunnen beheren. Je zult er ook voor willen zorgen dat de CollectionChanged gebeurtenis wordt opgewekt op de UI thread, zelfs als FindInvoices op een achtergrond thread draait.
Omdat de Clear methode wordt aangeroepen in de server klasse (net voor het zoeken naar de eerste Factuur) kan de Reset waarde van de Action eigenschap worden gebruikt als een signaal dat het eerste item op het punt staat te worden opgewekt. Maar het kan natuurlijk zijn dat er geen facturen worden gevonden tijdens het zoeken, dus het gebruik van de Reset Action zou ertoe kunnen leiden dat de client resources toewijst die nooit echt worden gebruikt. Om daadwerkelijk “eerste item” verwerking af te handelen, zou je een vlag moeten toevoegen aan de Add Action verwerking om alleen uit te voeren wanneer het eerste item is gevonden.
Daarnaast heeft de server een beperkt aantal opties om aan te geven dat de laatste Factuur is gevonden, zodat de client kan stoppen met wachten op “de volgende.” De server zou, vermoedelijk, de collectie kunnen wissen na het vinden van het laatste item, maar dat zou alleen maar meer complexe verwerking in de Reset Action verwerking forceren (heb ik Facturen verwerkt? Zo ja, dan heb ik de laatste factuur verwerkt; zo nee, dan sta ik op het punt om de eerste factuur te verwerken).
Terwijl, voor eenvoudige problemen, ObservableCollection prima zal zijn, elke redelijk geavanceerde implementatie op basis van ObservableCollection (en elke toepassing die efficiëntie waardeert) gaat wat ingewikkelde code vereisen, vooral in de client.
De Rx Oplossingen
Als je asynchrone verwerking wilt, dan kan Rx (beschikbaar via NuGet) een betere oplossing bieden voor de implementatie van het observer pattern door te lenen van het publish/subscribe model. Deze oplossing biedt ook een LINQ-gebaseerd filtering model, betere signalering voor first/last item condities en betere foutafhandeling.
Rx kan ook interessantere observer implementaties aan dan mogelijk zijn met een ObservableCollection. In mijn casestudy zou mijn server, na het terugzenden van de oorspronkelijke lijst met facturen, kunnen blijven controleren op nieuwe facturen die aan de cache worden toegevoegd nadat de oorspronkelijke zoekopdracht is voltooid (en die aan de zoekcriteria voldoen, natuurlijk). Wanneer er een factuur verschijnt die aan de criteria voldoet, wil de klant daarvan op de hoogte worden gebracht zodat de nieuwe factuur aan de lijst kan worden toegevoegd. Rx ondersteunt dit soort event-gebaseerde uitbreidingen van het observer patroon beter dan ObservableCollection.
Er zijn twee belangrijke interfaces in Rx voor de ondersteuning van het observer patroon. De eerste is IObservable<T>, geïmplementeerd door de server en specificeert een enkele methode: Subscribe. De server die de Subscribe methode implementeert zal een verwijzing naar een object van een client doorgegeven krijgen. Om het probleem van de verlopen luisteraar op te lossen, retourneert de Subscribe methode een verwijzing naar de client voor een object dat de IDisposable interface implementeert. De client kan dat object gebruiken om de verbinding met de server te verbreken. Wanneer de client de verbinding verbreekt, wordt van de server verwacht dat hij de client uit zijn interne lijsten verwijdert.
De tweede is de IObserver<T> interface, die door de client moet worden geïmplementeerd. Die interface vereist dat de client drie methoden implementeert en aan de server blootstelt: OnNext, OnCompleted en OnError. De kritieke methode hier is OnNext, die door de server wordt gebruikt om een bericht aan de client door te geven (in mijn studie zou dat bericht nieuwe Factuur-objecten zijn die zullen worden teruggegeven zodra ze verschijnen). De server kan de OnCompleted methode van de client gebruiken om aan te geven dat er geen gegevens meer zijn. De derde methode, OnError, biedt een manier voor de server om de client te signaleren dat er een exception is opgetreden.
Je bent natuurlijk welkom om de IObserver interface zelf te implementeren (het is onderdeel van het .NET Framework). Samen met de ObservableCollection is dat misschien alles wat je nodig hebt als je een synchrone oplossing maakt (ik heb daar ook een column over geschreven, “Writing Cleaner Code with Reactive Extensions” ).
De Rx bevat echter diverse packages die asynchrone implementaties van deze interfaces bieden, waaronder implementaties voor JavaScript en RESTful services. De Rx Subject klasse biedt een implementatie van IObservable die het implementeren van een asynchrone publish/subscribe versie van het observer patroon vereenvoudigt.
Creëren van een Asynchrone Oplossing
Het maken van een server om met een Subject object te werken vereist zeer weinig veranderingen in de originele synchrone server-side code. Ik vervang de oude ObservableCollection door een Subject object dat elke Factuur doorgeeft zoals het verschijnt aan elke luisterende client. Ik verklaar het Subject object als publiek zodat clients er toegang toe hebben:
public class InvoiceManagement{ public IObservable<Invoice> foundInvoice = new Subject<Invoice>();
In de body van de methode, in plaats van een factuur aan een verzameling toe te voegen, gebruik ik de OnNext methode van het Subject om elke factuur aan de client door te geven als deze wordt gevonden:
public void FindInvoices(decimal Amount){ inv = GetInvoicesForAmount(Amount) // Poll for invoices foundInvoice.OnNext(inv); // ...repeat...}
In mijn client, verklaar ik eerst een instantie van de server klasse. Dan, in een methode gemarkeerd als async, roep ik de Subject’s Subscribe methode aan om aan te geven dat ik wil beginnen met het ophalen van berichten:
public class InvoiceManagementTests{ InvoiceManagement invMgmt = new InvoiceManagement(); public async void ProcessInvoices() { invMgmt.foundInvoice.Subscribe<Invoice>();
Om de resultaten te filteren op alleen de facturen die ik wil, kan ik een LINQ statement toepassen op het Subject object. Dit voorbeeld filtert de facturen op de facturen die zijn nabesteld (om Rx LINQ extensies te gebruiken moet je een using statement toevoegen voor de System.Reactive.Linq namespace):
invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();
Als ik eenmaal ben begonnen met luisteren naar het subject, kan ik specificeren welke verwerking ik wil doen als ik een factuur ontvang. Ik kan bijvoorbeeld FirstAsync gebruiken om alleen de eerste factuur te verwerken die door de service wordt geretourneerd. In dit voorbeeld gebruik ik het await statement met de oproep aan FirstAsync, zodat ik de controle kan teruggeven aan het hoofdgedeelte van mijn applicatie terwijl ik de factuur verwerk. Deze code wacht om die eerste factuur op te halen, gaat dan verder met de code die ik gebruik om het factuurverwerkingsproces te initialiseren en verwerkt ten slotte de factuur:
Invoice inv;inv = await invMgmt.foundInvoice.FirstAsync();// ...setup code invoices...HandleInvoiceAsync(inv);
Eén waarschuwing: FirstAsync blokkeert als de server nog geen resultaten heeft opgeleverd. Als je blokkeren wilt vermijden, kun je FirstOrDefaultAsync gebruiken, die null teruggeeft als de server nog geen resultaten heeft geproduceerd. Als er geen resultaten zijn, kan de client beslissen wat hij moet doen.
Het meer typische geval is dat de client alle teruggestuurde facturen wil verwerken (na filtering) en dat hij dat asynchroon wil doen. In dat geval kunt u in plaats van een combinatie van Subscribe en OnNext gewoon de ForEachAsync-methode gebruiken. Je kunt een methode of een lambda-expressie doorgeven die de binnenkomende resultaten verwerkt. Als je een methode doorgeeft (die niet asynchroon kan zijn), zoals ik hier doe, wordt aan die methode de factuur doorgegeven die ForEachAsync heeft getriggerd:
invMgmt.foundInvoice.ForEachAsync(HandleInvoice);
Aan de ForEachAsync methode kan ook een cancellation token worden doorgegeven om de client te laten weten dat hij de verbinding verbreekt. Een goede gewoonte zou zijn om het token door te geven bij het aanroepen van een van de Rx *Async methoden om de client de verwerking te laten beëindigen zonder te hoeven wachten tot alle objecten zijn verwerkt.
De ForEachAsync zal geen resultaat verwerken dat al is verwerkt door een First (of FirstOrDefaultAsync) methode, zodat u FirstOrDefaultAsync met ForEachAsync kunt gebruiken om te controleren of de server nog iets te verwerken heeft voordat hij volgende objecten verwerkt. Echter, de Subject’s IsEmpty methode zal dezelfde controle eenvoudiger uitvoeren. Indien de client middelen moet toewijzen die nodig zijn voor het verwerken van resultaten, kan de client met IsEmpty controleren of er iets te doen is alvorens deze middelen toe te wijzen (een alternatief zou zijn om deze middelen toe te wijzen op het eerste item dat in de lus wordt verwerkt). Het gebruik van IsEmpty met een client die controleert of er resultaten zijn voordat hij resources toewijst (en de verwerking start) en tegelijkertijd annulering ondersteunt, zou code opleveren die er ongeveer zo uitziet als Figuur 2.
Figuur 2 Code om annulering te ondersteunen en verwerking uit te stellen tot resultaten klaar zijn
CancellationTokenSource cancelSource = new CancellationTokenSource();CancellationToken cancel;cancel = cancelSource.Token;if (!await invMgmt.foundInvoice.IsEmpty()){ // ...setup code for processing invoices... try { invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel); } catch (Exception ex) { if (ex.GetType() != typeof(CancellationToken)) { // ...report message } } // ...clean up code when all invoices are processed or client disconnects}
Wrapping Up
Als je alleen een eenvoudige implementatie van het observer-patroon nodig hebt, dan is ObservableCollection misschien alles wat je nodig hebt om een stroom resultaten te verwerken. Voor een betere controle en voor een event-gebaseerde toepassing, zullen de Subject klasse en de uitbreidingen die met Rx worden geleverd je toepassing in een asynchrone modus laten werken door een krachtige implementatie van het publish/subscribe model te ondersteunen (en dan heb ik nog niet gekeken naar de rijke bibliotheek van operatoren die met Rx worden geleverd). Als je met Rx werkt, is het de moeite waard om de Rx Design Guide (bit.ly/1VOPxGS) te downloaden, die de best practices bespreekt voor het consumeren en produceren van observeerbare streams.
Rx biedt ook enige ondersteuning voor het converteren van het berichttype dat tussen de client en de server wordt doorgegeven door gebruik te maken van de ISubject<TSource, TResult> interface. De ISubject<TSource, TResult> interface specificeert twee datatypes: een “in” datatype en een “out” datatype. Binnen de Subject klasse die deze interface implementeert kunnen alle nodige operaties uitgevoerd worden om het resultaat dat terugkomt van de server (het “in” datatype) om te zetten in het resultaat dat vereist wordt door de client (het “out” datatype). Bovendien is de in parameter covariant (het accepteert het gespecificeerde datatype of alles waarvan het datatype erft) en de out parameter is contravariant (het accepteert het gespecificeerde datatype of alles dat er van is afgeleid), wat je extra flexibiliteit geeft.
We leven in een steeds asynchroonere wereld en in die wereld zal het observer patroon steeds belangrijker worden-het is een nuttig gereedschap voor iedere interface tussen processen waar het server proces meer dan een enkel resultaat retourneert. Gelukkig heb je verschillende opties om het observer pattern te implementeren in het .NET Framework, waaronder de ObservableCollection en Rx.
Peter Vogel is een systeem architect en principal bij PH&V Information Services. PH&V biedt full-stack consulting van UX design tot object modeling en database design.
Dank aan de volgende technische experts van Microsoft voor het beoordelen van dit artikel: Stephen Cleary, James McCaffrey en Dave Sexton
Stephen Cleary werkt al 16 jaar met multithreading en asynchroon programmeren en heeft async ondersteuning in het Microsoft .NET Framework gebruikt sinds de eerste community technology preview. Hij is de auteur van “Concurrency in C# Cookbook” (O’Reilly Media, 2014). Zijn homepage, inclusief zijn blog, is te vinden op stephencleary.com.
Discussieer dit artikel in het MSDN Magazine forum
Geef een antwoord