Я подключаюсь к локальному серверу (OSRM) через HTTP, чтобы отправлять маршруты и возвращать диски. Я замечаю, что ввод-вывод медленнее потоковой передачи, поскольку кажется, что период ожидания для вычисления меньше времени, необходимого для отправки запроса и обработки вывода JSON (я думаю, что I/O лучше, когда серверу требуется некоторое время для обработайте свой запрос → вы не хотите, чтобы он блокировался, потому что вам нужно подождать, это не мое дело). Threading страдает от Global Interpreter Lock и поэтому появляется (и доказательство ниже), что мой самый быстрый вариант - использовать многопроцессорность.
Проблема с многопроцессорностью заключается в том, что она настолько быстра, что она исчерпывает мои сокеты, и я получаю сообщение об ошибке (каждый раз каждый раз запрашивает новое соединение). Я могу (в последовательном порядке) использовать объект request.Sessions(), чтобы поддерживать соединение, однако я не могу заставить это работать параллельно (каждый процесс имеет собственный сеанс).
Ближайшим кодом, который я должен работать в данный момент, является этот многопроцессорный код:
conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=cpu_count())
def ReqOsrm(url_input):
ul, qid = url_input
try:
response = conn_pool.request('GET', ul)
json_geocode = json.loads(response.data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from, used_to = json_geocode['via_points']
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
return out
else:
print("Done but no route: %d %s" % (qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("%s: %d %s" % (err, qid, req_url))
return [qid, 999, 0, 0, 0, 0, 0, 0]
# run:
pool = Pool(cpu_count())
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
Однако я не могу заставить HTTPConnectionPool работать правильно, и он создает новые сокеты каждый раз (я думаю), а затем дает мне ошибку:
HTTPConnectionPool (host = '127.0.0.1', port = 5005): превышено максимальное количество попыток с URL: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (Вызвано NewConnectionError (': Не удалось установить новое соединение: [WinError 10048] Только одно использование каждого адреса сокета (протокол/сетевой адрес/порт) обычно разрешается ',))
Моя цель - получить расчет расстояний от сервер OSRM-маршрутизации Я запускаю локально (как можно быстрее).
У меня вопрос в двух частях - в основном я пытаюсь преобразовать некоторый код с использованием multiprocessing.Pool() для лучшего кода (правильные асинхронные функции - так, что выполнение никогда не прерывается и работает как можно быстрее).
Проблема, с которой я сталкиваюсь, заключается в том, что все, что я пытаюсь, выглядит медленнее, чем многопроцессорность (я приводил несколько примеров ниже того, что я пробовал).
Некоторые потенциальные методы: gevents, grequests, торнадо, запросы-фьючерсы, asyncio и т.д.
A - Multiprocessing.Pool()
Сначала я начал с чего-то вроде этого:
def ReqOsrm(url_input):
req_url, query_id = url_input
try_c = 0
#print(req_url)
while try_c < 5:
try:
response = requests.get(req_url)
json_geocode = response.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
....
pool = Pool(cpu_count()-1)
calc_routes = pool.map(ReqOsrm, url_routes)
Где я подключался к локальному серверу (localhost, port: 5005), который был запущен на 8 потоках, а поддерживает параллельное выполнение.
После небольшого поиска я понял, что ошибка, которую я получал, заключалась в том, что запросы открывали новое соединение/сокет для каждого запроса. Так что через некоторое время это было слишком быстро и утомительно. Кажется, что это можно использовать для запросов request.Session() - , однако мне не удалось получить эту работу с многопроцессорной обработкой (где каждый процесс имеет собственный сеанс).
Вопрос 1.
На некоторых компьютерах это выполняется нормально, например:
Для сравнения с более поздними версиями: 45% использования сервера и 1700 запросов в секунду
Однако на некоторых это не так, и я не совсем понимаю, почему:
HTTPConnectionPool (host = '127.0.0.1', port = 5000): превышено максимальное количество попыток с URL: /viaroute?loc=49.34343,3.30199&loc=49.56655,3.25837&alt=false&geometry=false (Вызванный NewConnectionError (': Не удалось установить новое соединение: [WinError 10048] Только одно использование каждого адреса сокета (протокол/сетевой адрес/порт) обычно разрешается ',))
Я предполагаю, что, поскольку запросы блокируют сокет, когда он используется, иногда сервер слишком медленно реагирует на старый запрос и генерируется новый. Сервер поддерживает очередь, однако запросы не так, а не для добавления в очередь Я получаю ошибку?
Вопрос 2.
Я нашел:
Блокировка или неблокирование?
При использовании адаптера транспорта по умолчанию, запросы не предоставляют любой неблокирующий IO. Свойство Response.content заблокирует пока не будет загружен весь ответ. Если вам требуется больше гранулярность, потоковые функции библиотеки (см. Запросы) позволяют получить меньшее количество ответов в время. Однако эти вызовы будут по-прежнему блокироваться.
Если вас беспокоит использование блокировки IO, есть много проекты, которые объединяют запросы с одним из Pythons асинхронности.
Два превосходных примера - это графики и запросы-фьючерсы.
B - запросы-фьючерсы
Чтобы решить эту проблему, мне нужно было переписать мой код для использования асинхронных запросов, поэтому я попробовал следующее:
from requests_futures.sessions import FuturesSession
from concurrent.futures import ThreadPoolExecutor, as_completed
(Кстати, я запускаю свой сервер с возможностью использовать все потоки)
И основной код:
calc_routes = []
futures = {}
with FuturesSession(executor=ThreadPoolExecutor(max_workers=1000)) as session:
# Submit requests and process in background
for i in range(len(url_routes)):
url_in, qid = url_routes[i] # url |query-id
future = session.get(url_in, background_callback=lambda sess, resp: ReqOsrm(sess, resp))
futures[future] = qid
# Process the futures as they become complete
for future in as_completed(futures):
r = future.result()
try:
row = [futures[future]] + r.data
except Exception as err:
print('No route')
row = [futures[future], 999, 0, 0, 0, 0, 0, 0]
calc_routes.append(row)
Где моя функция (ReqOsrm) теперь переписана как:
def ReqOsrm(sess, resp):
json_geocode = resp.json()
status = int(json_geocode['status'])
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
# Cannot find route between points (code errors as 999)
else:
out = [999, 0, 0, 0, 0, 0, 0]
resp.data = out
Однако этот код медленнее, чем многопроцессорный! Прежде чем я получил около 1700 запросов в секунду, теперь я получаю 600 секунд. Я думаю, это потому, что у меня нет полного использования процессора, но я не уверен, как его увеличить?
C - Thread
Я попробовал другой метод (создание потоков) - однако снова не был уверен, как это сделать, чтобы максимизировать использование ЦП (в идеале я хочу, чтобы мой сервер использовал 50 %, нет?):
def doWork():
while True:
url,qid = q.get()
status, resp = getReq(url)
processReq(status, resp, qid)
q.task_done()
def getReq(url):
try:
resp = requests.get(url)
return resp.status_code, resp
except:
return 999, None
def processReq(status, resp, qid):
try:
json_geocode = resp.json()
# Found route between points
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done but no route")
out = [qid, 999, 0, 0, 0, 0, 0, 0]
except Exception as err:
print("Error: %s" % err)
out = [qid, 999, 0, 0, 0, 0, 0, 0]
qres.put(out)
return
#Run:
concurrent = 1000
qres = Queue()
q = Queue(concurrent)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in url_routes:
q.put(url)
q.join()
except Exception:
pass
# Get results
calc_routes = [qres.get() for _ in range(len(url_routes))]
Этот метод быстрее, чем request_futures, я думаю, но я не знаю, сколько потоков задано для максимизации этого -
D - торнадо (не работает)
Теперь я пытаюсь использовать торнадо, но не могу заставить его работать, он ломается с существующим кодом -1073741819, если я использую curl - если я использую simple_httpclient, он работает, но затем я получаю ошибки таймаута:
ОШИБКА: tornado.application: Несколько исключений из списка выходных данных Traceback (последний последний вызов): Файл "C:\Anaconda3\lib\site-packages\tornado\gen.py", строка 789, в обратном вызове result_list.append(f.result()) Файл "C:\Anaconda3\lib\site-packages\tornado\concurrent.py", строка 232, в результат raise_exc_info (self._exc_info) Файл "", строка 3, в raise_exc_info tornado.httpclient.HTTPError: HTTP 599: Тайм-аут
def handle_req(r):
try:
json_geocode = json_decode(r)
status = int(json_geocode['status'])
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
print(out)
except Exception as err:
print(err)
out = [999, 0, 0, 0, 0, 0, 0]
return out
# Configure
# For some reason curl_httpclient crashes my computer
AsyncHTTPClient.configure("tornado.simple_httpclient.SimpleAsyncHTTPClient", max_clients=10)
@gen.coroutine
def run_experiment(urls):
http_client = AsyncHTTPClient()
responses = yield [http_client.fetch(url) for url, qid in urls]
responses_out = [handle_req(r.body) for r in responses]
raise gen.Return(value=responses_out)
# Initialise
_ioloop = ioloop.IOLoop.instance()
run_func = partial(run_experiment, url_routes)
calc_routes = _ioloop.run_sync(run_func)
E - asyncio/aiohttp
Решили попробовать другой подход (хотя было бы здорово работать с торнадо), используя asyncio и aiohttp.
import asyncio
import aiohttp
def handle_req(data, qid):
json_geocode = json.loads(data.decode('utf-8'))
status = int(json_geocode['status'])
if status == 200:
tot_time_s = json_geocode['route_summary']['total_time']
tot_dist_m = json_geocode['route_summary']['total_distance']
used_from = json_geocode['via_points'][0]
used_to = json_geocode['via_points'][1]
out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
else:
print("Done, but not route for {0} - status: {1}".format(qid, status))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
return out
def chunked_http_client(num_chunks):
# Use semaphore to limit number of requests
semaphore = asyncio.Semaphore(num_chunks)
@asyncio.coroutine
# Return co-routine that will download files asynchronously and respect
# locking fo semaphore
def http_get(url, qid):
nonlocal semaphore
with (yield from semaphore):
response = yield from aiohttp.request('GET', url)
body = yield from response.content.read()
yield from response.wait_for_close()
return body, qid
return http_get
def run_experiment(urls):
http_client = chunked_http_client(500)
# http_client returns futures
# save all the futures to a list
tasks = [http_client(url, qid) for url, qid in urls]
response = []
# wait for futures to be ready then iterate over them
for future in asyncio.as_completed(tasks):
data, qid = yield from future
try:
out = handle_req(data, qid)
except Exception as err:
print("Error for {0} - {1}".format(qid,err))
out = [qid, 999, 0, 0, 0, 0, 0, 0]
response.append(out)
return response
# Run:
loop = asyncio.get_event_loop()
calc_routes = loop.run_until_complete(run_experiment(url_routes))
Это работает нормально, но все же медленнее, чем многопроцессорное обслуживание!