Limit asynchronous queries in javascript – job queue with configurable thread limit

I had a scenario where multiple queries were required to fetch data, first in nodejs fetching from a web service, and later querying SharePoint lists exceeding 100,000 items for display in a data table.

I created the QueueRunner javascript class to be able to queue up a large amount of work  (such as 20 large queries against SharePoint) – with the ability to specify how many threads can happen in parallell (how many simultenous queries at a time are allowed). See below for the QueueRunner class code:

  • QueueRunner prototype 1 using native promise object
  • QueueRunner prototype 2 using jQuery promise object

QueueRunner prototype (using native promise)

/*
	Job Queue Runner: Add functions that return a promise, set the number of allowed simultaneous threads, and then run
*/
var QueueRunner = (function () {
    function QueueRunner(id) {
        this.maxThreads = 1; // Number of allowed simultaneous threads
        this.jobQueue = [];
        this.threadCount = 0;
        this.jobQueueConsumer = null;
        this.jobsStarted = 0;
        if (typeof (id) !== 'undefined') {
            this.id = id;
        }
        else {
            this.id = 'QueueRunner';
        }
    }
    QueueRunner.prototype.run = function () {
        var instance = this;
        return new Promise(function (resolve, reject) {
            instance.jobQueueConsumer = setInterval(function () {
                if (instance.threadCount < instance.maxThreads && instance.jobQueue.length > 0) {
                    instance.threadCount++;
                    instance.jobsStarted++;
                    // Remove the next job from the queue (index zero) and run it
                    var job = instance.jobQueue.splice(0, 1)[0];
                    //console.log(instance.id + ': Start job ' + instance.jobsStarted + ' of ' + (instance.jobQueue.length + instance.jobsStarted));
                    job().then(function () {
                        instance.threadCount--;
                    }, function () {
                        instance.threadCount--;
                    });
                }
                if (instance.threadCount < 1 && instance.jobQueue.length < 1) {
                    clearInterval(instance.jobQueueConsumer);
                    logMessage(instance.id + ': All jobs done.');
                    resolve();
                }
            }, 20);
        });
    };
    QueueRunner.prototype.addJob = function (func) {
        this.jobQueue.push(func);
    };
    return QueueRunner;
}());

QueueRunner prototype (using jQuery promise)

/*
	Job Queue Runner: Add functions that return a promise, set the number of allowed simultaneous threads, and then run
*/
var QueueRunner = (function () {
    function QueueRunner(id) {
        this.maxThreads = 1;
        this.jobQueue = [];
        this.threadCount = 0;
        this.jobQueueConsumer = null;
        this.jobsStarted = 0;
        if (typeof (id) !== 'undefined') {
            this.id = id;
        }
        else {
            this.id = 'QueueRunner';
        }
    }
    QueueRunner.prototype.run = function () {
        var instance = this;
        var deferred = $.Deferred();
        instance.jobQueueConsumer = setInterval(function () {
            if (instance.threadCount < instance.maxThreads && instance.jobQueue.length > 0) {
                instance.threadCount++;
                instance.jobsStarted++;
                // Remove the next job from the queue (index zero) and run it
                var job = instance.jobQueue.splice(0, 1)[0];
                console.log(instance.id + ': Start job ' + instance.jobsStarted + ' of ' + (instance.jobQueue.length + instance.jobsStarted));
                job().then(function () {
                    instance.threadCount--;
                }, function () {
                    instance.threadCount--;
                });
            }
            if (instance.threadCount < 1 && instance.jobQueue.length < 1) {
                clearInterval(instance.jobQueueConsumer);
                console.log(instance.id + ': All jobs done.');
                deferred.resolve();
            }
        }, 20);
        return deferred.promise();
    };
    QueueRunner.prototype.addJob = function (func) {
        this.jobQueue.push(func);
    };
    return QueueRunner;
}());

Usage example:

var queue = new QueueRunner('LoadExtraProps'); // Create queue with id / name
queue.maxThreads = 6; // Number of simultaneous threads / queries allowed at a time
// Add work to the queue e.g. for each SPO list item, fetch version history.
items.forEach(function (item) {
	queue.addJob(function () {
		// Job function - must return a promise that resolves when the job is done
		return new Promise(function (resolve, reject) {
			// Perform async work / query e.g. fetch version history ... resolve when complete.
			// ...
			resolve();
		});
	});
});
queue.run().then(function () {	
	// Continue after the queue is completed...
});

Leave a comment