Страницы

Поиск по вопросам

четверг, 13 февраля 2020 г.

Переполнение/утечка памяти программы - C#

#c_sharp #net #многопоточность #память #парсер


Пишу программу для парсинга одного сайта. Сам сайт парсится с помощью CsQuery. Нужно
за раз обработать нужный диапазон страниц сайта. Задаётся начальная и конечная ссылки
для парсинга и программа в несколько потоков перебирает все страницы в диапазоне и
извлекает нужную информацию в List, что бы после окончания сохранить всё в файл. Нужное
количество потоков запускается, и они по очереди берут из счётчика текущей страницы
свой номер и работают с ним. В потоках написан цикл While, что бы они не закрывались,
пока не спарсили последнюю страницу. После окончания парсинга отдельно сохраняется
вся информация в List. Но проблема в том, что парсится будут большие диапазоны страниц
больше миллиона, а при тестовом запуске на диапазоне в 10 000 страниц программа начинает
занимать в памяти больше 1,5 гигабайт. В отдельной программе пробовал заполнять List
случайными данными, по типу тех, что должны были быть извлечены. Добавил 100 000 строк,
и размер оперативной памяти, используемой программы не превышал 100 мегабайт. Парсинг
так же работает правильно, никаких избыточных данных он не добавляет. Я грешу на мою
неправильную работу с потоками, и то, что сборщик мусора не уничтожает данные с прошлых
проходов парсинга. Пробовал разные способы так и не решил проблему с утечкой памяти.
Помогите найти ошибку, или подсказать более правильный метод работы с потоками. Код
прикладываю.

class Program
{
    static int begin_of_post = 2950774;      //начальный индекс постов
    static int end_of_post = 2951774;        //конечный индекс
    static int current_post;                 //текущий пост для потоков

    static List list_posts = new List();   //список хранения данных
о постах

    static void Main(string[] args)
    {
        ServicePointManager.DefaultConnectionLimit = 1000000000;   // количество
одновременных соединений

        current_post = begin_of_post;

        Thread my_tr;                               
        for (int i = 0; i < 10; i++)            //запуск потоков
        {
            my_tr = new Thread(parse_site);
            my_tr.Start();
        }

        Console.ReadLine();
        save_to_file();
    }

    static void parse_site()
    {
        while (current_post <= end_of_post)
        {
            int link_to_post =current_post;                 //ссылка на пост
            Interlocked.Increment(ref current_post);        //инкремент счётчика

            CQ cq;
            try
            {
                cq = CQ.CreateFromUrl("http://site.ru/" + link_to_post);        //
загрузка кода страницы
            }
            catch
            {
                Console.WriteLine("Error " + link_to_post);
                continue;
            }

            string post_info;
            ...
            //сам парсинг сайта
            ...

            int current = int.Parse(link_to_post) - begin_of_post;          
            int end = end_of_post - begin_of_post;
            Console.WriteLine("Обработана ссылка " + current.ToString() + " ИЗ "
+ end.ToString());

            Thread my_tr_save=new Thread(save_post);
            my_tr_save.Start(post_info);
        }
    }

    static void save_post(object post_info)
    {
        ...
        // Парсинг информации о странице
        ...

        lock (list_posts)
        {
            list_posts.Add(post_info.ToString());
        }
    }      

    static void save_to_file()
    {
                    ...
        //сохранение строк list_posts в файл
                    ...
    }
}

    


Ответы

Ответ 1



