wip(): prevent cron from running twice

This commit is contained in:
Matteo Pagliazzi
2016-05-23 16:42:45 +02:00
parent 3f1343fdfb
commit 1a26965542
3 changed files with 94 additions and 32 deletions

View File

@@ -6,6 +6,7 @@ import Bluebird from 'bluebird';
import { model as Group } from '../../models/group'; import { model as Group } from '../../models/group';
import { model as User } from '../../models/user'; import { model as User } from '../../models/user';
import { cron } from '../../libs/api-v3/cron'; import { cron } from '../../libs/api-v3/cron';
import { v4 as uuid } from 'uuid';
const daysSince = common.daysSince; const daysSince = common.daysSince;
@@ -98,20 +99,43 @@ module.exports = function cronMiddleware (req, res, next) {
if (daysMissed <= 0) return next(); if (daysMissed <= 0) return next();
// Fetch active tasks (no completed todos) let quest;
Tasks.Task.find({ let progress;
let tasks;
// To avoid double cron we set _cronSignature on the user to a random string
// and check that it has remained the same before saving
user._cronSignature = uuid();
User.update({
_id: user._id,
_cronSignature: 'not-running',
}, {
$set: {
_cronSignature: user._cronSignature,
},
}).exec()
.then((updateResult) => { // Fetch active tasks (no completed todos)
// if the cron signature is set, throw an error and recover later
if (updateResult.nMatched === 0 || updateResult.nUpdated === 0) {
throw new Error('cron-already-running');
}
return Tasks.Task.find({
userId: user._id, userId: user._id,
$or: [ // Exclude completed todos $or: [ // Exclude completed todos
{type: 'todo', completed: false}, {type: 'todo', completed: false},
{type: {$in: ['habit', 'daily', 'reward']}}, {type: {$in: ['habit', 'daily', 'reward']}},
], ],
}).exec() }).exec();
.then(tasks => { })
.then(tasksFetched => {
tasks = tasksFetched;
let tasksByType = {habits: [], dailys: [], todos: [], rewards: []}; let tasksByType = {habits: [], dailys: [], todos: [], rewards: []};
tasks.forEach(task => tasksByType[`${task.type}s`].push(task)); tasks.forEach(task => tasksByType[`${task.type}s`].push(task));
// Run cron // Run cron
let progress = cron({user, tasksByType, now, daysMissed, analytics, timezoneOffsetFromUserPrefs}); progress = cron({user, tasksByType, now, daysMissed, analytics, timezoneOffsetFromUserPrefs});
// Clear old completed todos - 30 days for free users, 90 for subscribers // Clear old completed todos - 30 days for free users, 90 for subscribers
// Do not delete challenges completed todos TODO unless the task is broken? // Do not delete challenges completed todos TODO unless the task is broken?
@@ -126,7 +150,7 @@ module.exports = function cronMiddleware (req, res, next) {
}).exec(); }).exec();
let ranCron = user.isModified(); let ranCron = user.isModified();
let quest = common.content.quests[user.party.quest.key]; quest = common.content.quests[user.party.quest.key];
if (ranCron) res.locals.wasModified = true; // TODO remove after v2 is retired if (ranCron) res.locals.wasModified = true; // TODO remove after v2 is retired
if (!ranCron) return next(); if (!ranCron) return next();
@@ -134,27 +158,65 @@ module.exports = function cronMiddleware (req, res, next) {
// Group.tavernBoss(user, progress); // Group.tavernBoss(user, progress);
// Save user and tasks // Save user and tasks
let toSave = [user.save()]; // Uses mongoose's internals to get update command
let mongooseDelta = user.$__delta();
if (mongooseDelta instanceof Error) {
throw mongooseDelta;
}
let mongooseWhere = user.$__where(mongooseDelta[0]);
if (mongooseWhere instanceof Error) {
throw mongooseWhere;
}
mongooseWhere._cronSignature = user._cronSignature; // Only update the user if cron signature matches
return User.update(mongooseWhere, mongooseDelta[1]);
})
.then(updateResult => {
// if the cron signature is set, throw an error and recover later
if (updateResult.nMatched === 0 || updateResult.nUpdated === 0) {
throw new Error('cron-already-running');
}
let toSave = [];
tasks.forEach(task => { tasks.forEach(task => {
if (task.isModified()) toSave.push(task.save()); if (task.isModified()) toSave.push(task.save());
}); });
return Bluebird.all(toSave) return Bluebird.all(toSave);
.then(saved => { })
user = res.locals.user = saved[0]; .then(() => {
if (!quest) return; if (!quest) return;
// If user is on a quest, roll for boss & player, or handle collections // If user is on a quest, roll for boss & player, or handle collections
let questType = quest.boss ? 'boss' : 'collect'; let questType = quest.boss ? 'boss' : 'collect';
// TODO this saves user, runs db updates, loads user. Is there a better way to handle this? // TODO this saves user, runs db updates, loads user. Is there a better way to handle this?
return Group[`${questType}Quest`](user, progress) return Group[`${questType}Quest`](user, progress);
.then(() => User.findById(user._id).exec()) // fetch the updated user... })
.then(() => {
User.findByIdAndUpdate(user._id, {
$set: {_cronSignature: 'not-running'},
}, {
new: true, // return the updated document
}).exec();
}) // fetch the updated user...
.then(updatedUser => { .then(updatedUser => {
res.locals.user = updatedUser; user = res.locals.user = updatedUser;
return null; return null;
});
}) })
.then(() => next()) .then(() => next())
.catch(next); .catch((err) => {
if (err.message === 'cron-already-running') {
// recovering after abort, wait 200ms and reload user
setTimeout(() => {
User.findById(user._id, (reloadErr, reloadedUser) => {
if (reloadErr) return next(reloadErr);
user = res.locals.user = reloadedUser;
return next();
});
}, 200);
} else {
return next(err);
}
}); });
}; };

