refactor cron to avoid double cronning - work in progress

This commit is contained in:
Matteo Pagliazzi
2016-05-25 16:55:57 +02:00
parent 1a26965542
commit 31850830a0
3 changed files with 138 additions and 149 deletions

View File

@@ -119,7 +119,7 @@ api.getGroup = {
url: '/groups/:groupId', url: '/groups/:groupId',
// Disable cron when getting groups to avoid race conditions when the site is loaded // Disable cron when getting groups to avoid race conditions when the site is loaded
// and requests for party and user data are concurrent // and requests for party and user data are concurrent
runCron: false, runCron: true,
middlewares: [authWithHeaders()], middlewares: [authWithHeaders()],
async handler (req, res) { async handler (req, res) {
let user = res.locals.user; let user = res.locals.user;

View File

@@ -10,14 +10,16 @@ import { v4 as uuid } from 'uuid';
const daysSince = common.daysSince; const daysSince = common.daysSince;
module.exports = function cronMiddleware (req, res, next) { module.exports = async function cronMiddleware (req, res, next) {
let user = res.locals.user; let user = res.locals.user;
if (!user) return next(); // User might not be available when authentication is not mandatory if (!user) return next(); // User might not be available when authentication is not mandatory
let analytics = res.analytics; let analytics = res.analytics;
let now = new Date(); let now = new Date();
let _cronSignature = uuid();
try {
console.log('CHECKING RUN CRON', req.originalUrl, req.method, (new Date()).toISOString());
// If the user's timezone has changed (due to travel or daylight savings), // If the user's timezone has changed (due to travel or daylight savings),
// cron can be triggered twice in one day, so we check for that and use // cron can be triggered twice in one day, so we check for that and use
@@ -98,44 +100,37 @@ module.exports = function cronMiddleware (req, res, next) {
} }
if (daysMissed <= 0) return next(); if (daysMissed <= 0) return next();
console.log('RUNNING CRON FOR REAL', req.originalUrl, req.method, (new Date()).toISOString());
let quest; // To avoid double cron we first set _cronSignature to now and then check that it's not changed while processing
let progress; let userUpdateResult = await User.update({
let tasks;
// To avoid double cron we set _cronSignature on the user to a random string
// and check that it has remained the same before saving
user._cronSignature = uuid();
User.update({
_id: user._id, _id: user._id,
_cronSignature: 'not-running', _cronSignature: 'not-running', // Check that in the meantime another cron has not started
}, { }, {
$set: { $set: {
_cronSignature: user._cronSignature, _cronSignature,
}, },
}).exec() }).exec();
.then((updateResult) => { // Fetch active tasks (no completed todos) console.log('FIRST USER UPDATE?', userUpdateResult, req.originalUrl, req.method, (new Date()).toISOString());
// if the cron signature is set, throw an error and recover later // if the cron signature is set, throw an error and recover later
if (updateResult.nMatched === 0 || updateResult.nUpdated === 0) { if (userUpdateResult.nMatched === 0 || userUpdateResult.nModified === 0) {
throw new Error('cron-already-running'); throw new Error('cron-already-running');
} }
return Tasks.Task.find({ let tasks = await Tasks.Task.find({
userId: user._id, userId: user._id,
$or: [ // Exclude completed todos $or: [ // Exclude completed todos
{type: 'todo', completed: false}, {type: 'todo', completed: false},
{type: {$in: ['habit', 'daily', 'reward']}}, {type: {$in: ['habit', 'daily', 'reward']}},
], ],
}).exec(); }).exec();
})
.then(tasksFetched => {
tasks = tasksFetched;
let tasksByType = {habits: [], dailys: [], todos: [], rewards: []}; let tasksByType = {habits: [], dailys: [], todos: [], rewards: []};
tasks.forEach(task => tasksByType[`${task.type}s`].push(task)); tasks.forEach(task => tasksByType[`${task.type}s`].push(task));
// Run cron // Run cron
progress = cron({user, tasksByType, now, daysMissed, analytics, timezoneOffsetFromUserPrefs}); let progress = cron({user, tasksByType, now, daysMissed, analytics, timezoneOffsetFromUserPrefs});
// Clear old completed todos - 30 days for free users, 90 for subscribers // Clear old completed todos - 30 days for free users, 90 for subscribers
// Do not delete challenges completed todos TODO unless the task is broken? // Do not delete challenges completed todos TODO unless the task is broken?
@@ -149,74 +144,65 @@ module.exports = function cronMiddleware (req, res, next) {
'challenge.id': {$exists: false}, 'challenge.id': {$exists: false},
}).exec(); }).exec();
let ranCron = user.isModified(); let quest = common.content.quests[user.party.quest.key];
quest = common.content.quests[user.party.quest.key];
if (ranCron) res.locals.wasModified = true; // TODO remove after v2 is retired res.locals.wasModified = true; // TODO remove after v2 is retired
if (!ranCron) return next();
// Group.tavernBoss(user, progress); // Group.tavernBoss(user, progress);
let reallyModifiedPaths = {}; // Mongoose stores both path and path.nested in user.modifiedPaths()
// Save user and tasks user.modifiedPaths().forEach(path => {
// Uses mongoose's internals to get update command let rootPath = path.split('.')[0];
let mongooseDelta = user.$__delta(); if (Object.keys(reallyModifiedPaths).indexOf(rootPath) === -1) {
if (mongooseDelta instanceof Error) { let dataForPath = user[rootPath].toObject ? user[rootPath].toObject() : user[rootPath];
throw mongooseDelta; reallyModifiedPaths[rootPath] = dataForPath;
} }
});
reallyModifiedPaths._cronSignature = 'not-running'; // Finish running cron
let mongooseWhere = user.$__where(mongooseDelta[0]); // Save user (only if another cron has done it already!)
if (mongooseWhere instanceof Error) { let secondUserUpdateResult = await User.update({
throw mongooseWhere; _id: user._id,
} _cronSignature,
mongooseWhere._cronSignature = user._cronSignature; // Only update the user if cron signature matches }, {
$set: reallyModifiedPaths,
}).exec();
console.log('SECOND USER UPDATE?', secondUserUpdateResult, req.originalUrl, req.method, (new Date()).toISOString());
return User.update(mongooseWhere, mongooseDelta[1]); // if cron already run, throw and recover later
}) if (secondUserUpdateResult.nMatched === 0 || secondUserUpdateResult.nModified === 0) {
.then(updateResult => {
// if the cron signature is set, throw an error and recover later
if (updateResult.nMatched === 0 || updateResult.nUpdated === 0) {
throw new Error('cron-already-running'); throw new Error('cron-already-running');
} }
let toSave = []; let tasksToSave = [];
tasks.forEach(task => { tasks.forEach(task => {
if (task.isModified()) toSave.push(task.save()); if (task.isModified()) tasksToSave.push(task.save());
}); });
await Bluebird.all(tasksToSave);
return Bluebird.all(toSave);
})
.then(() => {
if (!quest) return; if (!quest) return;
// If user is on a quest, roll for boss & player, or handle collections // If user is on a quest, roll for boss & player, or handle collections
let questType = quest.boss ? 'boss' : 'collect'; let questType = quest.boss ? 'boss' : 'collect';
// TODO this saves user, runs db updates, loads user. Is there a better way to handle this? // TODO this saves user, runs db updates, loads user. Is there a better way to handle this?
return Group[`${questType}Quest`](user, progress); await Group[`${questType}Quest`](user, progress);
})
.then(() => { res.locals.user = await User.findById(user._id).exec();
User.findByIdAndUpdate(user._id, {
$set: {_cronSignature: 'not-running'},
}, {
new: true, // return the updated document
}).exec();
}) // fetch the updated user...
.then(updatedUser => {
user = res.locals.user = updatedUser;
return null;
})
.then(() => next())
.catch((err) => {
if (err.message === 'cron-already-running') {
// recovering after abort, wait 200ms and reload user
setTimeout(() => {
User.findById(user._id, (reloadErr, reloadedUser) => {
if (reloadErr) return next(reloadErr);
user = res.locals.user = reloadedUser;
return next(); return next();
}); } catch (err) {
}, 200); if (err.message === 'cron-already-running') {
console.log('RECOVERING FROM CRON', req.originalUrl, req.method, (new Date()).toISOString());
// recovering after abort, wait 200ms and reload user
Bluebird.delay(200).then(() => {
return User.findById(user._id).exec();
}).then((reloadedUser) => {
res.locals.user = reloadedUser;
console.log('RECOVERED FROM CRON', req.originalUrl, req.method, (new Date()).toISOString());
return next();
}).catch(secondError => next(secondError));
} else { } else {
return next(err); return next(err);
} }
}); }
}; };

