Webエンジニアのブログ

「Go言語による並行処理」でGoの並行処理を学ぶ

最近Golangで楽しく開発していて、並行処理にも手を出してみたかったので「 O'Reilly Japan - Go言語による並行処理」を読んでみた。並行処理についての概念と、Goにおける並行処理の書き方について一通りここにまとめておく。

並行性と並列性の違い

  • Concurrent(並行)は「複数の動作が、論理的に、順不同もしくは同時に起こりうる」こと
  • Parallel(並列)は、「複数の動作が、物理的に、同時に起こること」

並列は並行の部分集合である。(並列⊂並行)

http://freak-da.hatenablog.com/entry/20100915/p1

マシンのCPUが1コアの場合、並列に動作するコードを実行したとしても、実際はCPUがコンテキストスイッチを行って並列であるかのように見せているだけである。

このことから、並列性はプログラムのラインタイムの性質であって、コードの性質ではない。私達はあくまで並列に走って欲しいと思う並行なコードを書いているだけである。

Goにおける並行処理の構成要素

ゴルーチン(goroutine)

ゴルーチンは、他のコードに対し並行に実行している関数のことである。すべてのGoのプログラムには最低1つのゴルーチン(メインゴルーチン)があり、これはプロセスが開始する際に自動的に生成され、起動される。

ゴルーチンは、goキーワードを関数呼び出しの前に置くことで簡単に起動できる。

func main() {
  go myFunction()
}

ゴルーチンはGo特有のものであり、コルーチン(coroutine)として知られる高水準の抽象化である。コルーチンは、ノンプリエンティブな並行処理のサブルーチンで、Goの関数やクロージャーに相当する。コルーチンには一時停止や再エントリーを許す複数のポイントがある。

ゴルーチンはGoのランタイムと密結合しており、一時停止や再エントリーのポイントを定義していない。Goのランタイムはゴルーチンの実行時の振る舞いを観察して、ゴルーチンがブロックしたら自動的に一時停止し、ブロックが開放されたら再開する。これによってある意味ゴルーチンをプリエンプティブにしているが、ゴルーチンがブロックしたときにしか割り込まない。

コルーチン、また結果としてゴルーチンは暗黙的には並行処理の構成要素であるが、並行性はゴルーチンの性質ではない。並行である場合には、何かが複数のコルーチンを同時管理して、それぞれに実行の機会を与える。GoはM:Nスケジューラーと呼ばれる実装でゴルーチンを管理する。これはM個のグリーンスレッドをN個のスレッドに対応させるものである。

Goはfork-joinモデルと呼ばれる並行処理のモデルに従っている。プログラムは子の処理を分岐(fork)して親と並行に実行させ、そして合流ポイントで並行処理が再び合流(join)するということである。

syncパッケージ

syncパッケージには低水準のメモリアクセス同期に便利な並行処理のプリミティブが入っている。

WaitGroup

WaitGroupはひとまとまりの並行処理があったとき、その結果を気にしない、もしくは他に結果を収集する手段がある場合に、並行処理の完了を待つ手段として非常に有効である。

この例では実行してから2秒後にAll goroutines complete.が出力される。

var wg sync.WaitGroup

wg.Add(1)
go func() {
  defer wg.Done()
  fmt.Println("1st goroutine sleeping...")
  time.Sleep(1 * time.Second)
}()

wg.Add(1)
go func() {
  defer wg.Done()
  fmt.Println("2nd goroutine sleeping...")
  time.Sleep(2 * time.Second)
}()

wg.Wait()
fmt.Println("All goroutines complete.")

MutexとRWMutex

Mutexはmutual exclusion(相互排他)の略で、共有のメモリなどのリソースに対する排他的アクセスを提供するものである。

この例では1つの変数に対していくつかの並行処理を行っている。incrementとdecrementの実行は順不同になる。

var count int
var lock sync.Mutex

increment := func() {
  lock.Lock()
  defer lock.Unlock()
  count++
  fmt.Printf("Incrementing: %d\n", count)
}

decrement := func() {
  lock.Lock()
  defer lock.Unlock()
  count--
  fmt.Printf("Decrementing: %d\n", count)
}

var arithmetic sync.WaitGroup
for index := 0; index < 5; index++ {
  arithmetic.Add(1)
  go func() {
    defer arithmetic.Done()
    increment()
  }()
}

for index := 0; index < 5; index++ {
  arithmetic.Add(1)
  go func() {
    defer arithmetic.Done()
    decrement()
  }()
}

arithmetic.Wait()

fmt.Println("Arithmetic complete.")

sync.RWMutexは概念的にはMutexと同じものであるが、書き込みのロックがなければ読み取りが可能になる、といった機能を提供する。論理的に意味がある場合はなるべくRWmutexを使用する。

Cond

Condは、ゴルーチンが待機したりイベントの発生を知らせるための合流ポイントである。Waitでブロックされたゴルーチンに、Signalメソッドで通知を送ることができる。Condの内部では、シグナルを待機しているゴルーチンをFIFO(先入れ先出し)のリストで管理しており、Signalはシグナルを一番長く待っているゴルーチンに通知する。

c := sync.NewCond(&sync.Mutex{})
queue := make([]interface{}, 0, 10)

removeFromQueue := func(delay time.Duration) {
  time.Sleep(delay)
  c.L.Lock()
  queue = queue[1:]
  fmt.Println("Removed from queue")
  c.L.Unlock()
  c.Signal()
}

