Страницы

Поиск по вопросам

суббота, 30 ноября 2019 г.

Принудительная отмена задачи

#c_sharp #net #многопоточность #tpl


Везде написано, что работа с задачами- это кооперативный процесс, т.е задача должна
сама корректно завершится при первой просьбе из внешнего кода.

Но, что делать если кто-то подводит?

Например, я делаю Cancel на токене и даю на завершение некоторое время, но задача
не завершается, а код должен двигаться дальше. Оставлять висеть задачу?

Читал, что есть Thread.Abort, но его не рекомендуют использовать.



Пример с выносом стороннего кода в отдельный процесс был приведен.

Хотелось бы увидеть еще какие-нибудь способы, еще как минимум решение задачи через
appDomain.
    


Ответы

Ответ 1



Ну вот вам пример реализации. Сразу предупреждаю, кода будет много. Возьмём в качестве основы вот такую ненадёжную функцию: class EvilComputation { static Random random = new Random(); public static async Task Compute( int numberOfSeconds, double x, CancellationToken ct) { bool wellBehaved = random.Next(2) == 0; var y = x * x; var delay = TimeSpan.FromSeconds(numberOfSeconds); await Task.Delay(delay, wellBehaved ? ct : CancellationToken.None); return y; } } Вы видим, что функция плохая: она может в зависимости от случайных условий не реагировать на отмену. Что делать в этом случае? Вынесем функцию в отдельный процесс. Этот процесс можно будет убить без особого вреда для исходного процесса. Для того, чтобы вызвать функцию в другом процессе, нужно передать данные о вызове функции туда. Для связи используем, например, анонимные пайпы (можно использовать по сути что угодно). Я основываю код на этом примере: How to: Use Anonymous Pipes for Local Interprocess Communication. Для передачи данных будем использовать стандартное бинарное форматирование, раз уж мы не пошли через WCF. Нам нужны DTO-объекты, которые будут перебрасываться между процессами. Их нужно использовать в двух процессах — главном и вспомогательном (назовём его плагином), поэтому для DTO-типов понадобится отдельная сборка. Заводим сборку OutProcCommonData, кладём в неё следующие классы: namespace OutProcCommonData { [Serializable] public class Command // общий класс-предок для посылаемой команды { } [Serializable] public class Evaluate : Command // команда на вычисление { public int NumberOfSecondsToProcess; public double X; } [Serializable] public class Cancel : Command // команда на отмену { } } Далее, возвращаемый результат: namespace OutProcCommonData { [Serializable] public class Response // общий класс-предок для возвращаемого результата { } [Serializable] public class Result : Response // готовый результат вычислений { public double Y; } [Serializable] public class Error : Response // ошибка с текстом { public string Text; } [Serializable] public class Cancelled : Response // подтверждение отмены { } } Далее, наш плагин. Это отдельное консольное приложение (хотя, если мы не хотим видеть консоль и отладочный вывод, можно сделать его неконсольным). Протокол общения таков. Главная программа посылает Evaluate, а после него, возможно, Cancel. Плагин возвращает Result в случае успешного вычисления, Cancelled в случае полученного сигнала отмены и успешно отменённого вычисления, и Error в случае ошибки (например, нарушения протокола коммуникации). Вот обвязочный код: class Plugin { static int Main(string[] args) { // нам должны быть переданы два аргумента: хендл входящего и исходящего пайпов if (args.Length != 2) { Console.Error.WriteLine("Shouldn't be started directly"); return 1; } return new Plugin().Run(args[0], args[1]).Result; } BinaryFormatter serializer = new BinaryFormatter(); // для сериализации async Task Run(string hIn, string hOut) { Console.WriteLine("[Plugin] Running"); // открывем переданные пайпы using (var inStream = new AnonymousPipeClientStream(PipeDirection.In, hIn)) using (var outStream = new AnonymousPipeClientStream(PipeDirection.Out, hOut)) { try { var cts = new CancellationTokenSource(); // токен для отмены Console.WriteLine("[Plugin] Reading args"); // пытаемся десериализовать аргументы var args = SafeGet(inStream); if (args == null) { Console.WriteLine("[Plugin] Didn't get args"); // отправляем ошибку, если не удалось serializer.Serialize( outStream, new OutProcCommonData.Error() { Text = "Unrecognized input" }); // и выходим return 3; } Console.WriteLine("[Plugin] Got args, start compute and waiting cancel"); // запускаем вычисление var computeTask = EvilComputation.Compute( args.NumberOfSecondsToProcess, args.X, cts.Token); // параллельно запускаем чтение возможной отмены var waitForCancelTask = Task.Run(() => (OutProcCommonData.Cancel)serializer.Deserialize(inStream)); // дожидаемся одного из двух var winner = await Task.WhenAny(computeTask, waitForCancelTask); // если первой пришла отмена... if (winner == waitForCancelTask) { Console.WriteLine("[Plugin] Got cancel, cancelling computation"); // просим вычисление завершиться cts.Cancel(); } // окончания вычисления всё равно нужно дождаться Console.WriteLine("[Plugin] Awaiting computation"); // если вычисление отменится, здесь будет исключение var result = await computeTask; Console.WriteLine("[Plugin] Sending back result"); // отсылаем результат в пайп serializer.Serialize( outStream, new OutProcCommonData.Result() { Y = result }); // нормальный выход return 0; } catch (OperationCanceledException) { // мы успешно отменили задание, рапортуем Console.WriteLine("[Plugin] Sending cancellation"); serializer.Serialize( outStream, new OutProcCommonData.Cancelled()); return 2; } catch (Exception ex) { // возникла непредвиденная ошибка, рапортуем Console.WriteLine($"[Plugin] Sending error {ex.Message}"); serializer.Serialize( outStream, new OutProcCommonData.Error() { Text = ex.Message }); return 3; } } } // ну и вспомогательная функция, которая пытается читать данные из пайпа T SafeGet(Stream s) where T : class { try { return (T)serializer.Deserialize(s); } catch { return null; } } } Я не отлавливаю ошибки при записи в пайп, добавьте сами по вкусу. Теперь, главная программа. Она будет у нас отдельно от плагина (то есть, у нас получаются три сборки). class Program { static void Main(string[] args) => new Program().Run().Wait(); async Task Run() { var cts = new CancellationTokenSource(); try { var y = await ComputeOutProc(2, cts.Token); Console.WriteLine($"[Main] Result: {y}"); } catch (TimeoutException) { Console.WriteLine("[Main] Timed out"); } catch (OperationCanceledException) { Console.WriteLine("[Main] Cancelled"); } } const int SecondsToSend = 3; const int TimeoutSeconds = 5; const int CancelSeconds = 2; BinaryFormatter serializer = new BinaryFormatter(); async Task ComputeOutProc(double x, CancellationToken ct) { Process plugin = null; bool pluginStarted = false; try { // создаём исходящий и входящий пайпы using (var commandStream = new AnonymousPipeServerStream( PipeDirection.Out, HandleInheritability.Inheritable)) using (var responseStream = new AnonymousPipeServerStream( PipeDirection.In, HandleInheritability.Inheritable)) { Console.WriteLine("[Main] Starting plugin"); plugin = new Process() { StartInfo = { FileName = "OutProcPlugin.exe", Arguments = commandStream.GetClientHandleAsString() + " " + responseStream.GetClientHandleAsString(), UseShellExecute = false } }; // запускаем плагин с параметрами plugin.Start(); pluginStarted = true; Console.WriteLine("[Main] Started plugin"); commandStream.DisposeLocalCopyOfClientHandle(); responseStream.DisposeLocalCopyOfClientHandle(); void Send(Command c) { serializer.Serialize(commandStream, c); commandStream.Flush(); } try { // отсылаем плагину команду на вычисление Console.WriteLine("[Main] Sending evaluate request"); Send(new OutProcCommonData.Evaluate() { NumberOfSecondsToProcess = SecondsToSend, X = x }); Task responseTask; bool readyInTime; bool cancellationSent = false; // внутри этого блока при отмене будем отсылать команду плагину using (ct.Register(() => { Send(new OutProcCommonData.Cancel()); Console.WriteLine("[Main] Requested cancellation"); cancellationSent = true; })) { Console.WriteLine("[Main] Starting getting response"); // ожидаем получение ответа responseTask = Task.Run(() => (Response)serializer.Deserialize(responseStream)); // или таймаута var timeoutTask = Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)); var winner = await Task.WhenAny(responseTask, timeoutTask); readyInTime = winner == responseTask; } // если наступил таймаут, просим процесс вежливо завершить вычисления if (!readyInTime) { if (!cancellationSent) { Console.WriteLine("[Main] Not ready in time, sending cancel"); Send(new OutProcCommonData.Cancel()); } else { Console.WriteLine("[Main] Not ready in time, cancel sent"); } // и ждём ещё немного, ну или прихода ответа var timeoutTask = Task.Delay(TimeSpan.FromSeconds(CancelSeconds)); await Task.WhenAny(responseTask, timeoutTask); } // если до сих пор ничего не пришло, плагин завис, убиваем его if (!responseTask.IsCompleted) { Console.WriteLine("[Main] No response, killing plugin"); plugin.Kill(); // это завершит ожидание с исключением, по идее // в ранних версиях .NET нужно было бы поймать // это исключение // и уходим с исключением-таймаутом ct.ThrowIfCancellationRequested(); throw new TimeoutException(); } // здесь мы уверены, что ожидание завершилось Console.WriteLine("[Main] Obtaining response"); var response = await responseTask; // тут может быть брошено исключение // если была затребована отмена, выходим ct.ThrowIfCancellationRequested(); // проверяем тип результата: switch (response) { case Result r: // нормальный результат, возвращаем его Console.WriteLine("[Main] Got result, returning"); return r.Y; case Cancelled _: // отмена не по ct = таймаут Console.WriteLine("[Main] Got cancellation"); throw new TimeoutException(); case Error err: // пришла ошибка, бросаем исключение // лучше, конечно, определить собственный тип здесь Console.WriteLine("[Main] Got error"); throw new Exception(err.Text); default: // сюда мы вообще не должны попасть, если плагин работает нормально Console.WriteLine("[Main] Unexpected error"); throw new Exception("Unexpected response type"); } } catch (IOException e) { Console.WriteLine("[Main] IO error occured"); throw new Exception("IO Error", e); } } } finally { if (pluginStarted) { plugin.WaitForExit(); plugin.Close(); } } } } Результат пробега: [Main] Starting plugin [Main] Started plugin [Main] Sending evaluate request [Main] Starting getting response [Plugin] Running [Plugin] Reading args [Plugin] Got args, start compute and waiting cancel [Plugin] Awaiting computation [Plugin] Sending back result [Main] Obtaining response [Main] Got result, returning [Main] Result: 4 Если поменять константу SecondsToSend на 10, чтобы был таймаут, получаем такой результат двух пробегов: Для штатного завершения: [Main] Starting plugin [Main] Started plugin [Main] Sending evaluate request [Main] Starting getting response [Plugin] Running [Plugin] Reading args [Plugin] Got args, start compute and waiting cancel [Main] Not ready in time, sending cancel [Plugin] Got cancel, cancelling computation [Plugin] Awaiting computation [Plugin] Sending cancellation [Main] Obtaining response [Main] Got cancellation [Main] Timed out Для принудительного завершения: [Main] Starting plugin [Main] Started plugin [Main] Sending evaluate request [Main] Starting getting response [Plugin] Running [Plugin] Reading args [Plugin] Got args, start compute and waiting cancel [Main] Not ready in time, sending cancel [Plugin] Got cancel, cancelling computation [Plugin] Awaiting computation [Main] No response, killing plugin [Main] Timed out Если добавить перед var y = await ComputeOutProc(2, cts.Token); преждевременную отмену: cts.CancelAfter(TimeSpan.FromSeconds(1)); получим такой результат: для штатного завершения [Main] Starting plugin [Main] Started plugin [Main] Sending evaluate request [Main] Starting getting response [Plugin] Running [Plugin] Reading args [Plugin] Got args, start compute and waiting cancel [Main] Requested cancellation [Plugin] Got cancel, cancelling computation [Plugin] Awaiting computation [Plugin] Sending cancellation [Main] Obtaining response [Main] Cancelled и для принудительного завершения [Main] Starting plugin [Main] Started plugin [Main] Sending evaluate request [Main] Starting getting response [Plugin] Running [Plugin] Reading args [Plugin] Got args, start compute and waiting cancel [Main] Requested cancellation [Plugin] Got cancel, cancelling computation [Plugin] Awaiting computation [Main] Not ready in time, cancel sent [Main] No response, killing plugin [Main] Cancelled Наверняка кое-где недостаточно контролируются ошибки, так что проверяйте, не нужно ли ловить какие-то ещё исключения. На эту заготовку можно добавлять сверху свою логику. Например, можно аналогично пулу потоков завести пул плагинов, и доставлять задания свободному в данный момент плагину.

Ответ 2



Нашел один способ принудительного завершения задачи без нарушения работы приложения. Данный способ позволяет завершить задачу, запущенную с параметром TaskCreationOptions.LongRunning, зная ID рабочего потока. Основан на вызове функции ExitThread в контексте целевого потока с помощью недокументированной функции RtlRemoteCall. Способ не работает, если поток бесконечно находится в состоянии ожидания, можно завершить только работающий поток. Т.е это не на 100% надежно, но, я полагаю, получше чем Thread.Abort. Если задача исполняет ваш собственный код, ID рабочего потока легко получить, вызывая из нее функцию GetCurrentThreadId и сохраняя результат в переменной. Если задача исполняет чужой код (например, подгружаемый из внешней DLL), можно его узнать только выборкой из потоков по времени старта задачи. Основной класс: using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Diagnostics; using System.Runtime.InteropServices; namespace TaskTest { public class TaskKiller { const int THREAD_ACCESS_TERMINATE = (0x0001); const int SYNCHRONIZE = (0x00100000); const int STANDARD_RIGHTS_REQUIRED = (0x000F0000); const int THREAD_ALL_ACCESS = (STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE | 0xFFFF); [DllImport("kernel32.dll")] public static extern IntPtr OpenThread(uint dwDesiredAccess, bool bInheritHandle, uint dwThreadId); [DllImport("kernel32.dll", SetLastError = true)] [return: MarshalAs(UnmanagedType.Bool)] public static extern bool CloseHandle(IntPtr hObject); [DllImport("kernel32.dll")] public static extern uint GetCurrentThreadId(); [DllImport("kernel32.dll")] public static extern IntPtr GetCurrentProcess(); [DllImport("kernel32.dll")] public static extern IntPtr GetCurrentThread(); [DllImport("kernel32.dll", SetLastError = true)] public static extern IntPtr GetModuleHandle(string lpModuleName); [DllImport("kernel32", CharSet = CharSet.Ansi, ExactSpelling = true, SetLastError = true)] public static extern IntPtr GetProcAddress(IntPtr hModule, string procName); [DllImport("ntdll.dll", ExactSpelling = true, EntryPoint = "RtlRemoteCall")] static extern int RtlRemoteCall( IntPtr Process, IntPtr Thread, IntPtr CallSite, uint ArgumentCount, IntPtr Arguments, uint PassContext, uint AlreadySuspended ); /// /// Завершение потока с указанным ID /// /// 0 при успешном завершении, код NTSTATUS при ошибке public static int KillThreadById(uint threadid) { IntPtr hModule = (IntPtr)0; IntPtr hThread = (IntPtr)0; IntPtr unmanagedPointer = (IntPtr)0; try { /*Получение адреса функции ExitThread*/ hModule = GetModuleHandle(@"kernel32.dll"); IntPtr pProc = (IntPtr)0; pProc = GetProcAddress(hModule, "ExitThread"); /*Получение дескриптора потока с полным доступом*/ hThread = TaskKiller.OpenThread( (uint)(THREAD_ALL_ACCESS), false, (uint)threadid); IntPtr hProcess = GetCurrentProcess(); int[] args = new int[] { (int)0 };//массив аргументов для RtlRemoteCall unmanagedPointer = Marshal.AllocHGlobal(args.Length * sizeof(int));//выделение блока неуправлемой памяти Marshal.Copy(args, 0, unmanagedPointer, args.Length);//копирование массива в неуправляемую память /*Вызов ExitThread в контексте завершаемого потока*/ int result = RtlRemoteCall(hProcess, hThread, pProc, 1, unmanagedPointer, 0, 0); return result; } finally { // Clean up resources if(unmanagedPointer !=(IntPtr)0)Marshal.FreeHGlobal(unmanagedPointer); if (hThread != (IntPtr)0) TaskKiller.CloseHandle(hThread); if (hModule != (IntPtr)0) TaskKiller.CloseHandle(hModule); } } /// /// Получение ID всех потоков, стартовавших в указанном интервале времени /// public static List GetThreadsByStartTime(DateTime t1, DateTime t2) { List threads = new List(); Process pr=Process.GetCurrentProcess(); using (pr) { ProcessThreadCollection ths = pr.Threads; foreach (ProcessThread th in ths) { using (th) { if (th.TotalProcessorTime.TotalMilliseconds > 0) { if (DateTime.Compare(th.StartTime, t1) >= 0 && DateTime.Compare(th.StartTime, t2) <= 0) threads.Add((uint)th.Id); } } } } return threads; } } } Пример использования: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Windows.Forms; using System.Threading; using System.Threading.Tasks; using System.Diagnostics; using System.Runtime.InteropServices; namespace TaskTest { public partial class Form1 : Form { public Form1() { InitializeComponent(); PrintThreads(); } int i = 0; uint threadid;//ID рабочего потока DateTime t;//время старта задачи Task t1=null;//задача void PrintThreads() { Process pr=Process.GetCurrentProcess(); using (pr) { ProcessThreadCollection ths = pr.Threads; StringBuilder b = new StringBuilder(300); b.AppendLine("Threads: " + ths.Count); foreach (ProcessThread th in ths) { using (th) { b.AppendLine(th.Id + " - " + th.ThreadState.ToString()+" - "+th.StartTime); } } textBox1.Text += b.ToString(); } } private void button1_Click(object sender, EventArgs e) { /*Делегат для задачи*/ Action action = () => { threadid = TaskKiller.GetCurrentThreadId(); //сохранить ID потока для последующего доступа while (true)// Just loop. { i++; } }; // Construct an unstarted task t1 = new Task(action,TaskCreationOptions.LongRunning); // Launch task t1.Start(); t = DateTime.Now;//сохранить время старта для последующего использования textBox1.Text = "Task started"+Environment.NewLine; PrintThreads(); } private void button2_Click(object sender, EventArgs e) { //Завершение потока, если известен его ID textBox1.Text = "-- Before terminating --" + Environment.NewLine; PrintThreads(); textBox1.Text += Environment.NewLine; int res=TaskKiller.KillThreadById(threadid); if (res != 0) { textBox1.Text += ("Error NTSTATUS=" + res.ToString("X")); } else { textBox1.Text += threadid.ToString() + " is terminated!"; } textBox1.Text += Environment.NewLine; textBox1.Text += "-- After terminating --" + Environment.NewLine; PrintThreads(); } private void bTerminate_Click(object sender, EventArgs e) { //Завершение потоков по времени старта textBox1.Text = "-- Before terminating --" + Environment.NewLine; PrintThreads(); textBox1.Text += "-----------------------"; textBox1.Text += Environment.NewLine; List threads=TaskKiller.GetThreadsByStartTime( t.Subtract(TimeSpan.FromSeconds(1)), t.Add(TimeSpan.FromSeconds(1)) ); foreach (uint id in threads) { TaskKiller.KillThreadById(id); textBox1.Text += id.ToString() + " is terminated!"; textBox1.Text += Environment.NewLine; } textBox1.Text += "-- After terminating --" + Environment.NewLine; PrintThreads(); textBox1.Text += "-----------------------"; } } }

Комментариев нет:

Отправить комментарий