View File

@@ -25,6 +25,9 @@ const Schema = mongoose.Schema;
export const INVITES_LIMIT = 100; export const INVITES_LIMIT = 100;
export const TAVERN_ID = shared.TAVERN_ID; export const TAVERN_ID = shared.TAVERN_ID;
const CRON_SAFE_MODE = nconf.get('CRON_SAFE_MODE') === 'true';
const CRON_SEMI_SAFE_MODE = nconf.get('CRON_SEMI_SAFE_MODE') === 'true';
// NOTE once Firebase is enabled any change to groups' members in MongoDB will have to be run through the API // NOTE once Firebase is enabled any change to groups' members in MongoDB will have to be run through the API
// changes made directly to the db will cause Firebase to get out of sync // changes made directly to the db will cause Firebase to get out of sync
export let schema = new Schema({ export let schema = new Schema({
@@ -516,7 +519,7 @@ schema.statics.bossQuest = async function bossQuest (user, progress) {
group.quest.progress.hp -= progress.up; group.quest.progress.hp -= progress.up;
// TODO Create a party preferred language option so emits like this can be localized. Suggestion: Always display the English version too. Or, if English is not displayed to the players, at least include it in a new field in the chat object that's visible in the database - essential for admins when troubleshooting quests! // TODO Create a party preferred language option so emits like this can be localized. Suggestion: Always display the English version too. Or, if English is not displayed to the players, at least include it in a new field in the chat object that's visible in the database - essential for admins when troubleshooting quests!
let playerAttack = `${user.profile.name} attacks ${quest.boss.name('en')} for ${progress.up.toFixed(1)} damage.`; let playerAttack = `${user.profile.name} attacks ${quest.boss.name('en')} for ${progress.up.toFixed(1)} damage.`;
let bossAttack = nconf.get('CRON_SAFE_MODE') === 'true' || nconf.get('CRON_SEMI_SAFE_MODE') === 'true' ? `${quest.boss.name('en')} does not attack, because it respects the fact that there are some bugs\` \`post-maintenance and it doesn't want to hurt anyone unfairly. It will continue its rampage soon!` : `${quest.boss.name('en')} attacks party for ${Math.abs(down).toFixed(1)} damage.`; let bossAttack = CRON_SAFE_MODE || CRON_SEMI_SAFE_MODE ? `${quest.boss.name('en')} does not attack, because it respects the fact that there are some bugs\` \`post-maintenance and it doesn't want to hurt anyone unfairly. It will continue its rampage soon!` : `${quest.boss.name('en')} attacks party for ${Math.abs(down).toFixed(1)} damage.`;
// TODO Consider putting the safe mode boss attack message in an ENV var // TODO Consider putting the safe mode boss attack message in an ENV var
group.sendChat(`\`${playerAttack}\` \`${bossAttack}\``); group.sendChat(`\`${playerAttack}\` \`${bossAttack}\``);