Skip to content

并发控制器

视频讲解

1. 问题场景

假设我们要上传一个 20GB 的大文件,将其分成 100 个 10MB 的分片。如果不使用任务调度器,同时发起 100 个上传请求会导致以下问题:

  1. 浏览器的并发请求数量被占满 - 浏览器对同一域名的并发请求有限制(通常为 6-8 个)
  2. 其他正常业务请求可能无法及时发送 - 所有连接被文件上传占用
  3. 服务器压力过大 - 同时处理大量请求可能导致服务器响应缓慢或崩溃
  4. 网络带宽被占满 - 大量并发上传会消耗所有可用带宽

2. 解决方案

我们可以尝试着将所有的任务,都放到一个队列里面去,这样我们每次最多发送两个请求,如果有一个请求发送完成,那么我们再从这个队列里面取出一个发送请求,我们只需要保证同时发送的请求不超过两个,这样就可以保证并发数量不被占满,也不会占用过多的带宽。

3. 实现原理

并发控制器的核心思想是:

  • 维护一个任务队列
  • 控制同时执行的任务数量
  • 当有任务完成时,自动从队列中取出新任务执行

4. 代码实现

javascript
/**
 * 任务调度器类
 * 用于控制并发任务的执行数量
 */
class TaskScheduler {
    tasks = [] // 待执行任务队列
    runningCount = 0 // 当前正在运行的任务数量
    maxRunning = Infinity // 最大并发任务数量

    /**
     * 构造函数
     * @param {number} maxRunning 最大并发数,默认为无限制
     */
    constructor(maxRunning = Infinity) {
    // 如果传入0,则设为无限制
        this.maxRunning = maxRunning === 0 ? Infinity : maxRunning
    }

    /**
     * 执行任务的核心方法
     * 检查是否可以执行新任务,如果可以则从队列中取出并执行
     */
    run() {
    // 如果当前运行的任务数已达到最大值,则不执行新任务
        if (this.runningCount >= this.maxRunning) {
            return
        }
        // 如果任务队列为空,则无任务可执行
        if (this.tasks.length === 0) {
            return
        }

        // 增加运行中任务计数
        this.runningCount++
        // 从队列头部取出一个任务
        const task = this.tasks.shift()

        // 执行任务,无论成功或失败都会在完成后递归调用run方法
        task().finally(() => {
            this.runningCount-- // 任务完成,减少运行中任务计数
            this.run() // 尝试执行下一个任务
        })
    }

    /**
     * 添加任务到调度器
     * @param {Function} task 要执行的任务函数,应返回Promise
     * @returns {Promise} 返回Promise,在任务完成时resolve
     */
    addTask(task) {
        return new Promise((resolve, reject) => {
            // 将任务包装后添加到队列中
            this.tasks.push(() => task().then(resolve).catch(reject))
            // this.tasks.push(() => task().then(resolve, reject)); // 另一种写法

            // 尝试立即执行任务
            this.run()
        })
    }
}

5. 使用示例

基础使用

javascript
// 创建一个最大并发数为2的调度器实例
const scheduler = new TaskScheduler(2)

// 辅助函数:创建延时Promise
const sleep = time => new Promise(resolve => setTimeout(resolve, time))

// 开始计时
console.time('task1')
console.time('task2')
console.time('task3')
console.time('task4')
console.time('task5')

// 添加5个任务到调度器,每个任务耗时1秒
// 由于最大并发数为2,所以会分批执行:前2个并行,后3个排队
scheduler.addTask(() => sleep(1000).then(() => console.timeEnd('task1')))
scheduler.addTask(() => sleep(1000).then(() => console.timeEnd('task2')))
scheduler.addTask(() => sleep(1000).then(() => console.timeEnd('task3')))
scheduler.addTask(() => sleep(1000).then(() => console.timeEnd('task4')))
scheduler.addTask(() => sleep(1000).then(() => console.timeEnd('task5')))

文件上传场景

javascript
// 创建文件上传调度器,最大并发数为3
const uploadScheduler = new TaskScheduler(3)

// 模拟文件分片上传
function uploadChunk(chunkData, chunkIndex) {
    return fetch('/upload', {
        method: 'POST',
        body: chunkData,
        headers: {
            'Content-Type': 'application/octet-stream',
            'X-Chunk-Index': chunkIndex,
        },
    })
}

