C++11で始めるマルチスレッドプログラミング その2 std::mutex 編

はじめに

本記事は
C++11で始めるマルチスレッドプログラミングその1 ~std::thread事始め~ - すいバカ日誌
の続きとなる記事です(何年越しだよ).
今回は std:;mutex による基本的な排他制御について書きます.
本記事を書いた時点では C++17 がもう策定されていますが,タイトルはC++11のまま行きます.

排他制御について

異なるスレッドが同じリソースを共有するような場面は当然発生します.
しかし,異なるスレッドが共有リソースに対して同時にアクセス(すくなくとも1つは変更操作)をした場合,データ競合 (data races) が発生し,未定義動作となってしまうことがあります.ちなみに,data races は C++ の規格としてその定義が書かれているので参照してください.*1

データ競合が問題になる例として,双方向 linked-list (以下単に list と書く) を考えてみましょう.
list に対して erase 操作を行った場合,3つの要素(それ自身,直前,直後)のポインタをすげ替える必要があり,これらは命令として一度に実行することは出来ません.
したがって,それぞれの処理の間には,他のスレッドが入り込む余地があり,ここで問題が発生します.例えば直前の要素の next ポインタのみを挿げ替え終わっている時点で,他スレッドが直後の prev ポインタを参照する,ようなことは容易に想像できます.

このように,処理の途中段階では,データ構造の不変条件 (invariants) が崩れていることが多く,そのような状態でのデータ構造への操作は危険な操作となります.

したがって,共有リソースはデータ競合を防ぐための何らかの保護機能を用いなければなりません.
それには Lock-free なデータ構造や transactional なデータ構造を用いるという方法もありますが,今回扱うのは std::mutex (mutual exclusive) による排他制御です.
排他制御は,共有リソースに対しての同時(書き込み)アクセスが,ただ1つのスレッドしか許さないようにすることを言います.

std::mutex による排他制御

まずはサンプルコードから.

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <string>

std::mutex stdout_m; // std::cout の排他制御
template <typename T>
void threadsafe_print(T v) {
    std::lock_guard<std::mutex> lock(stdout_m);
    std::cout << v << std::endl;
}

class widget {
public:
    void heavy_process(int i) {
        std::lock_guard<std::mutex> lock(m);
        threadsafe_print("called heavy_process with: " + std::to_string(i));
        std::this_thread::sleep_for(std::chrono::seconds(1));
        v.push_back(i);
    }

    void print() {
        std::lock_guard<std::mutex> lock(m);
        threadsafe_print("called print");
        for(auto x : v) {
            threadsafe_print(x);
        }
    }

private:
    std::vector<int> v;
    std::mutex m;
};

int main() {
    widget w;
    std::thread t1([&] {
        for(int i = 1; i <= 10; ++i) {
            w.heavy_process(i);
        }
    });
    std::thread t2([&] {
        for(int i = 11; i <= 20; ++i) {
            w.heavy_process(i);
        }
    });

    t1.join();
    t2.join();

    w.print();
}

このコードをもとに説明していきます.

stdout_m, widget::m, std::lock_guard

これらが今回の主役の std::mutex です.mutex を用いた排他制御では,アクセスの試み -> mutex を lock -> データに対する処理 -> 終了 -> mutex の unlock という流れになります.
lock, unlock は mutex から直接呼ぶことも出来ますが,RAIIを利用して書くのが安全です.std::lock_guard がそれに該当します(コンストラクタで lock,デストラクタで unlockする).
stdout_m で std::cout を排他制御しないと,出力が混ざって見にくいのでそうしています.
widget::m については,各 widget オブジェクトごとに別となります.
mutex が lock されている間は,他のスレッドはその mutex を lock できません.
この場合,unlock されるまで待つことになります.これによって排他制御が実現します.

heavy_process, print

heavy 内部で std::vector にデータを追加します.これは他スレッドが同時に行うと問題なので,排他制御が必要です.
print 内部で v を参照していますが,ループの途中で heavy_process が呼ばれると v の状態が変わってしまうので排他制御が必要になります(途中でイテレータが無効になるかもしれない).

std::this_thread::sleep_for

指定した時間だけ今のスレッドを sleep します.テストに便利なので今後もよく使うと思います.


以上でサンプルコードの解説は終わります.
試しに lock_guard を消してみると,たまに異常終了するのが観察できるので,いろいろ試して遊んでください.

スレッドセーフ性*2とデータ構造のインターフェース

マルチスレッド下での共有されるデータ構造の設計をすることになったとしましょう.
このとき,そのデータ構造のインターフェースは,注意深く設計する必要があります.
その点について確認するため,簡単なスレッドセーフなキュー safe_queue を実装してみましょう.

