Подтвердить что ты не робот

Очень простая многопоточная параллельная выборка URL (без очереди)

Я потратил целый день на поиск простейшего многопоточного стирателя URL в Python, но большинство скриптов, которые я нашел, используют очереди или многопроцессорные или сложные библиотеки.

Наконец, я написал один сам, о котором я сообщаю как ответ. Пожалуйста, не стесняйтесь предлагать какие-либо улучшения.

Я думаю, что другие люди, возможно, искали нечто подобное.

4b9b3361

Ответ 1

Упростите вашу оригинальную версию как можно дальше:

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

def fetch_url(url):
    urlHandler = urllib2.urlopen(url)
    html = urlHandler.read()
    print "'%s\' fetched in %ss" % (url, (time.time() - start))

threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

print "Elapsed Time: %s" % (time.time() - start)

Единственные новые трюки здесь:

  • Следите за создаваемыми вами темами.
  • Не беспокойтесь о счетчике потоков, если вы просто хотите знать, когда все будет готово; join уже говорит вам об этом.
  • Если вам не нужен какой-либо государственный или внешний API, вам не нужен подкласс Thread, просто функция target.

Ответ 2

multiprocessing имеет пул потоков, который не запускает другие процессы:

#!/usr/bin/env python
from multiprocessing.pool import ThreadPool
from time import time as timer
from urllib2 import urlopen

urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

def fetch_url(url):
    try:
        response = urlopen(url)
        return url, response.read(), None
    except Exception as e:
        return url, None, e

start = timer()
results = ThreadPool(20).imap_unordered(fetch_url, urls)
for url, html, error in results:
    if error is None:
        print("%r fetched in %ss" % (url, timer() - start))
    else:
        print("error fetching %r: %s" % (url, error))
print("Elapsed Time: %s" % (timer() - start,))

Преимущества по сравнению с решением на основе Thread:

  • ThreadPool позволяет ограничить максимальное количество одновременных подключений (20 в примере кода)
  • вывод не искажен, потому что весь вывод находится в основном потоке
  • регистрируются ошибки
  • код работает как на Python 2, так и на 3 без изменений (предполагая from urllib.request import urlopen на Python 3).

Ответ 3

Основной пример в concurrent.futures делает все, что вам нужно, намного проще. Кроме того, он может обрабатывать огромное количество URL-адресов, делая только 5 за раз, и он обрабатывает ошибки гораздо приятнее.

Конечно, этот модуль построен только с Python 3.2 или новее... но если вы используете 2.5-3.1, вы можете просто установить backport, futures, выключен PyPI. Все, что вам нужно изменить из кода примера, это поиск и замена concurrent.futures с помощью futures, а для 2.x, urllib.request с urllib2.

Здесь образец backported до 2.x, измененный для использования вашего списка URL-адресов и добавления времени:

import concurrent.futures
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

# Retrieve a single page and report the url and contents
def load_url(url, timeout):
    conn = urllib2.urlopen(url, timeout=timeout)
    return conn.readall()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print '%r generated an exception: %s' % (url, exc)
        else:
            print '"%s" fetched in %ss' % (url,(time.time() - start))
print "Elapsed Time: %ss" % (time.time() - start)

Но вы можете сделать это еще проще. На самом деле все, что вам нужно, это:

def load_url(url):
    conn = urllib2.urlopen(url, timeout)
    data = conn.readall()
    print '"%s" fetched in %ss' % (url,(time.time() - start))
    return data

with futures.ThreadPoolExecutor(max_workers=5) as executor:
    pages = executor.map(load_url, urls)

print "Elapsed Time: %ss" % (time.time() - start)

Ответ 4

Теперь я публикую другое решение, , которое имеет рабочие потоки не-deamon и соединяет их с основным потоком (что означает блокирование основного потока до тех пор, пока все рабочие потоки не закончатся) вместо уведомления конец выполнения каждого рабочего потока с обратным вызовом глобальной функции (как и в предыдущем ответе), так как в некоторых комментариях было отмечено, что такой способ не является потокобезопасным.

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]

class FetchUrl(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.url = url

    def run(self):
        urlHandler = urllib2.urlopen(self.url)
        html = urlHandler.read()
        print "'%s\' fetched in %ss" % (self.url,(time.time() - start))

for url in urls:
    FetchUrl(url).start()

#Join all existing threads to main thread.
for thread in threading.enumerate():
    if thread is not threading.currentThread():
        thread.join()

print "Elapsed Time: %s" % (time.time() - start)

Ответ 5

Этот script выбирает контент из набора URL-адресов, определенных в массиве. Он генерирует поток для каждого URL-адреса для извлечения, поэтому он предназначен для ограниченного набора URL-адресов.

Вместо использования объекта очереди каждый поток уведомляет свой конец обратным вызовом глобальной функции, которая подсчитывает количество запущенных потоков.

import threading
import urllib2
import time

start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
left_to_fetch = len(urls)

class FetchUrl(threading.Thread):
    def __init__(self, url):
        threading.Thread.__init__(self)
        self.setDaemon = True
        self.url = url

    def run(self):
        urlHandler = urllib2.urlopen(self.url)
        html = urlHandler.read()
        finished_fetch_url(self.url)


def finished_fetch_url(url):
    "callback function called when a FetchUrl thread ends"
    print "\"%s\" fetched in %ss" % (url,(time.time() - start))
    global left_to_fetch
    left_to_fetch-=1
    if left_to_fetch==0:
        "all urls have been fetched"
        print "Elapsed Time: %ss" % (time.time() - start)


for url in urls:
    "spawning a FetchUrl thread for each url to fetch"
    FetchUrl(url).start()