スケールアシンクロナス クライアント-サーバーリンクServer Links with Reactive
On 1月 25, 2022 by admin- 01/31/2019
- 17 minutes to read
June 2016
Volume 31 Number 6
By Peter Vogel | June 2016
Asynchronous processing has more common in application development.The only only only only asynchronous processing has been been connected, マイクロソフト.comMicrosoft .NET Frameworkは、特定の非同期デザインパターンをサポートするさまざまなツールを獲得しています。 多くの場合、優れた設計の非同期アプリケーションを作成するには、アプリケーションが実装しているデザイン パターンを認識し、適切な .NET コンポーネントのセットを選択することに尽きます。 Stephen Cleary の記事、「Patterns for Asynchronous MVVM Applications: Commands” (bit.ly/233Kocr) では、Model-View-ViewModel (MVVM) パターンを非同期で完全にサポートする方法を示しています。 他のケースでは、サポートには.NET Frameworkのコンポーネントが1つだけ必要です。 BlockingCollection を使用したプロバイダー/コンシューマー パターンの実装については、私の VisualStudioMagazine.com Practical .NET コラム「Create Simple, Reliable Asynchronous Apps with BlockingCollection」 (bit.ly/1TuOpE6) と「Create Sophisticated Asynchronous Applications with BlockingCollection」 (bit.ly/1SpYyD4) で説明しています。 このシナリオでは、クライアントが結果のストリームを頻繁に返すため、単一のタスク オブジェクトを返す非同期メソッドは機能しません。 このようなシナリオでは、.NET Frameworkの少なくとも2つのツール、ObservableCollectionとReactive Extensions(Rx)を活用することができます。 単純なソリューションであれば、ObservableCollection(asyncおよびawaitキーワードとともに)が必要なすべてである。 しかし、より「興味深い」問題、特にイベント駆動型の問題では、Rx はプロセスに対するより良い制御を提供します。
Defining the Pattern
UI デザインパターン (Model-View-Controller (MVC), Model-View-Presenter (MVP) および MVVM-UI など ) でオブザーバーパターンは頻繁に使用されていますが、オブザーバーパターンが適用できる一連のシナリオのうちの単なるシナリオとして検討される必要があります。 observerパターンの定義(Wikipediaより引用)は以下の通りです。 「サブジェクトと呼ばれるオブジェクトは、オブザーバーと呼ばれる依存関係のリストを維持し、通常はそれらのメソッドの 1 つを呼び出すことによって、状態の変化を自動的に通知する。 あるバージョンのオブザーバー パターンがなければ、クライアントは最後の結果が利用可能になるまで待機し、その後、すべての結果を一括して送信してもらわなければなりません。 ますます非同期化する世界では、オブザーバは結果が利用可能になったときに、クライアントと並行して結果を処理することが望まれます。 observer パターンを活用するときに、UI 以外のことを話していることを強調するために、この記事の残りの部分では、「observer」と「subject」の代わりに「client」と「server」を使用します。
Problems and Opportunities
observer パターンには少なくとも 3 つの問題点と 2 つの機会があります。 Observer パターンの多くの実装では、サーバーがそのクライアントのすべてへの参照を保持することを必要とします。 その結果、サーバが終了するまで、クライアントはサーバによってメモリ上に保持される可能性があります。 これは明らかに、クライアントが頻繁に接続および切断する動的なシステムで長時間実行されるプロセスにとって最適なソリューションではありません。 少なくとも、クライアントはサーバーが存在するかどうかを判断し、アタッチしないことを選択できる必要があります。さらに、サーバーは、結果を受け取るクライアントがない場合でも機能できる必要があります。 サーバーがすべてのクライアントに通知するまでにどれくらいの時間がかかるでしょうか。 observerパターンの性能は、通知するクライアントの数に直接影響されます。 したがって、サーバから戻ってきた結果をクライアントが先取りしてフィルタリングすることで、オブザーバ・パターンのパフォーマンスを向上させる機会があります。 これは、クライアントが興味を持つよりも多くの結果(あるいは多種多様な結果)をサーバーが生成するシナリオにも対応する。 クライアントは、特定の場合のみ通知されるように指定することができます。 2つ目の性能向上の機会は、サーバーに結果がない場合、または結果の生成が終了した場合を認識することにあります。 クライアントは、処理するものがあると保証されるまで、サーバー イベントを処理するために必要なリソースの取得をスキップでき、クライアントは最後の結果を処理したとわかるとすぐにそれらのリソースを解放することができます。 Publish/subscribe は、他方が現在利用できない場合でも、サーバーとクライアントを実行できるようにする疎結合の方法で observer パターンを実装します。 また、Publish/Subscribe は通常、クライアントに特定のトピック/チャネル (「Notify me about purchase orders」) またはさまざまな種類のコンテンツに関連する属性 (「Notify me about any urgent requests」) を購読させることにより、クライアント側のフィルタリングを実装しています。 Observer パターンのすべての実装は、クライアントとサーバーを特定のメッセージ形式に緊密に結合する傾向があります。 ほとんどの publish/subscribe 実装でメッセージのフォーマットを変更することは、すべてのクライアントが新しいフォーマットを使用するように更新されなければならないため、困難な場合があります。 伝送コストを最小化するために、データベース サーバーは各行が取得されるたびに結果を返しません。 しかし、大きな行セットでは、データベースは最後にすべての行を一括して返すこともありません。 その代わり、データベースサーバーは通常、サーバー上に保持されているカーソルから、サブセットが利用可能になった時点で頻繁にサブセットを返します。 データベースでは、クライアントとサーバーは同時に存在する必要はありません。 クライアントは、サーバーにアクセスできるかどうかを確認し、アクセスできない場合は、他に何ができるかを決定することができます。 フィルタリング処理(SQL)も非常に柔軟です。 しかし、データベース エンジンが行を返すために使用する形式を変更した場合、すべてのクライアントを少なくとも再コンパイルする必要があります。
Processing a Cache of Objects
Observer パターン実装を見るためのケーススタディとして、請求書のインメモリキャッシュを検索するクラスをサーバーとして使用します。 そのサーバーは、処理の最後に、すべての請求書のコレクションを返すことができます。 しかし、私は、クライアントが請求書を個別に処理し、サーバーの検索処理と並行して処理することを希望しています。 つまり、見つかった各請求書を返し、クライアントが次の請求書の検索と並行して各請求書を処理できるようなプロセスのバージョンを好みます。
サーバーのシンプルな実装は、次のようになります:
private List<Invoice> foundInvoices = new List<Invoice>();public List<Invoice> FindInvoices(decimal Amount){ foundInvoices.Clear(); Invoice inv; // ...search logic to add invoices to the collection foundInvoices.Add(inv); // ...repeat until all invoices found return foundInvoices;}
より高度なソリューションでは、リストを組み立てるのではなく、見つかった各請求書を返すために yield return を使用します。 いずれにせよ、FindInvoices メソッドを呼び出すクライアントは、処理の前後にいくつかの重要なアクティビティを実行する必要があります。 たとえば、最初の項目が見つかると、クライアントは MatchingInvoices リストを有効にしてクライアントに請求書を保持したり、請求書の処理に必要なリソースを取得/初期化したりしたいと思うかもしれません。 追加の請求書が追加されると、クライアントは各請求書を処理し、サーバーが最後の請求書が取得されたことを通知すると、処理する請求書が「もうない」ため、不要になったリソースをすべて解放する必要があるでしょう。 最初の行が返されると、クライアントは行を処理するために必要なリソースを初期化します。
Creating Simple Solutions with ObservableCollection
.NET Framework でオブザーバー パターンを実装するための最も明白な選択は、ObservableCollection です。 ObservableCollection は、それが変更されるたびに (イベントを通じて) クライアントに通知します。
ObservableCollection クラスを使用するために私のサンプル サーバーを書き直すには、2 つの変更のみが必要です。 まず、結果を保持するコレクションは ObservableCollection として定義され、公開される必要があります。 2つ目は、メソッドが結果を返す必要がないことです。 サーバーは、請求書をコレクションに追加するだけです。
サーバーの新しい実装は次のようになります。
public List<Invoice> FindInvoices(decimal Amount){ public ObservableCollection<Invoice> foundInvoices = new ObservableCollection<Invoice>(); public void FindInvoices(decimal Amount) { foundInvoices.Clear(); Invoice inv; // ...search logic to set inv foundInvoices.Add(inv); // ...repeat until all invoices are added to the collection }
サーバーのこのバージョンを使用するクライアントは、InvoiceManagement の foundInvoices コレクションの CollectionChanged イベントにイベント ハンドラを配線することだけが必要です。 次のコードでは、イベントからの切断をサポートするために、クラスに IDisposable インターフェイスを実装させています。
public class SearchInvoices: IDisposable{ InvoiceManagement invMgmt = new InvoiceManagement(); public void SearchInvoices() { invMgmt.foundInvoices.CollectionChanged += InvoicesFound; } public void Dispose() { invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged; }
クライアントでは、CollectionChanged イベントはその 2 番目のパラメーターとして NotifyCollectionChangedEventArgs オブジェクトを渡されます。 そのオブジェクトの Action プロパティは、コレクションに対して実行された変更 (アクションは、コレクションがクリアされた、新しいアイテムがコレクションに追加された、既存のアイテムが移動/置換/削除された) と変更されたアイテムに関する情報 (追加されたすべてのアイテムのコレクション、新しいアイテムが追加される前にコレクションに存在するアイテムのコレクション、移動/削除/置換されたアイテムの位置) を両方指定します。
サーバーのコレクションに追加された各請求書を非同期に処理するクライアント内の簡単なコードは、図 1 のようになります。
Figure 1 Asynchronously Processing Invoices Using ObservableCollection
private async void InvoicesFound(object sender, NotifyCollectionChangedEventArgs e){ switch (e.Action) { case NotifyCollectionChangedAction.Reset: { // ...initial item processing return; } case NotifyCollectionChangedAction.Add: { foreach (Invoice inv in e.NewItems) { await HandleInvoiceAsync(inv); } return; } }}
シンプルではありますが、特に長時間実行するプロセスや動的環境で作業する場合は、このコードでは不十分な可能性があります。 非同期設計の観点から、たとえば、このコードでは、HandleInvoiceAsync が返す Task オブジェクトをキャプチャして、クライアントが非同期タスクを管理できるようにすることができます。 また、FindInvoices がバックグラウンド スレッドで実行されても、CollectionChanged イベントが UI スレッドで発生するようにします。
Clear メソッドがサーバー クラスで呼び出される場所 (最初の Invoice を検索する直前) から、最初のアイテムが取得される信号として Actionプロパティの Reset 値が使用されます。 しかし、もちろん、検索してもInvoiceが見つからない場合もあるので、Reset Actionを使うと、実際には使われないリソースをクライアントが割り当てることになるかもしれません。 実際に「最初のアイテム」処理を処理するには、最初のアイテムが見つかったときのみ実行するように Add Action 処理にフラグを追加する必要があります。 サーバーは、おそらく、最後のアイテムを見つけた後にコレクションをクリアすることができますが、それは、Reset Action 処理にさらに複雑な処理を強制するだけです (Invoices を処理していましたか? 5025>
ObservableCollection は単純な問題では問題ありませんが、ObservableCollection に基づく適度に洗練された実装(および効率を重視するアプリケーション)は、特にクライアントで、いくつかの複雑なコードを必要とすることになるでしょう。
Rx ソリューション
非同期処理が必要な場合、Rx (NuGet から入手可能) は、パブリッシュ/サブスクライブ モデルから借用してオブザーバー パターンを実装するための優れたソリューションを提供することができます。 このソリューションはまた、LINQ ベースのフィルタリング モデル、最初/最後のアイテム条件に対するより良いシグナル伝達、およびより良いエラー処理も提供します。 私のケーススタディでは、請求書の最初のリストを返した後、サーバーは、最初の検索が完了した後にキャッシュに追加された新しい請求書 (そしてもちろん、検索基準に一致する) をチェックし続けるかもしれません。 条件に合う請求書が現れたら、クライアントはそのイベントを通知して、新しい請求書をリストに追加できるようにしたいと思うでしょう。 Rx は、ObservableCollection よりも、このようなイベント ベースの拡張をサポートします。
Observer パターンをサポートするための Rx の主要なインターフェイスが 2 つあります。 1 つは IObservable<T> で、サーバーによって実装され、1 つのメソッドを指定します。 Subscribe です。 Subscribe メソッドを実装したサーバーは、クライアントからオブジェクトへの参照を渡されます。 リスナーがいなくなる問題を処理するために、SubscribeメソッドはIDisposableインターフェイスを実装したオブジェクトの参照をクライアントに返します。 クライアントは、そのオブジェクトを使用してサーバーから切断することができます。 クライアントが切断すると、サーバーはクライアントをその内部リストから削除することが期待されます。
2 番目は IObserver<T> インターフェースで、これはクライアントによって実装されなければなりません。 このインターフェイスでは、クライアントが3つのメソッドを実装し、サーバーに公開する必要があります。 OnNext、OnCompleted、OnErrorです。 ここで重要なメソッドはOnNextで、サーバーがクライアントにメッセージを渡すために使用します(この事例の場合、メッセージは新しいInvoiceオブジェクトで、それぞれが出現するたびに返されることになります)。 サーバーは、クライアントのOnCompletedメソッドを使用して、これ以上データがないことを知らせることができます。 3 番目のメソッド OnError は、サーバーがクライアントに例外が発生したことを通知する方法を提供します。
IObserver インターフェイスは、もちろん自分で実装しても構いません (これは .NET Framework の一部です)。 ObservableCollection とともに、同期的なソリューションを作成する場合、それが必要なすべてかもしれません (これについても、コラム「Writing Cleaner Code with Reactive Extensions」を執筆しています)。 Rx Subject クラスは、Observer パターンの非同期発行/購読バージョンの実装を単純化する IObservable の実装を提供します。
Creating an Asynchronous Solution
Subject オブジェクトで動作するサーバーを作成するには、元の同期サーバー側コードにほとんど変更を加える必要がありません。 古い ObservableCollection を、各 Invoice をリスニングしているクライアントに渡す Subject オブジェクトに置き換えます。
public class InvoiceManagement{ public IObservable<Invoice> foundInvoice = new Subject<Invoice>();
メソッドの本体では、Invoice をコレクションに追加する代わりに、Subject の OnNext メソッドを使用して、各 Invoice が見つかるとクライアントに渡します。
public class InvoiceManagementTests{ InvoiceManagement invMgmt = new InvoiceManagement(); public async void ProcessInvoices() { invMgmt.foundInvoice.Subscribe<Invoice>();
欲しい請求書だけに結果をフィルタリングするには、LINQ ステートメントを Subject オブジェクトに適用できます。 この例では、バックオーダーされた請求書にフィルターをかけています (Rx LINQ 拡張機能を使用するには、System.Reactive.Linq 名前空間の using 文を追加する必要があります):
invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();
一旦、件名を聞き始めたら、請求書を受け取ったときに実行したい処理を指定することができます。 例えば、FirstAsyncを使って、サービスから返された最初の請求書だけを処理することができます。 この例では、FirstAsyncの呼び出しとともにawait文を使って、請求書の処理中にアプリケーション本体に制御を戻すことができるようにしています。 このコードでは、最初の請求書を取得するのを待ち、次に請求書処理プロセスを初期化するコードに移動し、最後に請求書を処理します:
Invoice inv;inv = await invMgmt.foundInvoice.FirstAsync();// ...setup code invoices...HandleInvoiceAsync(inv);
1つの注意点: FirstAsyncは、サーバーがまだ何も結果を生成していない場合にブロックされます。 ブロックを回避したい場合は、FirstOrDefaultAsync を使用します。これは、サーバーが結果を生成していない場合は null を返します。
より一般的なケースは、クライアントが (フィルタリング後に) 返されたすべての請求書を処理し、それを非同期に行いたい場合です。 その場合、Subscribe と OnNext を組み合わせて使用するのではなく、ForEachAsync メソッドだけを使用することができます。 受信した結果を処理するメソッドまたはラムダ式を渡すことができます。 ここで私が行っているように、(非同期であることができない)メソッドを渡すと、そのメソッドは ForEachAsync をトリガーしたインボイスを渡されます:
invMgmt.foundInvoice.ForEachAsync(HandleInvoice);
ForEachAsyncメソッドは、クライアントに切断することを知らせるためにキャンセル トークンを渡すこともできます。 5025>
ForEachAsync は、First (または FirstOrDefaultAsync) メソッドによってすでに処理された結果を処理しないため、ForEachAsync で FirstOrDefaultAsync を使用して、サーバーが後続オブジェクトを処理する前に何か処理があるかどうかを確認することができます。 しかし、Subject の IsEmpty メソッドは、より簡単に同じチェックを実行します。 クライアントが結果の処理に必要なリソースを割り当てる必要がある場合、IsEmpty を使用すると、リソースを割り当てる前に処理することがあるかどうかを確認できます(ループ内で処理される最初のアイテムでリソースを割り当てるという方法もあります)。 IsEmpty を使用して、リソースを割り当てる前に (処理を開始する前に) 何か結果があるかどうかを確認するクライアントで、キャンセルもサポートすると、図 2 のようなコードが得られます。
Figure 2 Cancellation and Delfer Processing Until Results are Ready
CancellationTokenSource cancelSource = new CancellationTokenSource();CancellationToken cancel;cancel = cancelSource.Token;if (!await invMgmt.foundInvoice.IsEmpty()){ // ...setup code for processing invoices... try { invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel); } catch (Exception ex) { if (ex.GetType() != typeof(CancellationToken)) { // ...report message } } // ...clean up code when all invoices are processed or client disconnects}
Wrapping Up
Observer パターンの単純な実装だけが必要なら、結果のストリームを処理するのに ObservableCollection ですべてが可能かもしれません。 より良い制御とイベントベースのアプリケーションのために、Subject クラスと Rx に付属する拡張機能により、発行/購読モデルの強力な実装をサポートして、アプリケーションが非同期モードで動作します(Rx に付属する演算子の豊富なライブラリには目を通していません)。 Rx を使用している場合、Rx デザイン ガイド (bit.ly/1VOPxGS) をダウンロードして、観測可能なストリームを消費および生成する際のベスト プラクティスを説明することは価値があります。 ISubject<TSource, TResult> インターフェースは、”in” データ型と “out” データ型の 2 つのデータ型を指定するもので、”in” データ型は、”out” データ型は、”out” データ型は、”out” のデータ型を指定する。 このインタフェースを実装する Subject クラス内では、サーバーから返された結果 (in データ型) をクライアントが必要とする結果 (out データ型) に変換するために必要なすべての操作を実行できます。 さらに、in パラメーターは共変数 (指定されたデータ型またはそのデータ型を継承するものを受け入れる)、out パラメーターは共変数 (指定されたデータ型またはそのデータ型から派生するものを受け入れる) で、さらなる柔軟性を提供します。 幸いなことに、.NET Framework には ObservableCollection や Rx など、observer パターンを実装するためのいくつかのオプションがあります。 PH&V は、UX デザインからオブジェクト モデリングおよびデータベース デザインまで、フルスタックのコンサルティングを提供しています。
この記事をレビューしてくださった以下の Microsoft の技術専門家に感謝します。 Stephen Cleary、James McCaffrey、Dave Sexton
Stephen Cleary は、16 年間マルチスレッドおよび非同期プログラミングに取り組み、Microsoft .NET Framework の非同期サポートを最初のコミュニティ テクノロジー プレビューから使用してきました。 著書に「Concurrency in C# Cookbook」(O’Reilly Media、2014年)があります。 ブログを含む彼のホームページは stephencleary.com.
Discuss this article in the MSDN Magazine forum
にあります。
コメントを残す