0. Trie树

续上,在继续看nats如何处理命令之前,先看一下如何处理订阅之间的关联。

在继续之前需要了解trie树,字典树也算是很经典的一种数据结构了,大体结构如图:

trie

其中最左侧节点连续下来就代表了aaaa,更多细节百度之,在nats里存储topic的结构就是一颗trie树。

具体的协议我们只查看sub和pub两个命令。

1. Sublist

nats里的sublist结构如下:

trie

type node struct {
	next  *level
	psubs map[*subscription]*subscription
	qsubs map[string](map[*subscription]*subscription)
	plist []*subscription
}

type level struct {
	nodes    map[string]*node
	pwc, fwc *node
}

分隔符是基于.,例如最左侧的就代表了foo.bar topic。

由于nats是支持publish和queue publish的,所以每个节点会同时存储psubs和qsubs:

其中每个subscription存储了subject和对应的*client:

type subscription struct {
	client  *client
	subject []byte
}

所以只要找到了对应的topic需要发送出去的client就都可以找到了。

这里比较关键的操作是Insert,Remove和Match。

在创建server的时候,就会创建一个新的sublist:

func New(opts *Options) *Server {
  ...
	s := &Server{
		configFile: opts.ConfigFile,
		info:       info,
		prand:      rand.New(rand.NewSource(time.Now().UnixNano())),
		sl:         NewSublist(),// 新建sublist
		opts:       opts,
		done:       make(chan bool, 1),
		start:      now,
		configTime: now,
	}

}

2. processSub

processSub相对来说比较简单,只要插入到我们上面的sublist中即可:

func (c *client) processSub(argo []byte) (err error) {
	sub := &subscription{client: c}
	switch len(args) { // 判断是不是queue
	case 2:
		sub.subject = args[0]
		sub.queue = nil
		sub.sid = args[1]
	case 3:
		sub.subject = args[0]
		sub.queue = args[1]
		sub.sid = args[2]
	default:
		return fmt.Errorf("processSub Parse Error: '%s'", arg)
	}

	sid := string(sub.sid)
	if c.subs[sid] == nil { // 在client里维护自己的subscription,当断开连接的时候需要在sublist里删除
		c.subs[sid] = sub
		if c.srv != nil {
			err = c.srv.sl.Insert(sub) // 插入到sublist当中
			if err != nil {
				delete(c.subs, sid)
			} else {
				shouldForward = c.typ != ROUTER
			}
		}
	}

}

3. processPub

在parser里面,处理完pub之后状态机并没有停止,还会继续处理payload部分,也就是说一个pub message会触发processPub和processMsg两个操作:

func (c *client) processPub(arg []byte) error {
  ...
  // 主要是写pubArg,在processMsg的时候会用到。
	switch len(args) {
	case 2:
		c.pa.subject = args[0]
		c.pa.reply = nil
		c.pa.size = parseSize(args[1])
		c.pa.szb = args[1]
	case 3:
		c.pa.subject = args[0]
		c.pa.reply = args[1]
		c.pa.size = parseSize(args[2])
		c.pa.szb = args[2]
	default:
		return fmt.Errorf("processPub Parse Error: '%s'", arg)
	}
  ...
}

4. processMsg

写消息会追加client的outbound的buff里:

// processMsg is called to process an inbound msg from a client.
func (c *client) processMsg(msg []byte) {
  ...
	var r *SublistResult
  // 找到所有订阅者
	subject := string(c.pa.subject)
	r = srv.sl.Match(subject)
	c.in.results[subject] = r
	fanout := len(r.psubs) + len(r.qsubs)
	for _, sub := range r.psubs {
    // 发送消息
		mh := c.msgHeader(msgh[:si], sub)
		c.deliverMsg(sub, mh, msg)
	}
  ...
}

func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
	client := sub.client
	client.mu.Lock()
	srv := client.srv
	client.outMsgs++
	client.outBytes += msgSize

	// 加入message head和msg到out buf
	client.queueOutbound(mh)
	client.queueOutbound(msg)

}

func (c *client) queueOutbound(data []byte) {
    ...
		c.out.p = append(c.out.p, data...)
    ...
}