「マルチスレッドプログラミングは難しい」のまとめ

http://tabesugi.net/video/multithread.html」の動画を見るのが面倒くさい人の為にコードと問題点をまとめてみました。コードは最終系のソースコードのProducer()とConsumer()を置き換えることで実行できるようになっています。Producer()のコードが省略されている場合は前ステップのコードを使用してください。コードの詳しい説明は動画本体の説明を見てください。

キューのみ (4分頃〜)

private static void Producer()
{
    Console.WriteLine("producer");
    for (int i = 0; i < N; i++) {
        int x = i;
        queue.Enqueue(x);
        Console.WriteLine("sent:" + x);
    }
}

private static void Consumer()
{
    for (int i =0; i < N; i++) {
        int x = queue.Dequeue();
        Console.WriteLine("received:"+x)
    }
}

Queue<T>がキューが空の場合にDequeue()すると例外を投げる仕様のため、Producerがキューに入れる前にConsumerがキューから取り出そうとする状況が発生すると異常終了します。では、もし例外を投げる仕様ではなくnullを返す仕様だったりしたら大丈夫かというと、Queueは変更に対してはスレッドセーフではないので、Enqueue()とDequeue()が同時に動いた場合にうまく動かない可能性があります。

ということでこの方式で改善すべき点は以下の2点です。

  • Enqueue()とDequeue()が同時に動かないようにする
  • キューが空の場合にComsumerの処理を待ってもらうようにする

前者を解決する方法を排他制御といい、複数のスレッドがひとつのデータを操作する際に、同時にひとつのスレッドしか操作していないことを保証する為の制御です。後者はスレッドの待ち合わせで、ある状態になるまで待機したり、スレッドがある状態を変更したときに、それに対して待機しているスレッドを起こしたりする処理です。

話は後者の方を中心に進んで行きます。

キュー + spin lock (10分頃〜)

private static void Producer()
{
    Console.WriteLine("producer");
    for (int i = 0; i < N; i++) {
        int x = i;
        queue.Enqueue(x);
        Console.WriteLine("sent:" + x);
        Thread.Sleep(100);        // 処理を遅くする為のウェイト
    }
}

private static void Consumer()
{
    for (int i =0; i < N; i++) {
        while (queue.Count == 0) {
            Console.WriteLine("empty queue.");    // Producerが遅いとこれがモリモリ出る
        }
        int x = queue.Dequeue();
        Console.WriteLine("received:"+x)
    }
}

スピンロックというかビジーウェイトの欠点は待つ為にプロセッサが働いているという点ですが、動画中ではConsumer()の待ちループで何もしていないので無駄具合が分かりません。上記コードのようにキューが空の場合にメッセージを出力するようにすると無駄具合が良く分かります。

無駄を無くす為にはどうすれば良いかというと、キューが空の場合はConsumerの実行を止めてもらって、キューが空でなくなった場合に再び動き出すようにします。

ちなみにここでキューの排他制御の話が出てきますが、ややこしくなるということで以降の説明からは省かれます。

キュー + ウェイト×1 (17分頃〜)

private static void Producer()
{
    Console.WriteLine("producer");
    for (int i = 0; i < N; i++) {
        int x = i;
        queue.Enqueue(x);
        trigger.Set();
        Console.WriteLine("sent:" + x);
    }
}

private static void Consumer()
{
    for (int i =0; i < N; i++) {
        Console.WriteLine("empty queue.");
        triiger.WaitOne();
        int x = queue.Dequeue();
        Console.WriteLine("received:"+x)
    }
}

.NET Frameworkの待ち合わせ処理用のクラスであるAutoResetEventを使って、Produserがキューに入れるまでConsumerが待つようにしています。

AutoResetEventは、WaitOne()で待機しいるスレッドがある時にSet()を呼ぶと、待機しているスレッドのうちひとつが動き出したら自動的にReset()されるというものです。スレッドが動き出す前にもう一度Set()したり、待機しているスレッドが無い場合はSet()を呼んでも何も起りません。ConsumerがWaitOne()で待機している間または解除されて動いている隙にProducerがSet()を呼んで沈黙すると、キューに何かが入っているにも関わらずConsumerは止まったままになってしまうという訳です。この現象はProducerが最後の1個をEnqueue()した後に起ります。

