Страницы

Поиск по вопросам

суббота, 14 декабря 2019 г.

TaskScheduler и балансировка по ядрам для процессов

#c_sharp #tpl


Как известно, TaskScheduler в TPL раскидывает таски по ядрам (хоть и не гарантирует это).

Возьмем другой случай - порождаются много копий процессов, где внутри поток с многочисленными
Thread.Sleep. В таком варианте поток намертво прилипнет к какому то ядру.

Вопрос в том, если этот поток переделать на task-модель, то будут ли эти таски перемалываться
на разных ядрах или же будут тяготеть к одному и тому же ядру? Хоть это и таски, но
по факту один поток разбивается на таски, чтобы избежать sleep и TaskScheduler может
тяготеть переиспользовать этот же поток (других то в пуле нет)
    


Ответы

Ответ 1



Окей, вам нужно перебрасывать выполнение между ядрами. Это можно сделать вот как. Подсчитываем количество ядер. Это легко: Environment.ProcessorCount. Запускаем столько UI-потоков, сколько у нас ядер. Для этого берём код отсюда, и заимствуем из него класс DispatcherThread. Каждый из них представляет собой поток, в который можно переключиться при помощи await AsyncHelper.RedirectTo(t.Dispatcher); (оттуда же). Нам нужно разбросать эти потоки по ядрам. Это можно сделать как описано здесь. Теперь в нашей async-функции, если мы хотим поменять ядро, просто пишем currentCore = (currentCore + 1) % Environment.ProcessorCount; await AsyncHelper.RedirectTo(threads[currentCore].Dispatcher); Полный код: using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Windows.Threading; namespace SO5 { class Program { static List threads; static int currentCoreNo = 0; static void Main(string[] args) { threads = Enumerable.Range(0, Environment.ProcessorCount) .Select(coreNo => new CoreAffineDispatcherThread(coreNo)) .ToList(); Run().Wait(); foreach (var t in threads) t.Dispose(); } static async Task Run() { for (int i = 0; i < 10; i++) { await Task.Delay(100); currentCoreNo = (currentCoreNo + 1) % Environment.ProcessorCount; await AsyncHelper.RedirectTo(threads[currentCoreNo].Dispatcher); var t = Thread.CurrentThread; Console.WriteLine($"Task reporting from thread {t.ManagedThreadId}," + $" thread pool: {t.IsThreadPoolThread}"); } } public class CoreAffineDispatcherThread : IDisposable { public Dispatcher Dispatcher { get; private set; } Thread thread; public CoreAffineDispatcherThread(int coreNumber) { using (var barrier = new AutoResetEvent(false)) { thread = new Thread(() => { Dispatcher = Dispatcher.CurrentDispatcher; barrier.Set(); Thread.BeginThreadAffinity(); #pragma warning disable 618 // The call to BeginThreadAffinity guarantees stable results // for GetCurrentThreadId, so we ignore the obsolete warning int osThreadId = AppDomain.GetCurrentThreadId(); #pragma warning restore 618 // Find the ProcessThread for this thread. ProcessThread thread = Process.GetCurrentProcess() .Threads.Cast() .Where(t => t.Id == osThreadId) .Single(); // Set the thread's processor affinity var cpuMask = 1 << coreNumber; thread.ProcessorAffinity = new IntPtr(cpuMask); Dispatcher.Run(); Thread.EndThreadAffinity(); }); thread.SetApartmentState(ApartmentState.STA); thread.Start(); barrier.WaitOne(); } } public void Dispose() { Dispatcher.InvokeShutdown(); if (thread != Thread.CurrentThread) thread.Join(); } } } static class AsyncHelper { public static DispatcherRedirector RedirectTo(Dispatcher d) { return new DispatcherRedirector(d); } } public struct DispatcherRedirector : INotifyCompletion { public DispatcherRedirector(Dispatcher dispatcher) { this.dispatcher = dispatcher; } #region awaiter public DispatcherRedirector GetAwaiter() { // combined awaiter and awaitable return this; } #endregion #region awaitable public bool IsCompleted { get { // true means execute continuation inline return dispatcher.CheckAccess(); } } public void OnCompleted(Action continuation) { dispatcher.BeginInvoke(continuation); } public void GetResult() { } #endregion Dispatcher dispatcher; } } При тестовом пробеге выдаёт: Task reporting from thread 10, thread pool: False Task reporting from thread 11, thread pool: False Task reporting from thread 12, thread pool: False Task reporting from thread 13, thread pool: False Task reporting from thread 14, thread pool: False Task reporting from thread 15, thread pool: False Task reporting from thread 16, thread pool: False Task reporting from thread 9, thread pool: False Task reporting from thread 10, thread pool: False Task reporting from thread 11, thread pool: False

Комментариев нет:

Отправить комментарий