Подтвердить что ты не робот

Запросы Python - потоки/процессы по сравнению с IO

Я подключаюсь к локальному серверу (OSRM) через HTTP, чтобы отправлять маршруты и возвращать диски. Я замечаю, что ввод-вывод медленнее потоковой передачи, поскольку кажется, что период ожидания для вычисления меньше времени, необходимого для отправки запроса и обработки вывода JSON (я думаю, что I/O лучше, когда серверу требуется некоторое время для обработайте свой запрос → вы не хотите, чтобы он блокировался, потому что вам нужно подождать, это не мое дело). Threading страдает от Global Interpreter Lock и поэтому появляется (и доказательство ниже), что мой самый быстрый вариант - использовать многопроцессорность.

Проблема с многопроцессорностью заключается в том, что она настолько быстра, что она исчерпывает мои сокеты, и я получаю сообщение об ошибке (каждый раз каждый раз запрашивает новое соединение). Я могу (в последовательном порядке) использовать объект request.Sessions(), чтобы поддерживать соединение, однако я не могу заставить это работать параллельно (каждый процесс имеет собственный сеанс).

Ближайшим кодом, который я должен работать в данный момент, является этот многопроцессорный код:

conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())

def ReqOsrm(url_input):
    ul, qid = url_input      
    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out
        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()

Однако я не могу заставить HTTPConnectionPool работать правильно, и он создает новые сокеты каждый раз (я думаю), а затем дает мне ошибку:

HTTPConnectionPool (host = '127.0.0.1', port = 5005): превышено максимальное количество попыток с URL: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Вызвано NewConnectionError (': Не удалось установить новое соединение: [WinError 10048] Только одно использование каждого адреса сокета (протокол/сетевой адрес/порт) обычно разрешается ',))


Моя цель - получить расчет расстояний от сервер OSRM-маршрутизации Я запускаю локально (как можно быстрее).

У меня вопрос в двух частях - в основном я пытаюсь преобразовать некоторый код с использованием multiprocessing.Pool() для лучшего кода (правильные асинхронные функции - так, что выполнение никогда не прерывается и работает как можно быстрее).

Проблема, с которой я сталкиваюсь, заключается в том, что все, что я пытаюсь, выглядит медленнее, чем многопроцессорность (я приводил несколько примеров ниже того, что я пробовал).

Некоторые потенциальные методы: gevents, grequests, торнадо, запросы-фьючерсы, asyncio и т.д.

A - Multiprocessing.Pool()

Сначала я начал с чего-то вроде этого:

def ReqOsrm(url_input):
    req_url, query_id = url_input
    try_c = 0
    #print(req_url)
    while try_c < 5:
        try:
            response = requests.get(req_url)
            json_geocode = response.json()
            status = int(json_geocode['status'])
            # Found route between points
            if status == 200:
            ....

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes)

Где я подключался к локальному серверу (localhost, port: 5005), который был запущен на 8 потоках, а поддерживает параллельное выполнение.

После небольшого поиска я понял, что ошибка, которую я получал, заключалась в том, что запросы открывали новое соединение/сокет для каждого запроса. Так что через некоторое время это было слишком быстро и утомительно. Кажется, что это можно использовать для запросов request.Session() - , однако мне не удалось получить эту работу с многопроцессорной обработкой (где каждый процесс имеет собственный сеанс).

Вопрос 1.

На некоторых компьютерах это выполняется нормально, например:

введите описание изображения здесь

Для сравнения с более поздними версиями: 45% использования сервера и 1700 запросов в секунду

Однако на некоторых это не так, и я не совсем понимаю, почему:

HTTPConnectionPool (host = '127.0.0.1', port = 5000): превышено максимальное количество попыток с URL: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Вызванный NewConnectionError (': Не удалось установить новое соединение: [WinError 10048] Только одно использование каждого адреса сокета (протокол/сетевой адрес/порт) обычно разрешается ',))

Я предполагаю, что, поскольку запросы блокируют сокет, когда он используется, иногда сервер слишком медленно реагирует на старый запрос и генерируется новый. Сервер поддерживает очередь, однако запросы не так, а не для добавления в очередь Я получаю ошибку?

Вопрос 2.

Я нашел:

Блокировка или неблокирование?

