1. WriteLoop

写循环相对来说比较简单,只要达到触发条件就flush一次:

func (c *client) writeLoop() {
  ...
	for {
		c.mu.Lock()
		if waitOk && (c.out.pb == 0 || c.out.fsp > 0) && len(c.out.nb) == 0 && !c.flags.isSet(clearConnection) {
			c.out.sg.Wait()
		}
		// Flush data
		waitOk = c.flushOutbound()
    ...
	}
}

这里主要检测的是cliet里的out对象,out就是写缓存

2. ReadLoop

readLoop会更复杂一些,里面的工作主要包含:读数据->处理数据->调整缓存,主要跟读取相关的数据在client的in里面:

func (c *client) readLoop() {

	b := make([]byte, c.in.rsz)
  ...
	for {
		n, err := nc.Read(b) // 读数据
		if err := c.parse(b[:n]); err != nil { // parse 数据
      ...
		}

  ...
	}
}

这里nats还实现了一个动态调整读取缓存的机制:

c.in.rsz = startBufSize // 设置为初始的bufsize
...
if n >= cap(b) { // 如果读的数据超过bufsize 清空srs
	c.in.srs = 0
} else if n < cap(b)/2 { // 读的数据小于bufsize的一般 +1
	c.in.srs++
}
// 如果读的数据大于当前缓冲区,并且当前缓冲大小小于最大,放大一倍
if n >= cap(b) && cap(b) < maxBufSize { //
	c.in.rsz = cap(b) * 2
	b = make([]byte, c.in.rsz)
// 如果读的数据连续三次小于当前缓冲大小并且没有达到最小缓存,缩小一倍
} else if n < cap(b) && cap(b) > minBufSize && c.in.srs > shortsToShrink {
	c.in.rsz = cap(b) / 2
	b = make([]byte, c.in.rsz)
}

3. Parser

nats协议跟redis的格式很像,都是基于文本协议的而且命令很少:

INFO	Server	Sent to client after initial TCP/IP connection
CONNECT	Client	Sent to server to specify connection information
PUB	Client	Publish a message to a subject, with optional reply subject
SUB	Client	Subscribe to a subject (or subject wildcard)
UNSUB	Client	Unsubscribe (or auto-unsubscribe) from subject
MSG	Server	Delivers a message payload to a subscriber
PING	Both	PING keep-alive message
PONG	Both	PONG keep-alive response
+OK	Server	Acknowledges well-formed protocol message in verbose mode
-ERR	Server	Indicates a protocol error. May cause client disconnect.

以sub为例:SUB FOO 1\r\n.

上面读取到[]byte之后,就进入了parser.go处理接收到的信息,parser只用了一个状态机(conf用lex&parser,协议本体用状态机,hmmmm…)实现,关于这个状态机其实也不复杂,以sub为例:

...
case SUB_ARG:
  switch b {
  case '\r':
    c.drop = 1
  case '\n':
    var arg []byte
    if c.argBuf != nil {
      arg = c.argBuf
      c.argBuf = nil
    } else {
      arg = buf[c.as : i-c.drop]
    }
    if err := c.processSub(arg); err != nil {
      return err
    }
    c.drop, c.as, c.state = 0, i+1, OP_START
  default:
    if c.argBuf != nil {
      c.argBuf = append(c.argBuf, b)
    }
  }
  ...

最终会走到sub_arg然后调用c.processSub(arg)。