Подтвердить что ты не робот

Mongodb перемещает документы из одной коллекции в другую коллекцию

Как документы перемещаются из одной коллекции в другую коллекцию в MongoDB? Например: у меня есть много документов в коллекции A, и я хочу переместить все 1 месяц старые документы в коллекцию B (эти 1 месяц старше документы не должны быть в коллекции A).

Используя агрегацию, мы можем сделать копию. Но я пытаюсь сделать перемещение документов. Какой метод можно использовать для перемещения документов?

4b9b3361

Ответ 1

Обновление

Этот ответ by @jasongarber является более безопасным подходом и должен использоваться вместо моего.


При условии, что я получил вас правильно, и вы хотите переместить все документы старше 1 месяца, и вы используете mongoDB 2.6, нет причин не использовать массовые операции, которые являются наиболее эффективным способом выполнения нескольких операций, о которых я знаю

> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
    function(doc){
      bulkInsert.insert(doc);
      bulkRemove.find({_id:doc._id}).removeOne();
    }
  )
> bulkInsert.execute()
> bulkRemove.execute()

Это должно быть довольно быстро, и это имеет то преимущество, что если что-то пойдет не так во время объемной вставки, исходные данные все еще существуют.


Edit

Чтобы предотвратить чрезмерное использование памяти, вы можете выполнить массовую операцию на каждом обработанном документе x:

> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var x = 10000
> var counter = 0
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
    function(doc){
      bulkInsert.insert(doc);
      bulkRemove.find({_id:doc._id}).removeOne();
      counter ++
      if( counter % x == 0){
        bulkInsert.execute()
        bulkRemove.execute()
        bulkInsert = db.target.initializeUnorderedBulkOp()
        bulkRemove = db.source.initializeUnorderedBulkOp()
      }
    }
  )
> bulkInsert.execute()
> bulkRemove.execute()

Ответ 2

Вставьте и удалите:

