2023-09-20 19:01:22 +08:00

311 lines
6.8 KiB
JavaScript

'use strict';
var AsyncLock = function (opts) {
opts = opts || {};
this.Promise = opts.Promise || Promise;
// format: {key : [fn, fn]}
// queues[key] = null indicates no job running for key
this.queues = Object.create(null);
// lock is reentrant for same domain
this.domainReentrant = opts.domainReentrant || false;
if (this.domainReentrant) {
if (typeof process === 'undefined' || typeof process.domain === 'undefined') {
throw new Error(
'Domain-reentrant locks require `process.domain` to exist. Please flip `opts.domainReentrant = false`, ' +
'use a NodeJS version that still implements Domain, or install a browser polyfill.');
}
// domain of current running func {key : fn}
this.domains = Object.create(null);
}
this.timeout = opts.timeout || AsyncLock.DEFAULT_TIMEOUT;
this.maxOccupationTime = opts.maxOccupationTime || AsyncLock.DEFAULT_MAX_OCCUPATION_TIME;
this.maxExecutionTime = opts.maxExecutionTime || AsyncLock.DEFAULT_MAX_EXECUTION_TIME;
if (opts.maxPending === Infinity || (Number.isInteger(opts.maxPending) && opts.maxPending >= 0)) {
this.maxPending = opts.maxPending;
} else {
this.maxPending = AsyncLock.DEFAULT_MAX_PENDING;
}
};
AsyncLock.DEFAULT_TIMEOUT = 0; //Never
AsyncLock.DEFAULT_MAX_OCCUPATION_TIME = 0; //Never
AsyncLock.DEFAULT_MAX_EXECUTION_TIME = 0; //Never
AsyncLock.DEFAULT_MAX_PENDING = 1000;
/**
* Acquire Locks
*
* @param {String|Array} key resource key or keys to lock
* @param {function} fn async function
* @param {function} cb callback function, otherwise will return a promise
* @param {Object} opts options
*/
AsyncLock.prototype.acquire = function (key, fn, cb, opts) {
if (Array.isArray(key)) {
return this._acquireBatch(key, fn, cb, opts);
}
if (typeof (fn) !== 'function') {
throw new Error('You must pass a function to execute');
}
// faux-deferred promise using new Promise() (as Promise.defer is deprecated)
var deferredResolve = null;
var deferredReject = null;
var deferred = null;
if (typeof (cb) !== 'function') {
opts = cb;
cb = null;
// will return a promise
deferred = new this.Promise(function(resolve, reject) {
deferredResolve = resolve;
deferredReject = reject;
});
}
opts = opts || {};
var resolved = false;
var timer = null;
var occupationTimer = null;
var executionTimer = null;
var self = this;
var done = function (locked, err, ret) {
if (occupationTimer) {
clearTimeout(occupationTimer);
occupationTimer = null;
}
if (executionTimer) {
clearTimeout(executionTimer);
executionTimer = null;
}
if (locked) {
if (!!self.queues[key] && self.queues[key].length === 0) {
delete self.queues[key];
}
if (self.domainReentrant) {
delete self.domains[key];
}
}
if (!resolved) {
if (!deferred) {
if (typeof (cb) === 'function') {
cb(err, ret);
}
}
else {
//promise mode
if (err) {
deferredReject(err);
}
else {
deferredResolve(ret);
}
}
resolved = true;
}
if (locked) {
//run next func
if (!!self.queues[key] && self.queues[key].length > 0) {
self.queues[key].shift()();
}
}
};
var exec = function (locked) {
if (resolved) { // may due to timed out
return done(locked);
}
if (timer) {
clearTimeout(timer);
timer = null;
}
if (self.domainReentrant && locked) {
self.domains[key] = process.domain;
}
var maxExecutionTime = opts.maxExecutionTime || self.maxExecutionTime;
if (maxExecutionTime) {
executionTimer = setTimeout(function () {
if (!!self.queues[key]) {
done(locked, new Error('Maximum execution time is exceeded ' + key));
}
}, maxExecutionTime);
}
// Callback mode
if (fn.length === 1) {
var called = false;
try {
fn(function (err, ret) {
if (!called) {
called = true;
done(locked, err, ret);
}
});
} catch (err) {
// catching error thrown in user function fn
if (!called) {
called = true;
done(locked, err);
}
}
}
else {
// Promise mode
self._promiseTry(function () {
return fn();
})
.then(function(ret){
done(locked, undefined, ret);
}, function(error){
done(locked, error);
});
}
};
if (self.domainReentrant && !!process.domain) {
exec = process.domain.bind(exec);
}
if (!self.queues[key]) {
self.queues[key] = [];
exec(true);
}
else if (self.domainReentrant && !!process.domain && process.domain === self.domains[key]) {
// If code is in the same domain of current running task, run it directly
// Since lock is re-enterable
exec(false);
}
else if (self.queues[key].length >= self.maxPending) {
done(false, new Error('Too many pending tasks in queue ' + key));
}
else {
var taskFn = function () {
exec(true);
};
if (opts.skipQueue) {
self.queues[key].unshift(taskFn);
} else {
self.queues[key].push(taskFn);
}
var timeout = opts.timeout || self.timeout;
if (timeout) {
timer = setTimeout(function () {
timer = null;
done(false, new Error('async-lock timed out in queue ' + key));
}, timeout);
}
}
var maxOccupationTime = opts.maxOccupationTime || self.maxOccupationTime;
if (maxOccupationTime) {
occupationTimer = setTimeout(function () {
if (!!self.queues[key]) {
done(false, new Error('Maximum occupation time is exceeded in queue ' + key));
}
}, maxOccupationTime);
}
if (deferred) {
return deferred;
}
};
/*
* Below is how this function works:
*
* Equivalent code:
* self.acquire(key1, function(cb){
* self.acquire(key2, function(cb){
* self.acquire(key3, fn, cb);
* }, cb);
* }, cb);
*
* Equivalent code:
* var fn3 = getFn(key3, fn);
* var fn2 = getFn(key2, fn3);
* var fn1 = getFn(key1, fn2);
* fn1(cb);
*/
AsyncLock.prototype._acquireBatch = function (keys, fn, cb, opts) {
if (typeof (cb) !== 'function') {
opts = cb;
cb = null;
}
var self = this;
var getFn = function (key, fn) {
return function (cb) {
self.acquire(key, fn, cb, opts);
};
};
var fnx = keys.reduceRight(function (prev, key) {
return getFn(key, prev);
}, fn);
if (typeof (cb) === 'function') {
fnx(cb);
}
else {
return new this.Promise(function (resolve, reject) {
// check for promise mode in case keys is empty array
if (fnx.length === 1) {
fnx(function (err, ret) {
if (err) {
reject(err);
}
else {
resolve(ret);
}
});
} else {
resolve(fnx());
}
});
}
};
/*
* Whether there is any running or pending asyncFunc
*
* @param {String} key
*/
AsyncLock.prototype.isBusy = function (key) {
if (!key) {
return Object.keys(this.queues).length > 0;
}
else {
return !!this.queues[key];
}
};
/**
* Promise.try() implementation to become independent of Q-specific methods
*/
AsyncLock.prototype._promiseTry = function(fn) {
try {
return this.Promise.resolve(fn());
} catch (e) {
return this.Promise.reject(e);
}
};
module.exports = AsyncLock;