JavaScriptを有効にしてください

インプットを動的に増減させるFan-Inパターン【Goによる並行処理】

 ·  ☕ 5 分で読めます
Photo by Miguel Á. Padriñán from Pexels

Photo by Miguel Á. Padriñán from Pexels

Go言語による並行処理

この本でゴルーチンを使った並行処理のパターンをいろいろ読んで、面白いなーと思いながら勉強していて、実際にFan-Inのパターンを使って実装したい場面があったのだが

一度Fan-Inを作った後に、n:1のn側となるインプットのチャネルを任意のタイミングで増やせるようにしたかった。


意外とこの実装方法がネットで探しても出てこず、ちょっとやり方を考えてみた。
(もしかしたらアンチパターンだから誰もやってないのかもしれないが、、、)

Fan-Inパターンとは

並行処理における実装パターンの一つ

複数のコルーチンあるいはスレッド(producer)が生成したインプットを集約して、インプットを処理する単一のコルーチン(consumer)へ渡される
つまりは並行で動いている処理を、1か所に集めるというコンセプト

例えば、リクエストを並行処理しているWebシステムで
DBや外部APIへのコネクションは資源が限られているため単一スレッドで行いたい
といった場合があるかもしれない

そんなときにリクエストを処理するコルーチンから、DBへクエリを投げる専用のコルーチンへfan-inすることができる

GoによるFan-In

シンプルなFan-In

fanIn関数は複数のproducerとなるチャネルを引数にとり、それらをまとめて受け取ることのできるチャネル(multiplexedCh)を返す。
内部でproducerそれぞれに対しgoroutineを起動し、チャネルからデータを受け取ったらmultiplexedChに送る。

producerのチャネルの数をsync.WaitGroupでカウントしておき、また別のgoroutineでそれらがすべてcloseされるまで監視する。
すべてcloseされたらmultiplexedChをcloseする。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

import (
	"fmt"
	"sync"
)

func main() {
	fanIn := func(done <-chan interface{}, channels ...chan interface{}) <-chan interface{} {
		var wg sync.WaitGroup
		multiplexedCh := make(chan interface{})

		multiplex := func(ch <-chan interface{}) {
			defer wg.Done()
			for {
				select {
				case <-done:
					return
				case data := <-ch:
    				multiplexedCh <- data
				}
			}
		}

		wg.Add(len(channels))
		for _, c := range channels {
			go multiplex(c)
		}

		go func() {
			wg.Wait()
			fmt.Println("all producers closed")
			close(multiplexedCh)
		}()

		return multiplexedCh
	}

	// producerとなるchannel
	ch1 := make(chan interface{})
	ch2 := make(chan interface{})

	// fan-in
	done := make(chan interface{})
	multiplexedCh := fanIn(done, ch1, ch2) 

	// consumerを作る
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for data := range multiplexedCh {
			fmt.Println(data)
		}
	}()

	ch1 <- "message 1"
	ch2 <- "message 2"

	close(done)
	wg.Wait()
}
// Output
message 1
message 2
all producers closed

インプットChannelを後から増やしたり減らしたりしたい

ここからが本題
冒頭で書いたような例だと
インプットとなるgoroutineがリクエストごとに生成されるため、すでにあるconsumerに対してproducerを追加したり削除したりしたくなる

上のシンプルな実装だと最初にproducerとconsumerが固定されてしまう

コンセプト

先のシンプルなFan-Inとの違いは、initFanIn関数で新しいproducerの追加を受け付けるためのregisterChを用意していることだ。
initFanIn関数には、はじめの1個となるproducerのチャネルとcontextだけをわたしてやることでベースとなるパイプラインを起動する。

新しいproducerを増やすには、チャネルとcontextを含んだProducerstructをregisterChに送ってやればいい。
またproducersのうち一つが不要になったのなら、該当するcontextをキャンセルすることでそのproducerのmultiplexを担当するgoroutineが停止できる。

※あと、実際に利用する場面を想定してdoneチャネルの代わりにcontextを使っているが、そこは深い意味はない。

producerの追加を待ち受ける

新しいmultiplexerを起動するためには、インプットのチャネルとキャンセル検知するためのcontextをregisterChで受け取る必要があるため、Producerという構造体を定義しておいた。

1
2
3
4
type Producer struct {
	ctx context.Context
	ch  <-chan interface{}
}

registerChは送り元となるmain goroutineで作っておきinitFanInに渡してあげる。
(チャネルは基本的にsendする側が所有権を持っておくべきなので)

1
2
3
4
5
6
7
// main()
// 2個目以降のproducerを登録するためのchを用意
registerCh := make(chan Producer)
defer close(registerCh)

// fanInをinitialize
multiplexedCh := initFanIn(ctx1, ch1, registerCh)

通常のfan-inに加えて、追加を待ち受けるgoroutineを起動する。
wg.Add(1)でproducerのカウントを上げ、go multiplex(r.ctx, r.ch)でアウトプットにつなげるgoroutineを起動してあげる。

1
2
3
4
5
6
7
// initFanIn()
go func() {
	for r := range registerCh {
		wg.Add(1)
		go multiplex(r.ctx, r.ch)
	}
}()

サンプルコード

サンプルコードの全体はこちら。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package main

import (
	"context"
	"fmt"
	"sync"
)

type Producer struct {
	ctx context.Context
	ch  <-chan interface{}
}

func initFanIn(ctx context.Context, ch <-chan interface{}, registerCh <-chan Producer) <-chan interface{} {
	multiplexedCh := make(chan interface{})

	var wg sync.WaitGroup
	multiplex := func(ctx context.Context, ch <-chan interface{}) {
		defer wg.Done()
		for {
			select {
			case <-ctx.Done():
				return
			case data := <-ch:
				multiplexedCh <- data
			}
		}
	}

	wg.Add(1)
	go multiplex(ctx, ch)

	go func() {
		for r := range registerCh {
			wg.Add(1)
			go multiplex(r.ctx, r.ch)
		}
	}()

	go func() {
		wg.Wait()

		fmt.Println("all producer is closed")
		close(multiplexedCh)
	}()

	return multiplexedCh
}

func main() {
	// 1個目のproducerを用意
	ctx1, cancel1 := context.WithCancel(context.Background())
	ch1 := make(chan interface{})
	defer close(ch1)

	// 2個目以降のproducerを登録するためのchを用意
	registerCh := make(chan Producer)
	defer close(registerCh)

	// fanInをinitialize
	multiplexedCh := initFanIn(ctx1, ch1, registerCh)

	// consumerを作っておく
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for data := range multiplexedCh {
			fmt.Println(data)
		}
	}()

	// 試しにメッセージを送ってみる
	ch1 <- "initializing fanin completed"

	// 2個目のproducerを用意
	ctx2, cancel2 := context.WithCancel(context.Background())
	ch2 := make(chan interface{})
	defer close(ch2)

	// fanInのproducerとして登録
	registerCh <- Producer{ctx2, ch2}

	// 2個目のproducerからメッセージを送ってみる
	ch2 <- "second ch is added to publisher"

	// 1個目のproducerを先に閉じる
	ch1 <- "ch1 is closing"
	cancel1()

	// 2個目も閉じる
	ch2 <- "ch2 is closing"
	cancel2()

	// すべてのproducerが閉じられたのでfanInが終了する
	wg.Wait()
}
共有