Skalieren asynchroner Client-.Server-Verknüpfungen mit Reactive
On Januar 25, 2022 by admin- 31.01.2019
- 17 Minuten zu lesen
Juni 2016
Band 31 Nummer 6
Von Peter Vogel | Juni 2016
Da die asynchrone Verarbeitung in der Anwendungsentwicklung immer verbreiteter wird, hat das Microsoft .NET Framework eine Vielzahl von Werkzeugen, die bestimmte asynchrone Entwurfsmuster unterstützen. Oftmals kommt es bei der Erstellung einer gut konzipierten asynchronen Anwendung darauf an, das Entwurfsmuster zu erkennen, das Ihre Anwendung implementiert, und dann den richtigen Satz von .NET-Komponenten auszuwählen.
In einigen Fällen erfordert die Übereinstimmung die Integration mehrerer .NET-Komponenten. Der Artikel von Stephen Cleary, „Patterns for Asynchronous MVVM Applications: Commands“ (bit.ly/233Kocr), zeigt, wie man das Model-View-ViewModel (MVVM)-Muster vollständig asynchron unterstützt. In anderen Fällen erfordert die Unterstützung nur eine Komponente aus dem .NET Framework. In meinen VisualStudioMagazine.com Practical .NET-Kolumnen „Create Simple, Reliable Asynchronous Apps with BlockingCollection“ (bit.ly/1TuOpE6) und „Create Sophisticated Asynchronous Applications with BlockingCollection“ (bit.ly/1SpYyD4) habe ich die Implementierung des Provider/Consumer-Patterns unter Verwendung von BlockingCollection erörtert.
Ein weiteres Beispiel ist die Implementierung des Observer-Design-Patterns zur asynchronen Überwachung einer lang laufenden Operation. In diesem Szenario funktioniert eine asynchrone Methode, die ein einzelnes Task-Objekt zurückgibt, nicht, da der Client häufig einen Strom von Ergebnissen zurückgibt. Für diese Szenarien können Sie mindestens zwei Tools aus dem .NET Framework nutzen: ObservableCollection und Reactive Extensions (Rx). Für einfache Lösungen reicht die ObservableCollection (zusammen mit den Schlüsselwörtern async und await) aus. Für „interessantere“ und vor allem ereignisgesteuerte Probleme bietet Rx jedoch eine bessere Kontrolle über den Prozess.
Definieren des Musters
Während das Beobachtermuster häufig in UI-Entwurfsmustern – einschließlich Model-View-Controller (MVC), Model-View-Presenter (MVP) und MVVM – verwendet wird, sollten UIs nur als ein Szenario aus einer größeren Menge von Szenarien betrachtet werden, in denen das Beobachtermuster Anwendung findet. Die Definition des Beobachtermusters (Zitat aus Wikipedia) lautet: „Ein Objekt, Subjekt genannt, führt eine Liste seiner Abhängigen, Beobachter genannt, und benachrichtigt diese automatisch über Zustandsänderungen, in der Regel durch den Aufruf einer ihrer Methoden.“
Im Grunde geht es beim Beobachter-Muster darum, Ergebnisse von lang laufenden Prozessen an den Client zu übermitteln, sobald diese Ergebnisse verfügbar sind. Ohne eine Version des Beobachtermusters müssen Clients warten, bis das letzte Ergebnis verfügbar ist, und sich dann alle Ergebnisse in einem Stück zusenden lassen. In einer zunehmend asynchronen Welt wollen Sie, dass die Beobachter die Ergebnisse parallel zum Client verarbeiten, sobald diese verfügbar sind. Um zu verdeutlichen, dass es bei der Nutzung des Beobachtermusters um mehr als nur um UIs geht, werde ich im weiteren Verlauf dieses Artikels „Client“ und „Server“ anstelle von „Beobachter“ und „Subjekt“ verwenden.
Probleme und Möglichkeiten
Es gibt mindestens drei Probleme und zwei Möglichkeiten mit dem Beobachtermuster. Das erste Problem ist das „Lapsed-Listener“-Problem: Viele Implementierungen des Beobachtermusters erfordern, dass der Server einen Verweis auf alle seine Clients hält. Infolgedessen können Clients vom Server im Speicher gehalten werden, bis der Server beendet wird. Dies ist natürlich keine optimale Lösung für einen lang laufenden Prozess in einem dynamischen System, in dem sich Clients häufig verbinden und trennen.
Das Lapsed-Listener-Problem ist jedoch nur ein Symptom des zweiten, größeren Problems: Viele Implementierungen des Observer-Patterns erfordern eine enge Kopplung von Server und Client, so dass sowohl der Server als auch der Client jederzeit präsent sein müssen. Zumindest sollte der Client in der Lage sein, festzustellen, ob der Server anwesend ist und sich dafür entscheiden, keine Verbindung herzustellen; außerdem sollte der Server in der Lage sein, auch dann zu funktionieren, wenn es keine Clients gibt, die Ergebnisse akzeptieren.
Das dritte Problem ist leistungsbezogen: Wie lange wird es dauern, bis der Server alle Clients benachrichtigt hat? Die Leistung im Beobachtermuster wird direkt von der Anzahl der zu benachrichtigenden Clients beeinflusst. Daher besteht die Möglichkeit, die Leistung im Beobachtermuster zu verbessern, indem der Client die vom Server zurückkommenden Ergebnisse im Voraus filtern kann. Dies gilt auch für den Fall, dass der Server mehr Ergebnisse (oder eine größere Vielfalt an Ergebnissen) liefert, als für den Client von Interesse sind: Der Client kann angeben, dass er nur in bestimmten Fällen benachrichtigt werden soll. Die zweite Leistungsmöglichkeit besteht darin, zu erkennen, wann der Server keine Ergebnisse hat oder die Produktion von Ergebnissen abgeschlossen ist. Clients können die Beschaffung von Ressourcen, die für die Verarbeitung von Serverereignissen erforderlich sind, überspringen, bis der Client sicher ist, dass es etwas zu verarbeiten gibt, und Clients können diese Ressourcen freigeben, sobald sie wissen, dass sie das letzte Ergebnis verarbeitet haben.
Vom Observer zum Publish/Subscribe
Die Berücksichtigung dieser Überlegungen führt von einfachen Implementierungen des Observer-Musters zum verwandten Publish/Subscribe-Modell. Publish/Subscribe implementiert das Observer-Muster in einer lose gekoppelten Art und Weise, die es Servern und Clients ermöglicht, auch dann auszuführen, wenn der jeweils andere gerade nicht verfügbar ist. Publish/Subscribe implementiert typischerweise auch eine clientseitige Filterung, indem der Client entweder bestimmte Themen/Kanäle („Benachrichtige mich über Bestellungen“) oder Attribute, die mit verschiedenen Arten von Inhalten verbunden sind („Benachrichtige mich über dringende Anfragen“), abonnieren kann.
Ein Problem bleibt jedoch bestehen. Alle Implementierungen des Beobachtermusters neigen dazu, Clients und Server eng an ein bestimmtes Nachrichtenformat zu koppeln. Die Änderung des Formats einer Nachricht in den meisten Publish/Subscribe-Implementierungen kann schwierig sein, da alle Clients aktualisiert werden müssen, um das neue Format zu verwenden.
In vielerlei Hinsicht ähnelt dies der Beschreibung eines serverseitigen Cursors in einer Datenbank. Um die Übertragungskosten zu minimieren, gibt der Datenbankserver die Ergebnisse nicht für jede abgerufene Zeile zurück. Bei großen Zeilensätzen gibt die Datenbank jedoch auch nicht alle Zeilen am Ende in einem einzigen Stapel zurück. Stattdessen gibt der Datenbankserver in der Regel Teilmengen aus einem auf dem Server gehaltenen Cursor zurück, sobald diese Teilmengen verfügbar werden. Bei einer Datenbank müssen der Client und der Server nicht gleichzeitig anwesend sein: Der Datenbankserver kann laufen, wenn keine Clients anwesend sind; ein Client kann prüfen, ob der Server erreichbar ist, und, falls nicht, entscheiden, was er sonst noch tun kann (wenn überhaupt). Der Filterprozess (SQL) ist ebenfalls sehr flexibel. Wenn die Datenbank-Engine jedoch das Format ändert, das sie für die Rückgabe von Zeilen verwendet, müssen alle Clients zumindest neu kompiliert werden.
Verarbeitung eines Objekt-Caches
Als Fallstudie für die Betrachtung einer einfachen Beobachtermuster-Implementierung verwende ich als Server eine Klasse, die einen In-Memory-Cache von Rechnungen durchsucht. Dieser Server könnte am Ende seiner Verarbeitung eine Sammlung aller Rechnungen zurückgeben. Ich würde es jedoch vorziehen, dass der Client die Rechnungen einzeln und parallel zum Suchprozess des Servers verarbeitet. Das bedeutet, dass ich eine Version des Prozesses bevorzuge, die jede Rechnung zurückgibt, wenn sie gefunden wird, und den Client jede Rechnung parallel zur Suche nach der nächsten Rechnung verarbeiten lässt.
Eine einfache Implementierung des Servers könnte wie folgt aussehen:
private List<Invoice> foundInvoices = new List<Invoice>();public List<Invoice> FindInvoices(decimal Amount){ foundInvoices.Clear(); Invoice inv; // ...search logic to add invoices to the collection foundInvoices.Add(inv); // ...repeat until all invoices found return foundInvoices;}
Aufwändigere Lösungen könnten yield return verwenden, um jede Rechnung zurückzugeben, wenn sie gefunden wird, anstatt die Liste zusammenzustellen. Unabhängig davon wird ein Client, der die FindInvoices-Methode aufruft, einige kritische Aktivitäten vor und nach der Verarbeitung durchführen wollen. Sobald das erste Element gefunden wurde, möchte der Client beispielsweise eine MatchingInvoices-Liste aktivieren, um die Rechnungen beim Client zu speichern, oder alle für die Verarbeitung einer Rechnung erforderlichen Ressourcen erwerben/initialisieren. Wenn weitere Rechnungen hinzugefügt werden, müsste der Client jede Rechnung verarbeiten und, wenn der Server signalisiert, dass die letzte Rechnung abgerufen wurde, alle Ressourcen freigeben, die nicht mehr benötigt werden, weil es „keine“ Rechnungen mehr zu verarbeiten gibt.
Bei einem Datenbankabruf beispielsweise wird ein Lesevorgang blockiert, bis die erste Zeile zurückgegeben wird. Sobald die erste Zeile zurückgegeben wird, initialisiert der Client alle Ressourcen, die zur Verarbeitung einer Zeile erforderlich sind. Das Lesen gibt auch false zurück, wenn die letzte Zeile abgerufen wird, so dass der Client diese Ressourcen freigeben kann, da keine weiteren Zeilen zu verarbeiten sind.
Erstellen einfacher Lösungen mit ObservableCollection
Die offensichtlichste Wahl für die Implementierung des Beobachtermusters im .NET Framework ist die ObservableCollection. Die ObservableCollection benachrichtigt den Client (über ein Ereignis), wenn sie geändert wird.
Das Umschreiben meines Beispielservers zur Verwendung der Klasse ObservableCollection erfordert nur zwei Änderungen. Erstens muss die Sammlung, die die Ergebnisse enthält, als ObservableCollection definiert und öffentlich gemacht werden. Zweitens ist es nicht mehr notwendig, dass die Methode ein Ergebnis zurückgibt: Der Server muss der Sammlung nur noch Rechnungen hinzufügen.
Die neue Implementierung des Servers könnte wie folgt aussehen:
public List<Invoice> FindInvoices(decimal Amount){ public ObservableCollection<Invoice> foundInvoices = new ObservableCollection<Invoice>(); public void FindInvoices(decimal Amount) { foundInvoices.Clear(); Invoice inv; // ...search logic to set inv foundInvoices.Add(inv); // ...repeat until all invoices are added to the collection }
Ein Client, der diese Version des Servers verwendet, muss lediglich einen Ereignishandler für das CollectionChanged-Ereignis der foundInvoices-Sammlung von InvoiceManagement einrichten. Im folgenden Code habe ich die Klasse die IDisposable-Schnittstelle implementieren lassen, um das Trennen der Verbindung vom Ereignis zu unterstützen:
public class SearchInvoices: IDisposable{ InvoiceManagement invMgmt = new InvoiceManagement(); public void SearchInvoices() { invMgmt.foundInvoices.CollectionChanged += InvoicesFound; } public void Dispose() { invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged; }
Im Client wird dem Ereignis CollectionChanged ein NotifyCollectionChangedEventArgs-Objekt als zweiter Parameter übergeben. Die Action-Eigenschaft dieses Objekts gibt sowohl an, welche Änderung an der Sammlung vorgenommen wurde (die Aktionen sind: die Sammlung wurde geleert, neue Elemente wurden der Sammlung hinzugefügt, vorhandene Elemente wurden verschoben/ersetzt/entfernt) als auch Informationen über die geänderten Elemente (eine Sammlung aller hinzugefügten Elemente, eine Sammlung der Elemente, die vor dem Hinzufügen der neuen Elemente in der Sammlung vorhanden waren, die Position des Elements, das verschoben/entfernt/ersetzt wurde).
Ein einfacher Code im Client, der jede Rechnung asynchron verarbeiten würde, sobald sie der Sammlung im Server hinzugefügt wird, würde wie der Code in Abbildung 1 aussehen.
Abbildung 1 Asynchrone Verarbeitung von Rechnungen mit ObservableCollection
private async void InvoicesFound(object sender, NotifyCollectionChangedEventArgs e){ switch (e.Action) { case NotifyCollectionChangedAction.Reset: { // ...initial item processing return; } case NotifyCollectionChangedAction.Add: { foreach (Invoice inv in e.NewItems) { await HandleInvoiceAsync(inv); } return; } }}
Dieser Code ist zwar einfach, aber für Ihre Bedürfnisse möglicherweise unzureichend, vor allem, wenn Sie einen lang laufenden Prozess verarbeiten oder in einer dynamischen Umgebung arbeiten. Aus der Sicht eines asynchronen Designs könnte der Code beispielsweise das von HandleInvoiceAsync zurückgegebene Task-Objekt erfassen, damit der Client die asynchronen Tasks verwalten kann. Sie sollten auch sicherstellen, dass das CollectionChanged-Ereignis auf dem UI-Thread ausgelöst wird, selbst wenn FindInvoices auf einem Hintergrund-Thread ausgeführt wird.
Da die Clear-Methode in der Serverklasse aufgerufen wird (kurz vor der Suche nach der ersten Rechnung), kann der Reset-Wert der Action-Eigenschaft als Signal dafür verwendet werden, dass das erste Element in Kürze abgerufen wird. Natürlich kann es vorkommen, dass bei der Suche keine Rechnungen gefunden werden, so dass die Verwendung der Aktion „Reset“ dazu führen kann, dass der Client Ressourcen zuweist, die nie tatsächlich verwendet werden. Um die Verarbeitung des „ersten Postens“ tatsächlich zu handhaben, müßte man der Add Action-Verarbeitung ein Flag hinzufügen, das nur dann ausgeführt wird, wenn der erste Posten gefunden wurde.
Darüber hinaus hat der Server eine begrenzte Anzahl von Optionen, um anzuzeigen, daß die letzte Rechnung gefunden wurde, so daß der Client nicht mehr auf „die nächste“ warten muß. Der Server könnte vermutlich die Sammlung leeren, nachdem er das letzte Element gefunden hat, aber das würde nur eine komplexere Verarbeitung in die Verarbeitung der Reset Action erzwingen (habe ich Rechnungen verarbeitet? Wenn ja, dann habe ich die letzte Rechnung verarbeitet; wenn nein, dann bin ich dabei, die erste Rechnung zu verarbeiten).
Während für einfache Probleme ObservableCollection in Ordnung ist, wird jede einigermaßen anspruchsvolle Implementierung, die auf ObservableCollection basiert (und jede Anwendung, die Wert auf Effizienz legt), etwas komplizierten Code erfordern, insbesondere im Client.
Die Rx-Lösungen
Wenn Sie eine asynchrone Verarbeitung wünschen, kann Rx (verfügbar über NuGet) eine bessere Lösung für die Implementierung des Observer-Musters bieten, indem es sich das Publish/Subscribe-Modell zu eigen macht. Diese Lösung bietet auch ein LINQ-basiertes Filtermodell, eine bessere Signalisierung der Bedingungen für das erste/letzte Element und eine bessere Fehlerbehandlung.
Rx kann auch interessantere Beobachterimplementierungen verarbeiten, als dies mit einer ObservableCollection möglich ist. In meinem Fallbeispiel könnte mein Server, nachdem er die ursprüngliche Liste der Rechnungen zurückgegeben hat, weiterhin nach neuen Rechnungen suchen, die dem Cache hinzugefügt werden, nachdem die ursprüngliche Suche abgeschlossen ist (und die den Suchkriterien entsprechen, natürlich). Wenn eine Rechnung, die den Kriterien entspricht, auftaucht, möchte der Kunde über das Ereignis benachrichtigt werden, damit die neue Rechnung der Liste hinzugefügt werden kann. Rx unterstützt diese Art von ereignisbasierten Erweiterungen des Beobachtermusters besser als ObservableCollection.
Es gibt zwei wichtige Schnittstellen in Rx zur Unterstützung des Beobachtermusters. Die erste ist IObservable<T>, die vom Server implementiert wird und eine einzige Methode angibt: Subscribe. Dem Server, der die Subscribe-Methode implementiert, wird ein Verweis auf ein Objekt von einem Client übergeben. Um das Problem des abgelaufenen Hörers zu lösen, gibt die Subscribe-Methode dem Client einen Verweis auf ein Objekt zurück, das die Schnittstelle IDisposable implementiert. Der Client kann dieses Objekt verwenden, um die Verbindung mit dem Server zu trennen. Wenn der Client die Verbindung trennt, wird vom Server erwartet, dass er den Client aus allen seinen internen Listen entfernt.
Das zweite ist die Schnittstelle IObserver<T>, die vom Client implementiert werden muss. Diese Schnittstelle verlangt vom Client, dass er drei Methoden implementiert und dem Server zur Verfügung stellt: OnNext, OnCompleted und OnError. Die entscheidende Methode ist hier OnNext, mit der der Server eine Nachricht an den Client weitergibt (in meinem Beispiel wären das neue Rechnungsobjekte, die bei jedem Erscheinen zurückgegeben werden). Der Server kann die Methode OnCompleted des Clients verwenden, um zu signalisieren, dass keine weiteren Daten vorhanden sind. Die dritte Methode, OnError, bietet dem Server die Möglichkeit, dem Client zu signalisieren, dass eine Ausnahme aufgetreten ist.
Sie können die IObserver-Schnittstelle natürlich auch selbst implementieren (sie ist Teil des .NET Framework). Zusammen mit der ObservableCollection kann das alles sein, was Sie brauchen, wenn Sie eine synchrone Lösung erstellen (ich habe auch darüber eine Kolumne geschrieben, „Writing Cleaner Code with Reactive Extensions“).
Das Rx enthält jedoch mehrere Pakete, die asynchrone Implementierungen dieser Schnittstellen bereitstellen, einschließlich Implementierungen für JavaScript und RESTful Services. Die Rx-Subject-Klasse bietet eine Implementierung von IObservable, die die Implementierung einer asynchronen Publish/Subscribe-Version des Observer-Musters vereinfacht.
Erstellung einer asynchronen Lösung
Die Erstellung eines Servers für die Arbeit mit einem Subject-Objekt erfordert nur wenige Änderungen am ursprünglichen synchronen serverseitigen Code. Ich ersetze die alte ObservableCollection durch ein Subject-Objekt, das jede Rechnung so, wie sie erscheint, an alle zuhörenden Clients weitergibt. Ich deklariere das Subject-Objekt als öffentlich, so dass Clients darauf zugreifen können:
public class InvoiceManagement{ public IObservable<Invoice> foundInvoice = new Subject<Invoice>();
Im Hauptteil der Methode verwende ich, anstatt eine Rechnung zu einer Sammlung hinzuzufügen, die OnNext-Methode des Subjects, um jede gefundene Rechnung an den Client zu übergeben:
public void FindInvoices(decimal Amount){ inv = GetInvoicesForAmount(Amount) // Poll for invoices foundInvoice.OnNext(inv); // ...repeat...}
In meinem Client deklariere ich zunächst eine Instanz der Serverklasse. Dann rufe ich in einer als asynchron gekennzeichneten Methode die Subscribe-Methode des Subjekts auf, um anzugeben, dass ich mit dem Abrufen von Nachrichten beginnen möchte:
public class InvoiceManagementTests{ InvoiceManagement invMgmt = new InvoiceManagement(); public async void ProcessInvoices() { invMgmt.foundInvoice.Subscribe<Invoice>();
Um die Ergebnisse auf die gewünschten Rechnungen zu filtern, kann ich eine LINQ-Anweisung auf das Subjektobjekt anwenden. In diesem Beispiel werden die Rechnungen nach den Rechnungen gefiltert, die im Rückstand sind (für die Verwendung von Rx-LINQ-Erweiterungen müssen Sie eine using-Anweisung für den System.Reactive.Linq-Namensraum hinzufügen):
invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();
Nachdem ich begonnen habe, den Betreff abzuhören, kann ich angeben, welche Verarbeitung ich vornehmen möchte, wenn ich eine Rechnung erhalte. Ich kann z. B. FirstAsync verwenden, um nur die erste vom Dienst zurückgegebene Rechnung zu verarbeiten. In diesem Beispiel verwende ich die await-Anweisung mit dem Aufruf von FirstAsync, damit ich die Kontrolle an den Hauptteil meiner Anwendung zurückgeben kann, während ich die Rechnung verarbeite. Dieser Code wartet auf den Abruf der ersten Rechnung, geht dann zu dem Code über, den ich zur Initialisierung des Rechnungsbearbeitungsprozesses verwende, und verarbeitet schließlich die Rechnung:
Invoice inv;inv = await invMgmt.foundInvoice.FirstAsync();// ...setup code invoices...HandleInvoiceAsync(inv);
Eine Einschränkung: FirstAsync wird blockiert, wenn der Server noch keine Ergebnisse geliefert hat. Wenn Sie das Blockieren vermeiden wollen, können Sie FirstOrDefaultAsync verwenden, das null zurückgibt, wenn der Server keine Ergebnisse geliefert hat. Wenn es keine Ergebnisse gibt, kann der Client entscheiden, was zu tun ist.
Der typischere Fall ist, dass der Client alle zurückgegebenen Rechnungen (nach dem Filtern) asynchron verarbeiten möchte. In diesem Fall können Sie statt einer Kombination aus Subscribe und OnNext einfach die Methode ForEachAsync verwenden. Sie können eine Methode oder einen Lambda-Ausdruck übergeben, der die eingehenden Ergebnisse verarbeitet. Wenn Sie eine Methode übergeben (die nicht asynchron sein kann), wie ich es hier tue, wird dieser Methode die Rechnung übergeben, die ForEachAsync ausgelöst hat:
invMgmt.foundInvoice.ForEachAsync(HandleInvoice);
Der ForEachAsync-Methode kann auch ein Abbruch-Token übergeben werden, um dem Client zu signalisieren, dass er die Verbindung abbricht. Es empfiehlt sich, das Token beim Aufruf einer beliebigen Rx *Async-Methode zu übergeben, damit der Client die Verarbeitung beenden kann, ohne auf die Verarbeitung aller Objekte warten zu müssen.
Die ForEachAsync-Methode verarbeitet keine Ergebnisse, die bereits von einer First-Methode (oder FirstOrDefaultAsync) verarbeitet wurden, so dass Sie FirstOrDefaultAsync mit ForEachAsync verwenden können, um zu prüfen, ob der Server etwas zu verarbeiten hat, bevor er nachfolgende Objekte verarbeitet. Die IsEmpty-Methode des Subjects führt die gleiche Prüfung jedoch einfacher durch. Wenn der Client Ressourcen zuweisen muss, die für die Verarbeitung von Ergebnissen erforderlich sind, kann er mit IsEmpty prüfen, ob es etwas zu tun gibt, bevor er diese Ressourcen zuweist (eine Alternative wäre, diese Ressourcen beim ersten in der Schleife verarbeiteten Element zuzuweisen). Die Verwendung von IsEmpty mit einem Client, der vor der Ressourcenzuweisung (und dem Beginn der Verarbeitung) prüft, ob es Ergebnisse gibt, und gleichzeitig die Stornierung unterstützt, würde zu einem Code führen, der in etwa so aussieht wie in Abbildung 2.
Abbildung 2 Code zur Unterstützung der Stornierung und zum Aufschieben der Verarbeitung, bis die Ergebnisse bereit sind
CancellationTokenSource cancelSource = new CancellationTokenSource();CancellationToken cancel;cancel = cancelSource.Token;if (!await invMgmt.foundInvoice.IsEmpty()){ // ...setup code for processing invoices... try { invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel); } catch (Exception ex) { if (ex.GetType() != typeof(CancellationToken)) { // ...report message } } // ...clean up code when all invoices are processed or client disconnects}
Wrapping Up
Wenn Sie nur eine einfache Implementierung des Beobachtermusters benötigen, reicht ObservableCollection möglicherweise aus, um einen Strom von Ergebnissen zu verarbeiten. Für eine bessere Kontrolle und für eine ereignisbasierte Anwendung können die Klasse Subject und die mit Rx gelieferten Erweiterungen Ihre Anwendung in einem asynchronen Modus arbeiten lassen, indem sie eine leistungsstarke Implementierung des Publish/Subscribe-Modells unterstützen (und ich habe mir die reichhaltige Bibliothek von Operatoren, die mit Rx geliefert werden, noch nicht angesehen). Wenn Sie mit Rx arbeiten, lohnt es sich, den Rx Design Guide (bit.ly/1VOPxGS) herunterzuladen, der die besten Praktiken beim Konsumieren und Produzieren von beobachtbaren Strömen diskutiert.
Rx bietet auch einige Unterstützung für die Konvertierung des Nachrichtentyps, der zwischen dem Client und dem Server übergeben wird, indem es das ISubject<TSource, TResult> Interface verwendet. Die Schnittstelle ISubject<TSource, TResult> spezifiziert zwei Datentypen: einen „in“-Datentyp und einen „out“-Datentyp. Innerhalb der Subjektklasse, die diese Schnittstelle implementiert, können Sie alle notwendigen Operationen durchführen, um das vom Server zurückgegebene Ergebnis (den „in“-Datentyp) in das vom Client benötigte Ergebnis (den „out“-Datentyp) umzuwandeln. Darüber hinaus ist der „in“-Parameter kovariant (er akzeptiert den angegebenen Datentyp oder alles, von dem der Datentyp erbt) und der „out“-Parameter kontravariant (er akzeptiert den angegebenen Datentyp oder alles, was davon abgeleitet ist), was Ihnen zusätzliche Flexibilität bietet.
Wir leben in einer zunehmend asynchronen Welt, und in dieser Welt wird das Beobachtermuster immer wichtiger – es ist ein nützliches Werkzeug für jede Schnittstelle zwischen Prozessen, bei denen der Serverprozess mehr als ein einzelnes Ergebnis zurückgibt. Glücklicherweise gibt es mehrere Optionen für die Implementierung des Beobachtermusters im .NET Framework, darunter ObservableCollection und Rx.
Peter Vogel ist Systemarchitekt und Leiter von PH&V Information Services. PH&V bietet Full-Stack-Consulting vom UX-Design über die Objektmodellierung bis hin zum Datenbankdesign.
Danke an die folgenden technischen Experten von Microsoft für die Durchsicht dieses Artikels: Stephen Cleary, James McCaffrey und Dave Sexton
Stephen Cleary arbeitet seit 16 Jahren mit Multithreading und asynchroner Programmierung und nutzt die asynchrone Unterstützung im Microsoft .NET Framework seit der ersten Community Technology Preview. Er ist der Autor von „Concurrency in C# Cookbook“ (O’Reilly Media, 2014). Seine Homepage, einschließlich seines Blogs, ist unter stephencleary.com.
Diskutieren Sie diesen Artikel im MSDN Magazine Forum
Schreibe einen Kommentar