При использовании адаптера транспорта по умолчанию, запросы не предоставляют любой неблокирующий IO. Свойство Response.content заблокирует пока не будет загружен весь ответ. Если вам требуется больше гранулярность, потоковые функции библиотеки (см. Запросы) позволяют получить меньшее количество ответов в время. Однако эти вызовы будут по-прежнему блокироваться.

Если вас беспокоит использование блокировки IO, есть много проекты, которые объединяют запросы с одним из Pythons асинхронности.

Два превосходных примера - это графики и запросы-фьючерсы.

B - запросы-фьючерсы

Чтобы решить эту проблему, мне нужно было переписать мой код для использования асинхронных запросов, поэтому я попробовал следующее:

from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed

(Кстати, я запускаю свой сервер с возможностью использовать все потоки)

И основной код:

calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
    # Submit requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid
    # Process the futures as they become complete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data
        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
        calc_routes.append(row)

Где моя функция (ReqOsrm) теперь переписана как:

def ReqOsrm(sess, resp):
    json_geocode = resp.json()
    status = int(json_geocode['status'])
    # Found route between points
    if status == 200:
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
    # Cannot find route between points (code errors as 999)
    else:
        out = [999, 0, 0, 0, 0, 0, 0]
    resp.data = out

Однако этот код медленнее, чем многопроцессорный! Прежде чем я получил около 1700 запросов в секунду, теперь я получаю 600 секунд. Я думаю, это потому, что у меня нет полного использования процессора, но я не уверен, как его увеличить?

введите описание изображения здесь

C - Thread

Я попробовал другой метод (создание потоков) - однако снова не был уверен, как это сделать, чтобы максимизировать использование ЦП (в идеале я хочу, чтобы мой сервер использовал 50 %, нет?):

def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()

def getReq(url):
    try:
        resp = requests.get(url)
        return resp.status_code, resp
    except:
        return 999, None

def processReq(status, resp, qid):
    try:
        json_geocode = resp.json()
        # Found route between points
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from = json_geocode['via_points'][0]
            used_to = json_geocode['via_points'][1]
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
        else:
            print("Done but no route")
            out = [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("Error: %s" % err)
        out = [qid, 999, 0, 0, 0, 0, 0, 0]
    qres.put(out)
    return

#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)

for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in url_routes:
        q.put(url)
        q.join()
    except Exception:
        pass

# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

Этот метод быстрее, чем request_futures, я думаю, но я не знаю, сколько потоков задано для максимизации этого -

введите описание изображения здесь

D - торнадо (не работает)

Теперь я пытаюсь использовать торнадо, но не могу заставить его работать, он ломается с существующим кодом -1073741819, если я использую curl - если я использую simple_httpclient, он работает, но затем я получаю ошибки таймаута:

ОШИБКА: tornado.application: Несколько исключений из списка выходных данных Traceback (последний последний вызов): Файл "C:\Anaconda3\lib\site-packages\tornado\gen.py", строка 789, в обратном вызове     result_list.append(f.result()) Файл "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", строка 232, в результат     raise_exc_info (self._exc_info) Файл "", строка 3, в raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Тайм-аут

def handle_req(r):
    try:
        json_geocode = json_decode(r)
        status = int(json_geocode['status'])
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
        print(out)
    except Exception as err:
        print(err)
        out = [999, 0, 0, 0, 0, 0, 0]
    return out

# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)

@gen.coroutine
def run_experiment(urls):
    http_client = AsyncHTTPClient()
    responses = yield [http_client.fetch(url) for url, qid in urls]
    responses_out = [handle_req(r.body) for r in responses]
    raise gen.Return(value=responses_out)

# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)

E - asyncio/aiohttp

Решили попробовать другой подход (хотя было бы здорово работать с торнадо), используя asyncio и aiohttp.

import asyncio
import aiohttp

def handle_req(data, qid):
    json_geocode = json.loads(data.decode('utf-8'))
    status = int(json_geocode['status'])
    if status == 200:
        tot_time_s = json_geocode['route_summary']['total_time']
        tot_dist_m = json_geocode['route_summary']['total_distance']
        used_from = json_geocode['via_points'][0]
        used_to = json_geocode['via_points'][1]
        out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
    else:
        print("Done, but not route for {0} - status: {1}".format(qid, status))
        out = [qid, 999, 0, 0, 0, 0, 0, 0]
    return out

def chunked_http_client(num_chunks):
    # Use semaphore to limit number of requests
    semaphore = asyncio.Semaphore(num_chunks)
    @asyncio.coroutine
    # Return co-routine that will download files asynchronously and respect
    # locking fo semaphore
    def http_get(url, qid):
        nonlocal semaphore
        with (yield from semaphore):
            response = yield from aiohttp.request('GET', url)
            body = yield from response.content.read()
            yield from response.wait_for_close()
        return body, qid
    return http_get

def run_experiment(urls):
    http_client = chunked_http_client(500)
    # http_client returns futures
    # save all the futures to a list
    tasks = [http_client(url, qid) for url, qid in urls]
    response = []
    # wait for futures to be ready then iterate over them
    for future in asyncio.as_completed(tasks):
        data, qid = yield from future
        try:
            out = handle_req(data, qid)
        except Exception as err:
            print("Error for {0} - {1}".format(qid,err))
            out = [qid, 999, 0, 0, 0, 0, 0, 0]
        response.append(out)
    return response

# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))

Это работает нормально, но все же медленнее, чем многопроцессорное обслуживание!

введите описание изображения здесь

4b9b3361

Ответ 1

Посмотрите на свой многопроцессорный код в верхней части вопроса. Кажется, что HttpConnectionPool() вызывается каждый раз, когда вызывается ReqOsrm. Таким образом, для каждого URL-адреса создается новый пул. Вместо этого используйте параметры initializer и args для создания единого пула для каждого процесса.

conn_pool = None

def makePool(host, port):
    global conn_pool
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)

