如何实现一个 Node 上的并发调度类

You,3 min read

如何实现一个 Node 上的并发调度类

/**
 * 表示一个并发调度器,用于并发执行多个任务并自带重试功能 
 * 核心:批量请求中保留尽可能地 fulfilled 接收一定 rejected 状态
 * @class
 * @constructor
 * @param {Object} options - 并发调度器的选项。
 * @param {number} options.maxRetries - 失败任务的最大重试次数。默认为3。
 * @example
 * const scheduler = new ConcurrentScheduler({ maxRetries: 5 });
 * const tasks = [task1, task2, task3];
 * const initialState = {};
 * const resultProcessor = (state, result) => { // process result };
 * const finalState = await scheduler.execute(tasks, { initialState, resultProcessor });
 */
class ConcurrentScheduler {
  /**
   * 创建一个新的 ConcurrentScheduler 实例。
   * @param {Object} options - ConcurrentScheduler 的选项。
   * @param {number} [options.maxRetries=3] - 每个任务的最大重试次数。
   * @constructor
   */
  constructor(options) {
    const { maxRetries = 3 } = options ?? {};
    this.maxRetries = maxRetries
  }
 
  /**
   * 执行并发任务
   * @param {Array<Function>} tasks - 任务列表,每个任务是一个函数
   * @param {Object} options - 选项参数
   * @param {*} options.initialState - 初始状态
   * @param {Function} options.resultProcessor - 结果处理函数
   * @returns {*} 执行完任务后的最终状态
   * @example
   * const tasks = [task1, task2, task3];
   * const initialState = {};
   * const resultProcessor = (state, result) => { // process result };
   * const finalState = await scheduler.execute(tasks, { initialState, resultProcessor });
   */
  async execute(tasks, {
    initialState,
    resultProcessor
  }) {
    // 逆向思维,先假设所有任务都失败了
    let failedTasks = tasks.slice();
    
    let state = initialState;
 
    for (let attempt = 0; attempt < this.maxRetries; attempt++) {
      // 没有失败的任务了,直接退出
      if (failedTasks.length === 0) break; 
 
      try {
        const results = await Promise.allSettled(failedTasks.map(task => task()));
        failedTasks = [];
 
        results.forEach((result, index) => {
          if (result.status === 'fulfilled') {
            resultProcessor(state, result.value);
          } else {
            failedTasks.push(tasks[index]);
          }
        });
      } catch (executionError) {
        console.error('An error occurred during task execution:', executionError);
      }
    }
 
    return state;
  }
}
 
// 使用示例
const asyncTask = (taskName) => {
  return () => new Promise((resolve, reject) => {
    setTimeout(() => {
      Math.random() > 0.5 ? resolve(`Success ${taskName}`) : reject(`Fail ${taskName}`)
    }, 300)
  })
}
 
const taskQueue = [
  asyncTask('task1'),
  asyncTask('task2'),
  asyncTask('task3'),
];
 
const cs = new ConcurrentScheduler();
 
(async () => {
  console.time('execute');
  const result = await cs.execute(taskQueue, {
    initialState: [],
    resultProcessor: (state, value) => state.push(value)
  });
  console.timeEnd('execute');
  console.log(result); // 输出累积的成功结果
})();
 
module.exports = ConcurrentScheduler;
 
2026 © Lizhenyui.