Есть ли лучший способ подождать очереди в очереди?

Есть ли лучший способ подождать очереди в очереди, прежде чем выполнять другой процесс?

В настоящее время я делаю:

this.workerLocker = new object(); // Global variable this.RunningWorkers = arrayStrings.Length; // Global variable // Initiate process foreach (string someString in arrayStrings) { ThreadPool.QueueUserWorkItem(this.DoSomething, someString); Thread.Sleep(100); } // Waiting execution for all queued threads lock (this.workerLocker) // Global variable (object) { while (this.RunningWorkers > 0) { Monitor.Wait(this.workerLocker); } } // Do anything else Console.WriteLine("END"); 

 // Method DoSomething() definition public void DoSomething(object data) { // Do a slow process... . . . lock (this.workerLocker) { this.RunningWorkers--; Monitor.Pulse(this.workerLocker); } } 

Вероятно, вы хотите взглянуть на AutoResetEvent и ManualResetEvent.

Они предназначены именно для этой ситуации (ожидая окончания streamа ThreadPool, прежде чем делать что-то).

Вы бы сделали что-то вроде этого:

 static void Main(string[] args) { List resetEvents = new List(); foreach (var x in Enumerable.Range(1, WORKER_COUNT)) { ManualResetEvent resetEvent = new ManualResetEvent(); ThreadPool.QueueUserWorkItem(DoSomething, resetEvent); resetEvents.Add(resetEvent); } // wait for all ManualResetEvents WaitHandle.WaitAll(resetEvents.ToArray()); // You probably want to use an array instead of a List, a list was just easier for the example :-) } public static void DoSomething(object data) { ManualResetEvent resetEvent = data as ManualResetEvent; // Do something resetEvent.Set(); } 

Изменить: забыл упомянуть, что вы можете подождать один stream, любой stream и так далее. Кроме того, в зависимости от вашей ситуации, AutoResetEvent может немного упростить ситуацию, поскольку он (как следует из названия) может сигнализировать события автоматически 🙂

Как насчет Fork и соединения, которые используют только Monitor ;-p

 Forker p = new Forker(); foreach (var obj in collection) { var tmp = obj; p.Fork(delegate { DoSomeWork(tmp); }); } p.Join(); 

Полный код показан на этом более раннем ответе .

Или для очереди производителей / потребителей с ограниченным размером (поточно-безопасным и т. Д.).

В дополнение к Barrier, указанному Хенком Холтерманом (BTW его очень плохое использование Barrier, см. Мой комментарий к его ответу), .NET 4.0 предоставляет целый набор других параметров (чтобы использовать их в .NET 3.5, вам необходимо скачать дополнительную DLL от Microsoft ). Я написал сообщение, в котором перечислены их все , но мой любимый, безусловно, Parallel.ForEach:

 Parallel.ForEach(arrayStrings, someString => { DoSomething(someString); }); 

За кулисами Parallel.ForEach очереди в новый и улучшенный пул streamов и ждет, пока все streamи не будут выполнены.

Мне очень нравится шаблон Begin End-Async, когда мне нужно дождаться завершения задач.

Я бы посоветовал вам обернуть BeginEnd в рабочий class:

 public class StringWorker { private string m_someString; private IAsyncResult m_result; private Action DoSomethingDelegate; public StringWorker(string someString) { DoSomethingDelegate = DoSomething; } private void DoSomething() { throw new NotImplementedException(); } public IAsyncResult BeginDoSomething() { if (m_result != null) { throw new InvalidOperationException(); } m_result = DoSomethingDelegate.BeginInvoke(null, null); return m_result; } public void EndDoSomething() { DoSomethingDelegate.EndInvoke(m_result); } } 

Для начала и работы используйте этот fragment кода:

 List workers = new List(); foreach (var someString in arrayStrings) { StringWorker worker = new StringWorker(someString); worker.BeginDoSomething(); workers.Add(worker); } foreach (var worker in workers) { worker.EndDoSomething(); } Console.WriteLine("END"); 

И это все.

Sidenote: Если вы хотите получить результат с BeginEnd, то измените «Action» на Func и измените EndDoSomething, чтобы вернуть тип.

 public class StringWorker { private string m_someString; private IAsyncResult m_result; private Func DoSomethingDelegate; public StringWorker(string someString) { DoSomethingDelegate = DoSomething; } private string DoSomething() { throw new NotImplementedException(); } public IAsyncResult BeginDoSomething() { if (m_result != null) { throw new InvalidOperationException(); } m_result = DoSomethingDelegate.BeginInvoke(null, null); return m_result; } public string EndDoSomething() { return DoSomethingDelegate.EndInvoke(m_result); } } 

Да, есть.

Предлагаемый подход

1) счетчик и ручка ожидания

 int ActiveCount = 1; // 1 (!) is important EventWaitHandle ewhAllDone = new EventWaitHandle(false, ResetMode.Manual); 

