funcexs(accept <-chanint, recipient chan<- int) { for result := range accept { fmt.Println("Received only sent channel a:", result) recipient <- result + 2 }
//fmt.Println("Send Only", recipient) }
funcmain() { startTime := time.Now() ch := make(chanint, 10) for i := 0; i < 100; i++ { gofunc(ch <-chanint) { time.Sleep(time.Second * 5) fmt.Println(<-ch) }(ch) ch <- i }
func main() { combust := wash(10) rice := combustion(combust) packs := open(rice) //输出测试,看看效果 for p := range packs { fmt.Println(p) } }
func wash(n int) <-chan string { out := make(chan string) go func() { defer close(out) for i := 1; i <= n; i++ { out <- fmt.Sprint("洗米", i) } }() return out } func combustion(in <-chan string) <-chan string { out := make(chan string) go func() { defer close(out) for c := range in { out <- "烧饭(" + c + ")" } }() return out }
func open(in <-chan string) <-chan string { out := make(chan string) go func() { defer close(out) for c := range in { out <- "开锅(" + c + ")" } }() return out }
funcwash(n int) <-chanstring { out := make(chanstring) gofunc() { deferclose(out) for i := 1; i <= n; i++ { out <- fmt.Sprint("洗米", i) } }() return out } funccombustion(in <-chanstring) <-chanstring { out := make(chanstring) gofunc() { deferclose(out) time.Sleep(2) for c := range in { out <- "烧饭(" + c + ")" } }() return out }
funcopen(in <-chanstring) <-chanstring { out := make(chanstring) gofunc() { deferclose(out) for c := range in { out <- "开锅(" + c + ")" } }() return out }
funcmerge(ins ...<-chanstring) <-chanstring { var wg sync.WaitGroup out := make(chanstring) //把一个channel中的数据发送到out中 p := func(in <-chanstring) { defer wg.Done() for c := range in { out <- c } } wg.Add(len(ins)) //扇入,需要启动多个goroutine用于处于多个channel中的数据 for _, cs := range ins { go p(cs) } //等待所有输入的数据ins处理完,再关闭输出out gofunc() { wg.Wait() close(out) }() return out }