Подтвердить что ты не робот

Быстрое или массовое увеличение в пимонго

Как я могу увеличить объем в pymongo? Я хочу обновить кучу записей и делать их по одному очень медленно.

Ответ на почти идентичный вопрос находится здесь: Массовое обновление/восстановление в MongoDB?

Принятый ответ фактически не отвечает на вопрос. Он просто дает ссылку на CLI монго для выполнения импорта/экспорта.

Я также был бы открыт для кого-то, объясняющего, почему выполнение массового upsert невозможно/нет лучшей практики, но, пожалуйста, объясните, какое предпочтительное решение для этой проблемы.

Спасибо!

4b9b3361

Ответ 1

MongoDB 2.6+ поддерживает массовые операции. Это включает в себя массовые вставки, взлеты, обновления и т.д. Суть этого заключается в уменьшении/устранении задержек с латентностью "туда и обратно" при выполнении операций записи по записи ( "документ по документу" является правильным).

Итак, как это работает? Пример в Python, потому что я работаю.

>>> import pymongo
>>> pymongo.version
'2.7rc0'

Чтобы использовать эту функцию, мы создаем объект "bulk", добавляем к нему документы, а затем вызываем выполнение на нем, и он будет отправлять все обновления сразу. Предостережения: BSON-размер собранных операций (сумма битсов) не может превышать ограничение размера документа 16 МБ. Конечно, количество операций может, таким образом, значительно варьироваться, ваш пробег мая может.

Пример в Pymongo of Bulk upsert operation:

import pymongo
conn = pymongo.MongoClient('myserver', 8839)
db = conn['mydbname']
coll = db.myCollection
bulkop = coll.initialize_ordered_bulk_op()
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':1})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':2})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':3})
retval = bulkop.execute()

Это важный метод. Дополнительная информация доступна по адресу:

http://api.mongodb.org/python/2.7rc1/examples/bulk.html

Ответ 2

Современные выпуски pymongo (больше 3.x) завершают массовые операции в согласованном интерфейсе, который понижает, когда выпуск сервера не поддерживает массовые операции. Это теперь согласуется с официально поддерживаемыми драйверами MongoDB.

Таким образом, предпочтительным методом кодирования является использование bulk_write(), где вы используете UpdateOne другое другое соответствующее действие. И теперь, конечно, предпочтительно использовать списки естественного языка, а не конкретный строитель.

Прямой перевод старого документа:

from pymongo import UpdateOne

operations = [
    UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True)
]

result = collection.bulk_write(operations)

Или классический цикл преобразования документа:

import random
from pymongo import UpdateOne

random.seed()

operations = []

for doc in collection.find():
    # Set a random number on every document update
    operations.append(
        UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } })
    )

    # Send once every 1000 in batch
    if ( len(operations) == 1000 ):
        collection.bulk_write(operations,ordered=False)
        operations = []

if ( len(operations) > 0 ):
    collection.bulk_write(operations,ordered=False)

Возвращенный результат BulkWriteResult, который будет содержать счетчики согласованных и обновленных документов, а также возвращаемые значения _id для любого "upserts", которые происходят.

Существует немного неправильного представления о размере массива массивных операций. Фактический запрос, отправленный на сервер, не может превышать ограничение на BSON 16 МБ, поскольку этот предел также применяется к "запросу", отправленному на сервер, который также использует формат BSON.

Однако это не определяет размер массива запросов, который вы можете создать, так как фактические операции будут отправляться и обрабатываться только партиями 1000. Единственное реальное ограничение заключается в том, что сами эти 1000 инструкций по эксплуатации фактически не создают документ BSON размером более 16 МБ. Это действительно довольно высокий порядок.

Общая концепция массовых методов - это "меньше трафика" в результате отправки сразу нескольких вещей и обработки только одного ответа сервера. Сокращение этих накладных расходов, связанных с каждым запросом на обновление, экономит много времени.

Ответ 3

Ответ остается неизменным: поддержка массовых сообщений не поддерживается.

Ответ 4

Вы можете обновить все документы, соответствующие вашей спецификации запроса, используя multi = True.

Существует ошибка здесь о том, как сделать команду команд так, как вы хотите.

Ответ 6

Самое быстрое обновление с Python 3.5+, мотором и асинхронным:

import asyncio
import datetime
import logging
import random
import time

import motor.motor_asyncio
import pymongo.errors


async def execute_bulk(bulk):
    try:
        await bulk.execute()
    except pymongo.errors.BulkWriteError as err:
        logging.error(err.details)


async def main():
    cnt = 0
    bulk = db.initialize_unordered_bulk_op()
    tasks = []
    async for document in db.find({}, {}, no_cursor_timeout=True):
        cnt += 1
        bulk.find({'_id': document['_id']}).update({'$set': {"random": random.randint(0,10)}})
        if not cnt % 1000:
            task = asyncio.ensure_future(execute_bulk(bulk))
            tasks.append(task)
            bulk = db.initialize_unordered_bulk_op()
    if cnt % 1000:
        task = asyncio.ensure_future(bulk.execute(bulk))
        tasks.append(task)
    logging.info('%s processed', cnt)
    await asyncio.gather(*tasks)


logging.basicConfig(level='INFO')    
db = motor.motor_asyncio.AsyncIOMotorClient()['database']['collection']
start_time = time.time()
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    execution_time = time.time() - start_time
    logging.info('Execution time: %s', datetime.timedelta(seconds=execution_time))