Featured image of post Go Async Channel

Go Async Channel

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
package main

import (
	"fmt"
	"time"
)

// AsyncWait executes the given function f in a separate goroutine and returns
// a channel that will be closed when the function completes. This allows
// callers to wait for the completion of an asynchronous operation.
//
// The returned channel is receive-only and will be closed (not sent to) when
// the function f finishes executing, regardless of whether f panics or
// completes normally.
//
// Example usage:
//
//	done := AsyncWait(func() {
//	    // some long-running operation
//	    time.Sleep(time.Second)
//	})
//	<-done // wait for completion
func AsyncWait(f func()) <-chan struct{} {
	ch := make(chan struct{})
	go func() {
		f()
		close(ch)
	}()
	return ch
}

// AsyncProduce runs the producer function f in a separate goroutine and sends
// the produced values to the provided channel ch. The function f should return
// a value and a boolean indicating whether to continue producing.
//
// The producer function f will be called repeatedly until it returns false as
// the second return value. Each value returned (when ok is true) will be sent
// to the channel ch. When f returns false, the loop terminates and ch is closed.
//
// Parameters:
//
//	ch - the channel to send produced values to (must be created by caller)
//	f  - producer function that returns (value, shouldContinue)
//
// The returned channel is the same as the input channel ch, provided for
// convenience and method chaining.
//
// Example usage:
//
//	output := AsyncProduce(make(chan int, 10), func() (int, bool) {
//	    // produce some value
//	    val := generateValue()
//	    return val, val != -1 // continue until -1 is generated
//	})
//
//	for value := range output {
//	    fmt.Println(value)
//	}
func AsyncProduce[T any](ch chan T, f func() (T, bool)) <-chan T {
	go func() {
		for {
			v, ok := f()
			if !ok {
				break
			}
			ch <- v
		}
		close(ch)
	}()
	return ch
}

func main() {
	// 示例 1: 使用 AsyncWait
	done := AsyncWait(func() {
		fmt.Println("开始任务...")
		time.Sleep(3 * time.Second)
		fmt.Println("任务完成!")
	})

	// 示例 2: 使用 AsyncProduce
	counter := 0
	producer := func() (int, bool) {
		if counter >= 5 {
			return 0, false
		}
		val := counter
		counter++
		time.Sleep(500 * time.Millisecond)
		return val, true
	}

	for val := range AsyncProduce(make(chan int), producer) {
		fmt.Printf("收到数据: %d\n", val)
	}
	fmt.Println("所有数据接收完成")

	<-done // 等待示例 1 任务完成
	fmt.Println("收到通知:任务已经完成")
}
Licensed under CC BY-NC-SA 4.0