このコードの問題は、Consumerのループの先頭では必ずキューが空になっているという前提のコードなのに、実際は空では無い場合があるというところにあります。したがって、必ず空になるようにするか、空でなくても大丈夫なようにする必要があります。

キュー + ウェイト + 条件判定 (26分頃〜)

for (int i =0; i < N; i++) {
    if (queue.Count == 0) {
        Console.WriteLine("empty queue.");    
        triiger.WaitOne();
    }
    int x = queue.Dequeue();
    Console.WriteLine("received:"+x)
    Thread.Sleep(1000);
}

後者の解決方法を取って、キューが空では無い場合はWaitOne()で待たないようにしたのがこのコードです。

このコードではif文を通った後は必ずキューが空では無いという前提がありますが、実はその前提は成り立ちません。マルチスレッドプログラミングでは、排他制御していない部分には必ず他のスレッドに割り込まれるという性質があります。もしConsumerが複数いた場合、WaitOne()からDequeue()のまで間に他のスレッドのConsumerが先にDequeue()してしまうかもしれません。されてしまったらDequeue()で例外が出て死ぬというわけです。

この問題を解決するにはキューの中味チェックからDequeue()までの間に他のConsumerに割り込まれないように排他制御する必要があります。

ちなみにこのコードにはもうひとつ問題があって、if文で分岐してWaitOne()するまでの間にProducerがSet()した場合に、キューが空では無いにも関わらずConsumerが止まったままになります。

キュー + ウェイト×2 (ぎっこんばったん方式) (34分頃〜)

private const int N = 10;
private static Queue<int> queue;    // shared
private static AutoResetEvent trigger1;    // shared
private static AutoResetEvent trigger2;    // shared

private static void Main(string[] args)
{
    queue = new Queue<int>();
    trigger1 = new AutoResetEvent(false);
    trigger2 = new AutoResetEvent(true);
    new Thread(new ThreadStart(Producer)).Start();
    new Thread(new ThreadStart(Producer)).Start();
    new Thread(new ThreadStart(Consumer)).Start();
    new Thread(new ThreadStart(Consumer)).Start();
}

for (int i =0; i < N; i++) {
    int x = i;
    trigger2.WaitOne();
    queue.Enqueue(x);
    trigger1.Set();
    Console.WriteLine("sent:"+x)
}

for (int i =0; i < N; i++) {
    trigger1.WaitOne();
    int x = queue.Dequeue();
    trigger2.Set();
    Console.WriteLine("received:"+x)
}

動画内ではキューでは無いと説明していますが、もはやマルチスレッドでも無くなっています。ProducerはConsumerを起こしてから止まり、ConsumerはProducerを起こしてから止まるので同時には動かなくなっています。そして前述の通りAutoResetEventはSet()でひとつのスレッドしか動き出さない為、結果的にConsumerが同時に1つしか動きません。同時にひとつしか動かないのであれば排他制御は要らないというわけです。

キュー + Semaphore

※コードは最終形と同じなので省略


動画中で概ねうまく行っていると評価されている「キュー + ウェイト + 条件判定」には以下の2つの問題があります。

  • Consumerのキューの空チェックからWaitOne()までの隙にProducerがSet()するとConsumerが止まったままになる
  • ConsumerのWaitOne()〜Dequeue()までの間に他のスレッドでDequeue()されると例外が投げられる

セマフォを使えばこの問題を解決できるとして最終系のコードが実装されますが、それはセマフォと言うよりスレッドのスケジューラに近いものになっています。1回のEnqueue()でひとつのConsumerが動くようにスケジューリングすれば、止まったままのConsumerも無くなり、Consumer同士の排他制御も要らなくなるという訳です。ちなみに、肝心のスケジューリングが排他制御されていないため、複数のProducerが同時にMySemaphore#Release()すると誤動作します。