diff options
| author | Shulhan <m.shulhan@gmail.com> | 2021-05-16 17:10:53 +0700 |
|---|---|---|
| committer | Shulhan <m.shulhan@gmail.com> | 2021-05-16 17:10:53 +0700 |
| commit | 639fcc6259dc79966ee19bf6c07c46b2ab1d10c3 (patch) | |
| tree | 7258df6f74f1f5993e73982d2407558b3b1fd228 | |
| parent | d8bf49bdb04112788ca47cc9f79f061fcaefb838 (diff) | |
| download | golang-id-web-639fcc6259dc79966ee19bf6c07c46b2ab1d10c3.tar.xz | |
blog: terjemahkan "Go Concurrency Patterns: Pipelines and cancellation"
Artikel ini menampilkan contoh-contoh membuat _pipeline_, menyoroti
kesalahan yang mungkin muncul saat operasi gagal, dan memperkenalkan
teknik-teknik untuk berurusan dengan kegagalan secara bersih.
| -rw-r--r-- | _content/blog/index.adoc | 5 | ||||
| -rw-r--r-- | _content/blog/pipelines/bounded.go | 123 | ||||
| -rw-r--r-- | _content/blog/pipelines/index.adoc | 770 | ||||
| -rw-r--r-- | _content/blog/pipelines/parallel.go | 111 | ||||
| -rw-r--r-- | _content/blog/pipelines/serial.go | 55 |
5 files changed, 1064 insertions, 0 deletions
diff --git a/_content/blog/index.adoc b/_content/blog/index.adoc index a78b9b6..02ab166 100644 --- a/_content/blog/index.adoc +++ b/_content/blog/index.adoc @@ -67,6 +67,11 @@ * link:/blog/errors-are-values[Error adalah nilai^], 12 Januari 2015. Rob Pike. +=== 2014 + +* link:/blog/pipelines[Pola konkurensi Go: _pipeline_ dan pembatalan^], + 13 Maret 2014, Sameer Ajmani. + === 2013 * link:/blog/playground[Membedah Go Playground^], diff --git a/_content/blog/pipelines/bounded.go b/_content/blog/pipelines/bounded.go new file mode 100644 index 0000000..3977bef --- /dev/null +++ b/_content/blog/pipelines/bounded.go @@ -0,0 +1,123 @@ +// +build OMIT + +package main + +import ( + "crypto/md5" + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "sync" +) + +// walkFiles starts a goroutine to walk the directory tree at root and send the +// path of each regular file on the string channel. It sends the result of the +// walk on the error channel. If done is closed, walkFiles abandons its work. +func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { + paths := make(chan string) + errc := make(chan error, 1) + go func() { // HL + // Close the paths channel after Walk returns. + defer close(paths) // HL + // No select needed for this send, since errc is buffered. + errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL + if err != nil { + return err + } + if !info.Mode().IsRegular() { + return nil + } + select { + case paths <- path: // HL + case <-done: // HL + return errors.New("walk canceled") + } + return nil + }) + }() + return paths, errc +} + +// A result is the product of reading and summing a file using MD5. +type result struct { + path string + sum [md5.Size]byte + err error +} + +// digester reads path names from paths and sends digests of the corresponding +// files on c until either paths or done is closed. +func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { + for path := range paths { // HLpaths + data, err := ioutil.ReadFile(path) + select { + case c <- result{path, md5.Sum(data), err}: + case <-done: + return + } + } +} + +// MD5All reads all the files in the file tree rooted at root and returns a map +// from file path to the MD5 sum of the file's contents. If the directory walk +// fails or any read operation fails, MD5All returns an error. In that case, +// MD5All does not wait for inflight read operations to complete. +func MD5All(root string) (map[string][md5.Size]byte, error) { + // MD5All closes the done channel when it returns; it may do so before + // receiving all the values from c and errc. + done := make(chan struct{}) + defer close(done) + + paths, errc := walkFiles(done, root) + + // Start a fixed number of goroutines to read and digest files. + c := make(chan result) // HLc + var wg sync.WaitGroup + const numDigesters = 20 + wg.Add(numDigesters) + for i := 0; i < numDigesters; i++ { + go func() { + digester(done, paths, c) // HLc + wg.Done() + }() + } + go func() { + wg.Wait() + close(c) // HLc + }() + // End of pipeline. OMIT + + m := make(map[string][md5.Size]byte) + for r := range c { + if r.err != nil { + return nil, r.err + } + m[r.path] = r.sum + } + // Check whether the Walk failed. + if err := <-errc; err != nil { // HLerrc + return nil, err + } + return m, nil +} + +func main() { + // Calculate the MD5 sum of all files under the specified directory, + // then print the results sorted by path name. + m, err := MD5All(os.Args[1]) + if err != nil { + fmt.Println(err) + return + } + var paths []string + for path := range m { + paths = append(paths, path) + } + sort.Strings(paths) + for _, path := range paths { + fmt.Printf("%x %s\n", m[path], path) + } +} diff --git a/_content/blog/pipelines/index.adoc b/_content/blog/pipelines/index.adoc new file mode 100644 index 0000000..4ef83d7 --- /dev/null +++ b/_content/blog/pipelines/index.adoc @@ -0,0 +1,770 @@ += Pola konkurensi Go: _pipeline_ dan pembatalan +Sameer Ajmani +13 Maret 2014 +:toc: + +== Pendahuluan + +Primitif konkurensi pada Go mempermudah kita membangun aliran data +_pipeline_ yang menggunakan I/O dan CPU dengan efisien. +Artikel ini menampilkan contoh-contoh dari _pipeline_ tersebut, menyoroti +kesalahan yang mungkin muncul saat operasi gagal, dan memperkenalkan +teknik-teknik untuk berurusan dengan kegagalan secara bersih. + + +== Apa itu _pipeline_? + +Tidak ada definisi formal dari sebuah _pipeline_ dalam Go; +ia adalah salah satu dari banyak jenis program yang konkuren. +Secara informal, sebuah _pipeline_ adalah suatu urutan tahap-tahap yang +dihubungkan oleh kanal, yang mana setiap tahap adalah sebuah grup dari +goroutine yang menjalankan fungsi yang sama. +Dalam setiap tahap, setiap goroutine + +* menerima nilai dari hulu lewat kanal _masuk_ +* memroses data lewat fungsi, biasanya menghasilkan nilai baru +* mengirim nilai ke hilir lewat kanal _keluar_ + +Setiap tahap memiliki sejumlah kanal masuk dan keluar, kecuali tahap yang +pertama dan terakhir, yang mana hanya memiliki kanal keluar atau masuk, secara +berurutan. +Tahap yang pertama biasanya disebut dengan _sumber_ atau _produser_; +tahap yang terakhir biasanya disebut dengan _sink_ atau _konsumer_. + +Kita akan memulai dengan sebuah contoh _pipeline_ sederhana untuk menjelaskan +ide-ide dan teknik-teknik tersebut. +Nanti, kita akan memperlihatkan contoh yang lebih nyata. + + +== Memangkatkan bilangan + +Bayangkan sebuah _pipeline_ dengan tiga tahap. + +Tahap pertama, `gen`, yaitu sebuah fungsi yang mengonversi deretan menjadi +menjadi sebuah kanal. +Fungsi `gen` menjalankan sebuah goroutine yang mengirim setiap parameter +integer yang ia terima ke dalam sebuah kanal dan menutup kanal tersebut saat +semua nilai telah dikirim. + +---- +func gen(nums ...int) <-chan int { + out := make(chan int) + go func() { + for _, n := range nums { + out <- n + } + close(out) + }() + return out +} +---- + +Tahap kedua, `sq`, menerima sejumlah nilai integer dari sebuah kanal dan +mengembalikan sebuah kanal yang berisi pangkat dari setiap integer yang ia +terima. +Setelah kanal _masuk_ ditutup (yang berarti semua nilai integer telah +diterima) dan tahap ini telah mengirim semua nilai pangkat ke hilir, ia akan +menutup kanal _keluar_. + +---- +func sq(in <-chan int) <-chan int { + out := make(chan int) + go func() { + for n := range in { + out <- n * n + } + close(out) + }() + return out +} +---- + +Fungsi `main` menyiapkan _pipeline_ dan menjalankan tahap yang terakhir: +menerima nilai dari tahap kedua dan mencetaknya satu per satu, sampai kanal +ditutup. + +---- +func main() { + // Siapkan pipeline. + c := gen(2, 3) + out := sq(c) + + // Konsumsi kanal keluar. + fmt.Println(<-out) // 4 + fmt.Println(<-out) // 9 +} +---- + +Secara `sq` memiliki tipe kanal yang sama untuk yang masuk dan keluar, kita +dapat menulisnya beberapa kali. +Kita juga dapat menulis fungsi `main` sebagai pengulangan dengan `range`, +seperti pada tahap-tahap lainnya: + +---- +func main() { + // Siapkan pipeline dan konsumsi kanal keluar. + for n := range sq(sq(gen(2, 3))) { + fmt.Println(n) // 16 kemudian 81 + } +} +---- + + +== _Fan-out_, _fan-in_ + +Beberapa fungsi dapat membaca dari kanal yang sama sampai kanal tersebut +ditutup; +hal ini disebut dengan _fan-out_. +Cara ini membolehkan distribusi kerja antara sekelompok goroutine supaya +penggunaan CPU dan I/O paralel. + +Sebuah fungsi dapat membaca dari beberapa input sampai semua input ditutup +dengan cara menggabungkan semua kanal input menjadi kanal tunggal yang ditutup +saat semua input telah ditutup. +Cara ini disebut dengan _fan-in_. + +Kita dapat mengubah _pipeline_ sebelumnya untuk menjalankan dua fungsi `sq`, +yang membaca dari kanal input yang sama. +Untuk itu, kita perlu membuat sebuah fungsi baru, `merge`, yang menggabungkan +(_fan-in_) semua hasil dari `sq`: + +---- +func main() { + in := gen(2, 3) + + // Distribusi pekerjaan lewat dua goroutine `sq` yang membaca `in` yang + // sama. + c1 := sq(in) + c2 := sq(in) + + // Konsumsi gabungan keluaran dari c1 dan c2. + for n := range merge(c1, c2) { + fmt.Println(n) // 4 lalu 9, atau 9 lalu 4 + } +} +---- + +Fungsi `merge` mengonversi sejumlah kanal menjadi sebuah kanal tunggal dengan +menjalankan sebuah goroutine untuk setiap kanal input dan menyalin nilainya ke +sebuah kanal _keluar_ tunggal. +Saat semua goroutine telah dimulai, fungsi `merge` menjalankan lagi sebuah +goroutine untuk menutup kanal _keluar_ saat semua pengiriman ke kanal _keluar_ +tersebut selesai. + +Pengiriman ke kanal yang telah ditutup akan menyebabkan _panic_, jadi sangat +penting untuk memastikan semua pengiriman telah selesai sebelum menutup kanal +_keluar_ tersebut. +Tipe +https://golang.org/pkg/sync/#WaitGroup[sync.WaitGroup] +menyediakan cara sederhana untuk mengatur sinkronisasi seperti ini: + +---- +func merge(cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + + // Jalankan goroutine untuk setiap kanal input lewat `cs`. + // Fungsi `output` menyalin nilai dari `c` ke `out` sampai `c` ditutup, + // terakhir memanggil `wg.Done`. + output := func(c <-chan int) { + for n := range c { + out <- n + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Jalankan sebuah goroutine untuk menutup kanal keluar saat semua + // goroutine `output` telah selesai. + // Goroutine ini harus dimulai setelah pemanggilan `wg.Add`. + go func() { + wg.Wait() + close(out) + }() + return out +} +---- + + +== Berhenti dengan segera + +Ada semacam pola dari fungsi-fungsi _pipeline_ kita: + +* Setiap tahap menutup kanal _keluar_ saat semua operasi pengiriman selesai. +* Setiap tahap terus menerima nilai dari kanal _masuk_ sampai kanal tersebut + ditutup. + +Pola ini membolehkan setiap tahap yang menerima nilai untuk dibuat sebagai +pengulangan `range` dan memastikan bahwa semua goroutine selesai saat +semua nilai telah sukses dikirim ke hilir. + +Namun, pada _pipeline_ di dunia nyata, setiap tahap tidak selalu menerima +semua nilai yang masuk. +Terkadang memang dirancang seperti itu: si penerima hanya memerlukan sebagian +dari nilai untuk melanjutkan pemrosesan. +Sering kali, sebuah tahap selesai lebih awal karena nilai yang masuk +merepresentasikan sebuah eror. +Untuk setiap kasus tersebut, si penerima seharusnya tidak menunggu sampai +semua nilai diterima, dan kita ingin supaya tahap sebelumnya berhenti mengirim +nilai yang tahap berikutnya tidak butuhkan. + +Pada contoh _pipeline_ kita sebelumnya, jika tahap terakhir gagal mengonsumsi +semua nilai yang masuk, maka goroutine yang mengirim nilai ke tahap terakhir +akan pampat, misalnya pada contoh kode berikut: + +---- + // Konsumsi hanya nilai pertama dari `output`. + out := merge(c1, c2) + fmt.Println(<-out) // 4 or 9 + return + // Secara kita tidak mengambil nilai kedua dari `out`, salah satu + // goroutine `output` akan pampat saat mencoba mengirim ke kanal. +} +---- + +Hal ini menyebabkan adanya kebocoran sumber daya: goroutine mengonsumsi sumber +daya memori dan _runtime_, dan referensi _heap_ pada _stack_ goroutine +menyebabkan data tidak di-_garbage collected_. +Goroutine tidak di _garbage collected_; mereka harus selesai dengan +sendirinya. + +Untuk itu kita perlu mengatur supaya setiap tahap dari hulu _pipeline_ keluar +dengan bersih walaupun tahap-tahap di hilir gagal menerima semua nilai yang +masuk. +Salah satu cara untuk menyelesaikan masalah ini yaitu dengan mengubah kanal +_keluar_ supaya memiliki _buffer_. +Sebuah _buffer_ dapat menyimpan sejumlah nilai; +operasi pengiriman akan langsung selesai jika ada ruang yang tersedia dalam +_buffer_: + +---- +c := make(chan int, 2) // buffer berukuran 2 +c <- 1 // langsung sukses. +c <- 2 // langsung sukses. +c <- 3 // pampat sampai goroutine yang lain melakukan <-c dan menerima 1. +---- + +Saat jumlah nilai yang akan dikirim diketahui saat kanal dibuat, maka sebuah +_buffer_ dapat menyederhanakan kode kita. +Contohnya, kita dapat menulis ulang fungsi `gen` untuk menyalin semua nilai +integer ke dalam sebuah kanal dengan _buffer_ dan menghindari pembuatan +goroutine yang baru: + +---- +func gen(nums ...int) <-chan int { + out := make(chan int, len(nums)) + for _, n := range nums { + out <- n + } + close(out) + return out +} +---- + +Balik lagi ke goroutine yang pampat dalam _pipeline_ kita, pertimbangkan +juga untuk menambahkan sebuah _buffer_ ke kanal _keluar_ yang dikembalikan +oleh fungsi `merge`: + +---- +func merge(cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int, 1) // ruang yang cukup untuk input yang belum dibaca. + // ... sisa kode lainnya tidak berubah ... +---- + +Walaupun hal ini memperbaiki goroutine yang pampat dalam program kita, kode +ini buruk. +Pilihan untuk ukuran _buffer_ `out` yaitu 1, karena kita mengetahui +jumlah nilai yang diterima oleh fungsi `merge` dan jumlah nilai yang setiap +tahap hilir akan konsumsi. +Hal ini menyebabkannya rentan dengan kesalahan: jika kita mengirim nilai +tambahan ke fungsi `gen`, atau jika tahap hilir membaca nilai yang lebih +sedikit, kita kembali mendapatkan goroutine yang pampat. + +Untuk itu, kita membutuhkan suatu cara supaya setiap tahap di hilir +mengindikasikan ke pengirim bahwa mereka akan berhenti menerima input. + + +== Pembatalan eksplisit + +Saat fungsi `main` memutuskan untuk berhenti menerima nilai dari `out`, +ia harus memberitahu goroutine pada tahap hulu untuk berhenti mengirim nilai. +Hal ini dapat dilakukan dengan mengirim sebuah nilai pada kanal bernama +`done`. +Fungsi `main` mengirim dua nilai ke kanal `done`, secara ada potensi dua +pengirim yang akan pampat: + +---- +func main() { + in := gen(2, 3) + + // Distribusi pekerjaan lewat dua goroutine sq yang membaca `in` yang + // sama. + c1 := sq(in) + c2 := sq(in) + + // Konsumsi nilai pertama dari keluaran. + done := make(chan struct{}, 2) + out := merge(done, c1, c2) + fmt.Println(<-out) // 4 atau 9 + + // Beritahu pengirim kita telah selesai menerima. + done <- struct{}{} + done <- struct{}{} +} +---- + +Goroutine yang bertugas melakukan pengiriman mengganti operasi pengiriman +mereka dengan sebuah perintah `select` yang mengirim sebuah nilai ke `out` +atau menerima sebuah nilai dari `done`. +Tipe nilai dari kanal `done` yaitu struct kosong karena nilainya tidak +diperlukan dalam kasus ini: yang diperlukan adalah kejadian menerima yang +mengindikasikan pengiriman ke `out` sebaiknya ditinggalkan. +Goroutine `output` terus membaca pada kanal _masuk_-nya, supaya tahap-tahap +di hulu tidak pampat. +(Kita akan bahas nanti bagaimana membuat pengulangan ini segera selesai.) + +---- +func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + + // Buat goroutine `output` untuk setiap kanal input dalam `cs`. + // Fungsi `output` menyalin nilai dari `c` ke `out` sampai `c` ditutup + // atau menerima nilai dari `done`, lalu fungsi ini akan memanggil + // `wg.Done`. + output := func(c <-chan int) { + for n := range c { + select { + case out <- n: + case <-done: + } + } + wg.Done() + } + // ... sisa kode selanjutnya tidak berubah ... +---- + +Pendekatan ini memiliki sebuah masalah: setiap penerima di hilir perlu +mengetahui jumlah pengirim yang kemungkinan pampat dan mengatur supaya +mengirim sinyal kepada pengirim tersebut. +Mencatat semua perhitungan tersebut membutuhkan waktu dan bisa saja salah. + +Kita memerlukan sebuah cara untuk memberitahu sejumlah goroutine, yang tidak +diketahui jumlahnya, untuk berhenti mengirim nilai ke tahap di hilir. +Pada Go, kita dapat melakukan hal ini dengan menutup kanal, karena +link:/ref/spec#Receive_operator[operasi menerima pada kanal yang telah ditutup +akan diproses langsung, menghasilkan nilai kosong dari tipe elemen dari +kanal]. + +Hal ini berarti fungsi `main` dapat membersihkan semua pengirim yang pampat +cukup dengan menutup kanal `done`. +Penutupan kanal ini secara efektif menyiarkan sinyal ke semua pengirim. +Kita mengubah setiap fungsi _pipeline_ untuk menerima `done` sebagai +parameter dan mengatur supaya penutupan kanal terjadi lewat perintah `defer`, +supaya semua nilai kembalian dari `main` akan mengirim sinyal ke tahap-tahap +pada _pipeline_ supaya berhenti. + +---- +func main() { + // Buat kanal `done` yang digunakan oleh semua pipeline, dan tutup kanal + // tersebut saat pipeline selesai, sebagai sinyal untuk semua goroutine + // yang kita jalankan. + done := make(chan struct{}) + defer close(done) + + in := gen(done, 2, 3) + + // Distribusi pekerjaan lewat dua goroutine sq yang membaca `in` yang + // sama. + c1 := sq(done, in) + c2 := sq(done, in) + + // Konsumsi nilai pertama dari hasil `merge`. + out := merge(done, c1, c2) + fmt.Println(<-out) // 4 atau 9 + + // Kanal `done` akan ditutup oleh pemanggilan `defer`. +} +---- + +Setiap tahap pada _pipeline_ sekarang bebas berhenti saat `done` ditutup. +Fungsi `merge` dapat selesai tanpa menghabiskan kanal _masuk_, secara ia +mengetahui bahwa pengirim dari hulu, `sq`, akan berhenti mengirim saat +`done` ditutup. +Fungsi `output` memastikan `wg.Done` dipanggil saat selesai lewat perintah +`defer`: + +---- +func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { + var wg sync.WaitGroup + out := make(chan int) + + // Buat sebuah goroutine `output` untuk setiap kanal input dalam `cs`. + // Fungsi `output` menyalin nilai dari `c` ke `out` sampai `c` atau `done` + // ditutup, kemudian memanggil `wg.Done`. + output := func(c <-chan int) { + defer wg.Done() + for n := range c { + select { + case out <- n: + case <-done: + return + } + } + } + // ... sisa kode selanjutnya tidak berubah ... +---- + +Dengan cara yang sama, fungsi `sq` dapat berhenti saat kanal `done` ditutup. +Fungsi `sq` memastikan kanal `out` ditutup saat keluar lewat perintah `defer`: + +---- +func sq(done <-chan struct{}, in <-chan int) <-chan int { + out := make(chan int) + go func() { + defer close(out) + for n := range in { + select { + case out <- n * n: + case <-done: + return + } + } + }() + return out +} +---- + +Berikut panduan untuk membuat _pipeline_: + +* Setiap tahap menutup kanal _keluar_ saat semua operasi pengirim selesai. +* Setiap tahap menerima nilai dari kanal _masuk_ sampai kanal tersebut ditutup + atau pengirim bebas dari pampat. + +_Pipeline_ membuka pengiriman yang terhenti baik lewat _buffer_ atau secara +eksplisit dengan mengirim sinyal ke pengirim saat penerima bisa meninggalkan +kanal. + + +== Mengurai isi direktori + +Mari kita lihat _pipeline_ yang lebih realistis. + +MD5 adalah algoritma _message-digest_ yang bisa digunakan untuk _checksum_ +berkas. +Utilitas perintah `md5sum` mencetak nilai _digest_ dari daftar berkas. + +---- +% md5sum *.go +d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go +ee869afd31f83cbb2d10ee81b2b831dc parallel.go +b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go +---- + +Contoh program yang akan kita buat seperti `md5sum` yang menerima sebuah +direktori sebagai argumen dan mencetak nilai _digest_ untuk setiap berkas di +dalam direktori tersebut, diurut berdasarkan nama. + +---- +% go run serial.go . +d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go +ee869afd31f83cbb2d10ee81b2b831dc parallel.go +b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go +---- + +Fungsi `main` dari program kita memanggil fungsi `MD5All`, yang mengembalikan +sebuah `map` yang berisi path dan nilai _digest_, kemudian mengurut dan +mencetak hasilnya: + +---- +func main() { + // Hitung MD5 dari semua berkas di dalam direktori, kemudian cetak + // hasilnya diurut berdasarkan nama. + m, err := MD5All(os.Args[1]) + if err != nil { + fmt.Println(err) + return + } + var paths []string + for path := range m { + paths = append(paths, path) + } + sort.Strings(paths) + for _, path := range paths { + fmt.Printf("%x %s\n", m[path], path) + } +} +---- + +Fungsi `MD5All` adalah fokus dari diskusi kita sekarang. +Dalam berkas +link:/blog/pipelines/serial.go[`serial.go`], +implementasinya tidak menggunakan konkurensi dan hanya membaca dan melakukan +_sum_ dari setiap berkas saat membaca isi direktori. + +---- +// MD5All baca semua berkas dalam direktori dan kembalikan sebuah map yang +// berisi path dan hasil MD5 sum dari isi berkas. +// Jika pembacaan isi direktori gagal atau ada operasi pembacaan isi berkas +// yang gagal, MD5All akan mengembalikan error. +func MD5All(root string) (map[string][md5.Size]byte, error) { + m := make(map[string][md5.Size]byte) + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.Mode().IsRegular() { + return nil + } + data, err := ioutil.ReadFile(path) + if err != nil { + return err + } + m[path] = md5.Sum(data) + return nil + }) + if err != nil { + return nil, err + } + return m, nil +} +---- + + +=== Pembacaan secara paralel + +Dalam +link:/blog/pipelines/parallel.go[`parallel.go`], +kita memecah `MD5All` menjadi _pipeline_ dengan dua tahap. +Tahap pertama, `sumFiles`, membaca isi direktori, membaca isi berkas dalam +sebuah goroutine, dan mengirim hasilnya ke sebuah kanal dengan nilai dari tipe +`result`: + +---- +type result struct { + path string + sum [md5.Size]byte + err error +} +---- + +Fungsi `sumFiles` mengembalikan dua buah kanal: satu untuk kembalian dan satu +lagi untuk eror dari membaca isi direktori dengan `filepath.Walk`. +Fungsi yang membaca isi direktori memulai sebuah goroutine baru untuk memroses +setiap berkas, kemudian mencek `done`. +Jika `done` ditutup, maka pembacaan isi direktori selesai segara: + +---- +func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { + // Untuk setiap berkas, jalankan sebuah goroutine yang menghitung _sum_ + // dari berkas dan mengirim hasilnya ke `c`. + // Mengirim hasil dari pembacaan direktori ke `errc`. + c := make(chan result) + errc := make(chan error, 1) + go func() { + var wg sync.WaitGroup + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.Mode().IsRegular() { + return nil + } + wg.Add(1) + go func() { + data, err := ioutil.ReadFile(path) + select { + case c <- result{path, md5.Sum(data), err}: + case <-done: + } + wg.Done() + }() + // Batalkan pembacaan direktori jika `done` ditutup. + select { + case <-done: + return errors.New("walk canceled") + default: + return nil + } + }) + // Pembacaan direktori telah selesai, sehingga semua pemanggilan + // wg.Add telah dilakukan. + // Jalankan sebuah goroutine untuk menutup `c` saat semua pengiriman + // telah selesai. + go func() { + wg.Wait() + close(c) + }() + // Perintah `select` tidak perlu di sini, secara `errc` menggunakan + // _buffer_. + errc <- err + }() + return c, errc +} +---- + +Fungsi `MD5All` menerima nilai _digest_ dari `c`. +`MD5All` segera selesai bila ada eror, menutup `done` lewat `defer`: + +---- +func MD5All(root string) (map[string][md5.Size]byte, error) { + // MD5All menutup kanal `done` saat selesai; ia bisa menutupnya sebelum + // menerima semua nilai dari `c` dan `errc`. + done := make(chan struct{}) + defer close(done) + + c, errc := sumFiles(done, root) + + m := make(map[string][md5.Size]byte) + for r := range c { + if r.err != nil { + return nil, r.err + } + m[r.path] = r.sum + } + if err := <-errc; err != nil { + return nil, err + } + return m, nil +} +---- + + +=== Membatasi paralelisme + +Implementasi `MD5All` dalam +link:/blog/pipelines/parallel.go[parallel.go] +menjalankan sebuah goroutine baru untuk setiap berkas. +Dalam sebuah direktori dengan banyak berkas, hal ini bisa mengakibatkan +alokasi memori yang lebih banyak daripada memori pada sistem. + +Kita dapat mengurangi alokasi ini dengan membatasi jumlah berkas yang dibaca +secara paralel. +Dalam +link:/blog/pipelines/bounded.go[`bounded.go`], +hal ini dilakukan dengan membuat sejumlah `n` goroutine untuk membaca berkas. +Sekarang _pipeline_ kita memiliki tiga tahap: baca isi direktori, baca dan +_digest_ berkas, dan kumpulkan hasil _digest_. + +Tahap pertama, `walkFiles`, menghasilkan path dari berkas di dalam direktori: + +---- +func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { + paths := make(chan string) + errc := make(chan error, 1) + go func() { + // Tutup kanal `paths` setelah semua isi direktori dibaca. + defer close(paths) + // Tidak perlus `select`, secara `errc` memiliki buffer. + errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.Mode().IsRegular() { + return nil + } + select { + case paths <- path: + case <-done: + return errors.New("walk canceled") + } + return nil + }) + }() + return paths, errc +} +---- + +Tahap kedua, `digester`, menjalankan sejumlah goroutine _digest_ yang menerima +nama berkas dari kanal `paths` dan mengirim hasilnya ke kanal `c`: + +---- +func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { + for path := range paths { + data, err := ioutil.ReadFile(path) + select { + case c <- result{path, md5.Sum(data), err}: + case <-done: + return + } + } +} +---- + +Tidak seperti contoh sebelumnya, fungsi `digester` tidak menutup kanal +_keluar_, karena beberapa goroutine mengirim ke sebuah kanal yang sama. +Namun, kode dalam `MD5All` akan menutup kanal tersebut saat semua `digester` +selesai: + +---- + // Jalankan sejumlah goroutine untuk membaca dan men-_digest_ berkas. + c := make(chan result) + var wg sync.WaitGroup + const numDigesters = 20 + wg.Add(numDigesters) + for i := 0; i < numDigesters; i++ { + go func() { + digester(done, paths, c) + wg.Done() + }() + } + go func() { + wg.Wait() + close(c) + }() +---- + +Kita bisa saja membuat setiap fungsi `digester` membuat dan mengembalikan +kanal _keluar_ mereka sendiri, namun hal ini membutuhkan goroutine tambahan +untuk _fan-in_ (menggabungkan) semua hasilnya. + +Tahap terakhir menerima semua hasil dari `c` kemudian memeriksa eror dari +`errc`. +Pemeriksaan ini tidak bisa dilakukan lebih awal, secara `walkFiles` bisa saja +menahan pengiriman nilai ke hilir: + +---- + m := make(map[string][md5.Size]byte) + for r := range c { + if r.err != nil { + return nil, r.err + } + m[r.path] = r.sum + } + // Periksa apakah `Walk` gagal. + if err := <-errc; err != nil { + return nil, err + } + return m, nil +} +---- + + +== Kesimpulan + +Artikel ini telah menjelaskan beberapa teknik untuk membangun aliran data +_pipeline_ dengan Go. +Berurusan dengan kegagalan pada _pipeline_ sedikit kompleks, secara setiap +tahap dalam _pipeline_ bisa menahan mengirim nilai ke hilir, dan tahap +selanjutnya bisa saja tidak memerlukan lagi data yang masuk. +Kita telah memperlihatkan bagaimana menutup sebuah kanal dapat menyiarkan +sebuah sinyal yang menandakan selesai ("done") ke semua goroutine yang +dijalankan oleh _pipeline_ dan mendefinisikan aturan-aturan untuk membangun +_pipeline_ secara benar. + +Bacaan lebih lanjut: + +* https://talks.golang.org/2012/concurrency.slide#1[Pola konkurensi Go^] + (https://www.youtube.com/watch?v=f6kdp27TYZs[video^]) mempresentasikan dasar + dari konkurensi Go dan cara pakainya. +* https://blog.golang.org/advanced-go-concurrency-patterns[Pola konkurensi Go lanjut^] + (http://www.youtube.com/watch?v=QDDwwePbDtw[video^]) menelaah penggunaan + yang lebih kompleks dari konkurensi pada Go, terutama `select`. +* Makalah Douglas McIlroy + https://swtch.com/~rsc/thread/squint.pdf["Squinting at Power Series"] + memperlihatkan bagaimana konkurensi seperti Go menyediakan dukungan yang + elegan untuk perhitungan yang kompleks. diff --git a/_content/blog/pipelines/parallel.go b/_content/blog/pipelines/parallel.go new file mode 100644 index 0000000..5ca3e76 --- /dev/null +++ b/_content/blog/pipelines/parallel.go @@ -0,0 +1,111 @@ +// +build OMIT + +package main + +import ( + "crypto/md5" + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" + "sync" +) + +// A result is the product of reading and summing a file using MD5. +type result struct { + path string + sum [md5.Size]byte + err error +} + +// sumFiles starts goroutines to walk the directory tree at root and digest each +// regular file. These goroutines send the results of the digests on the result +// channel and send the result of the walk on the error channel. If done is +// closed, sumFiles abandons its work. +func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { + // For each regular file, start a goroutine that sums the file and sends + // the result on c. Send the result of the walk on errc. + c := make(chan result) + errc := make(chan error, 1) + go func() { // HL + var wg sync.WaitGroup + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.Mode().IsRegular() { + return nil + } + wg.Add(1) + go func() { // HL + data, err := ioutil.ReadFile(path) + select { + case c <- result{path, md5.Sum(data), err}: // HL + case <-done: // HL + } + wg.Done() + }() + // Abort the walk if done is closed. + select { + case <-done: // HL + return errors.New("walk canceled") + default: + return nil + } + }) + // Walk has returned, so all calls to wg.Add are done. Start a + // goroutine to close c once all the sends are done. + go func() { // HL + wg.Wait() + close(c) // HL + }() + // No select needed here, since errc is buffered. + errc <- err // HL + }() + return c, errc +} + +// MD5All reads all the files in the file tree rooted at root and returns a map +// from file path to the MD5 sum of the file's contents. If the directory walk +// fails or any read operation fails, MD5All returns an error. In that case, +// MD5All does not wait for inflight read operations to complete. +func MD5All(root string) (map[string][md5.Size]byte, error) { + // MD5All closes the done channel when it returns; it may do so before + // receiving all the values from c and errc. + done := make(chan struct{}) // HLdone + defer close(done) // HLdone + + c, errc := sumFiles(done, root) // HLdone + + m := make(map[string][md5.Size]byte) + for r := range c { // HLrange + if r.err != nil { + return nil, r.err + } + m[r.path] = r.sum + } + if err := <-errc; err != nil { + return nil, err + } + return m, nil +} + +func main() { + // Calculate the MD5 sum of all files under the specified directory, + // then print the results sorted by path name. + m, err := MD5All(os.Args[1]) + if err != nil { + fmt.Println(err) + return + } + var paths []string + for path := range m { + paths = append(paths, path) + } + sort.Strings(paths) + for _, path := range paths { + fmt.Printf("%x %s\n", m[path], path) + } +} diff --git a/_content/blog/pipelines/serial.go b/_content/blog/pipelines/serial.go new file mode 100644 index 0000000..024ef47 --- /dev/null +++ b/_content/blog/pipelines/serial.go @@ -0,0 +1,55 @@ +// +build OMIT + +package main + +import ( + "crypto/md5" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sort" +) + +// MD5All reads all the files in the file tree rooted at root and returns a map +// from file path to the MD5 sum of the file's contents. If the directory walk +// fails or any read operation fails, MD5All returns an error. +func MD5All(root string) (map[string][md5.Size]byte, error) { + m := make(map[string][md5.Size]byte) + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL + if err != nil { + return err + } + if !info.Mode().IsRegular() { + return nil + } + data, err := ioutil.ReadFile(path) // HL + if err != nil { + return err + } + m[path] = md5.Sum(data) // HL + return nil + }) + if err != nil { + return nil, err + } + return m, nil +} + +func main() { + // Calculate the MD5 sum of all files under the specified directory, + // then print the results sorted by path name. + m, err := MD5All(os.Args[1]) // HL + if err != nil { + fmt.Println(err) + return + } + var paths []string + for path := range m { + paths = append(paths, path) + } + sort.Strings(paths) // HL + for _, path := range paths { + fmt.Printf("%x %s\n", m[path], path) + } +} |
