summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--_content/blog/index.adoc5
-rw-r--r--_content/blog/pipelines/bounded.go123
-rw-r--r--_content/blog/pipelines/index.adoc770
-rw-r--r--_content/blog/pipelines/parallel.go111
-rw-r--r--_content/blog/pipelines/serial.go55
5 files changed, 1064 insertions, 0 deletions
diff --git a/_content/blog/index.adoc b/_content/blog/index.adoc
index a78b9b6..02ab166 100644
--- a/_content/blog/index.adoc
+++ b/_content/blog/index.adoc
@@ -67,6 +67,11 @@
* link:/blog/errors-are-values[Error adalah nilai^],
12 Januari 2015. Rob Pike.
+=== 2014
+
+* link:/blog/pipelines[Pola konkurensi Go: _pipeline_ dan pembatalan^],
+ 13 Maret 2014, Sameer Ajmani.
+
=== 2013
* link:/blog/playground[Membedah Go Playground^],
diff --git a/_content/blog/pipelines/bounded.go b/_content/blog/pipelines/bounded.go
new file mode 100644
index 0000000..3977bef
--- /dev/null
+++ b/_content/blog/pipelines/bounded.go
@@ -0,0 +1,123 @@
+// +build OMIT
+
+package main
+
+import (
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sort"
+ "sync"
+)
+
+// walkFiles starts a goroutine to walk the directory tree at root and send the
+// path of each regular file on the string channel. It sends the result of the
+// walk on the error channel. If done is closed, walkFiles abandons its work.
+func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
+ paths := make(chan string)
+ errc := make(chan error, 1)
+ go func() { // HL
+ // Close the paths channel after Walk returns.
+ defer close(paths) // HL
+ // No select needed for this send, since errc is buffered.
+ errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
+ if err != nil {
+ return err
+ }
+ if !info.Mode().IsRegular() {
+ return nil
+ }
+ select {
+ case paths <- path: // HL
+ case <-done: // HL
+ return errors.New("walk canceled")
+ }
+ return nil
+ })
+ }()
+ return paths, errc
+}
+
+// A result is the product of reading and summing a file using MD5.
+type result struct {
+ path string
+ sum [md5.Size]byte
+ err error
+}
+
+// digester reads path names from paths and sends digests of the corresponding
+// files on c until either paths or done is closed.
+func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
+ for path := range paths { // HLpaths
+ data, err := ioutil.ReadFile(path)
+ select {
+ case c <- result{path, md5.Sum(data), err}:
+ case <-done:
+ return
+ }
+ }
+}
+
+// MD5All reads all the files in the file tree rooted at root and returns a map
+// from file path to the MD5 sum of the file's contents. If the directory walk
+// fails or any read operation fails, MD5All returns an error. In that case,
+// MD5All does not wait for inflight read operations to complete.
+func MD5All(root string) (map[string][md5.Size]byte, error) {
+ // MD5All closes the done channel when it returns; it may do so before
+ // receiving all the values from c and errc.
+ done := make(chan struct{})
+ defer close(done)
+
+ paths, errc := walkFiles(done, root)
+
+ // Start a fixed number of goroutines to read and digest files.
+ c := make(chan result) // HLc
+ var wg sync.WaitGroup
+ const numDigesters = 20
+ wg.Add(numDigesters)
+ for i := 0; i < numDigesters; i++ {
+ go func() {
+ digester(done, paths, c) // HLc
+ wg.Done()
+ }()
+ }
+ go func() {
+ wg.Wait()
+ close(c) // HLc
+ }()
+ // End of pipeline. OMIT
+
+ m := make(map[string][md5.Size]byte)
+ for r := range c {
+ if r.err != nil {
+ return nil, r.err
+ }
+ m[r.path] = r.sum
+ }
+ // Check whether the Walk failed.
+ if err := <-errc; err != nil { // HLerrc
+ return nil, err
+ }
+ return m, nil
+}
+
+func main() {
+ // Calculate the MD5 sum of all files under the specified directory,
+ // then print the results sorted by path name.
+ m, err := MD5All(os.Args[1])
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ var paths []string
+ for path := range m {
+ paths = append(paths, path)
+ }
+ sort.Strings(paths)
+ for _, path := range paths {
+ fmt.Printf("%x %s\n", m[path], path)
+ }
+}
diff --git a/_content/blog/pipelines/index.adoc b/_content/blog/pipelines/index.adoc
new file mode 100644
index 0000000..4ef83d7
--- /dev/null
+++ b/_content/blog/pipelines/index.adoc
@@ -0,0 +1,770 @@
+= Pola konkurensi Go: _pipeline_ dan pembatalan
+Sameer Ajmani
+13 Maret 2014
+:toc:
+
+== Pendahuluan
+
+Primitif konkurensi pada Go mempermudah kita membangun aliran data
+_pipeline_ yang menggunakan I/O dan CPU dengan efisien.
+Artikel ini menampilkan contoh-contoh dari _pipeline_ tersebut, menyoroti
+kesalahan yang mungkin muncul saat operasi gagal, dan memperkenalkan
+teknik-teknik untuk berurusan dengan kegagalan secara bersih.
+
+
+== Apa itu _pipeline_?
+
+Tidak ada definisi formal dari sebuah _pipeline_ dalam Go;
+ia adalah salah satu dari banyak jenis program yang konkuren.
+Secara informal, sebuah _pipeline_ adalah suatu urutan tahap-tahap yang
+dihubungkan oleh kanal, yang mana setiap tahap adalah sebuah grup dari
+goroutine yang menjalankan fungsi yang sama.
+Dalam setiap tahap, setiap goroutine
+
+* menerima nilai dari hulu lewat kanal _masuk_
+* memroses data lewat fungsi, biasanya menghasilkan nilai baru
+* mengirim nilai ke hilir lewat kanal _keluar_
+
+Setiap tahap memiliki sejumlah kanal masuk dan keluar, kecuali tahap yang
+pertama dan terakhir, yang mana hanya memiliki kanal keluar atau masuk, secara
+berurutan.
+Tahap yang pertama biasanya disebut dengan _sumber_ atau _produser_;
+tahap yang terakhir biasanya disebut dengan _sink_ atau _konsumer_.
+
+Kita akan memulai dengan sebuah contoh _pipeline_ sederhana untuk menjelaskan
+ide-ide dan teknik-teknik tersebut.
+Nanti, kita akan memperlihatkan contoh yang lebih nyata.
+
+
+== Memangkatkan bilangan
+
+Bayangkan sebuah _pipeline_ dengan tiga tahap.
+
+Tahap pertama, `gen`, yaitu sebuah fungsi yang mengonversi deretan menjadi
+menjadi sebuah kanal.
+Fungsi `gen` menjalankan sebuah goroutine yang mengirim setiap parameter
+integer yang ia terima ke dalam sebuah kanal dan menutup kanal tersebut saat
+semua nilai telah dikirim.
+
+----
+func gen(nums ...int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ }()
+ return out
+}
+----
+
+Tahap kedua, `sq`, menerima sejumlah nilai integer dari sebuah kanal dan
+mengembalikan sebuah kanal yang berisi pangkat dari setiap integer yang ia
+terima.
+Setelah kanal _masuk_ ditutup (yang berarti semua nilai integer telah
+diterima) dan tahap ini telah mengirim semua nilai pangkat ke hilir, ia akan
+menutup kanal _keluar_.
+
+----
+func sq(in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ for n := range in {
+ out <- n * n
+ }
+ close(out)
+ }()
+ return out
+}
+----
+
+Fungsi `main` menyiapkan _pipeline_ dan menjalankan tahap yang terakhir:
+menerima nilai dari tahap kedua dan mencetaknya satu per satu, sampai kanal
+ditutup.
+
+----
+func main() {
+ // Siapkan pipeline.
+ c := gen(2, 3)
+ out := sq(c)
+
+ // Konsumsi kanal keluar.
+ fmt.Println(<-out) // 4
+ fmt.Println(<-out) // 9
+}
+----
+
+Secara `sq` memiliki tipe kanal yang sama untuk yang masuk dan keluar, kita
+dapat menulisnya beberapa kali.
+Kita juga dapat menulis fungsi `main` sebagai pengulangan dengan `range`,
+seperti pada tahap-tahap lainnya:
+
+----
+func main() {
+ // Siapkan pipeline dan konsumsi kanal keluar.
+ for n := range sq(sq(gen(2, 3))) {
+ fmt.Println(n) // 16 kemudian 81
+ }
+}
+----
+
+
+== _Fan-out_, _fan-in_
+
+Beberapa fungsi dapat membaca dari kanal yang sama sampai kanal tersebut
+ditutup;
+hal ini disebut dengan _fan-out_.
+Cara ini membolehkan distribusi kerja antara sekelompok goroutine supaya
+penggunaan CPU dan I/O paralel.
+
+Sebuah fungsi dapat membaca dari beberapa input sampai semua input ditutup
+dengan cara menggabungkan semua kanal input menjadi kanal tunggal yang ditutup
+saat semua input telah ditutup.
+Cara ini disebut dengan _fan-in_.
+
+Kita dapat mengubah _pipeline_ sebelumnya untuk menjalankan dua fungsi `sq`,
+yang membaca dari kanal input yang sama.
+Untuk itu, kita perlu membuat sebuah fungsi baru, `merge`, yang menggabungkan
+(_fan-in_) semua hasil dari `sq`:
+
+----
+func main() {
+ in := gen(2, 3)
+
+ // Distribusi pekerjaan lewat dua goroutine `sq` yang membaca `in` yang
+ // sama.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Konsumsi gabungan keluaran dari c1 dan c2.
+ for n := range merge(c1, c2) {
+ fmt.Println(n) // 4 lalu 9, atau 9 lalu 4
+ }
+}
+----
+
+Fungsi `merge` mengonversi sejumlah kanal menjadi sebuah kanal tunggal dengan
+menjalankan sebuah goroutine untuk setiap kanal input dan menyalin nilainya ke
+sebuah kanal _keluar_ tunggal.
+Saat semua goroutine telah dimulai, fungsi `merge` menjalankan lagi sebuah
+goroutine untuk menutup kanal _keluar_ saat semua pengiriman ke kanal _keluar_
+tersebut selesai.
+
+Pengiriman ke kanal yang telah ditutup akan menyebabkan _panic_, jadi sangat
+penting untuk memastikan semua pengiriman telah selesai sebelum menutup kanal
+_keluar_ tersebut.
+Tipe
+https://golang.org/pkg/sync/#WaitGroup[sync.WaitGroup]
+menyediakan cara sederhana untuk mengatur sinkronisasi seperti ini:
+
+----
+func merge(cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+ // Jalankan goroutine untuk setiap kanal input lewat `cs`.
+ // Fungsi `output` menyalin nilai dari `c` ke `out` sampai `c` ditutup,
+ // terakhir memanggil `wg.Done`.
+ output := func(c <-chan int) {
+ for n := range c {
+ out <- n
+ }
+ wg.Done()
+ }
+ wg.Add(len(cs))
+ for _, c := range cs {
+ go output(c)
+ }
+
+ // Jalankan sebuah goroutine untuk menutup kanal keluar saat semua
+ // goroutine `output` telah selesai.
+ // Goroutine ini harus dimulai setelah pemanggilan `wg.Add`.
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+ return out
+}
+----
+
+
+== Berhenti dengan segera
+
+Ada semacam pola dari fungsi-fungsi _pipeline_ kita:
+
+* Setiap tahap menutup kanal _keluar_ saat semua operasi pengiriman selesai.
+* Setiap tahap terus menerima nilai dari kanal _masuk_ sampai kanal tersebut
+ ditutup.
+
+Pola ini membolehkan setiap tahap yang menerima nilai untuk dibuat sebagai
+pengulangan `range` dan memastikan bahwa semua goroutine selesai saat
+semua nilai telah sukses dikirim ke hilir.
+
+Namun, pada _pipeline_ di dunia nyata, setiap tahap tidak selalu menerima
+semua nilai yang masuk.
+Terkadang memang dirancang seperti itu: si penerima hanya memerlukan sebagian
+dari nilai untuk melanjutkan pemrosesan.
+Sering kali, sebuah tahap selesai lebih awal karena nilai yang masuk
+merepresentasikan sebuah eror.
+Untuk setiap kasus tersebut, si penerima seharusnya tidak menunggu sampai
+semua nilai diterima, dan kita ingin supaya tahap sebelumnya berhenti mengirim
+nilai yang tahap berikutnya tidak butuhkan.
+
+Pada contoh _pipeline_ kita sebelumnya, jika tahap terakhir gagal mengonsumsi
+semua nilai yang masuk, maka goroutine yang mengirim nilai ke tahap terakhir
+akan pampat, misalnya pada contoh kode berikut:
+
+----
+ // Konsumsi hanya nilai pertama dari `output`.
+ out := merge(c1, c2)
+ fmt.Println(<-out) // 4 or 9
+ return
+ // Secara kita tidak mengambil nilai kedua dari `out`, salah satu
+ // goroutine `output` akan pampat saat mencoba mengirim ke kanal.
+}
+----
+
+Hal ini menyebabkan adanya kebocoran sumber daya: goroutine mengonsumsi sumber
+daya memori dan _runtime_, dan referensi _heap_ pada _stack_ goroutine
+menyebabkan data tidak di-_garbage collected_.
+Goroutine tidak di _garbage collected_; mereka harus selesai dengan
+sendirinya.
+
+Untuk itu kita perlu mengatur supaya setiap tahap dari hulu _pipeline_ keluar
+dengan bersih walaupun tahap-tahap di hilir gagal menerima semua nilai yang
+masuk.
+Salah satu cara untuk menyelesaikan masalah ini yaitu dengan mengubah kanal
+_keluar_ supaya memiliki _buffer_.
+Sebuah _buffer_ dapat menyimpan sejumlah nilai;
+operasi pengiriman akan langsung selesai jika ada ruang yang tersedia dalam
+_buffer_:
+
+----
+c := make(chan int, 2) // buffer berukuran 2
+c <- 1 // langsung sukses.
+c <- 2 // langsung sukses.
+c <- 3 // pampat sampai goroutine yang lain melakukan <-c dan menerima 1.
+----
+
+Saat jumlah nilai yang akan dikirim diketahui saat kanal dibuat, maka sebuah
+_buffer_ dapat menyederhanakan kode kita.
+Contohnya, kita dapat menulis ulang fungsi `gen` untuk menyalin semua nilai
+integer ke dalam sebuah kanal dengan _buffer_ dan menghindari pembuatan
+goroutine yang baru:
+
+----
+func gen(nums ...int) <-chan int {
+ out := make(chan int, len(nums))
+ for _, n := range nums {
+ out <- n
+ }
+ close(out)
+ return out
+}
+----
+
+Balik lagi ke goroutine yang pampat dalam _pipeline_ kita, pertimbangkan
+juga untuk menambahkan sebuah _buffer_ ke kanal _keluar_ yang dikembalikan
+oleh fungsi `merge`:
+
+----
+func merge(cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int, 1) // ruang yang cukup untuk input yang belum dibaca.
+ // ... sisa kode lainnya tidak berubah ...
+----
+
+Walaupun hal ini memperbaiki goroutine yang pampat dalam program kita, kode
+ini buruk.
+Pilihan untuk ukuran _buffer_ `out` yaitu 1, karena kita mengetahui
+jumlah nilai yang diterima oleh fungsi `merge` dan jumlah nilai yang setiap
+tahap hilir akan konsumsi.
+Hal ini menyebabkannya rentan dengan kesalahan: jika kita mengirim nilai
+tambahan ke fungsi `gen`, atau jika tahap hilir membaca nilai yang lebih
+sedikit, kita kembali mendapatkan goroutine yang pampat.
+
+Untuk itu, kita membutuhkan suatu cara supaya setiap tahap di hilir
+mengindikasikan ke pengirim bahwa mereka akan berhenti menerima input.
+
+
+== Pembatalan eksplisit
+
+Saat fungsi `main` memutuskan untuk berhenti menerima nilai dari `out`,
+ia harus memberitahu goroutine pada tahap hulu untuk berhenti mengirim nilai.
+Hal ini dapat dilakukan dengan mengirim sebuah nilai pada kanal bernama
+`done`.
+Fungsi `main` mengirim dua nilai ke kanal `done`, secara ada potensi dua
+pengirim yang akan pampat:
+
+----
+func main() {
+ in := gen(2, 3)
+
+ // Distribusi pekerjaan lewat dua goroutine sq yang membaca `in` yang
+ // sama.
+ c1 := sq(in)
+ c2 := sq(in)
+
+ // Konsumsi nilai pertama dari keluaran.
+ done := make(chan struct{}, 2)
+ out := merge(done, c1, c2)
+ fmt.Println(<-out) // 4 atau 9
+
+ // Beritahu pengirim kita telah selesai menerima.
+ done <- struct{}{}
+ done <- struct{}{}
+}
+----
+
+Goroutine yang bertugas melakukan pengiriman mengganti operasi pengiriman
+mereka dengan sebuah perintah `select` yang mengirim sebuah nilai ke `out`
+atau menerima sebuah nilai dari `done`.
+Tipe nilai dari kanal `done` yaitu struct kosong karena nilainya tidak
+diperlukan dalam kasus ini: yang diperlukan adalah kejadian menerima yang
+mengindikasikan pengiriman ke `out` sebaiknya ditinggalkan.
+Goroutine `output` terus membaca pada kanal _masuk_-nya, supaya tahap-tahap
+di hulu tidak pampat.
+(Kita akan bahas nanti bagaimana membuat pengulangan ini segera selesai.)
+
+----
+func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+ // Buat goroutine `output` untuk setiap kanal input dalam `cs`.
+ // Fungsi `output` menyalin nilai dari `c` ke `out` sampai `c` ditutup
+ // atau menerima nilai dari `done`, lalu fungsi ini akan memanggil
+ // `wg.Done`.
+ output := func(c <-chan int) {
+ for n := range c {
+ select {
+ case out <- n:
+ case <-done:
+ }
+ }
+ wg.Done()
+ }
+ // ... sisa kode selanjutnya tidak berubah ...
+----
+
+Pendekatan ini memiliki sebuah masalah: setiap penerima di hilir perlu
+mengetahui jumlah pengirim yang kemungkinan pampat dan mengatur supaya
+mengirim sinyal kepada pengirim tersebut.
+Mencatat semua perhitungan tersebut membutuhkan waktu dan bisa saja salah.
+
+Kita memerlukan sebuah cara untuk memberitahu sejumlah goroutine, yang tidak
+diketahui jumlahnya, untuk berhenti mengirim nilai ke tahap di hilir.
+Pada Go, kita dapat melakukan hal ini dengan menutup kanal, karena
+link:/ref/spec#Receive_operator[operasi menerima pada kanal yang telah ditutup
+akan diproses langsung, menghasilkan nilai kosong dari tipe elemen dari
+kanal].
+
+Hal ini berarti fungsi `main` dapat membersihkan semua pengirim yang pampat
+cukup dengan menutup kanal `done`.
+Penutupan kanal ini secara efektif menyiarkan sinyal ke semua pengirim.
+Kita mengubah setiap fungsi _pipeline_ untuk menerima `done` sebagai
+parameter dan mengatur supaya penutupan kanal terjadi lewat perintah `defer`,
+supaya semua nilai kembalian dari `main` akan mengirim sinyal ke tahap-tahap
+pada _pipeline_ supaya berhenti.
+
+----
+func main() {
+ // Buat kanal `done` yang digunakan oleh semua pipeline, dan tutup kanal
+ // tersebut saat pipeline selesai, sebagai sinyal untuk semua goroutine
+ // yang kita jalankan.
+ done := make(chan struct{})
+ defer close(done)
+
+ in := gen(done, 2, 3)
+
+ // Distribusi pekerjaan lewat dua goroutine sq yang membaca `in` yang
+ // sama.
+ c1 := sq(done, in)
+ c2 := sq(done, in)
+
+ // Konsumsi nilai pertama dari hasil `merge`.
+ out := merge(done, c1, c2)
+ fmt.Println(<-out) // 4 atau 9
+
+ // Kanal `done` akan ditutup oleh pemanggilan `defer`.
+}
+----
+
+Setiap tahap pada _pipeline_ sekarang bebas berhenti saat `done` ditutup.
+Fungsi `merge` dapat selesai tanpa menghabiskan kanal _masuk_, secara ia
+mengetahui bahwa pengirim dari hulu, `sq`, akan berhenti mengirim saat
+`done` ditutup.
+Fungsi `output` memastikan `wg.Done` dipanggil saat selesai lewat perintah
+`defer`:
+
+----
+func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
+ var wg sync.WaitGroup
+ out := make(chan int)
+
+ // Buat sebuah goroutine `output` untuk setiap kanal input dalam `cs`.
+ // Fungsi `output` menyalin nilai dari `c` ke `out` sampai `c` atau `done`
+ // ditutup, kemudian memanggil `wg.Done`.
+ output := func(c <-chan int) {
+ defer wg.Done()
+ for n := range c {
+ select {
+ case out <- n:
+ case <-done:
+ return
+ }
+ }
+ }
+ // ... sisa kode selanjutnya tidak berubah ...
+----
+
+Dengan cara yang sama, fungsi `sq` dapat berhenti saat kanal `done` ditutup.
+Fungsi `sq` memastikan kanal `out` ditutup saat keluar lewat perintah `defer`:
+
+----
+func sq(done <-chan struct{}, in <-chan int) <-chan int {
+ out := make(chan int)
+ go func() {
+ defer close(out)
+ for n := range in {
+ select {
+ case out <- n * n:
+ case <-done:
+ return
+ }
+ }
+ }()
+ return out
+}
+----
+
+Berikut panduan untuk membuat _pipeline_:
+
+* Setiap tahap menutup kanal _keluar_ saat semua operasi pengirim selesai.
+* Setiap tahap menerima nilai dari kanal _masuk_ sampai kanal tersebut ditutup
+ atau pengirim bebas dari pampat.
+
+_Pipeline_ membuka pengiriman yang terhenti baik lewat _buffer_ atau secara
+eksplisit dengan mengirim sinyal ke pengirim saat penerima bisa meninggalkan
+kanal.
+
+
+== Mengurai isi direktori
+
+Mari kita lihat _pipeline_ yang lebih realistis.
+
+MD5 adalah algoritma _message-digest_ yang bisa digunakan untuk _checksum_
+berkas.
+Utilitas perintah `md5sum` mencetak nilai _digest_ dari daftar berkas.
+
+----
+% md5sum *.go
+d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
+ee869afd31f83cbb2d10ee81b2b831dc parallel.go
+b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
+----
+
+Contoh program yang akan kita buat seperti `md5sum` yang menerima sebuah
+direktori sebagai argumen dan mencetak nilai _digest_ untuk setiap berkas di
+dalam direktori tersebut, diurut berdasarkan nama.
+
+----
+% go run serial.go .
+d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
+ee869afd31f83cbb2d10ee81b2b831dc parallel.go
+b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
+----
+
+Fungsi `main` dari program kita memanggil fungsi `MD5All`, yang mengembalikan
+sebuah `map` yang berisi path dan nilai _digest_, kemudian mengurut dan
+mencetak hasilnya:
+
+----
+func main() {
+ // Hitung MD5 dari semua berkas di dalam direktori, kemudian cetak
+ // hasilnya diurut berdasarkan nama.
+ m, err := MD5All(os.Args[1])
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ var paths []string
+ for path := range m {
+ paths = append(paths, path)
+ }
+ sort.Strings(paths)
+ for _, path := range paths {
+ fmt.Printf("%x %s\n", m[path], path)
+ }
+}
+----
+
+Fungsi `MD5All` adalah fokus dari diskusi kita sekarang.
+Dalam berkas
+link:/blog/pipelines/serial.go[`serial.go`],
+implementasinya tidak menggunakan konkurensi dan hanya membaca dan melakukan
+_sum_ dari setiap berkas saat membaca isi direktori.
+
+----
+// MD5All baca semua berkas dalam direktori dan kembalikan sebuah map yang
+// berisi path dan hasil MD5 sum dari isi berkas.
+// Jika pembacaan isi direktori gagal atau ada operasi pembacaan isi berkas
+// yang gagal, MD5All akan mengembalikan error.
+func MD5All(root string) (map[string][md5.Size]byte, error) {
+ m := make(map[string][md5.Size]byte)
+ err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if !info.Mode().IsRegular() {
+ return nil
+ }
+ data, err := ioutil.ReadFile(path)
+ if err != nil {
+ return err
+ }
+ m[path] = md5.Sum(data)
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+----
+
+
+=== Pembacaan secara paralel
+
+Dalam
+link:/blog/pipelines/parallel.go[`parallel.go`],
+kita memecah `MD5All` menjadi _pipeline_ dengan dua tahap.
+Tahap pertama, `sumFiles`, membaca isi direktori, membaca isi berkas dalam
+sebuah goroutine, dan mengirim hasilnya ke sebuah kanal dengan nilai dari tipe
+`result`:
+
+----
+type result struct {
+ path string
+ sum [md5.Size]byte
+ err error
+}
+----
+
+Fungsi `sumFiles` mengembalikan dua buah kanal: satu untuk kembalian dan satu
+lagi untuk eror dari membaca isi direktori dengan `filepath.Walk`.
+Fungsi yang membaca isi direktori memulai sebuah goroutine baru untuk memroses
+setiap berkas, kemudian mencek `done`.
+Jika `done` ditutup, maka pembacaan isi direktori selesai segara:
+
+----
+func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
+ // Untuk setiap berkas, jalankan sebuah goroutine yang menghitung _sum_
+ // dari berkas dan mengirim hasilnya ke `c`.
+ // Mengirim hasil dari pembacaan direktori ke `errc`.
+ c := make(chan result)
+ errc := make(chan error, 1)
+ go func() {
+ var wg sync.WaitGroup
+ err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if !info.Mode().IsRegular() {
+ return nil
+ }
+ wg.Add(1)
+ go func() {
+ data, err := ioutil.ReadFile(path)
+ select {
+ case c <- result{path, md5.Sum(data), err}:
+ case <-done:
+ }
+ wg.Done()
+ }()
+ // Batalkan pembacaan direktori jika `done` ditutup.
+ select {
+ case <-done:
+ return errors.New("walk canceled")
+ default:
+ return nil
+ }
+ })
+ // Pembacaan direktori telah selesai, sehingga semua pemanggilan
+ // wg.Add telah dilakukan.
+ // Jalankan sebuah goroutine untuk menutup `c` saat semua pengiriman
+ // telah selesai.
+ go func() {
+ wg.Wait()
+ close(c)
+ }()
+ // Perintah `select` tidak perlu di sini, secara `errc` menggunakan
+ // _buffer_.
+ errc <- err
+ }()
+ return c, errc
+}
+----
+
+Fungsi `MD5All` menerima nilai _digest_ dari `c`.
+`MD5All` segera selesai bila ada eror, menutup `done` lewat `defer`:
+
+----
+func MD5All(root string) (map[string][md5.Size]byte, error) {
+ // MD5All menutup kanal `done` saat selesai; ia bisa menutupnya sebelum
+ // menerima semua nilai dari `c` dan `errc`.
+ done := make(chan struct{})
+ defer close(done)
+
+ c, errc := sumFiles(done, root)
+
+ m := make(map[string][md5.Size]byte)
+ for r := range c {
+ if r.err != nil {
+ return nil, r.err
+ }
+ m[r.path] = r.sum
+ }
+ if err := <-errc; err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+----
+
+
+=== Membatasi paralelisme
+
+Implementasi `MD5All` dalam
+link:/blog/pipelines/parallel.go[parallel.go]
+menjalankan sebuah goroutine baru untuk setiap berkas.
+Dalam sebuah direktori dengan banyak berkas, hal ini bisa mengakibatkan
+alokasi memori yang lebih banyak daripada memori pada sistem.
+
+Kita dapat mengurangi alokasi ini dengan membatasi jumlah berkas yang dibaca
+secara paralel.
+Dalam
+link:/blog/pipelines/bounded.go[`bounded.go`],
+hal ini dilakukan dengan membuat sejumlah `n` goroutine untuk membaca berkas.
+Sekarang _pipeline_ kita memiliki tiga tahap: baca isi direktori, baca dan
+_digest_ berkas, dan kumpulkan hasil _digest_.
+
+Tahap pertama, `walkFiles`, menghasilkan path dari berkas di dalam direktori:
+
+----
+func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
+ paths := make(chan string)
+ errc := make(chan error, 1)
+ go func() {
+ // Tutup kanal `paths` setelah semua isi direktori dibaca.
+ defer close(paths)
+ // Tidak perlus `select`, secara `errc` memiliki buffer.
+ errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if !info.Mode().IsRegular() {
+ return nil
+ }
+ select {
+ case paths <- path:
+ case <-done:
+ return errors.New("walk canceled")
+ }
+ return nil
+ })
+ }()
+ return paths, errc
+}
+----
+
+Tahap kedua, `digester`, menjalankan sejumlah goroutine _digest_ yang menerima
+nama berkas dari kanal `paths` dan mengirim hasilnya ke kanal `c`:
+
+----
+func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
+ for path := range paths {
+ data, err := ioutil.ReadFile(path)
+ select {
+ case c <- result{path, md5.Sum(data), err}:
+ case <-done:
+ return
+ }
+ }
+}
+----
+
+Tidak seperti contoh sebelumnya, fungsi `digester` tidak menutup kanal
+_keluar_, karena beberapa goroutine mengirim ke sebuah kanal yang sama.
+Namun, kode dalam `MD5All` akan menutup kanal tersebut saat semua `digester`
+selesai:
+
+----
+ // Jalankan sejumlah goroutine untuk membaca dan men-_digest_ berkas.
+ c := make(chan result)
+ var wg sync.WaitGroup
+ const numDigesters = 20
+ wg.Add(numDigesters)
+ for i := 0; i < numDigesters; i++ {
+ go func() {
+ digester(done, paths, c)
+ wg.Done()
+ }()
+ }
+ go func() {
+ wg.Wait()
+ close(c)
+ }()
+----
+
+Kita bisa saja membuat setiap fungsi `digester` membuat dan mengembalikan
+kanal _keluar_ mereka sendiri, namun hal ini membutuhkan goroutine tambahan
+untuk _fan-in_ (menggabungkan) semua hasilnya.
+
+Tahap terakhir menerima semua hasil dari `c` kemudian memeriksa eror dari
+`errc`.
+Pemeriksaan ini tidak bisa dilakukan lebih awal, secara `walkFiles` bisa saja
+menahan pengiriman nilai ke hilir:
+
+----
+ m := make(map[string][md5.Size]byte)
+ for r := range c {
+ if r.err != nil {
+ return nil, r.err
+ }
+ m[r.path] = r.sum
+ }
+ // Periksa apakah `Walk` gagal.
+ if err := <-errc; err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+----
+
+
+== Kesimpulan
+
+Artikel ini telah menjelaskan beberapa teknik untuk membangun aliran data
+_pipeline_ dengan Go.
+Berurusan dengan kegagalan pada _pipeline_ sedikit kompleks, secara setiap
+tahap dalam _pipeline_ bisa menahan mengirim nilai ke hilir, dan tahap
+selanjutnya bisa saja tidak memerlukan lagi data yang masuk.
+Kita telah memperlihatkan bagaimana menutup sebuah kanal dapat menyiarkan
+sebuah sinyal yang menandakan selesai ("done") ke semua goroutine yang
+dijalankan oleh _pipeline_ dan mendefinisikan aturan-aturan untuk membangun
+_pipeline_ secara benar.
+
+Bacaan lebih lanjut:
+
+* https://talks.golang.org/2012/concurrency.slide#1[Pola konkurensi Go^]
+ (https://www.youtube.com/watch?v=f6kdp27TYZs[video^]) mempresentasikan dasar
+ dari konkurensi Go dan cara pakainya.
+* https://blog.golang.org/advanced-go-concurrency-patterns[Pola konkurensi Go lanjut^]
+ (http://www.youtube.com/watch?v=QDDwwePbDtw[video^]) menelaah penggunaan
+ yang lebih kompleks dari konkurensi pada Go, terutama `select`.
+* Makalah Douglas McIlroy
+ https://swtch.com/~rsc/thread/squint.pdf["Squinting at Power Series"]
+ memperlihatkan bagaimana konkurensi seperti Go menyediakan dukungan yang
+ elegan untuk perhitungan yang kompleks.
diff --git a/_content/blog/pipelines/parallel.go b/_content/blog/pipelines/parallel.go
new file mode 100644
index 0000000..5ca3e76
--- /dev/null
+++ b/_content/blog/pipelines/parallel.go
@@ -0,0 +1,111 @@
+// +build OMIT
+
+package main
+
+import (
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sort"
+ "sync"
+)
+
+// A result is the product of reading and summing a file using MD5.
+type result struct {
+ path string
+ sum [md5.Size]byte
+ err error
+}
+
+// sumFiles starts goroutines to walk the directory tree at root and digest each
+// regular file. These goroutines send the results of the digests on the result
+// channel and send the result of the walk on the error channel. If done is
+// closed, sumFiles abandons its work.
+func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
+ // For each regular file, start a goroutine that sums the file and sends
+ // the result on c. Send the result of the walk on errc.
+ c := make(chan result)
+ errc := make(chan error, 1)
+ go func() { // HL
+ var wg sync.WaitGroup
+ err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if !info.Mode().IsRegular() {
+ return nil
+ }
+ wg.Add(1)
+ go func() { // HL
+ data, err := ioutil.ReadFile(path)
+ select {
+ case c <- result{path, md5.Sum(data), err}: // HL
+ case <-done: // HL
+ }
+ wg.Done()
+ }()
+ // Abort the walk if done is closed.
+ select {
+ case <-done: // HL
+ return errors.New("walk canceled")
+ default:
+ return nil
+ }
+ })
+ // Walk has returned, so all calls to wg.Add are done. Start a
+ // goroutine to close c once all the sends are done.
+ go func() { // HL
+ wg.Wait()
+ close(c) // HL
+ }()
+ // No select needed here, since errc is buffered.
+ errc <- err // HL
+ }()
+ return c, errc
+}
+
+// MD5All reads all the files in the file tree rooted at root and returns a map
+// from file path to the MD5 sum of the file's contents. If the directory walk
+// fails or any read operation fails, MD5All returns an error. In that case,
+// MD5All does not wait for inflight read operations to complete.
+func MD5All(root string) (map[string][md5.Size]byte, error) {
+ // MD5All closes the done channel when it returns; it may do so before
+ // receiving all the values from c and errc.
+ done := make(chan struct{}) // HLdone
+ defer close(done) // HLdone
+
+ c, errc := sumFiles(done, root) // HLdone
+
+ m := make(map[string][md5.Size]byte)
+ for r := range c { // HLrange
+ if r.err != nil {
+ return nil, r.err
+ }
+ m[r.path] = r.sum
+ }
+ if err := <-errc; err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func main() {
+ // Calculate the MD5 sum of all files under the specified directory,
+ // then print the results sorted by path name.
+ m, err := MD5All(os.Args[1])
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ var paths []string
+ for path := range m {
+ paths = append(paths, path)
+ }
+ sort.Strings(paths)
+ for _, path := range paths {
+ fmt.Printf("%x %s\n", m[path], path)
+ }
+}
diff --git a/_content/blog/pipelines/serial.go b/_content/blog/pipelines/serial.go
new file mode 100644
index 0000000..024ef47
--- /dev/null
+++ b/_content/blog/pipelines/serial.go
@@ -0,0 +1,55 @@
+// +build OMIT
+
+package main
+
+import (
+ "crypto/md5"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sort"
+)
+
+// MD5All reads all the files in the file tree rooted at root and returns a map
+// from file path to the MD5 sum of the file's contents. If the directory walk
+// fails or any read operation fails, MD5All returns an error.
+func MD5All(root string) (map[string][md5.Size]byte, error) {
+ m := make(map[string][md5.Size]byte)
+ err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
+ if err != nil {
+ return err
+ }
+ if !info.Mode().IsRegular() {
+ return nil
+ }
+ data, err := ioutil.ReadFile(path) // HL
+ if err != nil {
+ return err
+ }
+ m[path] = md5.Sum(data) // HL
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func main() {
+ // Calculate the MD5 sum of all files under the specified directory,
+ // then print the results sorted by path name.
+ m, err := MD5All(os.Args[1]) // HL
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ var paths []string
+ for path := range m {
+ paths = append(paths, path)
+ }
+ sort.Strings(paths) // HL
+ for _, path := range paths {
+ fmt.Printf("%x %s\n", m[path], path)
+ }
+}