JavaScript で複数のスレッドを実行して全ての処理が終わるまで待つ

最終更新日:





ループ内でスレッドを実行して全ての非同期処理が終わるまで待つクラスを書きたい。





Threadsクラス

class Threads {
  // Promise.then とか Promise.catch に
  // undefined を渡してもエラーにならないようだけど、
  // success と failure は一応、関数として定義しておく。
  #func = {
    success() {},
    failure() {},
  };
  // プロミスのインスタンスを貯める配列
  #promises;
  // 別の関数のコールバックで実行すると
  // this の指し示す先が変更されるので
  // 静的変数にインスタンスを持っておく。
  // bind 使うよりスマート
  static #instance;
  constructor(success, failure) {
    if (typeof success == 'function') {
      this.#func.success = success;
    }
    if (typeof failure == 'function') {
      this.#func.failure = failure;
    }
    this.#promises = [];
    Threads.#instance = this;
  }

  async do(value) {
    const self = Threads.#instance;
    const promise = new Promise((resolve, reject) => {
      setTimeout(() => {
        try {
          resolve(value);
        } catch (error) {
          reject(error);
        }
      }, Math.random() * 1000); // ランダムで数秒待機させる。
    })
      .then(self.#func.success)
      .catch(self.#func.failure);

    self.#promises.push(promise);
  }

  async wait() {
    await Promise.all(this.#promises);
  }
}

async function main() {
  const a = [1, 2, 3, 4, 5];

  // スレッド内で処理を行う関数
  const success = (v) => {
    console.log(v);
  };

  // 何かエラーが発生したときの関数
  const failure = (e) => {
    console.log(e.message);
  };

  // Promise.then と Promise.catch に渡す関数を
  // それぞれ、success と failure として渡して new する
  const thread = new Threads(success, failure);

  // 配列をループしてスレッドを実行
  a.forEach(thread.do);

  console.log('Before thread.wait');

  // 全ての処理が終わるまで待つ
  await thread.wait();

  console.log('After thread.wait');
}
main();


実行結果

$ node index.js 
Before thread.wait
4
3
2
1
5
After thread.wait

Before thread.wait が先に出力されているので、非同期になっていることがわかる。 それから、非同期処理(数値の出力)の後に After thread.wait が出力されているので、 await thread.wait() で処理が終わるのを待っているのがわかる。



エラーハンドリング

コードを書き換えて success 関数内でエラーをスローする。

async function main() {
  const a = [1, 2, 3, 4, 5];

  // エラーをスローする
  const success = (v) => {
    throw new Error('error!');
  };

  // 何かエラーが発生したときの関数
  const failure = (e) => {
    console.log(e.message);
  };

  const thread = new Threads(success, failure);

  a.forEach(thread.do);

  console.log('Before thread.wait');

  await thread.wait();

  console.log('After thread.wait');
}
main();


実行結果

$ node index.js 
Before thread.wait
error!
error!
error!
error!
error!
After thread.wait

success 関数内でのエラーを failure 関数でキャッチできてる。



実行した非同期処理をキャンセルしたい

class Threads {
  #func = {
    success() {},
    failure() {},
  };
  #promises;
  #cancel;
  static #instance;
  static #ERROR_CANCEL = new Error('Promise was cancelled');
  constructor(success, failure) {
    this.#func.success = success;
    this.#func.failure = failure;
    this.#promises = [];
    Threads.#instance = this;
  }

  async do(value) {
    const self = Threads.#instance;
    // 他にこの関数を使って非同期処理を行ってる場合は停止させる
    self.abort();
    const promise = new Promise((resolve, reject) => {
      // キャンセルするための関数 reject に独自のエラーを渡す。
      self.#cancel = () => reject(Threads.#ERROR_CANCEL);
      setTimeout(() => {
        try {
          resolve(value);
        } catch (error) {
          reject(error);
        }
      }, 3000);
    })
      .then(self.#func.success)
      .catch((error) => {
        // ERROR_CANCEL だった場合は failure 関数を実行せずに終わる
        if (error == Threads.#ERROR_CANCEL) {
          console.error('Cancel error:', error.message);
          return;
        }
        self.#func.failure(error);
      });

    this.#promises.push(promise);
  }

  abort() {
    if (typeof this.#cancel == 'function') {
      this.#cancel();
      this.#cancel = undefined;
    }
  }

  async wait() {
    await Promise.all(this.#promises);
  }
}

async function main() {
  const success = (v) => {
    console.log(v);
  };

  const failure = (e) => {
    console.log(e.message);
  };

  const thread = new Threads(success, failure);

  thread.do('Hello World');

  // thread.do の実行は3秒後なので、その前に abort が呼ばれるようにする。
  setTimeout(() => thread.abort(), 2000);

  console.log('Before thread.wait');

  await thread.wait();

  console.log('After thread.wait');
}
main();


実行結果

$ node index.js 
Before thread.wait
Cancel error: Promise was cancelled
After thread.wait



コメント