Escalar enlaces asíncronos cliente-servidor con Reactive.Server Links with Reactive
On enero 25, 2022 by admin- 01/31/2019
- 17 minutos para leer
Junio 2016
Volumen 31 Número 6
Por Peter Vogel | Junio 2016
A medida que el procesamiento asíncrono se ha vuelto más común en el desarrollo de aplicaciones, el Microsoft .NET Framework ha adquirido una amplia variedad de herramientas que soportan patrones de diseño asíncronos específicos. A menudo, la creación de una aplicación asíncrona bien diseñada se reduce a reconocer el patrón de diseño que su aplicación está implementando y, a continuación, elegir el conjunto adecuado de componentes .NET.
En algunos casos, la coincidencia requiere la integración de varios componentes .NET. El artículo de Stephen Cleary, «Patterns for Asynchronous MVVM Applications: Comandos» (bit.ly/233Kocr), muestra cómo soportar completamente el patrón Modelo-Vista-Vista-Modelo (MVVM) de forma asíncrona. En otros casos el soporte requiere sólo un componente del .NET Framework. He hablado de la implementación del patrón proveedor/consumidor utilizando BlockingCollection en mis columnas de VisualStudioMagazine.com Practical .NET, «Create Simple, Reliable Asynchronous Apps with BlockingCollection» (bit.ly/1TuOpE6), y, «Create Sophisticated Asynchronous Applications with BlockingCollection» (bit.ly/1SpYyD4).
Otro ejemplo es la implementación del patrón de diseño observador para supervisar una operación de larga duración de forma asíncrona. En este escenario, un método asíncrono que devuelve un único objeto Task no funciona porque el cliente devuelve frecuentemente un flujo de resultados. Para estos escenarios, se pueden aprovechar al menos dos herramientas de .NET Framework: ObservableCollection y Reactive Extensions (Rx). Para soluciones simples, la ObservableCollection (junto con las palabras clave async y await) es todo lo que necesitas. Sin embargo, para los problemas más «interesantes» y, sobre todo, impulsados por eventos, Rx le proporciona un mejor control sobre el proceso.
Definición del patrón
Aunque el patrón observador se utiliza con frecuencia en los patrones de diseño de la interfaz de usuario -incluyendo el Modelo-Vista-Controlador (MVC), el Modelo-Vista-Presentador (MVP) y MVVM- las interfaces de usuario deben ser consideradas como sólo un escenario de un conjunto más amplio de escenarios donde se aplica el patrón observador. La definición del patrón observador (citando a Wikipedia) es: «Un objeto, llamado sujeto, mantiene una lista de sus dependientes, llamados observadores, y les notifica automáticamente cualquier cambio de estado, normalmente llamando a uno de sus métodos»
En realidad, el patrón observador trata de obtener resultados de procesos de larga duración para el cliente tan pronto como esos resultados estén disponibles. Sin alguna versión del patrón observador, los clientes deben esperar hasta que el último resultado esté disponible y luego tener todos los resultados enviados a ellos en un solo bulto. En un mundo cada vez más asíncrono, quieres que los observadores procesen los resultados en paralelo con el cliente a medida que los resultados estén disponibles. Para enfatizar que se está hablando de algo más que de UIs cuando se aprovecha el patrón observador, usaré «cliente» y «servidor» en lugar de «observador» y «sujeto» en el resto de este artículo.
Problemas y oportunidades
Hay al menos tres problemas y dos oportunidades con el patrón observador. El primero es el problema de los oyentes perdidos: muchas implementaciones del patrón observador requieren que el servidor mantenga una referencia a todos sus clientes. Como resultado, los clientes pueden ser retenidos en memoria por el servidor hasta que éste salga. Obviamente, esta no es una solución óptima para un proceso de larga duración en un sistema dinámico en el que los clientes se conectan y desconectan con frecuencia.
El problema del oyente caducado, sin embargo, es sólo un síntoma del segundo problema más grande: muchas implementaciones del patrón observador requieren que el servidor y el cliente estén estrechamente acoplados, requiriendo que tanto el servidor como el cliente estén presentes en todo momento. Como mínimo, el cliente debería ser capaz de determinar si el servidor está presente y elegir no conectarse; además, el servidor debería ser capaz de funcionar incluso si no hay clientes aceptando resultados.
El tercer problema está relacionado con el rendimiento: ¿Cuánto tardará el servidor en notificar a todos los clientes? El rendimiento en el patrón observador se ve directamente afectado por el número de clientes a notificar. Por lo tanto, hay una oportunidad de mejorar el rendimiento en el patrón observador permitiendo que el cliente filtre de forma preventiva los resultados que llegan desde el servidor. Esto también aborda los escenarios en los que el servidor produce más resultados (o una mayor variedad de resultados) de los que el cliente está interesado: El cliente puede indicar que sólo debe ser notificado en casos específicos. La segunda oportunidad de rendimiento existe en torno a reconocer cuándo el servidor no tiene resultados o ha terminado de producirlos. Los clientes pueden omitir la adquisición de los recursos necesarios para procesar los eventos del servidor hasta que el cliente tenga la garantía de que hay algo que procesar y los clientes pueden liberar esos recursos tan pronto como sepan que han procesado el último resultado.
Del observador a la publicación/suscripción
El hecho de tener en cuenta estas consideraciones lleva a las implementaciones simples del patrón de observador al modelo relacionado de publicación/suscripción. Publish/subscribe implementa el patrón de observador de una forma poco acoplada que permite a los servidores y clientes ejecutarse incluso si el otro no está disponible en ese momento. Publish/subscribe también implementa típicamente el filtrado del lado del cliente permitiendo que el cliente se suscriba a temas/canales específicos («Notificarme sobre órdenes de compra») o a atributos asociados con diferentes tipos de contenido («Notificarme sobre cualquier solicitud urgente»).
Sin embargo, queda un problema. Todas las implementaciones del patrón observador tienden a acoplar estrechamente a clientes y servidores a un formato de mensaje específico. Cambiar el formato de un mensaje en la mayoría de las implementaciones de publicación/suscripción puede ser difícil porque todos los clientes deben ser actualizados para utilizar el nuevo formato.
En muchos sentidos, esto es similar a la descripción de un cursor del lado del servidor en una base de datos. Para minimizar los costes de transmisión, el servidor de la base de datos no devuelve los resultados a medida que se recupera cada fila. Sin embargo, para conjuntos de filas grandes, la base de datos tampoco devuelve todas las filas en un solo lote al final. En su lugar, el servidor de la base de datos suele devolver subconjuntos de un cursor mantenido en el servidor a menudo, a medida que esos subconjuntos están disponibles. Con una base de datos, el cliente y el servidor no tienen que estar presentes simultáneamente: El servidor de la base de datos puede ejecutarse cuando no hay clientes presentes; un cliente puede comprobar si el servidor es accesible y, en caso contrario, decidir qué puede hacer (si es que puede hacer algo). El proceso de filtrado (SQL) también es muy flexible. Sin embargo, si el motor de la base de datos cambia el formato que utiliza para devolver filas, entonces todos los clientes deben, como mínimo, ser recompilados.
Procesamiento de una caché de objetos
Como mi caso de estudio para ver la implementación de un patrón de observador simple, estoy utilizando como servidor una clase que busca una caché de facturas en memoria. Ese servidor podría, al final de su procesamiento, devolver una colección de todas las facturas. Sin embargo, prefiero que el cliente procese las facturas individualmente y en paralelo al proceso de búsqueda del servidor. Esto significa que prefiero una versión del proceso que devuelva cada factura a medida que se encuentra y que permita al cliente procesar cada factura en paralelo con la búsqueda de la siguiente.
Una implementación sencilla del servidor podría tener este aspecto:
private List<Invoice> foundInvoices = new List<Invoice>();public List<Invoice> FindInvoices(decimal Amount){ foundInvoices.Clear(); Invoice inv; // ...search logic to add invoices to the collection foundInvoices.Add(inv); // ...repeat until all invoices found return foundInvoices;}
Soluciones más sofisticadas podrían utilizar yield return para devolver cada factura a medida que se encuentra en lugar de ensamblar la lista. En cualquier caso, un cliente que llame al método FindInvoices querrá realizar algunas actividades críticas antes y después del procesamiento. Por ejemplo, una vez que se encuentra el primer elemento, el cliente podría querer habilitar una lista de MatchingInvoices para mantener las facturas en el cliente o adquirir/inicializar cualquier recurso necesario para procesar una factura. A medida que se añaden facturas adicionales, el cliente tendría que procesar cada factura y, cuando el servidor señale que se ha recuperado la última factura, liberar cualquier recurso que ya no sea necesario porque «no hay más» facturas que procesar.
Durante la recuperación de una base de datos, por ejemplo, una lectura se bloqueará hasta que se devuelva la primera fila. Una vez que se devuelve la primera fila, el cliente inicializa cualquier recurso que se necesite para procesar una fila. La lectura también devuelve false cuando se recupera la última fila, dejando que el cliente libere esos recursos porque no hay más filas que procesar.
Creación de soluciones simples con ObservableCollection
La opción más obvia para implementar el patrón observador en .NET Framework es la ObservableCollection. La ObservableCollection notificará al cliente (a través de un evento) cada vez que se modifique.
Redactar mi servidor de ejemplo para utilizar la clase ObservableCollection sólo requiere dos cambios. En primer lugar, hay que definir la colección que contiene los resultados como una ObservableCollection y hacerla pública. En segundo lugar, ya no es necesario que el método devuelva un resultado: El servidor sólo necesita añadir facturas a la colección.
La nueva implementación del servidor podría tener este aspecto:
public List<Invoice> FindInvoices(decimal Amount){ public ObservableCollection<Invoice> foundInvoices = new ObservableCollection<Invoice>(); public void FindInvoices(decimal Amount) { foundInvoices.Clear(); Invoice inv; // ...search logic to set inv foundInvoices.Add(inv); // ...repeat until all invoices are added to the collection }
Un cliente que utilice esta versión del servidor sólo necesita conectar un controlador de eventos al evento CollectionChanged de la colección foundInvoices de InvoiceManagement. En el siguiente código he hecho que la clase implemente la interfaz IDisposable para poder desconectarse del evento:
public class SearchInvoices: IDisposable{ InvoiceManagement invMgmt = new InvoiceManagement(); public void SearchInvoices() { invMgmt.foundInvoices.CollectionChanged += InvoicesFound; } public void Dispose() { invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged; }
En el cliente, al evento CollectionChanged se le pasa un objeto NotifyCollectionChangedEventArgs como segundo parámetro. La propiedad Action de ese objeto especifica tanto el cambio que se ha realizado en la colección (las acciones son: se ha limpiado la colección, se han añadido nuevos elementos a la colección, se han movido/reemplazado/eliminado los elementos existentes) como la información sobre los elementos cambiados (una colección de cualquier elemento añadido, una colección de elementos presentes en la colección antes de que se añadieran los nuevos elementos, la posición del elemento que se ha movido/eliminado/reemplazado).
Un código sencillo en el cliente que procese asíncronamente cada factura a medida que se añade a la colección en el servidor tendría el aspecto del código de la Figura 1.
Figura 1 Procesamiento asíncrono de facturas mediante ObservableCollection
private async void InvoicesFound(object sender, NotifyCollectionChangedEventArgs e){ switch (e.Action) { case NotifyCollectionChangedAction.Reset: { // ...initial item processing return; } case NotifyCollectionChangedAction.Add: { foreach (Invoice inv in e.NewItems) { await HandleInvoiceAsync(inv); } return; } }}
Aunque es sencillo, este código podría ser inadecuado para sus necesidades, especialmente si maneja un proceso de larga duración o trabaja en un entorno dinámico. Desde el punto de vista del diseño asíncrono, por ejemplo, el código podría capturar el objeto Task devuelto por el HandleInvoiceAsync para que el cliente pudiera gestionar las tareas asíncronas. También querrás asegurarte de que el evento CollectionChanged se levante en el hilo de la interfaz de usuario, incluso si FindInvoices se ejecuta en un hilo de fondo.
Debido a dónde se llama al método Clear en la clase del servidor (justo antes de buscar la primera factura), el valor Reset de la propiedad Action puede utilizarse como señal de que el primer elemento está a punto de ser recuperado. Sin embargo, por supuesto, es posible que no se encuentre ninguna factura en la búsqueda, por lo que el uso de la Acción Reset podría dar lugar a que el cliente asigne recursos que nunca se utilizan realmente. Para manejar realmente el procesamiento del «primer elemento», habría que añadir una bandera al procesamiento de la Acción Añadir para que se ejecutara sólo cuando se encontrara el primer elemento.
Además, el servidor tiene un número limitado de opciones para indicar que se ha encontrado la última factura, de modo que el cliente pueda dejar de esperar «la siguiente». El servidor podría, presumiblemente, borrar la colección después de encontrar el último elemento, pero eso sólo forzaría un procesamiento más complejo en el procesamiento de la acción de reinicio (¿he estado procesando Facturas? Si es así, entonces he procesado la última factura; si no, entonces estoy a punto de procesar la primera factura).
Aunque, para problemas sencillos, ObservableCollection estará bien, cualquier implementación razonablemente sofisticada basada en ObservableCollection (y cualquier aplicación que valore la eficiencia) va a requerir algo de código complicado, especialmente en el cliente.
Las soluciones Rx
Si quieres un procesamiento asíncrono entonces Rx (disponible a través de NuGet) puede proporcionar una mejor solución para implementar el patrón observador tomando prestado el modelo publish/subscribe. Esta solución también proporciona un modelo de filtrado basado en LINQ, una mejor señalización para las condiciones de primer/último elemento y un mejor manejo de errores.
Rx también puede manejar implementaciones de observadores más interesantes que son posibles con un ObservableCollection. En mi caso de estudio, después de devolver la lista inicial de facturas, mi servidor podría seguir comprobando las nuevas facturas que se añaden a la caché después de completar la búsqueda original (y que coinciden con los criterios de búsqueda, por supuesto). Cuando aparezca una factura que cumpla los criterios, el cliente querrá ser notificado sobre el evento para que la nueva factura pueda ser añadida a la lista. Rx soporta este tipo de extensiones basadas en eventos del patrón observador mejor que ObservableCollection.
Hay dos interfaces clave en Rx para soportar el patrón observador. La primera es IObservable<T>, implementada por el servidor y que especifica un único método: Subscribe. Al servidor que implemente el método Subscribe se le pasará una referencia a un objeto de un cliente. Para manejar el problema del oyente caducado, el método Subscribe devuelve al cliente una referencia a un objeto que implementa la interfaz IDisposable. El cliente puede utilizar ese objeto para desconectarse del servidor. Cuando el cliente se desconecta, se espera que el servidor elimine al cliente de cualquiera de sus listas internas.
La segunda es la interfaz IObserver<T>, que debe ser implementada por el cliente. Esa interfaz requiere que el cliente implemente y exponga tres métodos al servidor: OnNext, OnCompleted y OnError. El método crítico aquí es OnNext, que es usado por el servidor para pasar un mensaje al cliente (en mi caso de estudio ese mensaje sería nuevos objetos de Factura que serán devueltos a medida que cada uno aparezca). El servidor puede utilizar el método OnCompleted del cliente para señalar que no hay más datos. El tercer método, OnError, proporciona una forma de que el servidor señale al cliente que se ha producido una excepción.
Puedes implementar la interfaz IObserver tú mismo, por supuesto (es parte de .NET Framework). Junto con la ObservableCollection, eso puede ser todo lo que necesitas si estás creando una solución sincrónica (he escrito una columna sobre eso, también, «Writing Cleaner Code with Reactive Extensions» ).
Sin embargo, la Rx incluye varios paquetes que proporcionan implementaciones asincrónicas de estas interfaces, incluyendo implementaciones para JavaScript y servicios RESTful. La clase Rx Subject proporciona una implementación de IObservable que simplifica la implementación de una versión asíncrona de publicación/suscripción del patrón observador.
Creando una solución asíncrona
Crear un servidor para trabajar con un objeto Subject requiere muy pocos cambios en el código original del lado del servidor síncrono. Sustituyo la antigua ObservableCollection por un objeto Subject que pasará cada factura tal y como aparezca a cualquier cliente que esté escuchando. Declaro el objeto Subject como público para que los clientes puedan acceder a él:
public class InvoiceManagement{ public IObservable<Invoice> foundInvoice = new Subject<Invoice>();
En el cuerpo del método, en lugar de añadir una factura a una colección, uso el método OnNext del Subject para pasar cada factura al cliente a medida que se encuentra:
public void FindInvoices(decimal Amount){ inv = GetInvoicesForAmount(Amount) // Poll for invoices foundInvoice.OnNext(inv); // ...repeat...}
En mi cliente, primero declaro una instancia de la clase del servidor. Luego, en un método marcado como async, llamo al método Subscribe del Subject para indicar que quiero empezar a recuperar mensajes:
public class InvoiceManagementTests{ InvoiceManagement invMgmt = new InvoiceManagement(); public async void ProcessInvoices() { invMgmt.foundInvoice.Subscribe<Invoice>();
Para filtrar los resultados sólo a las facturas que quiero, puedo aplicar una sentencia LINQ al objeto Subject. Este ejemplo filtra las facturas a las que están ordenadas por detrás (para usar las extensiones Rx LINQ tendrás que añadir una sentencia using para el espacio de nombres System.Reactive.Linq):
invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();
Una vez que he empezado a escuchar el asunto, puedo especificar qué procesamiento quiero hacer cuando reciba una factura. Puedo, por ejemplo, utilizar FirstAsync para procesar sólo la primera factura devuelta por el servicio. En este ejemplo, utilizo la sentencia await con la llamada a FirstAsync para poder devolver el control al cuerpo principal de mi aplicación mientras se procesa la factura. Este código espera a recuperar esa primera factura, luego pasa a cualquier código que utilice para inicializar el proceso de procesamiento de la factura y, finalmente, procesa la factura:
Invoice inv;inv = await invMgmt.foundInvoice.FirstAsync();// ...setup code invoices...HandleInvoiceAsync(inv);
Una advertencia: FirstAsync se bloqueará si el servidor aún no ha producido ningún resultado. Si quieres evitar el bloqueo, puedes utilizar FirstOrDefaultAsync, que devolverá null si el servidor no ha producido ningún resultado. Si no hay resultados, el cliente puede decidir qué hacer, si es que hace algo.
El caso más típico es que el cliente quiera procesar todas las facturas devueltas (después de filtrarlas) y hacerlo de forma asíncrona. En ese caso, en lugar de utilizar una combinación de Subscribe y OnNext, puede utilizar simplemente el método ForEachAsync. Puedes pasar un método o una expresión lambda que procese los resultados entrantes. Si pasas un método (que no puede ser asíncrono), como hago aquí, a ese método se le pasará la factura que activó ForEachAsync:
invMgmt.foundInvoice.ForEachAsync(HandleInvoice);
Al método ForEachAsync también se le puede pasar un token de cancelación para que el cliente señale que se está desconectando. Una buena práctica sería pasar el token cuando se llama a cualquiera de los métodos Rx *Async para permitir que el cliente termine el procesamiento sin tener que esperar a que se procesen todos los objetos.
El ForEachAsync no procesará ningún resultado ya procesado por un método First (o FirstOrDefaultAsync) por lo que se puede utilizar FirstOrDefaultAsync con ForEachAsync para comprobar si el servidor tiene algo que procesar antes de procesar los objetos posteriores. Sin embargo, el método IsEmpty del sujeto realizará la misma comprobación de forma más sencilla. Si el cliente tiene que asignar algún recurso necesario para procesar los resultados, IsEmpty permite al cliente comprobar si hay algo que hacer antes de asignar esos recursos (una alternativa sería asignar esos recursos en el primer elemento procesado en el bucle). El uso de IsEmpty con un cliente que comprueba si hay algún resultado antes de asignar los recursos (e iniciar el procesamiento) y que además soporta la cancelación daría un código que se parece a la Figura 2.
Figura 2 Código para soportar la cancelación y diferir el procesamiento hasta que los resultados estén listos
CancellationTokenSource cancelSource = new CancellationTokenSource();CancellationToken cancel;cancel = cancelSource.Token;if (!await invMgmt.foundInvoice.IsEmpty()){ // ...setup code for processing invoices... try { invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel); } catch (Exception ex) { if (ex.GetType() != typeof(CancellationToken)) { // ...report message } } // ...clean up code when all invoices are processed or client disconnects}
Envolviendo
Si todo lo que necesitas es una implementación simple del patrón observador, entonces ObservableCollection podría hacer todo lo que necesitas para procesar un flujo de resultados. Para un mejor control y para una aplicación basada en eventos, la clase Subject y las extensiones que vienen con Rx permitirán que tu aplicación trabaje en modo asíncrono soportando una poderosa implementación del modelo publish/subscribe (y no he mirado la rica biblioteca de operadores que vienen con Rx). Si estás trabajando con Rx, merece la pena que te descargues la Guía de Diseño de Rx (bit.ly/1VOPxGS), en la que se discuten las mejores prácticas para consumir y producir flujos observables.
Rx también proporciona cierto soporte para convertir el tipo de mensaje que se pasa entre el cliente y el servidor utilizando la interfaz ISubject<TSource, TResult>. La interfaz ISubject<TSource, TResult> especifica dos tipos de datos: un tipo de datos «in» y un tipo de datos «out». Dentro de la clase Subject que implementa esta interfaz se pueden realizar las operaciones necesarias para convertir el resultado devuelto por el servidor (el tipo de dato «in») en el resultado requerido por el cliente (el tipo de dato «out»). Además, el parámetro «in» es covariante (aceptará el tipo de datos especificado o cualquier cosa de la que herede el tipo de datos) y el parámetro «out» es contravariante (aceptará el tipo de datos especificado o cualquier cosa que derive de él), lo que le proporciona una flexibilidad adicional.
Vivimos en un mundo cada vez más asíncrono y, en ese mundo, el patrón observador va a ser más importante: es una herramienta útil para cualquier interfaz entre procesos en la que el proceso servidor devuelva más de un único resultado. Afortunadamente, hay varias opciones para implementar el patrón observador en .NET Framework, incluyendo ObservableCollection y Rx.
Peter Vogel es arquitecto de sistemas y director de PH&V Information Services. PH&V proporciona consultoría full-stack desde el diseño UX hasta el modelado de objetos y el diseño de bases de datos.
Gracias a los siguientes expertos técnicos de Microsoft por revisar este artículo: Stephen Cleary, James McCaffrey y Dave Sexton
Stephen Cleary ha trabajado con multithreading y programación asíncrona durante 16 años y ha utilizado el soporte de async en Microsoft .NET Framework desde la primera vista previa de la tecnología de la comunidad. Es el autor de «Concurrency in C# Cookbook» (O’Reilly Media, 2014). Su página web, incluido su blog, está en stephencleary.com.
Discute este artículo en el foro de MSDN Magazine
Deja una respuesta