Featured image of post Go1.22 ServeMux SSE Cond

Go1.22 ServeMux SSE Cond

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
package main

import (
	"net/http"
	"strconv"
	"sync"
	"time"
)

var (
	event = newBroadcastEvent("counting")
)

func main() {
	mux := http.NewServeMux()
	mux.HandleFunc("GET /sse/{$}", sseHandler)
	mux.HandleFunc("GET /sse/{startAt}/{endAt}", sseHandler)
	mux.HandleFunc("GET /event/{$}", syncSSEHandler)

	go event.StartEvent()

	err := http.ListenAndServe(":8080", mux)
	if err != nil {
		return
	}
}

func sseHandler(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")

	startAt := r.PathValue("startAt")
	endAt := r.PathValue("endAt")
	startNum := 0
	endNum := 10
	if startAt != "" {
		startNum, _ = strconv.Atoi(startAt)
	}
	if endAt != "" {
		endNum, _ = strconv.Atoi(endAt)
	}

	// 写入数据
	for i := startNum; i < endNum; i++ {
		_, err := w.Write([]byte(strconv.Itoa(i) + " "))
		if err != nil {
			return
		}
		w.(http.Flusher).Flush()
		time.Sleep(100 * time.Millisecond)
	}
}

func syncSSEHandler(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")

	// 写入数据
	for {
		_, err := w.Write([]byte(event.String()))
		if err != nil {
			return
		}
		w.(http.Flusher).Flush()
	}
}

// broadcastEvent begin

type broadcastEvent struct {
	cond  *sync.Cond
	event string
	data  string
}

func newBroadcastEvent(event string) *broadcastEvent {
	return &broadcastEvent{
		cond:  sync.NewCond(&sync.Mutex{}),
		event: event,
	}
}

func (b *broadcastEvent) StartEvent() {
	i := 0
	for {
		b.cond.L.Lock()
		b.data = strconv.Itoa(i)
		i++
		b.cond.L.Unlock()
		b.cond.Broadcast()
		time.Sleep(time.Second)
	}
}

func (b *broadcastEvent) String() string {
	b.cond.L.Lock()
	defer b.cond.L.Unlock()
	b.cond.Wait()
	return "event: " + b.event + "\tdata: " + b.data + "\n"
}

// broadcastEvent end
Licensed under CC BY-NC-SA 4.0