Вообще для того чтобы судить об утечке, нужно использовать профайлер, сделать два внэпшота (до и после) и посмотреть, что отнимает память. Какие навскидку есть проблемы в этом коде: Внутри каждого из потоков, читающих страницы, вы в цикле непрерывно создаете новые потоки: Thread my_tr_save=new Thread(save_post); my_tr_save.Start(post_info); Во-первых, вы плодите множество потоков, а они занимают память. Во-вторых, это избыточно, потому что вы и так уже внутри отдельного потока. И разносить в разные потоки парсинг и сохранение для начала нет смысла. К переменной current_post обращаются разные потоки, причем небезопасным способом. Теоретически может случиться так, что каждый пост обрабатывается несколько раз и вы получаете дубликаты страниц в вашем конечном списке, а значит, лишнюю память. Вам нужно атомарно выполнять условие current_post <= end_of_post с последующим инкрементом и возвращать актуальное значение, а в теле цикла пользоваться эти значением. static bool HasPostsToParse(out current) { lock (lockObject) { // к переменной current_post обращаетесь только в этом методе if (current_post <= end_of_post) { current = ++current_post; return true; } else { current = current_post; return false; } } } static void parse_site() { int current; while (HasPostsToParse(out current)) { // используете локальную переменную current } }

Ответ 2



в вашем случае у вас рождаются новые потоки, каждому из которых выделяется стек по 4 мегабайта и ни один из потоков не может закончить работу и освободить память, потому что каждый из потоков закручивается while (current_post <= end_of_post) аж до конца выполнения всей работы вообще. Поэтому у вас будет постоянный прирост памяти за счет стеков аж до конца работы более правильный метод с потоками это TPL + правильное понимание IO-bound потоков и перелопатить весь этот миллион страниц можно одними потоками из пула. автор просил примеров как лучше делать в таких случаях (конечно парсер автора может не позволять этого) class SomeNetParser { private const int ThreadCount=20; private CountdownEvent _countdownEvent; private SemaphoreSlim _throttler; public void Check(IList urls) { _countdownEvent = new CountdownEvent(urls.Count); _throttler = new SemaphoreSlim(ThreadCount); foreach (var url in urls) { await _throttler.WaitAsync(ct); ProccessUrl(url); } _countdownEvent.Wait(); } private async void ProccessUrl(string url) { try { var page = await new WebClient().DownloadStringTaskAsync(new Uri(url)); ProccessResult(page); } finally { _semaphoreSlim.Release(); _countdownEvent.Signal(); } } private void ProccessResult(string page){/*....*/} } нужно не забыть метод Check вызвать не UI потоке. CountdownEvent нужен, чтобы после выхода из цикла дождался последней задачи. минусом данного решения является то, что он удерживает 1 поток. CountdownEvent можно выбросить и содержимое Check заменить на var allTasks = new List(); foreach (var url in urls) { await _throttler.WaitAsync(ct); allTasks.Add(ProccessUrl(url)); } await Task.WhenAll(allTasks); и ProccessUrl должен возвращать Task В этом случае allTasks будет накапливаться миллионом экземпляров Task и я даже не знаю, как быстро Task.WhenAll будет их проверять Есть еще вариант с LINQ, но он сложно понимаемый для новичков. зы: WebClient плохо подходит для этого. Он написан неправильно и выполняет часть своей работы с потоке который его вызвал и это так и не починили. HttpClient лучше подходит.

Ответ 3



Для параллельной обработки данных в заданном диапазоне можно использовать Parallel.For. Если взять части из вашего кода, то будет примерно так: using System.Threading.Tasks; using System.Collections.Concurrent; // ... int begin_of_post = 2950774; //начальный индекс постов int end_of_post = 2951774; //конечный индекс var list_posts = new BlockingCollection(); Parallel.For(begin_of_post, end_of_post, (current_post) => { // ... var cq = CQ.CreateFromUrl("http://site.ru/" + link_to_post); list_posts.Add(link_to_post); // ... код для парсинга и т.д. save(file_name); }); save_to_file(list_posts); UPDATE: Если надо скачать много страниц, распарсить их и сохранить в файлы, а также получить лог, то можно сделать примерно так: public class App { BlockingCollection log; // для синхронизации вывода в log public Run(int start, int end) { log = new BlockingCollection(); Task.Run(() => { foreach(var s in log) { // тут пишем в log.txt } }); Task.Factory.StartNew(() => { // для каждого запроса создаем отдельный Task foreach (var page in Enumerable.Range(start, end)) Task.Factory.StartNew(() => Download(page), TaskCreationOptions.AttachedToParent); }).Wait(); // ждем завершение всех запущенных Task'ов log.CompleteAdding(); // завершим Task логирования } void Download(int page) { // выполняется в отдельном потоке var url = "http://...." + page; log.Add(url); try { var html = RequestPage(url); Task.Factory.StartNew(() => Parse(url, html), TaskCreationOptions.AttachedToParent); } catch(...) { log.Add("fail"); } } void Parse(string url, string html) { // выполняется в отдельном потоке // тут парсим html и сохраняем его в файл } }

Комментариев нет:

Отправить комментарий