View File

@@ -281,7 +281,7 @@ schema.methods.sendChat = function sendChat (message, user) {
this.chat.splice(200); this.chat.splice(200);
// Kick off chat notifications in the background. // Kick off chat notifications in the background.
let lastSeenUpdate = {$set: {}, $inc: {_v: 1}}; let lastSeenUpdate = {$set: {}};
lastSeenUpdate.$set[`newMessages.${this._id}`] = {name: this.name, value: true}; lastSeenUpdate.$set[`newMessages.${this._id}`] = {name: this.name, value: true};
// do not send notifications for guilds with more than 5000 users and for the tavern // do not send notifications for guilds with more than 5000 users and for the tavern
@@ -431,7 +431,6 @@ schema.methods.finishQuest = function finishQuest (quest) {
updates.$inc[`achievements.quests.${questK}`] = 1; updates.$inc[`achievements.quests.${questK}`] = 1;
updates.$inc['stats.gp'] = Number(quest.drop.gp); updates.$inc['stats.gp'] = Number(quest.drop.gp);
updates.$inc['stats.exp'] = Number(quest.drop.exp); updates.$inc['stats.exp'] = Number(quest.drop.exp);
updates.$inc._v = 1;
if (this._id === TAVERN_ID) { if (this._id === TAVERN_ID) {
updates.$set['party.quest.completed'] = questK; // Just show the notif updates.$set['party.quest.completed'] = questK; // Just show the notif
@@ -536,9 +535,9 @@ schema.statics.bossQuest = async function bossQuest (user, progress) {
// Everyone takes damage // Everyone takes damage
await User.update({ await User.update({
_id: {$in: _.keys(group.quest.members)}, _id: {$in: _.keys(group.quest.members.toObject ? group.quest.members.toObject() : group.quest.members)},
}, { }, {
$inc: {'stats.hp': down, _v: 1}, $inc: {'stats.hp': down},
}, {multi: true}).exec(); }, {multi: true}).exec();
// Apply changes the currently cronning user locally so we don't have to reload it to get the updated state // Apply changes the currently cronning user locally so we don't have to reload it to get the updated state
// TODO how to mark not modified? https://github.com/Automattic/mongoose/pull/1167 // TODO how to mark not modified? https://github.com/Automattic/mongoose/pull/1167

View File

@@ -345,6 +345,7 @@ export let schema = new Schema({
}, },
lastCron: {type: Date, default: Date.now}, lastCron: {type: Date, default: Date.now},
_cronSignature: String, // Private property to avoid double cron
// {GROUP_ID: Boolean}, represents whether they have unseen chat messages // {GROUP_ID: Boolean}, represents whether they have unseen chat messages
newMessages: {type: Schema.Types.Mixed, default: () => { newMessages: {type: Schema.Types.Mixed, default: () => {
@@ -528,7 +529,7 @@ export let schema = new Schema({
schema.plugin(baseModel, { schema.plugin(baseModel, {
// noSet is not used as updating uses a whitelist and creating only accepts specific params (password, email, username, ...) // noSet is not used as updating uses a whitelist and creating only accepts specific params (password, email, username, ...)
noSet: [], noSet: [],
private: ['auth.local.hashed_password', 'auth.local.salt'], private: ['auth.local.hashed_password', 'auth.local.salt', '_cronSignature'],
toJSONTransform: function userToJSON (plainObj, originalDoc) { toJSONTransform: function userToJSON (plainObj, originalDoc) {
// plainObj.filters = {}; // TODO Not saved, remove? // plainObj.filters = {}; // TODO Not saved, remove?
plainObj._tmp = originalDoc._tmp; // be sure to send down drop notifs plainObj._tmp = originalDoc._tmp; // be sure to send down drop notifs