// 上传100个分片,但同时只有3个在上传
const chunks = Array.from({ length: 100 }, (_, i) => i)
const uploadPromises = chunks.map((chunkIndex) => {
    return uploadScheduler.addTask(() => {
        console.log(`开始上传分片 ${chunkIndex}`)
        return uploadChunk(getChunkData(chunkIndex), chunkIndex).then(() => console.log(`分片 ${chunkIndex} 上传完成`))
    })
})

// 等待所有分片上传完成
Promise.all(uploadPromises)
    .then(() => console.log('所有分片上传完成'))
    .catch(err => console.error('上传失败:', err))

6. 执行效果

使用并发控制器后:

  • 控制并发数量:同时最多只有指定数量的请求在执行
  • 自动队列管理:任务完成后自动执行下一个排队的任务
  • 资源合理利用:避免浏览器连接数限制和服务器压力过大
  • 带宽优化:合理分配网络带宽,不影响其他业务请求

通过这种方式,我们可以有效地控制大文件上传的并发数量,既保证了上传效率,又避免了资源过度占用的问题。

7. 拓展版本

typescript
/**
 * 任务调度器 - 支持并发控制的异步任务队列
 */
class TaskScheduler {
  private tasks: Array<() => Promise<any>> = [];
  private runningCount: number = 0;
  private maxRunning: number;
  private isDestroyed: boolean = false;

  /**
   * 创建任务调度器实例
   * @param maxRunning 最大并发数,默认为无限制
   */
  constructor(maxRunning: number = Infinity) {
    if (typeof maxRunning !== "number" || maxRunning < 0) {
      throw new Error("maxRunning must be a non-negative number");
    }
    this.maxRunning = maxRunning === 0 ? Infinity : maxRunning;
  }

  /**
   * 执行任务队列中的任务
   */
  private run(): void {
    // 边界判断:调度器已销毁
    if (this.isDestroyed) {
      return;
    }

    // 边界判断:已达到最大并发数
    if (this.runningCount >= this.maxRunning) {
      return;
    }

    // 边界判断:任务队列为空
    if (this.tasks.length === 0) {
      return;
    }

    this.runningCount++;
    const task = this.tasks.shift();

    // 安全检查:确保任务存在
    if (!task) {
      this.runningCount--;
      return;
    }

    // 执行任务并处理完成后的清理
    task()
      .finally(() => {
        this.runningCount--;
        // 递归执行下一个任务
        this.run();
      })
      .catch(error => {
        // 记录未处理的错误(可选:添加日志系统)
        console.warn("Unhandled task error:", error);
      });
  }

  /**
   * 添加任务到队列
   * @param task 返回Promise的异步任务函数
   * @returns Promise,解析为任务执行结果
   */
  addTask<T>(task: () => Promise<T>): Promise<T> {
    // 边界判断:调度器已销毁
    if (this.isDestroyed) {
      return Promise.reject(new Error("TaskScheduler has been destroyed"));
    }

    // 边界判断:任务参数验证
    if (typeof task !== "function") {
      return Promise.reject(new Error("Task must be a function"));
    }

    return new Promise<T>((resolve, reject) => {
      // 包装任务以处理结果传递
      const wrappedTask = async (): Promise<void> => {
        try {
          const result = await task();
          resolve(result);
        } catch (error) {
          reject(error);
        }
      };

      this.tasks.push(wrappedTask);
      this.run();
    });
  }

  /**
   * 获取当前状态信息
   */
  getStatus(): {
    pendingTasks: number;
    runningTasks: number;
    maxRunning: number;
    isDestroyed: boolean;
  } {
    return {
      pendingTasks: this.tasks.length,
      runningTasks: this.runningCount,
      maxRunning: this.maxRunning,
      isDestroyed: this.isDestroyed,
    };
  }

  /**
   * 更新最大并发数
   * @param maxRunning 新的最大并发数
   */
  setMaxRunning(maxRunning: number): void {
    if (typeof maxRunning !== "number" || maxRunning < 0) {
      throw new Error("maxRunning must be a non-negative number");
    }

    this.maxRunning = maxRunning === 0 ? Infinity : maxRunning;

    // 如果新的并发数更大,尝试执行更多任务
    if (maxRunning > this.runningCount) {
      this.run();
    }
  }

