使用Golang的Channel实现PubSub
利用Go管道实现订阅
目录
使用Golang的Channel实现PubSub
核心的订阅代码借鉴于Docker “github.com/docker/docker/pkg/pubsub”
先上代码
import (
"sync"
"time"
)
var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }}
// NewPublisher creates a new pub/sub publisher to broadcast messages.
// The duration is used as the send timeout as to not block the publisher publishing
// messages to other clients if one client is slow or unresponsive.
// The buffer is used when creating new channels for subscribers.
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
type subscriber chan interface{}
type topicFunc func(v interface{}) bool
// Publisher is basic pub/sub structure. Allows to send events and subscribe
// to them. Can be safely used from multiple goroutines.
type Publisher struct {
m sync.RWMutex
buffer int
timeout time.Duration
subscribers map[subscriber]topicFunc
}
// Len returns the number of subscribers for the publisher
func (p *Publisher) Len() int {
p.m.RLock()
i := len(p.subscribers)
p.m.RUnlock()
return i
}
// Subscribe adds a new subscriber to the publisher returning the channel.
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// SubscribeTopic adds a new subscriber that filters messages sent by a topic.
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// SubscribeTopicWithBuffer adds a new subscriber that filters messages sent by a topic.
// The returned channel has a buffer of the specified size.
func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} {
ch := make(chan interface{}, buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// Evict removes the specified subscriber from receiving any more messages.
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
_, exists := p.subscribers[sub]
if exists {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
// Publish sends the data in v to all subscribers currently registered with the publisher.
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
if len(p.subscribers) == 0 {
p.m.RUnlock()
return
}
wg := wgPool.Get().(*sync.WaitGroup)
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, wg)
}
wg.Wait()
wgPool.Put(wg)
p.m.RUnlock()
}
// Close closes the channels to all subscribers registered with the publisher.
func (p *Publisher) Close() {
p.m.Lock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
// send under a select as to not block if the receiver is unavailable
if p.timeout > 0 {
timeout := time.NewTimer(p.timeout)
defer timeout.Stop()
select {
case sub <- v:
case <-timeout.C:
}
return
}
select {
case sub <- v:
default:
}
}
Server代码
import (
"fmt"
"sync"
"time"
)
type Server struct {
Bucket map[string]*Publisher
m sync.RWMutex
}
func NewServer() *Server {
return &Server{
Bucket: make(map[string]*Publisher),
}
}
// 订阅主题
func (s *Server) Sub(subName string) {
p := s.newPublisher(subName)
c := p.SubscribeTopic(func(v interface{}) bool {
// 判断是不是字符串
if _, ok := v.(string); ok {
return true
}
return false
})
// 为了实现阻塞 需要先发送一个ping来激活
p.Publish("ping")
select {
case <-c:
for v := range c {
// 推送的内容
fmt.Println(v)
}
}
}
// 推送内容
func (s *Server) Pub(subName string, v interface{}) {
p := s.newPublisher(subName)
p.Publish(v)
}
// 关闭订阅
func (s *Server) Close(subName string) {
p := s.newPublisher(subName)
p.Close()
}
// 关闭订阅
func (s *Server) UnSub(subName string) {
// TODO 关闭订阅
// 这里可以自己实现相关关闭订阅的业务代码
}
// 获取一个Publisher
func (s *Server) newPublisher(subName string) *Publisher {
s.m.RLock()
defer s.m.RUnlock()
p := s.Bucket[subName]
if p == nil {
p = NewPublisher(100*time.Millisecond, 10)
s.Bucket[subName] = p
}
return p
}
测试代码
func TestRunPubSub(t *testing.T) {
s := NewPublisherServer()
fmt.Println("server running...")
// 多个订阅者
go s.Sub("hello")
go s.Sub("golang")
go s.Sub("php")
time.Sleep(time.Millisecond * 10)
go func() {
i := 0
for {
i++
s.Pub("hello", fmt.Sprintf("hello channel %d", i))
if i > 10 {
break
}
}
}()
go s.Pub("golang", "golang 123")
go s.Pub("php", "php 123")
time.Sleep(time.Millisecond * 100)
s.Close("hello")
s.Close("golang")
s.Close("php")
}
测试结果
=== RUN TestRunPubSub
server running...
golang 123
hello channel 1
hello channel 2
hello channel 3
hello channel 4
hello channel 5
hello channel 6
hello channel 7
hello channel 8
hello channel 9
hello channel 10
hello channel 11
php 123
--- PASS: TestRunPubSub (0.13s)
PASS
进程 已完成,退出代码为 0
测试代码2 启动一个TCP监听9898端口 使用终端命令telnet链接测试
func TestTCPPubSub(t *testing.T) {
listen, err := net.Listen("tcp", "0.0.0.0:9898")
if err != nil {
panic(err)
}
fmt.Println("server running...")
p := GetPublisherInstance()
for {
conn, err := listen.Accept()
if err != nil {
continue
}
go handleRequest(conn, p)
}
}
var Conns = make(map[string]net.Conn)
func handleRequest(conn net.Conn, p *Publisher) {
for {
bytes, _, _ := bufio.NewReader(conn).ReadLine()
fmt.Println(fmt.Sprintf("request string: [%s]", string(bytes)))
content := strings.Split(string(bytes), " ")
if content[0] == "subscribe" {
topic := content[1]
addr := conn.RemoteAddr().String()
Conns[addr+topic] = conn
go func() {
c := p.SubscribeTopic(func(v interface{}) bool {
if s, ok := v.(string); ok && strings.Contains(s, topic) {
return true
}
return false
})
for v := range c {
for k, conn2 := range Conns {
if k == addr+topic {
cant, err := conn2.Write([]byte(fmt.Sprintf("%s topic: %v", topic, v)))
if err != nil {
return
}
fmt.Println("write cant ", cant)
}
}
}
}()
} else if content[0] == "publish" {
topic := content[1]
go p.Publish(topic + content[2] + "\n")
} else if content[1] == "quit" {
topic := content[1]
c := p.SubscribeTopic(func(v interface{}) bool {
if s, ok := v.(string); ok && strings.Contains(s, topic) {
return true
}
return false
})
p.Evict(c)
break
} else {
fmt.Println("common chat " + string(bytes))
break
}
}
}
测试结果
订阅hello
$ telnet 127.0.0.1 9898
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
subscribe hello
hello topic: hello123
hello topic: hello123
hello topic: hello123
订阅golang
telnet 127.0.0.1 9898
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
subscribe golang
golang topic: golang👍123
golang topic: golang👍123
golang topic: golang👍123
向hello发送123
telnet 127.0.0.1 9898
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
publish hello 123
publish hello 123
publish hello 123
向golang发送👍123
publish golang 👍123
publish golang 👍123
publish golang 👍123
核心原理
// 订阅
type subscriber chan interface{}
// 主题函数
type topicFunc func(v interface{}) bool
type Publisher struct {
m sync.RWMutex // 读写锁
buffer int // 缓冲区
timeout time.Duration // 超时时间
subscribers map[subscriber]topicFunc // 主题
}
Github地址