1. Server

续上,在实现上有两种方式,一种是只做func的wrapper,即go-kit里面使用的,把对应的service func封装成指定的格式然后自定义使用的协议来自己启动server。另一种是直接把server包含在对应的package中。前者更加灵活,后者用起来更加简单,两者原理基本一致,所以实现一下第二种。

首先假设调用格式如下:

srv := postService{}
server := Server{
  ServiceKey: "service.form",
  NatsURL:    "localhost:4222",
  Mapper:         make(map[string]pkg.ServiceFunc),
  SubcribeMapper: make(map[string]pkg.ServiceFunc),
  HttpPort:   ":8888",
  Protocols:  []string{"http", "nats"},
}
server.Register("show", srv.Show)
server.Register("create", srv.Create)
server.Register("update", srv.Update)
server.Register("list", srv.List)
server.Register("delete", srv.Delete)
server.Register("publish", srv.Publish)
server.Subcribe("comment.create", srv.AddCommentCount) // 完全没有实际意义,只是演示
server.Start() // 非block

如上调用之后,访问show方法就是 http://post_service:8888/show,这里并没有区分http method,也就是说无论是get还是post都一致,或者说可以干脆都使用post来实现。

接下来是server,

type Server struct {
	HttpPort   string
	NatsURL    string
	ServiceKey string
	Mapper     map[string]ServiceFunc
  SubcribeMapper map[string]ServiceFunc
	Protocols  []string
}

func (s Server) Register(key string, serviceFunc ServiceFunc) {
	s.Mapper[key] = serviceFunc
}

func (s Server) Start() {
  for _,k:= range s.Protocols {
		switch k {
		case "http":
			go s.StartHttpServer()
		case "nats":
			go s.StartNatsSubcriber()
		}
	}
}

同时我们要对之前的Request和Response制定一下json化的策略:

2. Http

Http相对来说,比较简单,只需要监听接口即可。

func (s Server) StartHttpServer() {
	for k, v := range s.Mapper {
		http.HandleFunc("/"+k, httpWrapper(v))
	}
	http.ListenAndServe(s.HttpPort, nil)
}

func httpWrapper(s ServiceFunc) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		var request Request
		json.NewDecoder(r.Body).Decode(&request)
		result := s(request)
		w.Header().Set("Content-Type", "application/json")
		json.NewEncoder(w).Encode(&result)
	}
}

3. Nats

nats部分相对来说,暂时需要考虑两种情况:一种是像http一样对外提供接口,另一种是接收异步的消息。

func (s Server) StartNatsSubcriber() {
	nc, _ := nats.Connect(s.NatsURL)
	nc.Subscribe(s.ServiceKey, func(msg *nats.Msg) {
		req := Request{}
		json.Unmarshal(msg.Data, &req)
		result := s.Mapper[req.Method](req)
		reply, _ := json.Marshal(result)
		nc.Publish(msg.Reply, reply)
	})
	for k, v := range s.SubcribeMapper {
		nc.Subscribe(k, func(msg *nats.Msg) {
			req := Request{}
			json.Unmarshal(msg.Data, &req)
			result := v(req)
			reply, _ := json.Marshal(result)
			nc.Publish(msg.Reply, reply)
		})
	}
}

同时需要对原来的request和response自定义一下json处理:

func (req *Request) UnmarshalJSON(jsonBytes []byte) error {
	var pre map[string]interface{}
	err := json.Unmarshal(jsonBytes, &pre)
	if err != nil {
		return err
	}
	req.Method = pre["method"].(string)
	req.Protocol = pre["protocol"].(string)
	req.TrackID = pre["track_id"].(string)
	payload, err := json.Marshal(pre["payload"])
	if err != nil {
		return err
	}
	req.Payload = payload
	return nil
}
func (res *Response) MarshalJSON() ([]byte, error) {
	if res.Err != nil {
		return json.Marshal(&struct {
			Err string `json:"err"`
		}{
			Err: res.Err.Error(),
		})
	} else {
		return json.Marshal(&struct {
			Data interface{} `json:"data,omitempty"`
		}{
			Data: res.Data,
		})
	}
}

4. 其他

到目前为止,只是实现了一个非常简单的传输层,距离真正生产使用还很大。

首先我们并没有考虑负载均衡,所以目前还不能对微服务进行横向扩展。

其次是还没有实现服务发现,所以只实现了服务端还没有实现client,因为当我们试图call一个service的时候,我们需要知道它支持的协议。

不过对于大多数情况来说,我们的service之间都是internal并且假设互相都是可信的通信,所以不会做授权检查同时协议也应该是相对固定的,上面同时写了http和nats只是为了展示,大多数情况我们应该尽量统一通信协议。