并发控制器
1. 问题场景
假设我们要上传一个 20GB 的大文件,将其分成 100 个 10MB 的分片。如果不使用任务调度器,同时发起 100 个上传请求会导致以下问题:
- 浏览器的并发请求数量被占满 - 浏览器对同一域名的并发请求有限制(通常为 6-8 个)
- 其他正常业务请求可能无法及时发送 - 所有连接被文件上传占用
- 服务器压力过大 - 同时处理大量请求可能导致服务器响应缓慢或崩溃
- 网络带宽被占满 - 大量并发上传会消耗所有可用带宽
2. 解决方案
我们可以尝试着将所有的任务,都放到一个队列里面去,这样我们每次最多发送两个请求,如果有一个请求发送完成,那么我们再从这个队列里面取出一个发送请求,我们只需要保证同时发送的请求不超过两个,这样就可以保证并发数量不被占满,也不会占用过多的带宽。
3. 实现原理
并发控制器的核心思想是:
- 维护一个任务队列
- 控制同时执行的任务数量
- 当有任务完成时,自动从队列中取出新任务执行
4. 代码实现
javascript
/**
* 任务调度器类
* 用于控制并发任务的执行数量
*/
class TaskScheduler {
tasks = [] // 待执行任务队列
runningCount = 0 // 当前正在运行的任务数量
maxRunning = Infinity // 最大并发任务数量
/**
* 构造函数
* @param {number} maxRunning 最大并发数,默认为无限制
*/
constructor(maxRunning = Infinity) {
// 如果传入0,则设为无限制
this.maxRunning = maxRunning === 0 ? Infinity : maxRunning
}
/**
* 执行任务的核心方法
* 检查是否可以执行新任务,如果可以则从队列中取出并执行
*/
run() {
// 如果当前运行的任务数已达到最大值,则不执行新任务
if (this.runningCount >= this.maxRunning) {
return
}
// 如果任务队列为空,则无任务可执行
if (this.tasks.length === 0) {
return
}
// 增加运行中任务计数
this.runningCount++
// 从队列头部取出一个任务
const task = this.tasks.shift()
// 执行任务,无论成功或失败都会在完成后递归调用run方法
task().finally(() => {
this.runningCount-- // 任务完成,减少运行中任务计数
this.run() // 尝试执行下一个任务
})
}
/**
* 添加任务到调度器
* @param {Function} task 要执行的任务函数,应返回Promise
* @returns {Promise} 返回Promise,在任务完成时resolve
*/
addTask(task) {
return new Promise((resolve, reject) => {
// 将任务包装后添加到队列中
this.tasks.push(() => task().then(resolve).catch(reject))
// this.tasks.push(() => task().then(resolve, reject)); // 另一种写法
// 尝试立即执行任务
this.run()
})
}
}
5. 使用示例
基础使用
javascript
// 创建一个最大并发数为2的调度器实例
const scheduler = new TaskScheduler(2)
// 辅助函数:创建延时Promise
const sleep = time => new Promise(resolve => setTimeout(resolve, time))
// 开始计时
console.time('task1')
console.time('task2')
console.time('task3')
console.time('task4')
console.time('task5')
// 添加5个任务到调度器,每个任务耗时1秒
// 由于最大并发数为2,所以会分批执行:前2个并行,后3个排队
scheduler.addTask(() => sleep(1000).then(() => console.timeEnd('task1')))
scheduler.addTask(() => sleep(1000).then(() => console.timeEnd('task2')))
scheduler.addTask(() => sleep(1000).then(() => console.timeEnd('task3')))
scheduler.addTask(() => sleep(1000).then(() => console.timeEnd('task4')))
scheduler.addTask(() => sleep(1000).then(() => console.timeEnd('task5')))
文件上传场景
javascript
// 创建文件上传调度器,最大并发数为3
const uploadScheduler = new TaskScheduler(3)
// 模拟文件分片上传
function uploadChunk(chunkData, chunkIndex) {
return fetch('/upload', {
method: 'POST',
body: chunkData,
headers: {
'Content-Type': 'application/octet-stream',
'X-Chunk-Index': chunkIndex,
},
})
}
// 上传100个分片,但同时只有3个在上传
const chunks = Array.from({ length: 100 }, (_, i) => i)
const uploadPromises = chunks.map((chunkIndex) => {
return uploadScheduler.addTask(() => {
console.log(`开始上传分片 ${chunkIndex}`)
return uploadChunk(getChunkData(chunkIndex), chunkIndex).then(() => console.log(`分片 ${chunkIndex} 上传完成`))
})
})
// 等待所有分片上传完成
Promise.all(uploadPromises)
.then(() => console.log('所有分片上传完成'))
.catch(err => console.error('上传失败:', err))
6. 执行效果
使用并发控制器后:
- 控制并发数量:同时最多只有指定数量的请求在执行
- 自动队列管理:任务完成后自动执行下一个排队的任务
- 资源合理利用:避免浏览器连接数限制和服务器压力过大
- 带宽优化:合理分配网络带宽,不影响其他业务请求
通过这种方式,我们可以有效地控制大文件上传的并发数量,既保证了上传效率,又避免了资源过度占用的问题。
7. 拓展版本
typescript
/**
* 任务调度器 - 支持并发控制的异步任务队列
*/
class TaskScheduler {
private tasks: Array<() => Promise<any>> = [];
private runningCount: number = 0;
private maxRunning: number;
private isDestroyed: boolean = false;
/**
* 创建任务调度器实例
* @param maxRunning 最大并发数,默认为无限制
*/
constructor(maxRunning: number = Infinity) {
if (typeof maxRunning !== "number" || maxRunning < 0) {
throw new Error("maxRunning must be a non-negative number");
}
this.maxRunning = maxRunning === 0 ? Infinity : maxRunning;
}
/**
* 执行任务队列中的任务
*/
private run(): void {
// 边界判断:调度器已销毁
if (this.isDestroyed) {
return;
}
// 边界判断:已达到最大并发数
if (this.runningCount >= this.maxRunning) {
return;
}
// 边界判断:任务队列为空
if (this.tasks.length === 0) {
return;
}
this.runningCount++;
const task = this.tasks.shift();
// 安全检查:确保任务存在
if (!task) {
this.runningCount--;
return;
}
// 执行任务并处理完成后的清理
task()
.finally(() => {
this.runningCount--;
// 递归执行下一个任务
this.run();
})
.catch(error => {
// 记录未处理的错误(可选:添加日志系统)
console.warn("Unhandled task error:", error);
});
}
/**
* 添加任务到队列
* @param task 返回Promise的异步任务函数
* @returns Promise,解析为任务执行结果
*/
addTask<T>(task: () => Promise<T>): Promise<T> {
// 边界判断:调度器已销毁
if (this.isDestroyed) {
return Promise.reject(new Error("TaskScheduler has been destroyed"));
}
// 边界判断:任务参数验证
if (typeof task !== "function") {
return Promise.reject(new Error("Task must be a function"));
}
return new Promise<T>((resolve, reject) => {
// 包装任务以处理结果传递
const wrappedTask = async (): Promise<void> => {
try {
const result = await task();
resolve(result);
} catch (error) {
reject(error);
}
};
this.tasks.push(wrappedTask);
this.run();
});
}
/**
* 获取当前状态信息
*/
getStatus(): {
pendingTasks: number;
runningTasks: number;
maxRunning: number;
isDestroyed: boolean;
} {
return {
pendingTasks: this.tasks.length,
runningTasks: this.runningCount,
maxRunning: this.maxRunning,
isDestroyed: this.isDestroyed,
};
}
/**
* 更新最大并发数
* @param maxRunning 新的最大并发数
*/
setMaxRunning(maxRunning: number): void {
if (typeof maxRunning !== "number" || maxRunning < 0) {
throw new Error("maxRunning must be a non-negative number");
}
this.maxRunning = maxRunning === 0 ? Infinity : maxRunning;
// 如果新的并发数更大,尝试执行更多任务
if (maxRunning > this.runningCount) {
this.run();
}
}
/**
* 清空待执行的任务队列
* @param rejectPending 是否拒绝待执行的任务,默认为true
*/
clearPendingTasks(rejectPending: boolean = true): void {
if (rejectPending) {
// 这里需要更复杂的实现来拒绝待执行的Promise
// 简化版本:直接清空队列
this.tasks.length = 0;
} else {
this.tasks.length = 0;
}
}
/**
* 等待所有任务完成
* @param timeout 超时时间(毫秒),可选
*/
async waitForCompletion(timeout?: number): Promise<void> {
return new Promise((resolve, reject) => {
let timeoutId: ReturnType<typeof setTimeout> | undefined;
if (timeout) {
timeoutId = setTimeout(() => {
reject(new Error(`Timeout after ${timeout}ms`));
}, timeout);
}
const checkCompletion = () => {
if (this.tasks.length === 0 && this.runningCount === 0) {
if (timeoutId) clearTimeout(timeoutId);
resolve();
} else {
setTimeout(checkCompletion, 10);
}
};
checkCompletion();
});
}
/**
* 销毁调度器,清理资源
*/
destroy(): void {
this.isDestroyed = true;
this.clearPendingTasks(true);
}
}
8. 拓展版本使用示例
typescript
// 辅助函数:创建延迟Promise
function delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
// 辅助函数:创建可能失败的任务
function createTask(value: any, shouldFail: boolean = false, delayMs: number = 100): () => Promise<any> {
return async () => {
await delay(delayMs);
if (shouldFail) {
throw new Error(`Task failed with value: ${value}`);
}
return value;
};
}
// 辅助函数:创建计数器任务
let taskCounter = 0;
function createCounterTask(delayMs: number = 50): () => Promise<number> {
return async () => {
await delay(delayMs);
return ++taskCounter;
};
}
// 重置计数器
function resetCounter() {
taskCounter = 0;
}
/**
* 基础功能测试
*/
async function testBasicFunctionality() {
console.log("\n=== 基础功能测试 ===");
// 测试1:基本任务执行
console.log("测试1: 基本任务执行");
const scheduler = new TaskScheduler();
const result = await scheduler.addTask(createTask("hello"));
console.log("结果:", result); // 应该输出: hello
// 测试2:多个任务顺序执行
console.log("\n测试2: 多个任务执行");
resetCounter();
const promises = [
scheduler.addTask(createCounterTask()),
scheduler.addTask(createCounterTask()),
scheduler.addTask(createCounterTask()),
];
const results = await Promise.all(promises);
console.log("执行顺序:", results); // 应该是 [1, 2, 3] 或其他顺序
scheduler.destroy();
}
/**
* 并发控制测试
*/
async function testConcurrencyControl() {
console.log("\n=== 并发控制测试 ===");
// 测试3:限制并发数
console.log("测试3: 限制并发数为2");
const scheduler = new TaskScheduler(2);
resetCounter();
const startTime = Date.now();
const promises = [
scheduler.addTask(createCounterTask(200)), // 200ms
scheduler.addTask(createCounterTask(200)), // 200ms
scheduler.addTask(createCounterTask(200)), // 200ms
scheduler.addTask(createCounterTask(200)), // 200ms
];
const results = await Promise.all(promises);
const endTime = Date.now();
const duration = endTime - startTime;
console.log("执行结果:", results);
console.log("执行时间:", duration, "ms");
console.log("预期时间: ~400ms (2批次并发)");
// 测试4:动态调整并发数
console.log("\n测试4: 动态调整并发数");
scheduler.setMaxRunning(1);
console.log("调整后状态:", scheduler.getStatus());
scheduler.destroy();
}
/**
* 主测试函数
*/
async function runAllTests() {
console.log("开始 TaskSchedulerClass 全面测试...");
try {
await testBasicFunctionality();
await testConcurrencyControl();
// 添加更多测试用例 太长写不下了,估计你们也不看。。。
console.log("\n🎉 所有测试完成!");
} catch (error) {
console.error("❌ 测试过程中出现错误:", error);
}
}
runAllTests()