MongoDB findAndModify. Is it really atomic? Help to create a closed solution for updating

I have docs Event

consisting of inline Snapshots

.

I want to add Snapshot

A to Event

if:

  • This event started within 5 minutes of snapshot A
  • The last snapshot in the event is less than a minute before snapshot A.

Otherwise ... create a new one Event

.

Here's my query findAndUpdate

that might make more sense:

Event.findAndModify(
  query: { 
    start_timestamp: { $gte: newSnapshot.timestamp - 5min },
    last_snapshot_timestamp: { $gte: newSnapshot.timestamp - 1min }
  },
  update: { 
    snapshots[newSnapshot.timestamp]: newSnapshot,
    $max: { last_snapshot_timestamp: newSnapshot.timestamp },
    $min: { start_timestamp: newSnapshot.timestamp }
  },
  upsert: true,
  $setOnInsert: { ALL OUR NEW EVENT FIELDS } }
)

      

Edit: Unfortunately, I cannot create a unique index in start_timestamp. The pictures are included with different time stamps and I want to group them into an event. Ie Snapshot A sets at 12:00:00 and Snapshot B sets at 12:00:59. They should be in the same case, but they can be written to the database at different times, because the workers writing them act simultaneously. Say another snapshot, at 12:00:30 it should be written at the same event as above. Finally, a snapshot at 12:02:00 should be recorded for a new event.

My question is ... will this work correctly in a parallel environment. Is it an atom findAndUpdate

? Is it possible to create two events where I had to create one and add a snapshot to it?

Edit: So the above approach does not guarantee that no two events will be generated, as @chainh has kindly pointed out.

So, I tried a new lock based approach - do you think this would work?

var acquireLock = function() {
  var query = { "locked": false}
  var update = { $set: { "locked": true } }
  return Lock.findAndModify({
    query: query, 
    update: update,
    upsert: true
  })
};

var releaseLock = function() {
  var query = { "locked": true }
  var update = { $set: { "locked": false } }
  return Lock.findAndModify({
    query: query, 
    update: update
  })
};

var insertSnapshot = function(newSnapshot, upsert) {
  Event.findAndModify(
    query: { 
      start_timestamp: { $gte: newSnapshot.timestamp - 5min },
      last_snapshot_timestamp: { $gte: newSnapshot.timestamp - 1min }
    },
    update: { 
      snapshots[newSnapshot.timestamp]: newSnapshot,
      $max: { last_snapshot_timestamp: newSnapshot.timestamp },
      $min: { start_timestamp: newSnapshot.timestamp }
    },
    upsert: upsert,
    $setOnInsert: { ALL OUR NEW EVENT FIELDS } }
  )
};

var safelyInsertEvent = function(snapshot) {
  return insertSnapshot(snapshot, false)
  .then(function(modifyRes) {
    if (!modifyRes.succeeded) {
      return acquireLock()
    }
  })
  .then(function(lockRes) {
    if (lockRes.succeeded) {
      return insertSnapshot(snapshot, true)
    } else {
      throw new AcquiringLockError("Didn't acquire lock. Try again")
    }
  })
  .then(function() {
    return releaseLock()
  })
  .catch(AcquiringLockError, function(err) {
    return safelyInsertEvent(snapshot)
  })
};

      

The locking document will simply contain one field (locked). Basically the above code tries to find an existing event and update it. If it works, great, we can help out. If we haven't updated, we know we don't have an existing event to insert the snapshot. This way we then acquire the lock atomically, and if that succeeds, we can safely recreate the new event. If you find that the lock failed, we'll just try the whole process again, and hopefully by then we have an existing event to insert it.

+3


source to share


2 answers


As per your codes:

Event.findAndModify(
  query: { 
    start_timestamp: { $gte: newSnapshot.timestamp - 5min },
    last_snapshot_timestamp: { $gte: newSnapshot.timestamp - 1min }
  },
  update: { 
    snapshots[newSnapshot.timestamp]: newSnapshot,
    $max: { last_snapshot_timestamp: newSnapshot.timestamp },
    $min: { start_timestamp: newSnapshot.timestamp }
  },
  upsert: true,
  $setOnInsert: { ALL OUR NEW EVENT FIELDS } }
)

      

When you manage to insert the first document file into the database, the fields of that event document have the following relationship:
start_timestamp == last_snapshot_timestamp