  /**
   * 清空待执行的任务队列
   * @param rejectPending 是否拒绝待执行的任务,默认为true
   */
  clearPendingTasks(rejectPending: boolean = true): void {
    if (rejectPending) {
      // 这里需要更复杂的实现来拒绝待执行的Promise
      // 简化版本:直接清空队列
      this.tasks.length = 0;
    } else {
      this.tasks.length = 0;
    }
  }

  /**
   * 等待所有任务完成
   * @param timeout 超时时间(毫秒),可选
   */
  async waitForCompletion(timeout?: number): Promise<void> {
    return new Promise((resolve, reject) => {
      let timeoutId: ReturnType<typeof setTimeout> | undefined;

      if (timeout) {
        timeoutId = setTimeout(() => {
          reject(new Error(`Timeout after ${timeout}ms`));
        }, timeout);
      }

      const checkCompletion = () => {
        if (this.tasks.length === 0 && this.runningCount === 0) {
          if (timeoutId) clearTimeout(timeoutId);
          resolve();
        } else {
          setTimeout(checkCompletion, 10);
        }
      };

      checkCompletion();
    });
  }

  /**
   * 销毁调度器,清理资源
   */
  destroy(): void {
    this.isDestroyed = true;
    this.clearPendingTasks(true);
  }
}

8. 拓展版本使用示例

typescript
// 辅助函数:创建延迟Promise
function delay(ms: number): Promise<void> {
  return new Promise(resolve => setTimeout(resolve, ms));
}

// 辅助函数:创建可能失败的任务
function createTask(value: any, shouldFail: boolean = false, delayMs: number = 100): () => Promise<any> {
  return async () => {
    await delay(delayMs);
    if (shouldFail) {
      throw new Error(`Task failed with value: ${value}`);
    }
    return value;
  };
}

// 辅助函数:创建计数器任务
let taskCounter = 0;
function createCounterTask(delayMs: number = 50): () => Promise<number> {
  return async () => {
    await delay(delayMs);
    return ++taskCounter;
  };
}

// 重置计数器
function resetCounter() {
  taskCounter = 0;
}

/**
 * 基础功能测试
 */
async function testBasicFunctionality() {
  console.log("\n=== 基础功能测试 ===");

  // 测试1:基本任务执行
  console.log("测试1: 基本任务执行");
  const scheduler = new TaskScheduler();
  const result = await scheduler.addTask(createTask("hello"));
  console.log("结果:", result); // 应该输出: hello

  // 测试2:多个任务顺序执行
  console.log("\n测试2: 多个任务执行");
  resetCounter();
  const promises = [
    scheduler.addTask(createCounterTask()),
    scheduler.addTask(createCounterTask()),
    scheduler.addTask(createCounterTask()),
  ];
  const results = await Promise.all(promises);
  console.log("执行顺序:", results); // 应该是 [1, 2, 3] 或其他顺序

  scheduler.destroy();
}

/**
 * 并发控制测试
 */
async function testConcurrencyControl() {
  console.log("\n=== 并发控制测试 ===");

  // 测试3:限制并发数
  console.log("测试3: 限制并发数为2");
  const scheduler = new TaskScheduler(2);
  resetCounter();

  const startTime = Date.now();
  const promises = [
    scheduler.addTask(createCounterTask(200)), // 200ms
    scheduler.addTask(createCounterTask(200)), // 200ms
    scheduler.addTask(createCounterTask(200)), // 200ms
    scheduler.addTask(createCounterTask(200)), // 200ms
  ];

  const results = await Promise.all(promises);
  const endTime = Date.now();
  const duration = endTime - startTime;

  console.log("执行结果:", results);
  console.log("执行时间:", duration, "ms");
  console.log("预期时间: ~400ms (2批次并发)");

  // 测试4:动态调整并发数
  console.log("\n测试4: 动态调整并发数");
  scheduler.setMaxRunning(1);
  console.log("调整后状态:", scheduler.getStatus());

  scheduler.destroy();
}

/**
 * 主测试函数
 */
async function runAllTests() {
  console.log("开始 TaskSchedulerClass 全面测试...");

  try {
    await testBasicFunctionality();
    await testConcurrencyControl();
    // 添加更多测试用例 太长写不下了,估计你们也不看。。。
    console.log("\n🎉 所有测试完成!");
  } catch (error) {
    console.error("❌ 测试过程中出现错误:", error);
  }
}
runAllTests()

Released under the MIT License.