Страницы

Поиск по вопросам

среда, 24 октября 2018 г.

TaskScheduler и балансировка по ядрам для процессов

Как известно, TaskScheduler в TPL раскидывает таски по ядрам (хоть и не гарантирует это).
Возьмем другой случай - порождаются много копий процессов, где внутри поток с многочисленными Thread.Sleep. В таком варианте поток намертво прилипнет к какому то ядру.
Вопрос в том, если этот поток переделать на task-модель, то будут ли эти таски перемалываться на разных ядрах или же будут тяготеть к одному и тому же ядру? Хоть это и таски, но по факту один поток разбивается на таски, чтобы избежать sleep и TaskScheduler может тяготеть переиспользовать этот же поток (других то в пуле нет)


Ответ

Окей, вам нужно перебрасывать выполнение между ядрами. Это можно сделать вот как.
Подсчитываем количество ядер. Это легко: Environment.ProcessorCount Запускаем столько UI-потоков, сколько у нас ядер. Для этого берём код отсюда, и заимствуем из него класс DispatcherThread. Каждый из них представляет собой поток, в который можно переключиться при помощи await AsyncHelper.RedirectTo(t.Dispatcher); (оттуда же). Нам нужно разбросать эти потоки по ядрам. Это можно сделать как описано здесь Теперь в нашей async-функции, если мы хотим поменять ядро, просто пишем
currentCore = (currentCore + 1) % Environment.ProcessorCount; await AsyncHelper.RedirectTo(threads[currentCore].Dispatcher);

Полный код:
using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Threading;
namespace SO5 { class Program { static List threads; static int currentCoreNo = 0;
static void Main(string[] args) { threads = Enumerable.Range(0, Environment.ProcessorCount) .Select(coreNo => new CoreAffineDispatcherThread(coreNo)) .ToList(); Run().Wait();
foreach (var t in threads) t.Dispose(); }
static async Task Run() { for (int i = 0; i < 10; i++) { await Task.Delay(100);
currentCoreNo = (currentCoreNo + 1) % Environment.ProcessorCount; await AsyncHelper.RedirectTo(threads[currentCoreNo].Dispatcher);
var t = Thread.CurrentThread; Console.WriteLine($"Task reporting from thread {t.ManagedThreadId}," + $" thread pool: {t.IsThreadPoolThread}"); } }
public class CoreAffineDispatcherThread : IDisposable { public Dispatcher Dispatcher { get; private set; }
Thread thread;
public CoreAffineDispatcherThread(int coreNumber) { using (var barrier = new AutoResetEvent(false)) { thread = new Thread(() => { Dispatcher = Dispatcher.CurrentDispatcher; barrier.Set(); Thread.BeginThreadAffinity();
#pragma warning disable 618 // The call to BeginThreadAffinity guarantees stable results // for GetCurrentThreadId, so we ignore the obsolete warning int osThreadId = AppDomain.GetCurrentThreadId(); #pragma warning restore 618
// Find the ProcessThread for this thread. ProcessThread thread = Process.GetCurrentProcess() .Threads.Cast() .Where(t => t.Id == osThreadId) .Single(); // Set the thread's processor affinity var cpuMask = 1 << coreNumber; thread.ProcessorAffinity = new IntPtr(cpuMask);
Dispatcher.Run();
Thread.EndThreadAffinity(); });
thread.SetApartmentState(ApartmentState.STA); thread.Start(); barrier.WaitOne(); } }
public void Dispose() { Dispatcher.InvokeShutdown(); if (thread != Thread.CurrentThread) thread.Join(); } } }
static class AsyncHelper { public static DispatcherRedirector RedirectTo(Dispatcher d) { return new DispatcherRedirector(d); } }
public struct DispatcherRedirector : INotifyCompletion { public DispatcherRedirector(Dispatcher dispatcher) { this.dispatcher = dispatcher; }
#region awaiter public DispatcherRedirector GetAwaiter() { // combined awaiter and awaitable return this; } #endregion
#region awaitable public bool IsCompleted { get { // true means execute continuation inline return dispatcher.CheckAccess(); } }
public void OnCompleted(Action continuation) { dispatcher.BeginInvoke(continuation); }
public void GetResult() { } #endregion
Dispatcher dispatcher; } }
При тестовом пробеге выдаёт:
Task reporting from thread 10, thread pool: False Task reporting from thread 11, thread pool: False Task reporting from thread 12, thread pool: False Task reporting from thread 13, thread pool: False Task reporting from thread 14, thread pool: False Task reporting from thread 15, thread pool: False Task reporting from thread 16, thread pool: False Task reporting from thread 9, thread pool: False Task reporting from thread 10, thread pool: False Task reporting from thread 11, thread pool: False

Комментариев нет:

Отправить комментарий