for index := 0; index < 10; index++ {
  c.L.Lock()
  for len(queue) == 2 {
    c.Wait()
  }
  fmt.Println("Adding to queue")
  queue = append(queue, struct{}{})
  go removeFromQueue(1 * time.Second)
  c.L.Unlock()
}

Signalの他にBroadcastという通知方法がある。BroadcastはSignalとは違って、シグナルを待っている全てのゴルーチンにシグナルを伝える。Signalは後述するチャネルを使えば簡単に再現できるが、Broadcastの再現は難しい。加えてCondの方がチャネルより性能が高い。

type Button struct {
  Clicked *sync.Cond
}
button := Button{Clicked: sync.NewCond(&sync.Mutex{})}

subscribe := func(c *sync.Cond, fn func()) {
  var goroutineRunning sync.WaitGroup
  goroutineRunning.Add(1)
  go func() {
    goroutineRunning.Done()
    c.L.Lock()
    defer c.L.Unlock()
    c.Wait()
    fn()
  }()
  goroutineRunning.Wait()
}

var clickRegistered sync.WaitGroup
clickRegistered.Add(3)
subscribe(button.Clicked, func() {
  fmt.Println("Maximizing window.")
  clickRegistered.Done()
})
subscribe(button.Clicked, func() {
  fmt.Println("Displaying annoying dialog box!")
  clickRegistered.Done()
})
subscribe(button.Clicked, func() {
  fmt.Println("Mouse clicked.")
  clickRegistered.Done()
})

button.Clicked.Broadcast()

clickRegistered.Wait()

Once

Onceは一度しか実行されないようにする型である。以下の例では、Count: 1が出力される。

var count int
increment := func() { count++ }
decrement := func() { count-- }

var once sync.Once
once.Do(increment)
once.Do(decrement)

fmt.Printf("Count: %d\n", count)

Pool

Poolはオブジェクトプールパターンを並行処理で安全な形で実装したもの。オブジェクトプールパターンは、使うものを決まった数だけ、つまりプールを作る方法である。

以下の例では、3回目のGet()ではPool.Newが実行されない。(Putが1回実行されているため)

myPool := &sync.Pool{
  New: func() interface{} {
    fmt.Println("Creating new instance.")
    return struct{}{}
  },
}

myPool.Get()
instance := myPool.Get()
myPool.Put(instance)
myPool.Get()

Poolでは、プールするインスタンスの生成コストが高い場合に有効となる。

チャネル(channel)

チャネルは、Goにおける同期処理のプリミティブの一つであり、メモリに対するアクセスを同期することができ、ゴルーチン間の通信に使うのが最適である。

チャネルの生成は以下のように行う。一方向チャネルは関数の引数や戻り値としてよく使われる。

var dataStream chan interface{}
dataStream = make(chan interface{})

// 読み込み専用チャネル
var dataStream <-chan interface{}
// 送信専用チャネル
var dataStream chan<- interface{}

以下の例では、"Hello channels!"が出力される。

stringStream := make(chan string)
go func() {
  stringStream <- "Hello channels!"
}()
fmt.Println(<-stringStream)

このように、Goのチャネルは明示的に並行処理を待つ処理を書く必要は無い。チャネルに書き込もうとするとチャネルに空きができるまで待機し、チャネルから読み込もうとするとチャネルに要素が入るまで待機する。

チャネルはclose(ch)によって閉じることができる。これによってチャネルをループで処理することができる。

intStream := make(chan int)
go func() {
  defer close(intStream)

  for index := 1; index <= 5; index++ {
    intStream <- index
  }
}()

for integer := range intStream {
  fmt.Printf("%v ", integer)
}

以下のようにチャネルの生成時にバッファを指定することができる。予めバッファ、つまりキャパシティを指定しておくことでチャネルに対する書き込みを高速化できる。

dataStream := make(chan interface{}, 4) // キャパシティが4のバッファ付きチャネル

バッファが満杯になった状態でチャネルに書き込もうとすると、ゴルーチンはブロックされる。バッファを指定するのは早すぎる最適化になりやすいため注意する必要がある。

チャネルの状態に対するチャネルへの操作結果

操作 チャネルの状態 結果
読み込み nil ブロック
Openで空でない 値を取得
Openで空 ブロック
Close <デフォルト値>, false
書き込み専用 コンパイルエラー
書き込み nil ブロック
Openで満杯 ブロック
Openで満杯でない 値を書き込み
Close panic
読み込み専用 コンパイルエラー
close nil panic
Openで空でない チャネルを閉じる
Openで空 チャネルを閉じる
Closed panic
読み込み専用 コンパイルエラー

select文

select文は複数のチャネルを待機する場合に使用する。複数のcaseに指定したチャネルが全てブロックしていない読み取り状態である場合は、ランダムでいずれかが実行される。

var c1, c2 <-chan interface{}
var c3 chan<- interface{}
select {
case <-c1:
  //
case <-c2:
  //
case c3 <- struct{}{}:
  // 実行される
}

どのチャネルも読み取り可能でない場合、select文によりブロック状態になる。これを避けるためにdefaulttimeパッケージを使用できる。

select {
case <-time.After(1 * time.Second):
  //
default:
  //
}

GOMAXPROCSレバー

runtime.GOMAXPROCS関数により、ホストマシンの論理CPUの数(実際にはワークキューと呼ばれるOSスレッドの数)を制御できる。これで性能を絞ることができるが、マシンやGoバージョンによって調整する必要がある。


{ "name": "hareku", "job": "Web Engineer" }