Rubyのマルチスレッド
プロセスについて学んだのでついでにスレッドも触ってみる。
環境
% ruby -v ruby 2.3.7p456 (2018-03-28 revision 63024) [x86_64-darwin18]
シングルスレッド
mainスレッド内で順番に実行する。
p Time.now # => 2019-06-30 12:35:46 +0900 sleep(3) p Time.now # => 2019-06-30 12:35:49 +0900
出力結果には3秒のズレが生じる。
Thread#new
スレッドが実行されるのは、Thread#new/fork/startで生成されたり、Thread#run, Thread#wakeupなどが呼ばれたとき。
th = Thread.new do p Time.now # => 2019-06-30 12:36:41 +0900 sleep(3) end p Time.now # => 2019-06-30 12:36:41 +0900
1つ目のTime.nowとsleepが子スレッドで、2つ目のTime.nowがmainスレッドで、並行的に実行されるので出力結果は同時刻となる。
Thread#join
スレッドは基本的に互いに干渉せず実行されるが、Thread#joinをつかうとレシーバのスレッドが終了するまで待つ。
th = Thread.new do p Time.now # => 2019-06-30 12:50:05 +0900 sleep(3) end th.join p Time.now # => 2019-06-30 12:50:08 +0900
子スレッドの終了を待ってから親スレッドのp Time.now
を実行するので、3秒のズレが生じる。
メモリの共有
マルチスレッドではメモリを共有する(<=>マルチプロセスでは原則としてメモリを共有しない)。
hoge = 3 th = Thread.new do hoge = 10 end th.join p hoge # => 10
Thread::Queue
積み上がったタスクを各スレッドがワーカーとして処理するために、Thread::Queueというクラスをつかえる。
q = Thread::Queue.new [*1..100].each {|n| q.push(n)} threads = [*1..5].map do |i| Thread.new do until q.empty? n = q.pop puts "square of #{n} is #{n**2} (thread #{i})\n" sleep(rand(0.01..0.1)) end end end threads.each(&:join)
結果
square of 1 is 1 (thread 3) square of 2 is 4 (thread 4) square of 3 is 9 (thread 2) square of 4 is 16 (thread 1) square of 5 is 25 (thread 5) square of 6 is 36 (thread 5) square of 7 is 49 (thread 3) square of 8 is 64 (thread 1) ...(以下略)
- 各スレッドが手が空き次第、値をキューから順次1つずつ取り出して処理している様子が確認できる。
- (どうせメモリを共有しているならArrayをつかってもいいのでは?と思ったけど、Arrayはスレッドセーフではないらしい。)
スレッド固有の変数
基本的にはメモリを共有するけど、「このスレッドだけでつかえる変数」がほしいときもある。
スレッド生成時に宣言する
hoge = 3 th = Thread.new do |hoge| hoge = 10 end th.join p hoge # => 3
Thread#newにブロック引数を渡すと、そのスレッド固有の変数になる。
Thread#[] で操作する
th = Thread.new do sleep(1) p Thread.current[:hoge] # => 10 end th[:hoge] = 10 th.join
スレッドセーフティ
- 先述のとおり、複数スレッド間ではメモリを共有するため、互いに望ましくない影響を及ぼし合う可能性がある。
- マルチスレッド処理において、各スレッドが互いに影響し合わないような性質をスレッドセーフという。
GVL
現在の実装では Ruby VM は Giant VM lock (GVL) を有しており、同時に実行されるネイティブスレッドは常にひとつです。 ただし、IO 関連のブロックする可能性があるシステムコールを行う場合には GVL を解放します。その場合にはスレッドは同時に実行され得ます。
https://docs.ruby-lang.org/ja/latest/doc/spec=2fthread.html
つまり、IO 関連のブロックする可能性があるシステムコールを...
- 呼ばない場合
- 呼ぶ場合
- GVLが解放される
- スレッドは複数同時に実行されるので、コードによりスレッドセーフティを担保する必要がある
- 並行性の恩恵を受けうる
- (上述のサンプルコードで並行性がみられたのも、
puts
とかsleep
とかのせいでGVLが解放されていたから)
- (上述のサンプルコードで並行性がみられたのも、
確認していく。
IO 関連のブロックする可能性があるシステムコールを呼ばない場合、コードによりスレッドセーフティを担保する必要はない
以下のコードは明示的にスレッドセーフティを担保されてはいないが、GVLによって結果的にスレッドセーフティが実現している。
n = nil threads = (1..5).map do |i| Thread.new do n = i 100_000.times {} # a time consuming line safe = n == i end end p threads.map(&:join).map(&:value) # => [true, true, true, true, true]
IO 関連のブロックする可能性があるシステムコールを呼ぶ場合、コードによりスレッドセーフティを担保する必要がある
以下はスレッドセーフではない。
n = nil threads = (1..5).map do |i| Thread.new do n = i puts '' safe = n == i end end p threads.map(&:join).map(&:value) # => [false, false, false, true, false]
ちなみにn = i
とn == i
の間に記述された処理にかかる時間は以下のとおり。
データレースが起こる時間的な間隙はtimes
の方がずっと長いことがわかる。
にも関わらずデータレースが発生しているのがputs
の方だけなのは「前者ではGVLがかかり/後者ではGVLが解放されたから」と考えられる。1
require 'benchmark' Benchmark.bm do |x| x.report(:times) { 100_000.times {} } x.report(:puts) { puts '' } end
user system total real times 0.010000 0.000000 0.010000 ( 0.002932) puts 0.000000 0.000000 0.000000 ( 0.000006)
Thread::Mutex
ではどうやってコード上でスレッドセーフティを担保すればよいのか。 そのためのツールとして、Thread::Mutexというクラスが提供されている。
- Mutexとはmutal exclusion(相互排他)
- あるスレッドXにおいてMutexオブジェクトがlockされている間は、他のスレッドYではMutexオブジェクトをlockすることができず、MutexオブジェクトがスレッドXでunlockされるまで、スレッドYの実行は停止される。
- これにより、Mutex#lockとMutex#unlockで囲まれた部分の処理は複数スレッドで並行して行われることがなくなり、スレッドセーフであることが担保される。
mutex = Thread::Mutex.new n = nil threads = (1..5).map do |i| Thread.new do mutex.lock n = i puts '' safe = n == i mutex.unlock safe end end p threads.map(&:join).map(&:value) # => [true, true, true, true, true]
-
ちなみに
10_000_000.times
とかにするとGVLが解放されないはずなのに、データレースが発生した。これはどうやらGVLとは別の、単一のスレッドに余りに長い時間がかかっているとタイマーでスレッドを切り替えるという仕組みによるものみたい。↩