#c_sharp #net #многопоточность #память #парсер
Пишу программу для парсинга одного сайта. Сам сайт парсится с помощью CsQuery. Нужно
за раз обработать нужный диапазон страниц сайта. Задаётся начальная и конечная ссылки
для парсинга и программа в несколько потоков перебирает все страницы в диапазоне и
извлекает нужную информацию в List, что бы после окончания сохранить всё в файл. Нужное
количество потоков запускается, и они по очереди берут из счётчика текущей страницы
свой номер и работают с ним. В потоках написан цикл While, что бы они не закрывались,
пока не спарсили последнюю страницу. После окончания парсинга отдельно сохраняется
вся информация в List. Но проблема в том, что парсится будут большие диапазоны страниц
больше миллиона, а при тестовом запуске на диапазоне в 10 000 страниц программа начинает
занимать в памяти больше 1,5 гигабайт. В отдельной программе пробовал заполнять List
случайными данными, по типу тех, что должны были быть извлечены. Добавил 100 000 строк,
и размер оперативной памяти, используемой программы не превышал 100 мегабайт. Парсинг
так же работает правильно, никаких избыточных данных он не добавляет. Я грешу на мою
неправильную работу с потоками, и то, что сборщик мусора не уничтожает данные с прошлых
проходов парсинга. Пробовал разные способы так и не решил проблему с утечкой памяти.
Помогите найти ошибку, или подсказать более правильный метод работы с потоками. Код
прикладываю.
class Program
{
static int begin_of_post = 2950774; //начальный индекс постов
static int end_of_post = 2951774; //конечный индекс
static int current_post; //текущий пост для потоков
static List list_posts = new List(); //список хранения данных
о постах
static void Main(string[] args)
{
ServicePointManager.DefaultConnectionLimit = 1000000000; // количество
одновременных соединений
current_post = begin_of_post;
Thread my_tr;
for (int i = 0; i < 10; i++) //запуск потоков
{
my_tr = new Thread(parse_site);
my_tr.Start();
}
Console.ReadLine();
save_to_file();
}
static void parse_site()
{
while (current_post <= end_of_post)
{
int link_to_post =current_post; //ссылка на пост
Interlocked.Increment(ref current_post); //инкремент счётчика
CQ cq;
try
{
cq = CQ.CreateFromUrl("http://site.ru/" + link_to_post); //
загрузка кода страницы
}
catch
{
Console.WriteLine("Error " + link_to_post);
continue;
}
string post_info;
...
//сам парсинг сайта
...
int current = int.Parse(link_to_post) - begin_of_post;
int end = end_of_post - begin_of_post;
Console.WriteLine("Обработана ссылка " + current.ToString() + " ИЗ "
+ end.ToString());
Thread my_tr_save=new Thread(save_post);
my_tr_save.Start(post_info);
}
}
static void save_post(object post_info)
{
...
// Парсинг информации о странице
...
lock (list_posts)
{
list_posts.Add(post_info.ToString());
}
}
static void save_to_file()
{
...
//сохранение строк list_posts в файл
...
}
}
Ответы
Ответ 1
Вообще для того чтобы судить об утечке, нужно использовать профайлер, сделать два внэпшота (до и после) и посмотреть, что отнимает память. Какие навскидку есть проблемы в этом коде: Внутри каждого из потоков, читающих страницы, вы в цикле непрерывно создаете новые потоки: Thread my_tr_save=new Thread(save_post); my_tr_save.Start(post_info); Во-первых, вы плодите множество потоков, а они занимают память. Во-вторых, это избыточно, потому что вы и так уже внутри отдельного потока. И разносить в разные потоки парсинг и сохранение для начала нет смысла. К переменной current_post обращаются разные потоки, причем небезопасным способом. Теоретически может случиться так, что каждый пост обрабатывается несколько раз и вы получаете дубликаты страниц в вашем конечном списке, а значит, лишнюю память. Вам нужно атомарно выполнять условие current_post <= end_of_post с последующим инкрементом и возвращать актуальное значение, а в теле цикла пользоваться эти значением. static bool HasPostsToParse(out current) { lock (lockObject) { // к переменной current_post обращаетесь только в этом методе if (current_post <= end_of_post) { current = ++current_post; return true; } else { current = current_post; return false; } } } static void parse_site() { int current; while (HasPostsToParse(out current)) { // используете локальную переменную current } }Ответ 2
в вашем случае у вас рождаются новые потоки, каждому из которых выделяется стек по 4 мегабайта и ни один из потоков не может закончить работу и освободить память, потому что каждый из потоков закручивается while (current_post <= end_of_post) аж до конца выполнения всей работы вообще. Поэтому у вас будет постоянный прирост памяти за счет стеков аж до конца работы более правильный метод с потоками это TPL + правильное понимание IO-bound потоков и перелопатить весь этот миллион страниц можно одними потоками из пула. автор просил примеров как лучше делать в таких случаях (конечно парсер автора может не позволять этого) class SomeNetParser { private const int ThreadCount=20; private CountdownEvent _countdownEvent; private SemaphoreSlim _throttler; public void Check(IListurls) { _countdownEvent = new CountdownEvent(urls.Count); _throttler = new SemaphoreSlim(ThreadCount); foreach (var url in urls) { await _throttler.WaitAsync(ct); ProccessUrl(url); } _countdownEvent.Wait(); } private async void ProccessUrl(string url) { try { var page = await new WebClient().DownloadStringTaskAsync(new Uri(url)); ProccessResult(page); } finally { _semaphoreSlim.Release(); _countdownEvent.Signal(); } } private void ProccessResult(string page){/*....*/} } нужно не забыть метод Check вызвать не UI потоке. CountdownEvent нужен, чтобы после выхода из цикла дождался последней задачи. минусом данного решения является то, что он удерживает 1 поток. CountdownEvent можно выбросить и содержимое Check заменить на var allTasks = new List (); foreach (var url in urls) { await _throttler.WaitAsync(ct); allTasks.Add(ProccessUrl(url)); } await Task.WhenAll(allTasks); и ProccessUrl должен возвращать Task В этом случае allTasks будет накапливаться миллионом экземпляров Task и я даже не знаю, как быстро Task.WhenAll будет их проверять Есть еще вариант с LINQ, но он сложно понимаемый для новичков. зы: WebClient плохо подходит для этого. Он написан неправильно и выполняет часть своей работы с потоке который его вызвал и это так и не починили. HttpClient лучше подходит. Ответ 3
Для параллельной обработки данных в заданном диапазоне можно использовать Parallel.For. Если взять части из вашего кода, то будет примерно так: using System.Threading.Tasks; using System.Collections.Concurrent; // ... int begin_of_post = 2950774; //начальный индекс постов int end_of_post = 2951774; //конечный индекс var list_posts = new BlockingCollection(); Parallel.For(begin_of_post, end_of_post, (current_post) => { // ... var cq = CQ.CreateFromUrl("http://site.ru/" + link_to_post); list_posts.Add(link_to_post); // ... код для парсинга и т.д. save(file_name); }); save_to_file(list_posts); UPDATE: Если надо скачать много страниц, распарсить их и сохранить в файлы, а также получить лог, то можно сделать примерно так: public class App { BlockingCollection log; // для синхронизации вывода в log public Run(int start, int end) { log = new BlockingCollection (); Task.Run(() => { foreach(var s in log) { // тут пишем в log.txt } }); Task.Factory.StartNew(() => { // для каждого запроса создаем отдельный Task foreach (var page in Enumerable.Range(start, end)) Task.Factory.StartNew(() => Download(page), TaskCreationOptions.AttachedToParent); }).Wait(); // ждем завершение всех запущенных Task'ов log.CompleteAdding(); // завершим Task логирования } void Download(int page) { // выполняется в отдельном потоке var url = "http://...." + page; log.Add(url); try { var html = RequestPage(url); Task.Factory.StartNew(() => Parse(url, html), TaskCreationOptions.AttachedToParent); } catch(...) { log.Add("fail"); } } void Parse(string url, string html) { // выполняется в отдельном потоке // тут парсим html и сохраняем его в файл } }
Комментариев нет:
Отправить комментарий