Scale Asynchronous Client-ScaleServer Links com Reactive
On Janeiro 25, 2022 by admin- 01/31/2019
- 17 minutos para ler
Junho 2016
Volume 31 Número 6
Por Peter Vogel | Junho 2016
Como o processamento assíncrono se tornou mais comum no desenvolvimento de aplicações, a Microsoft .NET Framework adquiriu uma grande variedade de ferramentas que suportam padrões específicos de design assíncrono. Muitas vezes, a criação de um aplicativo assíncrono bem projetado se resume a reconhecer o padrão de design que seu aplicativo está implementando e, em seguida, escolher o conjunto certo de componentes .NET.
Em alguns casos, a combinação requer a integração de vários componentes .NET. Artigo de Stephen Cleary, “Patterns for Asynchronous MVVM Applications”: Commands” (bit.ly/233Kocr), mostra como suportar totalmente o padrão Model-View-ViewModel (MVVM) de uma forma assíncrona. Em outros casos, o suporte requer apenas um componente do framework .NET. Eu discuti a implementação do padrão provedor/consumidores usando o BlockingCollection nas minhas colunas do VisualStudioMagazine.com Practical .NET, “Create Simple, Reliable Asynchronous Apps with BlockingCollection” (bit.ly/1TuOpE6), e, “Create Sophisticated Asynchronous Applications with BlockingCollection” (bit.ly/1SpYyD4).
Outro exemplo está implementando o padrão de design do observador para monitorar uma operação de longa duração de forma assíncrona. Neste cenário, um método assíncrono que retorna um único objeto Task não funciona porque o cliente está frequentemente retornando um fluxo de resultados. Para estes cenários, você pode utilizar pelo menos duas ferramentas do .NET Framework: o ObservableCollection e o Reactive Extensions (Rx). Para soluções simples, a ObservableCollection (junto com a async e as palavras-chave await) é tudo o que você precisa. No entanto, para problemas mais “interessantes” e, principalmente, orientados por eventos, Rx fornece um melhor controle sobre o processo.
Definindo o Padrão
Embora o padrão observador seja freqüentemente usado em padrões de design de IU – incluindo Model-View-Controller (MVC), Model-View-Presenter (MVP) e MVVM-UIs devem ser considerados como apenas um cenário de um conjunto maior de cenários onde o padrão observador se aplica. A definição do padrão do observador (citação da Wikipedia) é: “Um objeto, chamado de sujeito, mantém uma lista de seus dependentes, chamados de observadores, e os notifica automaticamente de qualquer mudança de estado, geralmente chamando um de seus métodos”
Realmente, o padrão do observador é sobre obter resultados de processos de longa duração para o cliente, assim que esses resultados estiverem disponíveis. Sem alguma versão do padrão do observador, os clientes devem esperar até que o último resultado esteja disponível e então ter todos os resultados enviados para eles em um único bloco. Em um mundo cada vez mais assíncrono, você quer que os observadores processem os resultados em paralelo com o cliente, à medida que os resultados ficam disponíveis. Para enfatizar que você está falando de mais do que UIs ao alavancar o padrão do observador, vou usar “cliente” e “servidor” em vez de “observador” e “sujeito”, no resto deste artigo.
Problemas e Oportunidades
Existem pelo menos três questões e duas oportunidades com o padrão do observador. A primeira questão é o problema do ouvinte caducado: Muitas implementações do padrão observador requerem que o servidor tenha uma referência a todos os seus clientes. Como resultado, os clientes podem ser mantidos em memória pelo servidor até que o servidor saia. Esta obviamente não é uma solução ótima para um processo de longa duração em um sistema dinâmico onde os clientes se conectam e desconectam freqüentemente.
O problema do ouvinte caduco, entretanto, é apenas um sintoma do segundo, maior problema: Muitas implementações do padrão observador requerem que o servidor e o cliente estejam firmemente acoplados, exigindo que tanto o servidor quanto o cliente estejam presentes em todos os momentos. No mínimo, o cliente deve ser capaz de determinar se o servidor está presente e escolher não anexar; além disso, o servidor deve ser capaz de funcionar mesmo que não haja clientes aceitando resultados.
O terceiro problema é relacionado ao desempenho: Quanto tempo demorará para o servidor notificar todos os clientes? O desempenho no padrão do observador é diretamente afetado pelo número de clientes a serem notificados. Portanto, há uma oportunidade de melhorar a performance no padrão observador, deixando o cliente filtrar preemptivamente os resultados que retornam do servidor. Isto também aborda os cenários onde o servidor produz mais resultados (ou uma maior variedade de resultados) do que o cliente está interessado: O cliente pode indicar que é apenas para ser notificado em casos específicos. A segunda oportunidade de desempenho existe em torno do reconhecimento quando o servidor não tem resultados ou terminou de produzir resultados. Os clientes podem pular a aquisição de recursos necessários para processar eventos do servidor até que o cliente tenha garantia de que há algo para processar e os clientes podem liberar esses recursos assim que souberem que processaram o último resultado.
From Observer to Publish/Subscribe
Factoring in these considerations leads from simple implementations of the observer pattern to the related publish/subscribe model. Publish/subscribe implementa o padrão do observador de uma forma frouxamente acoplada que permite que servidores e clientes executem mesmo que o outro esteja indisponível no momento. Publish/subscribe também tipicamente implementa a filtragem do lado do cliente, permitindo que o cliente se inscreva em tópicos/canais específicos (“Notifique-me sobre pedidos de compra”) ou em atributos associados a diferentes tipos de conteúdo (“Notifique-me sobre qualquer pedido urgente”).
No entanto, uma questão permanece. Todas as implementações do padrão do observador tendem a unir firmemente clientes e servidores a um formato de mensagem específico. Mudar o formato de uma mensagem na maioria das implementações de publicação/assinatura pode ser difícil porque todos os clientes devem ser atualizados para usar o novo formato.
De muitas maneiras, isto é similar à descrição de um cursor do lado do servidor em uma base de dados. Para minimizar os custos de transmissão, o servidor de banco de dados não retorna resultados à medida que cada linha é recuperada. Entretanto, para grandes conjuntos de linhas, a base de dados também não retorna todas as linhas em um único lote no final. Em vez disso, o servidor de banco de dados normalmente retorna subconjuntos de um cursor pressionado no servidor com freqüência, à medida que esses subconjuntos ficam disponíveis. Com uma base de dados, o cliente e o servidor não têm de estar presentes simultaneamente: O servidor de banco de dados pode funcionar quando não há clientes presentes; um cliente pode verificar se o servidor está acessível e, se não estiver, decidir o que (se houver alguma coisa) ele pode fazer. O processo de filtragem (SQL) também é muito flexível. No entanto, se o motor de base de dados alterar o formato que utiliza para retornar linhas, então todos os clientes devem, no mínimo, ser recompilados.
Processar um Cache de Objectos
Como o meu estudo de caso para olhar para uma simples implementação de padrões de observação, estou a utilizar como meu servidor uma classe que procura um cache in-memory de facturas. Esse servidor poderia, no final do seu processamento, devolver uma coleção de todas as faturas. No entanto, prefiro que o cliente processe as facturas individualmente e em paralelo com o processo de pesquisa do servidor. Isso significa que prefiro uma versão do processo, que devolve cada factura tal como foi encontrada e permite ao cliente processar cada factura em paralelo com a procura da factura seguinte.
Uma simples implementação do servidor poderia ser assim:
private List<Invoice> foundInvoices = new List<Invoice>();public List<Invoice> FindInvoices(decimal Amount){ foundInvoices.Clear(); Invoice inv; // ...search logic to add invoices to the collection foundInvoices.Add(inv); // ...repeat until all invoices found return foundInvoices;}
Soluções mais sofisticadas poderiam usar o retorno de rendimento para devolver cada factura tal como foi encontrada, em vez de montar a lista. Independentemente disso, um cliente que chama o método FindInvoices vai querer executar algumas atividades críticas antes e depois do processamento. Por exemplo, uma vez encontrado o primeiro item, o cliente pode querer habilitar uma lista MatchingInvoices para manter as faturas no cliente ou adquirir/inicializar quaisquer recursos necessários para processar uma fatura. À medida que notas fiscais adicionais são adicionadas, o cliente precisaria processar cada nota fiscal e, quando o servidor sinaliza que a nota fiscal final é recuperada, liberar quaisquer recursos que não são mais necessários porque não há mais notas fiscais para processar.
Durante a recuperação de uma base de dados, por exemplo, uma leitura será bloqueada até que a primeira linha seja devolvida. Quando a primeira linha é retornada, o cliente inicializa os recursos necessários para processar uma linha. A leitura também retorna falsa quando a última linha é recuperada, deixando o cliente liberar esses recursos porque não há mais linhas para processar.
Criar Soluções Simples com ObservableCollection
A escolha mais óbvia para implementar o padrão observador no .NET Framework é o ObservableCollection. O ObservableCollection notificará o cliente (através de um evento) sempre que for alterado.
Reescrever o meu servidor de amostra para usar a classe ObservableCollection requer apenas duas alterações. Primeiro, a coleção que contém os resultados precisa ser definida como uma ObservableCollection e tornada pública. Segundo, não é mais necessário para o método retornar um resultado: O servidor só precisa adicionar faturas à cobrança.
A nova implementação do servidor pode parecer assim:
public List<Invoice> FindInvoices(decimal Amount){ public ObservableCollection<Invoice> foundInvoices = new ObservableCollection<Invoice>(); public void FindInvoices(decimal Amount) { foundInvoices.Clear(); Invoice inv; // ...search logic to set inv foundInvoices.Add(inv); // ...repeat until all invoices are added to the collection }
Um cliente que usa esta versão do servidor só precisa transferir um manipulador de eventos para o evento CollectionChanged do InvoiceManagement’s foundInvoiceManagement’s Collection. No código a seguir eu tive a classe implementando a interface IDisposable para suportar desconexão do evento:
public class SearchInvoices: IDisposable{ InvoiceManagement invMgmt = new InvoiceManagement(); public void SearchInvoices() { invMgmt.foundInvoices.CollectionChanged += InvoicesFound; } public void Dispose() { invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged; }
No cliente, o evento CollectionChanged é passado um objeto NotifyCollectionChangedEventArgs como seu segundo parâmetro. A propriedade Action desse objeto especifica tanto o que foi alterado na coleção (as ações são: a coleção foi limpa, novos itens foram adicionados à coleção, itens existentes foram movidos/removidos/substituídos) e informações sobre os itens alterados (uma coleção de quaisquer itens adicionados, uma coleção de itens presentes na coleção antes dos novos itens serem adicionados, a posição do item que foi movido/removido/substituído).
Código simples no cliente que processaria de forma assíncrona cada fatura à medida que fosse adicionada à coleção no servidor seria parecido com o código da Figura 1.
Figure 1 Asynchronously Processing Invoices Using ObservableCollection
private async void InvoicesFound(object sender, NotifyCollectionChangedEventArgs e){ switch (e.Action) { case NotifyCollectionChangedAction.Reset: { // ...initial item processing return; } case NotifyCollectionChangedAction.Add: { foreach (Invoice inv in e.NewItems) { await HandleInvoiceAsync(inv); } return; } }}
Embora simples, este código pode ser inadequado para suas necessidades, especialmente se você estiver manipulando um processo de longa duração ou trabalhando em um ambiente dinâmico. De um ponto de vista de design assíncrono, por exemplo, o código poderia capturar o objeto Task retornado pelo HandleInvoiceAsync para que o cliente pudesse gerenciar as tarefas assíncronas. Você também vai querer ter certeza de que o evento CollectionChanged é levantado na thread UI mesmo que o FindInvoices seja executado em uma thread de fundo.
Por causa de onde o método Clear é chamado na classe server (pouco antes de procurar pela primeira Invoice) o valor da propriedade Action Reset pode ser usado como um sinal de que o primeiro item está prestes a ser recuperado. No entanto, é claro que nenhuma fatura pode ser encontrada na busca, portanto, usando a Ação de Reset pode resultar na alocação de recursos pelo cliente que nunca são realmente utilizados. Para realmente lidar com o processamento do “primeiro item”, você precisaria adicionar um sinalizador no processamento da Ação Adicionar para executar somente quando o primeiro item fosse encontrado.
Além disso, o servidor tem um número limitado de opções para indicar que a última Nota Fiscal é encontrada para que o cliente possa parar de esperar pela “próxima”. O servidor poderia, presumivelmente, limpar a coleção após encontrar o último item, mas isso apenas forçaria um processamento mais complexo no processamento da Ação de Reset (tenho processado Faturas? Se sim, então eu processei a última Nota Fiscal; se não, então estou prestes a processar a primeira Nota Fiscal).
Embora, para problemas simples, a ObservableCollection estará bem, qualquer implementação razoavelmente sofisticada baseada na ObservableCollection (e qualquer aplicação que valorize a eficiência) vai requerer algum código complicado, especialmente no cliente.
As Soluções Rx
Se você quiser processamento assíncrono, então o Rx (disponível através do NuGet) pode fornecer uma solução melhor para implementar o padrão do observador tomando emprestado do modelo de publicação/assinatura. Esta solução também fornece um modelo de filtragem baseado no LINQ, melhor sinalização para condições do primeiro/último item e melhor manipulação de erros.
Rx também pode manipular implementações de observadores mais interessantes do que são possíveis com uma Coleção Observável. No meu estudo de caso, após retornar a lista inicial de faturas, meu servidor pode continuar a verificar novas faturas que são adicionadas ao cache após a conclusão da pesquisa original (e que correspondem ao critério de pesquisa, é claro). Quando uma factura que corresponda aos critérios aparece, o cliente vai querer ser notificado sobre o evento para que a nova factura possa ser adicionada à lista. Rx suporta estes tipos de extensões baseadas em eventos para o padrão do observador melhor do que ObservableCollection.
Existem duas interfaces chave em Rx para suportar o padrão do observador. A primeira é IObservable<T>, implementada pelo servidor e especificando um único método: Subscrever. O servidor implementando o método Subscribe será passado uma referência a um objeto de um cliente. Para lidar com o problema do ouvinte caduco, o método Subscribe retorna uma referência ao cliente para um objeto que implementa a interface IDispossível. O cliente pode usar esse objeto para se desconectar do servidor. Quando o cliente se desconecta, espera-se que o servidor remova o cliente de qualquer uma de suas listas internas.
O segundo é a interface IObserver<T>, que deve ser implementada pelo cliente. Essa interface requer que o cliente implemente e exponha três métodos para o servidor: OnNext, OnCompleted e OnError. O método crítico aqui é o OnNext, que é usado pelo servidor para passar uma mensagem para o cliente (no meu estudo de caso essa mensagem seria novos objetos de Invoice que serão devolvidos conforme cada um aparece). O servidor pode usar o método OnCompleted do cliente para sinalizar que não há mais dados. O terceiro método, OnError, fornece uma maneira do servidor sinalizar ao cliente que ocorreu uma exceção.
Você pode implementar a interface IObserver você mesmo, é claro (ela faz parte do framework .NET). Junto com o ObservableCollection, isso pode ser tudo que você precisa se estiver criando uma solução síncrona (eu também escrevi uma coluna sobre isso, “Writing Cleaner Code with Reactive Extensions” ).
No entanto, o Rx inclui vários pacotes que fornecem implementações assíncronas dessas interfaces, incluindo implementações para serviços JavaScript e RESTful. A classe Rx Subject fornece uma implementação de IObservable que simplifica a implementação de uma versão assíncrona de publicação/subscrição do padrão observador.
Criar uma Solução Assíncrona
Criar um servidor para trabalhar com um objeto Subject requer muito poucas mudanças no código original síncrono do lado do servidor. Eu substituo a antiga ObservableCollection por um objeto Subject que irá passar cada Nota Fiscal como aparece para qualquer cliente que esteja ouvindo. Eu declaro o objeto Subject como público para que os clientes possam acessá-lo:
public class InvoiceManagement{ public IObservable<Invoice> foundInvoice = new Subject<Invoice>();
No corpo do método, ao invés de adicionar uma fatura a uma cobrança, eu uso o método Subject’s OnNext para passar cada fatura para o cliente como é encontrado:
public void FindInvoices(decimal Amount){ inv = GetInvoicesForAmount(Amount) // Poll for invoices foundInvoice.OnNext(inv); // ...repeat...}
No meu cliente, eu primeiro declaro uma instância da classe servidor. Depois, num método marcado como async, chamo o método Subject’s Subscribe para indicar que quero começar a recuperar as mensagens:
public class InvoiceManagementTests{ InvoiceManagement invMgmt = new InvoiceManagement(); public async void ProcessInvoices() { invMgmt.foundInvoice.Subscribe<Invoice>();
Para filtrar os resultados apenas para as facturas que quero, posso aplicar uma declaração LINQ ao objecto Subject. Este exemplo filtra as facturas para as que estão novamente encomendadas (para usar as extensões Rx LINQ terá de adicionar um extracto de utilização para o espaço de nomes System.Reactive.Linq):
invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();
Após ter começado a ouvir o assunto, posso especificar que processamento quero fazer quando receber uma factura. Posso, por exemplo, usar o FirstAsync para processar apenas a primeira fatura devolvida pelo serviço. Neste exemplo, eu uso o extrato de espera com a chamada para o FirstAsync para que eu possa retornar o controle ao corpo principal da minha aplicação enquanto estiver processando a fatura. Este código espera para recuperar a primeira fatura, depois passa para qualquer código que eu usei para inicializar o processo de processamento da fatura e, finalmente, processa a fatura:
Invoice inv;inv = await invMgmt.foundInvoice.FirstAsync();// ...setup code invoices...HandleInvoiceAsync(inv);
Uma advertência: FirstAsync irá bloquear se o servidor ainda não tiver produzido nenhum resultado. Se você quiser evitar o bloqueio, você pode usar FirstOrDefaultAsync, que retornará nulo se o servidor ainda não tiver produzido nenhum resultado. Se não houver resultados, o cliente pode decidir o que fazer.
O caso mais típico é que o cliente quer processar todas as notas fiscais devolvidas (após a filtragem) e fazê-lo de forma assíncrona. Nesse caso, em vez de usar uma combinação de Subscribe e OnNext, você pode simplesmente usar o método ForEachAsync. Você pode passar um método ou uma expressão lambda que processe os resultados recebidos. Se você passar um método (que não pode ser assíncrono), como eu faço aqui, esse método será passado a fatura que acionou ForEachAsync:
invMgmt.foundInvoice.ForEachAsync(HandleInvoice);
O método ForEachAsync também pode ser passado um sinal de cancelamento para deixar o cliente sinalizar que está se desconectando. Uma boa prática seria passar o token ao chamar qualquer um dos métodos Rx *Async para suportar deixar o cliente terminar o processamento sem ter que esperar que todos os objetos sejam processados.
O ForEachAsync não processará nenhum resultado já processado por um método First (ou FirstOrDefaultAsync) para que você possa usar FirstOrDefaultAsync com ForEachAsync para verificar se o servidor tem algo para processar antes de processar objetos subsequentes. No entanto, o método Subject’s IsEmpty irá executar a mesma verificação de forma mais simples. Se o cliente tem que alocar quaisquer recursos necessários para processar resultados, o IsEmpty permite que o cliente verifique se há algo a fazer antes de alocar esses recursos (uma alternativa seria alocar esses recursos no primeiro item processado no loop). Usando o IsEmpty com um cliente que verifica se há algum resultado antes de alocar os recursos (e iniciar o processamento) enquanto também suporta o cancelamento daria um código que se parece com a Figura 2.
Figure 2 Code to Support Cancellation and Defer Processing Until Results are Ready
CancellationTokenSource cancelSource = new CancellationTokenSource();CancellationToken cancel;cancel = cancelSource.Token;if (!await invMgmt.foundInvoice.IsEmpty()){ // ...setup code for processing invoices... try { invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel); } catch (Exception ex) { if (ex.GetType() != typeof(CancellationToken)) { // ...report message } } // ...clean up code when all invoices are processed or client disconnects}
Wrapping Up
Se tudo que você precisa é uma simples implementação do padrão do observador, então o ObservableCollection pode fazer tudo que você precisa para processar um fluxo de resultados. Para um melhor controle e para uma aplicação baseada em eventos, a classe Subject e as extensões que vêm com Rx deixarão sua aplicação trabalhar em modo assíncrono, suportando uma implementação poderosa do modelo publish/subscribe (e eu não olhei para a rica biblioteca de operadores que vêm com Rx). Se você está trabalhando com Rx, vale a pena baixar o Rx Design Guide (bit.ly/1VOPxGS), que discute as melhores práticas de consumo e produção de streams observáveis.
Rx também fornece algum suporte para converter o tipo de mensagem passada entre o cliente e o servidor usando a interface ISubject<TSource, TResult>. A interface ISubject<TSource, TResult> especifica dois tipos de dados: um datatype “in” e um datatype “out”. Dentro da classe Subject que implementa esta interface você pode realizar quaisquer operações necessárias para converter o resultado retornado do servidor (o datatype “in”) no resultado requerido pelo cliente (o datatype “out”). Além disso, o parâmetro in é covariante (aceitará o datatype especificado ou qualquer coisa que o datatype herda) e o parâmetro out é contravariante (aceitará o datatype especificado ou qualquer coisa que dele deriva), dando-lhe flexibilidade adicional.
Vivemos em um mundo cada vez mais assíncrono e, nesse mundo, o padrão do observador vai se tornar mais importante – é uma ferramenta útil para qualquer interface entre processos onde o processo do servidor retorna mais do que um único resultado. Felizmente, você tem várias opções para implementar o padrão observador no framework .NET, incluindo o ObservableCollection e Rx.
Peter Vogel é um arquiteto de sistemas e diretor em PH&V Information Services. PH&V fornece consultoria completa de desenho UX através de modelagem de objetos e desenho de banco de dados.
Peter Vogel é um arquiteto de sistemas e diretor em PH&V Serviços de Informação: Stephen Cleary, James McCaffrey e Dave Sexton
Stephen Cleary trabalha com programação multithreading e assíncrona há 16 anos e tem usado suporte a async no Microsoft .NET Framework desde a primeira visualização da tecnologia comunitária. Ele é autor de “Concurrency in C# Cookbook” (O’Reilly Media, 2014). Sua página inicial, incluindo seu blog, está em stephencleary.com.
Discutam este artigo no fórum da Revista MSDN
.
Deixe uma resposta