2) добавление цикла

 foreach (string someString in arrayStrings) { Interlocked.Increment(ref ActiveCount); ThreadPool.QueueUserWorkItem(this.DoSomething, someString); // Thread.Sleep(100); // you really need this sleep ? } PostActionCheck(); ewhAllDone.Wait(); 

3) DoSomething должно выглядеть

 { try { // some long executing code } finally { // .... PostActionCheck(); } } 

4), где PostActionCheck

 void PostActionCheck() { if (Interlocked.Decrement(ref ActiveCount) == 0) ewhAllDone.Set(); } 

идея

ActiveCount инициализируется с 1 , а затем увеличивается на n раз.

PostActionCheck называется n + 1 раз. Последний инициирует событие.

Преимущество этого решения заключается в том, что используется один объект ядра (который является событием) и 2 * n + 1 вызовами облегченного API. (Может ли быть меньше?)

PS

Я написал код здесь, у меня, возможно, были написаны некоторые имена classов.

У .NET 4.0 есть новый class для этого, Барьер .

Кроме того, ваш метод не так уж плох, и вы можете немного оптимизировать только с помощью Pulsing, если RunningWorkers равно 0 после декремента. Это будет выглядеть так:

 this.workerLocker = new object(); // Global variable this.RunningWorkers = arrayStrings.Length; // Global variable foreach (string someString in arrayStrings) { ThreadPool.QueueUserWorkItem(this.DoSomething, someString); //Thread.Sleep(100); } // Waiting execution for all queued threads Monitor.Wait(this.workerLocker); 

 // Method DoSomething() definition public void DoSomething(object data) { // Do a slow process... // ... lock (this.workerLocker) { this.RunningWorkers--; if (this.RunningWorkers == 0) Monitor.Pulse(this.workerLocker); } } 

Вы можете использовать EventWaithandle или AutoResetEvent, но все они обернуты API Win32. Поскольку class Monitor является чистым управляемым кодом, я бы предпочел использовать Monitor для этой ситуации.

Я не уверен, что на самом деле я недавно сделал что-то похожее на сканирование каждого IP-адреса подсети для приема определенного порта.

Несколько вещей, которые я могу предложить, могут улучшить производительность:

  • Используйте метод SetMaxThreads ThreadPool для настройки производительности (например, балансировка с одновременным запуском множества streamов, против времени блокировки на таком большом количестве streamов).

  • Не спать при настройке всех рабочих элементов, нет никакой реальной потребности (что я сразу же знаю. Однако не спите внутри метода DoSomething, возможно только на миллисекунду, чтобы другие streamи могли туда прыгать, если нужно ,

Я уверен, что вы могли бы реализовать более индивидуальный метод самостоятельно, но я сомневаюсь, что это было бы более эффективно, чем использование ThreadPool.

PS Я не понимаю на 100% причины использования монитора, так как вы все равно блокируете? Обратите внимание, что вопрос задается только потому, что я раньше не использовал class Monitor, а не потому, что я действительно сомневаюсь в его использовании.

Используйте Spring Threading. Он имеет встроенные барьерные реализации.

http://www.springsource.org/extensions/se-threading-net