std::queue

よくある queue のインターフェースの一部を抜粋・簡略化して示します.

template <typename T, typename Sequence = std::deque<T>>
class queue {
public:
    explicit queue();
    explicit queue(Sequence const& s);
    
    bool empty() const;
    size_t size() const;
    T& front();
    T const& front() const;
    void push(T const&);
    void push(T&&);
    void pop();
};

front と pop が完全に分割されています(pop で先頭要素を return しない)が,これは Exceptional C++ などでも触れられているとおり,例外安全性を保つために必要な分割です.

この設計を何も考えずそのまま safe_queue に流用して良いでしょうか?
答えは No となります.

なぜだめなのか?

例えば以下のプログラムを,スレッドAとBが safe_queue に対して処理を行うことを考えます.

safe_queue<int> que;
if(!que.empty()) {
    auto value = que.front();
    que.pop();
    // ...
}

このとき,たとえは以下のようなフローが考えられます(empty, front, pop はそれぞれ std::mutex により排他制御されていると仮定します).

(thread A) que.empty() の評価
(thread B) que.empty() の評価
(thread A) auto value = que.front();
(thread B) auto value = que.front();
(thread A) que.pop();
(thread B) que.pop(); // ???

thread B からの que.pop() は,予期した動作ではなさそうです.
というのも,thread A, B ともに見ている値は同じ que の先頭要素なのに対し,B が pop する値はまだ見ていない2番めの要素だからです.

このように,それぞれの関数内で std::mutex を lock したからといって,スレッドセーフなデータ構造が作れるという単純な話ではないのです.

解決策

std::mutex での排他制御を目指す場合,front と pop が分離されていると先に述べた問題が発生してしまうので,分離しないという選択を取ります.
ただし,例外安全性は保証したいので,多少コストを書けてどちらも実現する方法を考えます.例えば以下のようなものがあります.

  • 引数の参照に対して pop が先頭要素を返す.デメリットは,呼び出し側で格納用の変数を宣言する必要があること.格納用の変数が作れないこともある(初期化に何らかの有効かつ意味あるデータが必要・初期化コストが無視できない).
  • 戻り値に pop されたデータを指すポインタを返す.ポインタのコピーには例外が発生しないことを利用.デメリットは,内部で何らかの形でポインタの管理が挟まるので,ゼロコストとは言えないこと.

今回はこの案を採用し,また front() は提供しないことにします.

safe_queue の実装例

class empty_queue : std::exception {
public:
    const char* what() const throw() {
        return "Empty queue";
    }
};

template <typename T>
class safe_queue {
public:
    safe_queue() {}
    safe_queue(safe_queue const& other) {
        std::lock_guard<std::mutex> lock(other.m);
        que = other.que;
    }
    safe_queue& operator=(safe_queue const&) = delete;

    void push(T value) {
        std::lock_guard<std::mutex> lock(m);
        que.push(value);
    }

    void pop(T& res) {
        std::lock_guard<std::mutex> lock(m);
        if(que.empty()) throw empty_queue();
        res = que.front();
        que.pop();
    }
    std::shared_ptr<T> pop() {
        std::lock_guard<std::mutex> lock(m);
        if(que.empty()) throw empty_queue();
        auto const res = std::make_shared<T>(que.front());
        que.pop();
        return res;
    }

    bool empty() const {
        std::lock_guard<std::mutex> lock(m);
        return que.empty();
    }

    size_t size() const {
        std::lock_guard<std::mutex> lock(m);
        return que.size();
    }

private:
    std::queue<T> que;
    mutable std::mutex m;
};

int main() {
    safe_queue<int> sq;
    for(int i = 0; i < 1000; ++i) {
        sq.push(i);
    }

    std::thread t1([&] {
        while(!sq.empty()) {
            auto value = sq.pop();
            threadsafe_print("[thread A] poped: " + std::to_string(*value));
        }
    });
    std::thread t2([&] {
        while(!sq.empty()) {
            auto value = sq.pop();
            threadsafe_print("[thread B] poped: " + std::to_string(*value));
        }
    });

    t1.join();
    t2.join();
}

まとめ

今回は std::mutex による排他制御の基本だけを扱いました.
ミューテックス自体の実装の話は
C++ミューテックス・コレクション -みゅーこれ- 実装編 - yohhoyの日記(別館)
がわかりやすかったです.

もしかすると次回に続くかもしれません(本当か?).

*1:N4727 §6.8.2.1 Data races

*2:C++とスレッドセーフ性については yohhoy さんの記事スレッドセーフという幻想と現実 - yohhoyの日記(別館)が参考になります