After subsequent updates, the relationship is referred to:
start_timestamp <last_snapshot_timestamp <last_snapshot_timestamp + 1min <start_timestamp + 5min
OR
start_timestamp <last_snapshot_timestamp <start_timestamp + 5min <last_snapshot_timestamp + 1min

So, if a new snapshot wants to be inserted into this Event document permanently, it must match:
  newSnapshot.timestamp <Math.min (last_snapshot_timestamp + 1, start_timestamp + 5)

Suppose there are two event documents in the database over time:
Event1 (start_timestamp1, last_snapshot_timestamp1),
Event2 (start_timestamp2, last_snapshot_timestamp2)
Typically start_timestamp2> last_snapshot_timestamp1

Now, if a new snapshot appears and its timestamp is less than start_timestamp1 (assuming this is possible through latency or forging), then this snapshot can be inserted into any document in the document. So I doubt if you need another condition added to part of the query to make sure the distance between last_snapshot_timestamp and start_timestamp is always less than a certain value (e.g. 5 minutes)? For example, I change the request to



  query: { 
        start_timestamp: { $gte: newSnapshot.timestamp - 5min },
        last_snapshot_timestamp: { $gte: newSnapshot.timestamp - 1min , $lte : newSnapshot.timestamp + 5}
      }

      

Ok, go ahead ...
If I try to resolve this issue, I am still trying to create a unique index on the start_timestamp field . According to MongoDB manual, use findAndModify or update to complete the work atomically. But the headache is how I should handle when a duplicate value occurs, because newSnapshot.timestamp is out of control and it will probably change start_timestamp to $ min operator .

Approaches:

  • multiple threads upsert a new Event document because no documents can satisfy the request condition;
  • one thread will succeed in creating a new Event document with a specific newSnapshot.timestamp value, others will fail using the unique index constraints on the start_timestamp field ;
  • repeat other threads (now this update instead of upsert) and will update successfully (use existing Event document);
  • If an update (other than an upsert) causes start_timestamp to change with the $ min statement, and newSnapshot.tiemstamp matches the value of start_timestamp in the existing Event document, the update will not be performed using unique index constraints. But we can get a message and we know that the event document exists, the start_timestamp value is equal to newSnapshot.timestamp. Now we can just insert newSnapshot into this Event document because it definitely meets the condition.

Since it doesn't need to return an Event document, I use update instead of findAndModify as both are atomic and updating makes it easier to write in this case.
I am using simple JavaScript (runs on mongo shell) to express the steps (I am not familiar with this code syntax you were using: D) and I think you can understand easily.

var gap5 = 5 * 60 * 1000;   // just suppose, you should change accordingly if the value is not true. 
var gap1 = 1 * 60 * 1000;
var initialFields = {};     // ALL OUR NEW EVENT FIELDS

function insertSnapshotIfStartTimeStampNotExisted() {
    var query = { 
            start_timestamp: { $gte: newSnapshot.timestamp - gap5 },
            last_snapshot_timestamp: { $gte: newSnapshot.timestamp - gap1 }
    };
    var update = { 
            $push : {snapshots: newSnapshot}, // suppose snapshots is an array 
            $max: { last_snapshot_timestamp: newSnapshot.timestamp },
            $min: { start_timestamp: newSnapshot.timestamp },
            $setOnInsert : initialFields
    },

    var result = db.Event.update(query, update, {upsert : true});
    if (result.nUpserted == 0 && result.nModified == 0) {
        insertSnapshotIfStartTimeStampExisted();            // Event document existed with that start_timestamp
    }
}

function insertSnapshotIfStartTimeStampExisted() {
    var query = { 
            start_timestamp: newSnapshot.timestamp,
    };
    var update = { 
            $push : {snapshots: newSnapshot}
    },

    var result = db.Event.update(query, update, {upsert : false});
    if (result.nModified == 0) {
        insertSnapshotIfStartTimeStampNotExisted();         // If start_timestamp just gets modified; it possible.
    }
}

// entry
db.Event.ensureIndex({start_timestamp:1},{unique:true});
insertSnapshotIfStartTimeStampNotExisted();

      

+1


source


It is possible that will findAndModify

restart multiple events in a parallel environment. Unless your event document contains a field with a unique index, so only one findAndModify

successfully inserts a new event and the other findAndModify

fails and retries to add a snapshot to the new event. For more information see this jira ticket: https://jira.mongodb.org/browse/DOCS-861



+1


source







All Articles