golangでpipeするときのpattern

元ネタ

Go Concurrency Patterns: Pipelines and cancellation

読んでいて、このパターン使えばよかったのかという気づきがあったので内容を抜粋してメモする。

channelの基礎の基礎

channelを使って送信、受信したい時は、それぞれch <-, <- chと書けば良い

ch <- x //xをchに送信
<- ch //chから受信
x := <- ch //chから受信した内容をxに代入

channelからいくつの値が送信されるのかが分かっているのならば、この受信演算子を用いることもできなくもない

for i := 1; i < 4; i++ {
    fmt.Println(i, "th value is ", <-ch) // channelから3回受信する
}

rangeとclose

channelをrangeで回すことによってchannelからの受信を簡単にすることができる。

for i := range ch {
    process(i)
}

また、channelをcloseすることでrangeのイテレーションを止めることができる。送信するべきデータを送り終えたり受信側のchannelがcloseした時に、送信側のchannelをcloseすることで処理の終了を伝搬させることができる。

というか、rangeで回すときはcloseしなければ恐らくdeadlockして怒られる。みんな経験あるよね?(震え声

途中終了

rangeとcloseを用いることで処理の終了を伝搬させることができるが、これでは正規に送信が終わるまで、処理を途中で止めることができない。

途中で処理を終了したいときは、途中で処理を終了するためのchannelを新たに作る。このchannelを引数として渡すことで、関数の外から処理を終了させることができるようになる。また、この新たに作ったchannelとそれまで使っていたchannelのどちらからも受信できるように、select構文を用いる。

go func() {
    defer close(out)
    for n := range in {
        select {
        case out <- n * n:
        case <-done: // doneから受信したら即座に処理を終了する
            return
        }
    }
}()

goroutineの生やし方

func processX(done <-chan struct{}, in <-chan <T>) <-chan <T> {
    out := make(chan <T>)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- processX_(n):
            case <-done:
                return
            }
        }
    }()
    return out
}

こんなものが載ってた。<T>は何か知らんが型名

言葉にすると、関数の最初で出力channelを作って、goroutineな無名関数の中で実際の処理を行い、返り値は出力channelってパターン。

むっちゃpipeって感じがして、とりあえずこれを知っておけばgolangでできることがかなり増えそうだと思った。

最後に

golangのgoroutine周りあまり上手く使えていないので、誰かいいチュートリアル教えてください。なんでもしますから

触れなかったこと

  • channelのbuffer
  • runtime.GOMAXPROCS