0. Intro

nats的集群模型其实很简单,因为nats本身并不支持数据存储,所以不涉及到数据一致性问题,集群只是做了个简单的route功能。几点基础知识:

  • 所有的node之间互相链接
  • 新加入的node连接到一个node之后就会跟其他的node建立新连接
  • 收到client的消息之后会转发给其他node一份
  • 从其他node收到的信息只发给client不会再次转发

所以按照如下顺序简单介绍一下:

  • 启动一个支持集群的node
  • 启动一个新node并加入之前node
  • 处理新node加入的信息
  • 转发client信息到其他node
  • 接收到其他的node的转发信息

1. 启动server

 gnatsd -p 4222 -cluster nats://127.0.0.1:5222

首先还是要回到server.Start()方法:

if opts.Cluster.Port != 0 {
  s.startGoRoutine(func() {
    s.StartRouting(clientListenReady)
  })
}

当配置中的cluster不为空的时候就会启动一个对node的监听:

func (s *Server) StartRouting(clientListenReady chan struct{}) {
	defer s.grWG.Done()

	// 等待监听客户端先结束之后再启动
	<-clientListenReady

	// 等待启动监听其他routes之后再加入指定的node
	ch := make(chan struct{})
	go s.routeAcceptLoop(ch)
	<-ch

	// 当有指定其他routes的时候加入其他node
	s.solicitRoutes(s.getOpts().Routes)
}

启动监听routes的代码跟启动正常server监听的代码基本一致,每当有新连接的时候启动一个新的goroutine,这里进入的也是createRoute,跟下面的建立连接是同一个方法:

s.startGoRoutine(func() {
  s.createRoute(conn, nil)
  s.grWG.Done()
})

2. 建立node连接

gnatsd -p 6222 -cluster nats://127.0.0.1:7222 -routes nats://127.0.0.1:5222

启动的时候指定routes,如果想加入其他node本身必须也以cluster模式启动。启动自己的cluster监听之后接下来进入上面的s.solicitRoutes(s.getOpts().Routes)连接指定的其他node。

func (s *Server) solicitRoutes(routes []*url.URL) {
	for _, r := range routes {
		route := r
		s.startGoRoutine(func() { s.connectToRoute(route, true) })
	}
}

之后就会进行TCP连接并且对对应的读写进行监听,这个createRoute同时用于监听有新的node连接:

func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
  ...
  // 连接之后新建client并且开始监听读写,因为都是client所以读写流程跟server一致,只是typ不一样会进入不同的判断流程
	c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r}
	c.initClient()
	c.setPingTimer()
	s.startGoRoutine(func() { c.readLoop() })
	s.startGoRoutine(c.writeLoop)
  ...
}

3. 转发消息

先启动一个socat监听cluster接口:

 socat -v  tcp-l:5223,fork tcp-connect:127.0.0.1:5222

可以看到当第二个server启动的时候回先发送一个connect+info,然后服务器会返回一个info。

当有一个新的server加入的时候,收到的INFO会转发给其他nodes,而其他的node收到info之后就会去试图连接这个新的node,保证了所有node之间都互相连接。所以我们只看一下info的处理就好:

 // parser.go
 // 如前所述,所有的命令都是通过parser来解析的:info命令会一直解析到INFO_ARG
case INFO_ARG:
    ...
    if err := c.processInfo(arg); err != nil {
      return err
    }
    ...

上面也介绍了,对于命令的处理其实只是client的类型不一样:

 // client.go processInfo
 if c.typ == ROUTER {
   c.processRouteInfo(&info)
 }

在processRouteInfo中有两种处理,一种是收到了一个新加入节点的info的转发:

 // 当前client已经是route client,同时不是新收到info route的client
 // 这种情况已经不会继续转发
 if remoteID != "" && remoteID != info.ID {
   c.mu.Unlock()
   s.processImplicitRoute(info)
   return
 }

如上的情况会调用connect去连接这个新的node:

 func (s *Server) processImplicitRoute(info *Info) {
  ...
 	s.startGoRoutine(func() { s.connectToRoute(r, false) })
 }

另一种情况就是收到了一个新的info,这种情况下我们需要把这个info转发给其他node:

func (c *client) processRouteInfo(info *Info) {
  ...
	s.forwardNewRouteInfoToKnownServers(info)
  ...

}
func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {
	s.mu.Lock()
	defer s.mu.Unlock()

	b, _ := json.Marshal(info)
	infoJSON := []byte(fmt.Sprintf(InfoProto, b))

	for _, r := range s.routes {
		r.mu.Lock()
		if r.route.remoteID != info.ID {
			r.sendInfo(infoJSON)
		}
		r.mu.Unlock()
	}
}

4. 总结

因为是边读边整理的blog,所以整体逻辑有些乱,之后再重新整理下,大体上关键点如下:

  • 管理client连接 readloop&writeloop
  • sublist存储订阅信息
  • 状态机命令parser
  • 集群处理

nats最大的特点是高throughout,从源码中我们也都能找到答案:

  • 不存储message(对比mq,kafka)
  • 因为不需要存储message,所以集群也不涉及一致性问题
  • 因为不涉及一致性问题,所以信息在集群之间只是简单转发,同时不做错误处理,更简单
  • 出错统一由应用自己解决

所以nats更好的对比对象应该是service discovery+rpc,同时也有nats streaming实现了持久化的机制(还没看),在通信方式上已经具备了一个mq的基础功能,可以作为了解mq的一个很好起点,同时也有其自身的适用场景。