def ReqOsrm(url_input):
    ul, qid = url_input

    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out

        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]

    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

if __name__ == "__main__":
    # run:
    pool = Pool(initializer=makePool, initargs=('127.0.0.1', 5005))
    calc_routes = pool.map(ReqOsrm, url_routes)
    pool.close()
    pool.join()

В версии запроса-фьючерса имеется ошибка с отступом. Петля for future in as_completed(futures): имеет отступ под внешним контуром for i in range(len(url_routes)):. Таким образом, запрос выполняется во внешнем цикле, а затем внутренний цикл ожидает, что это будущее вернется до следующей итерации внешнего цикла. Это заставляет запросы запускаться последовательно, а не параллельно.

Я думаю, что код должен быть следующим:

calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
    # Submit all the requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid

    # this was indented under the code in section B of the question
    # process the futures as they become copmlete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data

        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
        print(row)
        calc_routes.append(row)

Ответ 2

Спасибо всем за помощь. Я хотел высказать свои выводы:

Поскольку мои HTTP-запросы относятся к локальному серверу, который мгновенно обрабатывает запрос, для меня не имеет смысла использовать асинхронные подходы (по сравнению с большинством случаев, когда запросы отправляются через Интернет). Для меня дорогостоящий фактор - это отправка запроса и обработка обратной связи, что означает, что я получаю намного лучшие скорости, используя несколько процессов (потоки страдают от GIL). Я также должен использовать сеансы для увеличения скорости (нет необходимости закрывать и повторно открывать соединение с сервером SAME) и помогать предотвратить истощение порта.

Вот все методы, которые пытались (работают) с примером RPS:

Последовательный

S1. Последовательный запрос GET (без сеанса) → 215 RPS