var documentsToMove = db.collectionA.find({});
documentsToMove.forEach(function(doc) {
    db.collectionB.insert(doc);
    db.collectionA.remove(doc);
}

Ответ 3

Это повторение @Markus W Mahlberg

Возвращение благосклонности - как функция

function moveDocuments(sourceCollection,targetCollection,filter) {
    var bulkInsert = targetCollection.initializeUnorderedBulkOp();
    var bulkRemove = sourceCollection.initializeUnorderedBulkOp();
    sourceCollection.find(filter)
        .forEach(function(doc) {
        bulkInsert.insert(doc);
        bulkRemove.find({_id:doc._id}).removeOne();
        }
  )
  bulkInsert.execute();
  bulkRemove.execute();
}

В примере используется

var x = {dsid:{$exists: true}};
moveDocuments(db.pictures,db.artifacts,x)

чтобы переместить все документы, содержащие dsid верхнего уровня из изображений в коллекцию артефактов.

Ответ 4

Основные операции @markus-w-mahlberg показали (и @mark-mullin уточнены) эффективны, но небезопасны, как написано. Если bulkInsert завершится с ошибкой, функция bulkRemove будет продолжена. Чтобы убедиться, что вы не теряете записи при перемещении, используйте это вместо:

function insertBatch(collection, documents) {
  var bulkInsert = collection.initializeUnorderedBulkOp();
  var insertedIds = [];
  var id;
  documents.forEach(function(doc) {
    id = doc._id;
    // Insert without raising an error for duplicates
    bulkInsert.find({_id: id}).upsert().replaceOne(doc);
    insertedIds.push(id);
  });
  bulkInsert.execute();
  return insertedIds;
}

function deleteBatch(collection, documents) {
  var bulkRemove = collection.initializeUnorderedBulkOp();
  documents.forEach(function(doc) {
    bulkRemove.find({_id: doc._id}).removeOne();
  });
  bulkRemove.execute();
}

function moveDocuments(sourceCollection, targetCollection, filter, batchSize) {
  print("Moving " + sourceCollection.find(filter).count() + " documents from " + sourceCollection + " to " + targetCollection);
  var count;
  while ((count = sourceCollection.find(filter).count()) > 0) {
    print(count + " documents remaining");
    sourceDocs = sourceCollection.find(filter).limit(batchSize);
    idsOfCopiedDocs = insertBatch(targetCollection, sourceDocs);

    targetDocs = targetCollection.find({_id: {$in: idsOfCopiedDocs}});
    deleteBatch(sourceCollection, targetDocs);
  }
  print("Done!")
}

Ответ 5

Возможно, с точки зрения производительности лучше удалить много документов с помощью одной команды (особенно если у вас есть индексы для части запроса), а не удалять их по очереди.

Например:

db.source.find({$gte: start, $lt: end}).forEach(function(doc){
   db.target.insert(doc);
});
db.source.remove({$gte: start, $lt: end});

Ответ 6

Из MongoDB 3.0 вверх вы можете использовать команду copyTo со следующим синтаксисом:

db.source_collection.copyTo("target_collection")

Затем вы можете использовать команду drop, чтобы удалить старую коллекцию:

db.source_collection.drop()

Ответ 7

$out используется для создания новой коллекции с данными, поэтому используйте $out

db.oldCollection.aggregate([{$out : "newCollection"}])

затем используйте drop

db.oldCollection.drop()

Ответ 8

вы можете использовать запрос диапазона для получения данных из sourceCollection и держать данные курсора в переменной и петле на нем и вставлять в целевую коллекцию:

 var doc = db.sourceCollection.find({
        "Timestamp":{
              $gte:ISODate("2014-09-01T00:00:00Z"),
              $lt:ISODate("2014-10-01T00:00:00Z")
        }
 });

 doc.forEach(function(doc){
    db.targetCollection.insert(doc);
 })

Надеюсь, что это поможет!

Ответ 9

Мне нравится ответ от @markus-w-mahlberg, однако время от времени я видел необходимость держать его немного проще для людей. Таким образом, у меня есть несколько функций, которые ниже. Естественно, вы могли бы обернуть все это с помощью массовых операторов, но этот код работает с новыми и старыми системами Mongo одинаково.

function parseNS(ns){
    //Expects we are forcing people to not violate the rules and not doing "foodb.foocollection.month.day.year" if they do they need to use an array.
    if (ns instanceof Array){
        database =  ns[0];
        collection = ns[1];
    }
    else{
        tNS =  ns.split(".");
        if (tNS.length > 2){
            print('ERROR: NS had more than 1 period in it, please pass as an [ "dbname","coll.name.with.dots"] !');
            return false;
        }
        database = tNS[0];
        collection = tNS[1];
    }
    return {database: database,collection: collection};
}

function insertFromCollection( sourceNS,  destNS, query, batchSize, pauseMS){
    //Parse and check namespaces
    srcNS = parseNS(sourceNS);
    destNS = parseNS(destNS);
    if ( srcNS == false ||  destNS == false){return false;}

    batchBucket = new Array();
    totalToProcess = db.getDB(srcNS.database).getCollection(srcNS.collection).find(query,{_id:1}).count();
    currentCount = 0;
    print("Processed "+currentCount+"/"+totalToProcess+"...");
    db.getDB(srcNS.database).getCollection(srcNS.collection).find(query).addOption(DBQuery.Option.noTimeout).forEach(function(doc){
        batchBucket.push(doc);
        if ( batchBucket.length > batchSize){
            db.getDB(destNS.database).getCollection(destNS.collection)insert(batchBucket);
            currentCount += batchBucket.length;
            batchBucket = [];
            sleep (pauseMS);
            print("Processed "+currentCount+"/"+totalToProcess+"...");       
        }
    }
    print("Completed");
}

/** Example Usage:
        insertFromCollection("foo.bar","foo2.bar",{"type":"archive"},1000,20);    

Очевидно, вы можете добавить db.getSiblingDB(srcNS.database).getCollection(srcNS.collection).remove(query,true) Если вы захотите также удалить записи после их копирования в новое место. Код можно легко создать таким образом, чтобы сделать его перезагруженным.

Ответ 10

Я планировал архивировать 1000 записей за раз, используя методы bulkinsert и bulkdelete для pymongo.

Для источника и цели

  • создать объекты mongodb для подключения к базе данных.

  • создать экземпляр объемных объектов. Примечание. Я создал резервную копию больших объектов. Это поможет мне отложить вставку или удаление при возникновении ошибки. Пример:

    Для источника // replace this with mongodb object creation logic source_db_obj = db_help.create_db_obj(source_db, source_col) source_bulk = source_db_obj.initialize_ordered_bulk_op() source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
    Для целей // replace this with mogodb object creation logic target_db_obj = db_help.create_db_obj(target_db, target_col) target_bulk = target_db_obj.initialize_ordered_bulk_op() target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()

  • Получить исходные записи, соответствующие критериям фильтра

    source_find_results = source_db_obj.find(фильтр)

  • Прокрутка записей источника

    создать целевые и исходные групповые операции

    Добавить поле archived_at с текущим datetime в целевую коллекцию

    //replace this with the logic to obtain the UTCtime. doc['archived_at'] = db_help.getUTCTime() target_bulk.insert(document) source_bulk.remove(document)

    для отката в случае каких-либо ошибок или исключений, создайте операции target_bulk_bak и source_bulk_bak.

    target_bulk_bak.find({'_id':doc['_id']}).remove_one() source_bulk_bak.insert(doc) //remove the extra column doc.pop('archieved_at', None)

  • Когда количество записей достигает 1000, выполните перемещение цели - объем и удаление источника. Примечание. Этот метод принимает объекты target_bulk и source_bulk для выполнения.

    execute_bulk_insert_remove (source_bulk, target_bulk)

  • Когда возникает исключение, выполните удаление target_bulk_bak и inesertions source_bulk_bak. Это отменит изменения. Поскольку mongodb не имеет отката, я придумал этот хак

    execute_bulk_insert_remove (source_bulk_bak, target_bulk_bak)

  • Наконец, повторно инициализируйте исходные и целевые объекты bulk и bulk_bak. Это необходимо, потому что вы можете использовать их только один раз.

  • Полный код

        def execute_bulk_insert_remove(source_bulk, target_bulk):
            try:
                target_bulk.execute()
                source_bulk.execute()
            except BulkWriteError as bwe:
                raise Exception(
                    "could not archive document, reason:    {}".format(bwe.details))
    
        def archive_bulk_immediate(filter, source_db, source_col, target_db, target_col):
            """
            filter: filter criteria for backup
            source_db: source database name
            source_col: source collection name
            target_db: target database name
            target_col: target collection name
            """
            count = 0
            bulk_count = 1000
    
            source_db_obj = db_help.create_db_obj(source_db, source_col)
            source_bulk = source_db_obj.initialize_ordered_bulk_op()
            source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
    
            target_db_obj = db_help.create_db_obj(target_db, target_col)
            target_bulk = target_db_obj.initialize_ordered_bulk_op()
            target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()
    
            source_find_results = source_db_obj.find(filter)
    
            start = datetime.now()
    
            for doc in source_find_results:
                doc['archived_at'] = db_help.getUTCTime()
    
                target_bulk.insert(doc)
                source_bulk.find({'_id': doc['_id']}).remove_one()
                target_bulk_bak.find({'_id': doc['_id']}).remove_one()
                doc.pop('archieved_at', None)
                source_bulk_bak.insert(doc)
    
                count += 1
    
                if count % 1000 == 0:
                    logger.info("count: {}".format(count))
                    try:
                        execute_bulk_insert_remove(source_bulk, target_bulk)
                    except BulkWriteError as bwe:
                        execute_bulk_insert_remove(source_bulk_bak, target_bulk_bak)
                        logger.info("Bulk Write Error: {}".format(bwe.details))
                        raise
    
                    source_bulk = source_db_obj.initialize_ordered_bulk_op()
                    source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
    
                    target_bulk = target_db_obj.initialize_ordered_bulk_op()
                    target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()
    
            end = datetime.now()
    
            logger.info("archived {} documents to {} in ms.".format(
                count, target_col, (end - start)))