0. Intro

NATS是一款基于Go语言的MQ,主要卖点是high thoughout,秒千万级别的吞吐量。当然没有银弹,高吞吐的一个原因就在于功能其实非常简单,可以说只实现了MQ的最基本功能,所以其自我描述更喜欢叫messaging system,主要缺失的功能包括持久化等。

之所以选择读NATS的源码而不是rabbitMQ等,主要原因也是因为他简单但是又足够,如果想要持久化功能还可以参考NATS Streaming,除去依赖和测试代码NATS只有不足8000行代码。

1. Codes

git-commit: 899b0849344f6b8386c8da40f4cbbd0f25f74396

LOC:

-------------------------------------------------------------------------------
Language                     files          blank        comment           code
-------------------------------------------------------------------------------
Go                              21           1157           1697           7936
-------------------------------------------------------------------------------
SUM:                            21           1157           1697           7936
-------------------------------------------------------------------------------

Tree:

├── auth.go 权限管理相关
├── ciphersuites.go 加密协议
├── client.go 连接处理
├── const.go 常量
├── errors.go 错误
├── log.go 日志接口
├── monitor.go 提供http查看服务器信息
├── monitor_sort_opts.go 同上
├── nkey.go 依赖,未深入,看起来像是自己写了一个加密解密的算法
├── opts.go 配置
├── parser.go 协议parser
├── reload.go reload服务器
├── ring.go monitor相关
├── route.go 集群相关
├── server.go server
├── signal.go 信号处理
├── sublist.go topic存储
└── util.go

Trim:

去掉一些内容

  • /conf:为了解析config自己写了个lex&parser。
  • /logger:其实用的logger也是自己写的,根目录这个logger只是另一层wrapper
  • /util:用来生成password的一个小工具
  • /server/pse、signal_windows.go、service_windows,service 这几个文件都是为了支持不同系统而存在的。
  • auth和monitor相关代码,前者是一个很简单的结构体和map权限管理,后者是提供http来查看服务器状态的一个http server,所以都略过。

3. 启动服务器

启动服务器的流程:处理options->启动monitor->判断是否加入集群->开启端口监听

处理options

处理options其实没太多好讲,主要就是从flags,config file和默认值里生成一个Options,比较关键的属性:

type Options struct {
	Host             string        `json:"addr"` // 监听地址
	Port             int           `json:"port"` // 监听端口
	MaxConn          int           `json:"max_connections"` // 最大连接数
	MaxSubs          int           `json:"max_subscriptions,omitempty"`// 最大订阅数
	MaxPayload       int           `json:"max_payload"` // 单条消息最大payload
	PidFile          string        `json:"-"` // 指定pid文件
	LogFile          string        `json:"-"` // 日志文件
}

启动monitor略,集群后面会单独写。

开启监听

从main开始最终会走到s.AcceptLoop(clientListenReady)

func (s *Server) AcceptLoop(clr chan struct{}) {
  ...
	hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
	l, e := net.Listen("tcp", hp)
	s.listener = l // 监听TCP
  ...
	for s.isRunning() {
		conn, err := l.Accept()
    ...
		s.startGoRoutine(func() {
			s.createClient(conn) // 每当有一个新的connect,就创建一个新的client。
			s.grWG.Done()
		})
	}
}

startGoRoutine是一个用来启动goroutine的方法,有一个work_group用来track全部启动的goroutine:

func (s *Server) startGoRoutine(f func()) {
	s.grMu.Lock()
	if s.grRunning {
		s.grWG.Add(1)
		go f()
	}
	s.grMu.Unlock()
}

4. Create Client

createClient创建了一个新的client struct同时开启两个goroutine分别监听conn的read和write:

func (s *Server) createClient(conn net.Conn) *client {
  ...
  // 创建client
	c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: max_pay, msubs: max_subs, start: now, last: now}
  // server追踪client
	s.totalClients++
	s.clients[c.cid] = c
	// ping-pong保持连接
	c.setPingTimer()
	// 监听读
	s.startGoRoutine(c.readLoop)
	// 监听写
	s.startGoRoutine(c.writeLoop)

}