def ReqOsrm(data):
    url, qid = data
    try:
        response = requests.get(url)
        json_geocode = json.loads(response.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

S2. Последовательный запрос GET (запросы .Session()) → 335 RPS

session = requests.Session()
def ReqOsrm(data):
    url, qid = data
    try:
        response = session.get(url)
        json_geocode = json.loads(response.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

S3. Последовательный запрос GET (urllib3.HTTPConnectionPool) → 545 RPS

conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1)
def ReqOsrm(data):
    url, qid = data
    try:
        response = conn_pool.request('GET', url)
        json_geocode = json.loads(response.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        return [qid, 999, 0, 0]
# Run:      
calc_routes = [ReqOsrm(x) for x in url_routes]

Async IO

A4. AsyncIO с aiohttp → 450 RPS

import asyncio
import aiohttp
concurrent = 100
def handle_req(data, qid):
    json_geocode = json.loads(data.decode('utf-8'))
    tot_time_s = json_geocode['paths'][0]['time']
    tot_dist_m = json_geocode['paths'][0]['distance']
    return [qid, 200, tot_time_s, tot_dist_m]
def chunked_http_client(num_chunks):
    # Use semaphore to limit number of requests
    semaphore = asyncio.Semaphore(num_chunks)
    @asyncio.coroutine
    # Return co-routine that will download files asynchronously and respect
    # locking fo semaphore
    def http_get(url, qid):
        nonlocal semaphore
        with (yield from semaphore):
            with aiohttp.ClientSession() as session:
                response = yield from session.get(url)
                body = yield from response.content.read()
                yield from response.wait_for_close()
        return body, qid
    return http_get
def run_experiment(urls):
    http_client = chunked_http_client(num_chunks=concurrent)
    # http_client returns futures, save all the futures to a list
    tasks = [http_client(url, qid) for url, qid in urls]
    response = []
    # wait for futures to be ready then iterate over them
    for future in asyncio.as_completed(tasks):
        data, qid = yield from future
        try:
            out = handle_req(data, qid)
        except Exception as err:
            print("Error for {0} - {1}".format(qid,err))
            out = [qid, 999, 0, 0]
        response.append(out)
    return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))

А5. Threading без сеансов → 330 RPS

from threading import Thread
from queue import Queue
concurrent = 100
def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()
def getReq(url):
    try:
        resp = requests.get(url)
        return resp.status_code, resp
    except:
        return 999, None
def processReq(status, resp, qid):
    try:
        json_geocode = json.loads(resp.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        out = [qid, 999, 0, 0]
    qres.put(out)
    return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
for url in url_routes:
    q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

A6. Threading с HTTPConnectionPool → 1550 RPS

from threading import Thread
from queue import Queue
from urllib3 import HTTPConnectionPool
concurrent = 100
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
def doWork():
    while True:
        url,qid = q.get()
        status, resp = getReq(url)
        processReq(status, resp, qid)
        q.task_done()
def getReq(url):
    try:
        resp = conn_pool.request('GET', url)
        return resp.status, resp
    except:
        return 999, None
def processReq(status, resp, qid):
    try:
        json_geocode = json.loads(resp.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        out = [qid, 999, 0, 0]
    qres.put(out)
    return
#Run:
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
for url in url_routes:
    q.put(url)
q.join()
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]

A7. запросы-фьючерсы → 520 RPS

from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
concurrent = 100
def ReqOsrm(sess, resp):
    try:
        json_geocode = resp.json()
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err)
        out = [999, 0, 0]
    resp.data = out
#Run:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=concurrent)) as session:
    # Submit requests and process in background
    for i in range(len(url_routes)):
        url_in, qid = url_routes[i]  # url |query-id
        future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
        futures[future] = qid
    # Process the futures as they become complete
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data
        except Exception as err:
            print('No route')
            row = [futures[future], 999, 0, 0]
        calc_routes.append(row)

Несколько процессов

P8. multiprocessing.worker + queue + requests.session() → 1058 RPS

from multiprocessing import *
class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.qin = qin
        self.qout = qout
    def run(self):
        s = requests.session()
        while not self.qin.empty():
            url, qid = self.qin.get()
            data = s.get(url)
            self.qout.put(ReqOsrm(data, qid))
            self.qin.task_done()
def ReqOsrm(resp, qid):
    try:
        json_geocode = json.loads(resp.content.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid)
        return [qid, 999, 0, 0]
# Run:
qout = Queue()
qin = JoinableQueue()
[qin.put(url_q) for url_q in url_routes]
[Worker(qin, qout).start() for _ in range(cpu_count())]
qin.join()
calc_routes = []
while not qout.empty():
    calc_routes.append(qout.get())

P9. multiprocessing.worker + queue + HTTPConnectionPool() → 1230 RPS

P10. многопроцессорность v2 (не совсем уверен, как это отличается) → 1350 RPS

conn_pool = None
def makePool(host, port):
    global conn_pool
    pool = conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)
