Scale Asynchronous Client-Server Links with Reactive
Il Gennaio 25, 2022 da admin- 01/31/2019
- 17 minuti per leggere
Giugno 2016
Volume 31 Numero 6
Di Peter Vogel | Giugno 2016
Come l’elaborazione asincrona è diventata più comune nello sviluppo di applicazioni, il Microsoft .NET Framework ha acquisito una grande varietà di strumenti che supportano specifici design pattern asincroni. Spesso la creazione di un’applicazione asincrona ben progettata si riduce a riconoscere il design pattern che la vostra applicazione sta implementando e poi scegliere il giusto set di componenti .NET.
In alcuni casi, la partita richiede l’integrazione di diversi componenti .NET. L’articolo di Stephen Cleary, “Patterns for Asynchronous MVVM Applications: Commands” (bit.ly/233Kocr), mostra come supportare pienamente il pattern Model-View-ViewModel (MVVM) in modo asincrono. In altri casi il supporto richiede solo un componente del .NET Framework. Ho discusso l’implementazione del pattern provider/consumatori usando la BlockingCollection nelle mie colonne di VisualStudioMagazine.com Practical .NET, “Create Simple, Reliable Asynchronous Apps with BlockingCollection” (bit.ly/1TuOpE6), e, “Create Sophisticated Asynchronous Applications with BlockingCollection” (bit.ly/1SpYyD4).
Un altro esempio è l’implementazione del design pattern observer per monitorare asincronicamente un’operazione di lunga durata. In questo scenario, un metodo asincrono che restituisce un singolo oggetto Task non funziona perché il client restituisce spesso un flusso di risultati. Per questi scenari, potete sfruttare almeno due strumenti del .NET Framework: ObservableCollection e Reactive Extensions (Rx). Per soluzioni semplici, ObservableCollection (insieme alle parole chiave async e await) è tutto ciò di cui avete bisogno. Tuttavia, per problemi più “interessanti” e, specialmente, guidati dagli eventi, Rx fornisce un miglior controllo sul processo.
Definizione del pattern
Mentre il pattern dell’osservatore è frequentemente usato nei pattern di progettazione UI-inclusi Model-View-Controller (MVC), Model-View-Presenter (MVP) e MVVM-UI dovrebbe essere considerato solo uno scenario di un più ampio insieme di scenari in cui si applica il pattern dell’osservatore. La definizione del pattern osservatore (citando da Wikipedia) è: “Un oggetto, chiamato soggetto, mantiene una lista dei suoi dipendenti, chiamati osservatori, e li notifica automaticamente di qualsiasi cambiamento di stato, di solito chiamando uno dei loro metodi.”
In realtà, il pattern osservatore è quello di ottenere risultati da processi di lunga durata al client non appena questi risultati sono disponibili. Senza una qualche versione del pattern observer, i client devono aspettare che l’ultimo risultato sia disponibile e poi avere tutti i risultati inviati in un unico blocco. In un mondo sempre più asincrono, si desidera che gli osservatori elaborino i risultati in parallelo con il client man mano che i risultati diventano disponibili. Per enfatizzare che si sta parlando di qualcosa di più delle UI quando si sfrutta il pattern dell’osservatore, userò “client” e “server” invece di “osservatore” e “soggetto”, nel resto di questo articolo.
Problemi e opportunità
Ci sono almeno tre problemi e due opportunità con il pattern dell’osservatore. Il primo problema è il problema del lapsed-listener: molte implementazioni del pattern observer richiedono che il server mantenga un riferimento a tutti i suoi client. Come risultato, i client possono essere tenuti in memoria dal server fino a quando il server esce. Questa ovviamente non è una soluzione ottimale per un processo di lunga durata in un sistema dinamico dove i client si connettono e disconnettono frequentemente.
Il problema dell’ascoltatore decaduto, comunque, è solo un sintomo del secondo, più grande problema: molte implementazioni dello schema dell’osservatore richiedono che il server e il client siano strettamente accoppiati, richiedendo che sia il server che il client siano presenti in ogni momento. Come minimo, il client dovrebbe essere in grado di determinare se il server è presente e scegliere di non attaccarsi; inoltre, il server dovrebbe essere in grado di funzionare anche se non ci sono client che accettano risultati.
Il terzo problema è legato alle prestazioni: Quanto tempo impiegherà il server a notificare tutti i client? Le prestazioni nello schema dell’osservatore sono direttamente influenzate dal numero di client da notificare. Pertanto, c’è l’opportunità di migliorare le prestazioni nello schema dell’osservatore lasciando che il client filtri preventivamente i risultati che ritornano dal server. Questo affronta anche gli scenari in cui il server produce più risultati (o una più ampia varietà di risultati) di quanto il client sia interessato: Il client può indicare che deve essere notificato solo in casi specifici. La seconda opportunità di performance esiste intorno al riconoscere quando il server non ha risultati o ha finito di produrre risultati. I client possono saltare l’acquisizione delle risorse richieste per elaborare gli eventi del server fino a quando il client ha la garanzia che ci sia qualcosa da elaborare e i client possono rilasciare quelle risorse non appena sanno di aver elaborato l’ultimo risultato.
Dall’osservatore a Publish/Subscribe
Facendo leva su queste considerazioni si passa da semplici implementazioni del pattern osservatore al relativo modello publish/subscribe. Publish/subscribe implementa il modello dell’osservatore in un modo liberamente accoppiato che permette ai server e ai client di eseguire anche se l’altro non è attualmente disponibile. Publish/subscribe implementa anche tipicamente il filtraggio lato client permettendo al client di sottoscrivere o argomenti/canali specifici (“Notificami gli ordini di acquisto”) o gli attributi associati a diversi tipi di contenuto (“Notificami le richieste urgenti”).
Un problema rimane, tuttavia. Tutte le implementazioni del modello dell’osservatore tendono ad accoppiare strettamente client e server ad uno specifico formato di messaggio. Cambiare il formato di un messaggio nella maggior parte delle implementazioni publish/subscribe può essere difficile perché tutti i client devono essere aggiornati per usare il nuovo formato.
In molti modi, questo è simile alla descrizione di un cursore lato server in un database. Per minimizzare i costi di trasmissione, il server del database non restituisce i risultati man mano che ogni riga viene recuperata. Tuttavia, per grandi insiemi di righe, il database non restituisce anche tutte le righe in un singolo batch alla fine. Invece, il server di database tipicamente restituisce sottoinsiemi da un cursore tenuto sul server spesso come quei sottoinsiemi diventano disponibili. Con un database, il client e il server non devono essere presenti contemporaneamente: Il server di database può funzionare quando non ci sono client presenti; un client può controllare se il server è accessibile e, se non lo è, decidere cosa (eventualmente) può fare. Anche il processo di filtraggio (SQL) è molto flessibile. Tuttavia, se il motore del database cambia il formato che usa per restituire le righe, allora tutti i client devono, come minimo, essere ricompilati.
Elaborazione di una cache di oggetti
Come mio caso di studio per guardare una semplice implementazione del pattern osservatore, sto usando come server una classe che cerca una cache in-memoria di fatture. Questo server potrebbe, alla fine della sua elaborazione, restituire una collezione di tutte le fatture. Tuttavia, preferirei che il client elaborasse le fatture individualmente e in parallelo al processo di ricerca del server. Ciò significa che preferisco una versione del processo che restituisce ogni fattura man mano che viene trovata e lascia che il client elabori ogni fattura in parallelo con la ricerca della fattura successiva.
Una semplice implementazione del server potrebbe assomigliare a questa:
private List<Invoice> foundInvoices = new List<Invoice>();public List<Invoice> FindInvoices(decimal Amount){ foundInvoices.Clear(); Invoice inv; // ...search logic to add invoices to the collection foundInvoices.Add(inv); // ...repeat until all invoices found return foundInvoices;}
Soluzioni più sofisticate potrebbero usare yield return per restituire ogni fattura quando viene trovata piuttosto che assemblare la lista. Indipendentemente da ciò, un client che chiama il metodo FindInvoices vorrà eseguire alcune attività critiche prima e dopo l’elaborazione. Per esempio, una volta trovato il primo elemento, il cliente potrebbe voler abilitare una lista MatchingInvoices per tenere le fatture al cliente o acquisire/inizializzare qualsiasi risorsa richiesta per elaborare una fattura. Man mano che vengono aggiunte altre fatture, il client dovrebbe elaborare ogni fattura e, quando il server segnala che l’ultima fattura è stata recuperata, rilasciare tutte le risorse che non sono più necessarie perché non ci sono “più” fatture da elaborare.
Durante il recupero di un database, per esempio, una lettura si blocca finché non viene restituita la prima riga. Una volta che la prima riga viene restituita, il client inizializza qualsiasi risorsa sia necessaria per elaborare una riga. La lettura restituisce anche false quando l’ultima riga viene recuperata, permettendo al client di rilasciare quelle risorse perché non ci sono più righe da elaborare.
Creare soluzioni semplici con ObservableCollection
La scelta più ovvia per implementare il pattern osservatore nel .NET Framework è ObservableCollection. La ObservableCollection notificherà al client (attraverso un evento) ogni volta che viene modificata.
Riscrivere il mio server di esempio per usare la classe ObservableCollection richiede solo due modifiche. Primo, la collezione che contiene i risultati deve essere definita come una ObservableCollection e resa pubblica. Secondo, non è più necessario che il metodo restituisca un risultato: Il server ha solo bisogno di aggiungere fatture alla collezione.
La nuova implementazione del server potrebbe assomigliare a questa:
public List<Invoice> FindInvoices(decimal Amount){ public ObservableCollection<Invoice> foundInvoices = new ObservableCollection<Invoice>(); public void FindInvoices(decimal Amount) { foundInvoices.Clear(); Invoice inv; // ...search logic to set inv foundInvoices.Add(inv); // ...repeat until all invoices are added to the collection }
Un client che usa questa versione del server ha solo bisogno di collegare un gestore di eventi all’evento CollectionChanged della collezione foundInvoices di InvoiceManagement. Nel seguente codice ho fatto implementare alla classe l’interfaccia IDisposable per supportare la disconnessione dall’evento:
public class SearchInvoices: IDisposable{ InvoiceManagement invMgmt = new InvoiceManagement(); public void SearchInvoices() { invMgmt.foundInvoices.CollectionChanged += InvoicesFound; } public void Dispose() { invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged; }
Nel client, all’evento CollectionChanged viene passato un oggetto NotifyCollectionChangedEventArgs come secondo parametro. La proprietà Action di questo oggetto specifica sia quale cambiamento è stato eseguito sulla collezione (le azioni sono: la collezione è stata cancellata, sono stati aggiunti nuovi elementi alla collezione, gli elementi esistenti sono stati spostati/sostituiti/rimossi) sia le informazioni sugli elementi modificati (un insieme di qualsiasi elemento aggiunto, un insieme di elementi presenti nella collezione prima dell’aggiunta dei nuovi elementi, la posizione dell’elemento che è stato spostato/rimosso/sostituito).
Codice semplice nel client che processerebbe in modo asincrono ogni fattura man mano che viene aggiunta all’insieme nel server sarebbe come il codice in Figura 1.
Figura 1 Elaborazione asincrona delle fatture usando ObservableCollection
private async void InvoicesFound(object sender, NotifyCollectionChangedEventArgs e){ switch (e.Action) { case NotifyCollectionChangedAction.Reset: { // ...initial item processing return; } case NotifyCollectionChangedAction.Add: { foreach (Invoice inv in e.NewItems) { await HandleInvoiceAsync(inv); } return; } }}
Anche se semplice, questo codice potrebbe essere inadeguato per le tue necessità, specialmente se stai gestendo un processo di lunga durata o se lavori in un ambiente dinamico. Da un punto di vista di design asincrono, per esempio, il codice potrebbe catturare l’oggetto Task restituito da HandleInvoiceAsync in modo che il client possa gestire i compiti asincroni. Vorrai anche assicurarti che l’evento CollectionChanged venga sollevato sul thread dell’UI anche se FindInvoices viene eseguito su un thread in background.
A causa del punto in cui il metodo Clear viene chiamato nella classe server (appena prima della ricerca della prima fattura) il valore Reset della proprietà Action può essere usato come segnale che il primo elemento sta per essere recuperato. Tuttavia, naturalmente, nessuna fattura può essere trovata nella ricerca, quindi l’uso dell’azione Reset potrebbe portare il client ad allocare risorse che non sono mai state effettivamente utilizzate. Per gestire effettivamente l’elaborazione del “primo elemento”, bisognerebbe aggiungere un flag all’elaborazione dell’Add Action per eseguire solo quando viene trovato il primo elemento.
Inoltre, il server ha un numero limitato di opzioni per indicare che l’ultima fattura è stata trovata in modo che il client possa smettere di aspettare “la prossima”. Il server potrebbe, presumibilmente, cancellare la collezione dopo aver trovato l’ultimo elemento, ma ciò forzerebbe solo un’elaborazione più complessa nell’elaborazione di Reset Action (ho elaborato Fatture? Se sì, allora ho elaborato l’ultima fattura; se no, allora sto per elaborare la prima fattura).
Mentre, per problemi semplici, ObservableCollection andrà bene, qualsiasi implementazione ragionevolmente sofisticata basata su ObservableCollection (e qualsiasi applicazione che apprezzi l’efficienza) richiederà del codice complicato, specialmente nel client.
Le soluzioni Rx
Se volete un’elaborazione asincrona allora Rx (disponibile attraverso NuGet) può fornire una soluzione migliore per implementare il pattern osservatore prendendo in prestito il modello publish/subscribe. Questa soluzione fornisce anche un modello di filtraggio basato su LINQ, una migliore segnalazione per le condizioni di primo/ultimo elemento e una migliore gestione degli errori.
Rx può anche gestire implementazioni di osservatori più interessanti di quelle possibili con una ObservableCollection. Nel mio caso di studio, dopo aver restituito la lista iniziale di fatture, il mio server potrebbe continuare a controllare le nuove fatture che vengono aggiunte alla cache dopo il completamento della ricerca originale (e che corrispondono ai criteri di ricerca, ovviamente). Quando appare una fattura che soddisfa i criteri, il cliente vorrà essere avvisato dell’evento in modo che la nuova fattura possa essere aggiunta alla lista. Rx supporta questo tipo di estensioni basate sugli eventi del pattern osservatore meglio di ObservableCollection.
Ci sono due interfacce chiave in Rx per supportare il pattern osservatore. La prima è IObservable<T>, implementata dal server e che specifica un singolo metodo: Subscribe. Al server che implementa il metodo Subscribe verrà passato un riferimento ad un oggetto da un client. Per gestire il problema dell’ascoltatore decaduto, il metodo Subscribe restituisce un riferimento al client per un oggetto che implementa l’interfaccia IDisposable. Il client può usare quell’oggetto per disconnettersi dal server. Quando il client si disconnette, ci si aspetta che il server rimuova il client da qualsiasi sua lista interna.
La seconda è l’interfaccia IObserver<T>, che deve essere implementata dal client. Questa interfaccia richiede che il client implementi ed esponga tre metodi al server: OnNext, OnCompleted e OnError. Il metodo critico qui è OnNext, che è usato dal server per passare un messaggio al client (nel mio caso di studio quel messaggio sarebbe costituito da nuovi oggetti Invoice che saranno restituiti man mano che ciascuno appare). Il server può usare il metodo OnCompleted del client per segnalare che non ci sono più dati. Il terzo metodo, OnError, fornisce un modo per il server di segnalare al client che si è verificata un’eccezione.
Si può implementare l’interfaccia IObserver da soli, naturalmente (è parte del .NET Framework). Insieme a ObservableCollection, potrebbe essere tutto ciò di cui avete bisogno se state creando una soluzione sincrona (ho scritto una rubrica anche su questo, “Scrivere codice più pulito con le estensioni reattive” ).
Tuttavia, Rx include diversi pacchetti che forniscono implementazioni asincrone di queste interfacce, incluse implementazioni per JavaScript e servizi RESTful. La classe Rx Subject fornisce un’implementazione di IObservable che semplifica l’implementazione di una versione asincrona publish/subscribe del pattern observer.
Creazione di una soluzione asincrona
Creare un server per lavorare con un oggetto Subject richiede poche modifiche al codice originale sincrono lato server. Sostituisco la vecchia ObservableCollection con un oggetto Subject che passerà ogni fattura come appare a qualsiasi client in ascolto. Dichiaro l’oggetto Subject come pubblico in modo che i client possano accedervi:
public class InvoiceManagement{ public IObservable<Invoice> foundInvoice = new Subject<Invoice>();
Nel corpo del metodo, invece di aggiungere una fattura a una collezione, uso il metodo OnNext del Subject per passare ogni fattura al client quando viene trovata:
public void FindInvoices(decimal Amount){ inv = GetInvoicesForAmount(Amount) // Poll for invoices foundInvoice.OnNext(inv); // ...repeat...}
Nel mio client, dichiaro prima un’istanza della classe server. Poi, in un metodo contrassegnato come async, chiamo il metodo Subscribe del soggetto per indicare che voglio iniziare a recuperare i messaggi:
public class InvoiceManagementTests{ InvoiceManagement invMgmt = new InvoiceManagement(); public async void ProcessInvoices() { invMgmt.foundInvoice.Subscribe<Invoice>();
Per filtrare i risultati solo sulle fatture che voglio, posso applicare una dichiarazione LINQ all’oggetto soggetto. Questo esempio filtra le fatture a quelle che sono arretrate (per usare le estensioni LINQ di Rx è necessario aggiungere una dichiarazione using per il namespace System.Reactive.Linq):
invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();
Una volta che ho iniziato ad ascoltare l’oggetto, posso specificare quale elaborazione voglio fare quando ricevo una fattura. Posso, per esempio, usare FirstAsync per elaborare solo la prima fattura restituita dal servizio. In questo esempio, uso l’istruzione await con la chiamata a FirstAsync in modo da poter restituire il controllo al corpo principale della mia applicazione mentre elaboro la fattura. Questo codice aspetta di recuperare la prima fattura, poi passa a qualsiasi codice io usi per inizializzare il processo di elaborazione della fattura e, infine, elabora la fattura:
Invoice inv;inv = await invMgmt.foundInvoice.FirstAsync();// ...setup code invoices...HandleInvoiceAsync(inv);
Un avvertimento: FirstAsync si blocca se il server non ha ancora prodotto alcun risultato. Se volete evitare il blocco, potete usare FirstOrDefaultAsync, che restituirà null se il server non ha prodotto alcun risultato. Se non ci sono risultati, il client può decidere cosa fare, se non altro.
Il caso più tipico è che il client voglia elaborare tutte le fatture restituite (dopo il filtraggio) e farlo in modo asincrono. In questo caso, piuttosto che usare una combinazione di Subscribe e OnNext, si può semplicemente usare il metodo ForEachAsync. Potete passare un metodo o un’espressione lambda che elabora i risultati in arrivo. Se passate un metodo (che non può essere asincrono), come faccio qui, a quel metodo verrà passata la fattura che ha innescato ForEachAsync:
invMgmt.foundInvoice.ForEachAsync(HandleInvoice);
Al metodo ForEachAsync può essere passato anche un token di cancellazione per far segnalare al client che si sta disconnettendo. Una buona pratica sarebbe quella di passare il token quando si chiama uno qualsiasi dei metodi Rx *Async per supportare il fatto che il client termina l’elaborazione senza dover aspettare che tutti gli oggetti vengano elaborati.
Il ForEachAsync non elaborerà alcun risultato già elaborato da un metodo First (o FirstOrDefaultAsync), quindi è possibile utilizzare FirstOrDefaultAsync con ForEachAsync per controllare se il server ha qualcosa da elaborare prima di elaborare oggetti successivi. Tuttavia, il metodo IsEmpty del soggetto eseguirà lo stesso controllo più semplicemente. Se il client deve allocare qualsiasi risorsa necessaria per l’elaborazione dei risultati, IsEmpty permette al client di controllare se c’è qualcosa da fare prima di allocare tali risorse (un’alternativa sarebbe quella di allocare tali risorse sul primo oggetto elaborato nel ciclo). Usando IsEmpty con un client che controlla se ci sono risultati prima di allocare le risorse (e iniziare l’elaborazione) e supportando allo stesso tempo la cancellazione, si otterrebbe un codice che assomiglia alla Figura 2.
Figura 2 Codice per supportare la cancellazione e rimandare l’elaborazione finché i risultati non sono pronti
CancellationTokenSource cancelSource = new CancellationTokenSource();CancellationToken cancel;cancel = cancelSource.Token;if (!await invMgmt.foundInvoice.IsEmpty()){ // ...setup code for processing invoices... try { invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel); } catch (Exception ex) { if (ex.GetType() != typeof(CancellationToken)) { // ...report message } } // ...clean up code when all invoices are processed or client disconnects}
Raccogliendo
Se tutto ciò di cui avete bisogno è una semplice implementazione dello schema dell’osservatore, allora ObservableCollection potrebbe fare tutto ciò che vi serve per elaborare un flusso di risultati. Per un migliore controllo e per un’applicazione basata su eventi, la classe Subject e le estensioni che vengono fornite con Rx permetteranno alla vostra applicazione di lavorare in modo asincrono supportando una potente implementazione del modello publish/subscribe (e non ho guardato la ricca libreria di operatori che viene fornita con Rx). Se state lavorando con Rx, vale la pena scaricare la Rx Design Guide (bit.ly/1VOPxGS), che discute le migliori pratiche nel consumare e produrre flussi osservabili.
Rx fornisce anche un certo supporto per convertire il tipo di messaggio passato tra il client e il server utilizzando l’interfaccia ISubject<TSource, TResult>. L’interfaccia ISubject<TSource, TResult> specifica due tipi di dati: un tipo di dati “in” e un tipo di dati “out”. All’interno della classe Subject che implementa questa interfaccia si possono eseguire tutte le operazioni necessarie per convertire il risultato restituito dal server (il datatype “in”) nel risultato richiesto dal client (il datatype “out”). Inoltre, il parametro in è covariante (accetterà il tipo di dato specificato o qualsiasi cosa da cui il tipo di dato erediti) e il parametro out è contravariante (accetterà il tipo di dato specificato o qualsiasi cosa che derivi da esso), dandovi ulteriore flessibilità.
Viviamo in un mondo sempre più asincrono e, in questo mondo, il pattern observer sta diventando più importante-è uno strumento utile per qualsiasi interfaccia tra processi dove il processo server restituisce più di un singolo risultato. Fortunatamente, avete diverse opzioni per implementare il pattern osservatore nel .NET Framework, tra cui ObservableCollection e Rx.
Peter Vogel è un architetto di sistemi e principale in PH&V Information Services. PH&V fornisce consulenza full-stack dalla progettazione UX alla modellazione di oggetti e alla progettazione di database.
Grazie ai seguenti esperti tecnici di Microsoft per la revisione di questo articolo: Stephen Cleary, James McCaffrey e Dave Sexton
Stephen Cleary ha lavorato con il multithreading e la programmazione asincrona per 16 anni e ha usato il supporto asincrono nel Microsoft .NET Framework fin dalla prima anteprima tecnologica comunitaria. È l’autore di “Concurrency in C# Cookbook” (O’Reilly Media, 2014). La sua homepage, compreso il suo blog, è all’indirizzo stephencleary.com.
Discuti questo articolo nel forum di MSDN Magazine
Lascia un commento