def ReqOsrm(data):
    url, qid = data
    try:
        response = conn_pool.request('GET', url)
        json_geocode = json.loads(response.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        return [qid, 999, 0, 0]
# Run:
pool = Pool(initializer=makePool, initargs=(ghost, gport))
calc_routes = pool.map(ReqOsrm, url_routes)

Итак, в заключение кажется, что для меня лучшие методы - это # ​​10 (и удивительно # 6)

Ответ 3

Вопрос 1

Вы получаете ошибку, потому что этот подход:

def ReqOsrm(url_input):
    req_url, query_id = url_input
    try_c = 0
    #print(req_url)
    while try_c < 5:
        try:
            response = requests.get(req_url)
            json_geocode = response.json()
            status = int(json_geocode['status'])
            # Found route between points
            if status == 200:
            ....

pool = Pool(cpu_count()-1) 
calc_routes = pool.map(ReqOsrm, url_routes)

создает новое TCP-соединение для каждого запрошенного URL-адреса, и в какой-то момент он терпит неудачу только из-за отсутствия в системе локальных портов. Чтобы подтвердить, что вы можете запустить netstat во время выполнения вашего кода:

netstat -a -n | find /c "localhost:5005"

Это даст вам несколько подключений к серверу.

Кроме того, достижение 1700 RPS выглядит совершенно нереалистичным для этого подхода, поскольку requests.get - довольно дорогостоящая операция, и маловероятно, что вы можете получить даже 50 RPS таким образом. Таким образом, вам, вероятно, нужно дважды проверить свои расчеты RPS.

Чтобы избежать ошибки, вам нужно использовать сеансы вместо создания соединений с нуля:

import multiprocessing
import requests
import time


class Worker(multiprocessing.Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.qin = qin
        self.qout = qout

    def run(self):
        s = requests.session()
        while not self.qin.empty():
            result = s.get(self.qin.get())
            self.qout.put(result)
            self.qin.task_done()

if __name__ == '__main__':
    start = time.time()

    qin = multiprocessing.JoinableQueue()
    [qin.put('http://localhost:8080/') for _ in range(10000)]

    qout = multiprocessing.Queue()

    [Worker(qin, qout).start() for _ in range(multiprocessing.cpu_count())]

    qin.join()

    result = []
    while not qout.empty():
        result.append(qout.get())

    print time.time() - start
    print result

Вопрос 2

Вы не получите более высокий RPS с потоками или асинхронными подходами, если I/O не займет больше времени, чем вычисления (например, высокая сетевая латентность, большие ответы и т.д.), потому что потоки затронуты GIL с момента запуска в том же процессе Python и асинхронные библиотеки могут блокироваться длительными вычислениями.

Хотя потоки или async libs могут повысить производительность, запуск одного и того же потока или асинхронного кода в нескольких процессах даст вам еще большую производительность.

Ответ 4

Вот образец, который я использовал с gevent, который основан на сопрограмме coroutine и не может страдать от GIL. Это может быть быстрее, чем использование потоков, и, возможно, самый быстрый при использовании в сочетании с многопроцессорной обработкой (в настоящее время он использует только 1 ядро):

from gevent import monkey
monkey.patch_all()

import logging
import random
import time
from threading import Thread

from gevent.queue import JoinableQueue
from logger import initialize_logger

initialize_logger()
log = logging.getLogger(__name__)


class Worker(Thread):

    def __init__(self, worker_idx, queue):
        # initialize the base class
        super(Worker, self).__init__()
        self.worker_idx = worker_idx
        self.queue = queue

    def log(self, msg):
        log.info("WORKER %s - %s" % (self.worker_idx, msg))

    def do_work(self, line):
        #self.log(line)
        time.sleep(random.random() / 10)

    def run(self):
        while True:
            line = self.queue.get()
            self.do_work(line)
            self.queue.task_done()


def main(number_of_workers=20):
    start_time = time.time()

    queue = JoinableQueue()
    for idx in range(number_of_workers):
        worker = Worker(idx, queue)
        # "daemonize" a thread to ensure that the threads will
        # close when the main program finishes
        worker.daemon = True
        worker.start()

    for idx in xrange(100):
        queue.put("%s" % idx)

    queue.join()
    time_taken = time.time() - start_time
    log.info("Parallel work took %s seconds." % time_taken)

    start_time = time.time()
    for idx in xrange(100):
        #log.info(idx)
        time.sleep(random.random() / 10)
    time_taken = time.time() - start_time
    log.info("Sync work took %s seconds." % time_taken)


if __name__ == "__main__":
    main()