xingo定时器

1) 延迟执行

func (this *Server) CallLater(durations time.Duration, f func(v ...interface{}), args ...interface{}){
       delayTask := timer.NewTimer(durations, f, args)
       delayTask.Run()
}

示例:

func testTimer(args ...interface {}){
       logger.Info(fmt.Sprintf("%s-%d-%f", args[0], args[1], args[2]))
}
s.CallLater(5*time.Second, testTimer, "viphxin", 10009, 10.999)

//s 为xingo server ,服务将会延迟5秒之后执行testTimer

2) 定时执行

func (this *Server) CallWhen(ts string, f func(v ...interface{}), args ...interface{}){
       loc, err_loc := time.LoadLocation("Local")
       if err_loc != nil{
              logger.Error(err_loc)
              return
       }
       t, err := time.ParseInLocation("2006-01-02 15:04:05", ts, loc)
       now := time.Now()
       //logger.Info(t)
       //logger.Info(now)
       //logger.Info(now.Before(t) == true)
       if err == nil{
              if now.Before(t){
                     this.CallLater(t.Sub(now), f, args...)
              }else{
                     logger.Error("CallWhen time before now")
              }
       }else{
              logger.Error(err)
       }
}

 

示例:

s.CallWhen("2016-12-15 18:35:10", testTimer, "viphxin", 10009, 10.999)

//s 为xingo server ,服务将会在2016-12-15 18:35:10执行testTimer,注意时间必须是将来的时间

3) 循环执行

func (this *Server)CallLoop(durations time.Duration, f func(v ...interface{}), args ...interface{}){
       go func() {
              delayTask := timer.NewTimer(durations, f, args)
              for {
                     time.Sleep(delayTask.GetDurations())
                     delayTask.GetFunc().Call()
              }
       }()
}

示例:

s.CallLoop(5*time.Second, testTimer, "loop--viphxin", 10009, 10.999)

//s 为xingo server ,服务将会每隔5秒执行testTimer

xingo集群服务器入门教程

xingo集群服务器推荐目录结构:

.
├── admin_server                           管理服务,可以实现集群的GM管理操作,支持http方式访问
│   ├── test_admin_http.go              对外管理的http接口
│   └── test_admin_rpc.go               对内提供给集群其他节点访问的rpc接口
├── conf                                         集群服务器配置
│   ├── clusterconf.json                    分布式架构定义
│   └── server.json                           服务器配置
├── game_server                             游戏服务器逻辑
├── gate_server                               gate服务器逻辑
│   ├── gateserver.go                        这里可以绑定节点链接和断开处理函数
│   └── test_gate_rpc.go                   对内rpc接口
├── log                                            集群服务器日志文件/支持按服务器/日志大小/时间切割日志
│   ├── cluster.log
│   ├── gate1.log
│   ├── gate2.log
│   └── net.log
├── master.go                                  管理服务
├── net_server                                 对外的网关服务器负责于客户端通信
│   ├── core
│   │   ├── player.go
│   │   └── playermgr.go
│   ├── netserver.go
│   ├── test_net_api.go
│   └── test_net_rpc.go
├── pb
│   └── msg.pb.go
├── README.md
└── server.go                                    xingo server

master.go

package main

import (
       "path/filepath"
       "github.com/viphxin/xingo/sys_rpc"
       "github.com/viphxin/xingo/clusterserver"
)

func main() {
       dir, err := filepath.Abs(filepath.Dir("."))
       if err == nil{
              s := clusterserver.NewMaster(filepath.Join(dir, "conf", "clusterconf.json"))//关联集群配置
              s.AddRpcRouter(&sys_rpc.MasterRpc{})//添加rpc接口
              s.StartMaster()//开启服务
       }
}

server.go

package main

import (
       "github.com/viphxin/xingo/clusterserver"
       "github.com/viphxin/xingo/sys_rpc"
       "os"
       "path/filepath"
       "xingo_cluster/net_server"
       "xingo_cluster/gate_server"
       "xingo_cluster/admin_server"
        _ "net/http"
       _ "net/http/pprof"
)

func main() {
       //pprof
       //go func() {
       //     println(http.ListenAndServe("localhost:6060", nil))
       //}()

       //server code
       args := os.Args
       dir, err := filepath.Abs(filepath.Dir("."))
       if err == nil{
              s := clusterserver.NewClusterServer(args[1], filepath.Join(dir, "conf", "clusterconf.json"))
              s.AddRpcRouter(&sys_rpc.ChildRpc{})
              s.AddRpcRouter(&sys_rpc.RootRpc{})
              /*
              注册分布式服务器
              */
              //net server
              s.AddModule("net", &net_server.TestNetApi{}, &net_server.TestNetRpc{})
              //gate server
              s.AddModule("gate", nil, &gate_server.TestGateRpc{})
              //admin server
              s.AddModule("admin", &admin_server.TestAdminHttp{}, nil)

              s.StartClusterServer()
       }
}

 

启动:

go run master.go
go run server.go gate1
go run server.go gate2
go run server.go net1
go run server.go net2
go run server.go net3
go run server.go net4
go run server.go admin

xingo单进程服务器入门教程

推荐项目目录结构:

.
├── api                                             用户自定义协议
│   └── test_api.go
├── client_walk.go                             自动化游戏全逻辑测试脚本
├── conf
│   └── server.json                             服务器全局配置
├── core                                           游戏核心逻辑/各种游戏玩法,活动相关逻辑放到这儿
│   ├── aoi.go
│   ├── player.go
│   └── worldmgr.go
├── log                                             默认日志文件存放路径/日志可按照时间和大小切割
│   ├── server.log
│   ├── server.log.2016-12-20
│   ├── server.log.2016-12-26
├── pb                                              消息定义
│   ├── msg.pb.go
│   └── msg.proto
├── README.md
├── server.go                                    服务器主逻辑

推荐clone demo项目到本地:

git clone https://git.oschina.net/viphxin/xingo_demo.git

配置服务器

vim conf/server.json

{
   "TcpPort": 8999,//监听端口
   "StepPerMs": 2000,//demo逻辑配置,非通用配置项
   "PoolSize": 5,    //工作队列数
   "IsUsePool": true,//是否使用工作队列
   "LogLevel": 1,    //日志级别 0 ALL 1 DEBUG 2 INFO 3 WARN 4 ERROR 5 FATAL 6 OFF
   "MaxConn": 2000   //支持最大连接数
}

google protobuf消息定义:

message Talk{
   string Content=1;
}

定义一个测试接口api,实现功能世界聊天(广播每个链接发过来的talk消息):

导入xingo

import (
       "xingo_demo/pb"
       "xingo_demo/core"
       "github.com/golang/protobuf/proto"
       "github.com/viphxin/xingo/fnet"
       "github.com/viphxin/xingo/logger"
       "fmt"
)

定义api接口:

type TestRouter struct {
}

/*
ping test
*/
func (this *TestRouter) Api_0(request *fnet.PkgAll) {
       logger.Debug("call Api_0")
       // request.Fconn.SendBuff(0, nil)
       packdata, err := fnet.DefaultDataPack.Pack(0, nil)
       if err == nil{
              request.Fconn.Send(packdata)
       }else{
              logger.Error("pack data error")
       }
}

/*
世界聊天
 */
func (this *TestRouter) Api_2(request *fnet.PkgAll) {
       msg := &pb.Talk{}
       err := proto.Unmarshal(request.Pdata.Data, msg)//解析Talk消息
       if err == nil {
              logger.Debug(fmt.Sprintf("user talk: content: %s.", msg.Content))
              pid, err1 := request.Fconn.GetProperty("pid")
              if err1 == nil{
                     p, _ := core.WorldMgrObj.GetPlayer(pid.(int32))
                     p.Talk(msg.Content)//广播
              }else{
                     logger.Error(err1)
                     request.Fconn.LostConnection()
              }

       } else {
              logger.Error(err)
              request.Fconn.LostConnection()
       }
}

 

实现自己的xingo server

package main

import (
       "fmt"
       "github.com/viphxin/xingo/fserver"
       "github.com/viphxin/xingo/iface"
       "github.com/viphxin/xingo/logger"
       "github.com/viphxin/xingo/utils"
       "xingo_demo/api"
       "xingo_demo/core"
       "os"
       "os/signal"
)

func DoConnectionMade(fconn iface.Iconnection) {
       logger.Debug("111111111111111111111111")
       p, _ := core.WorldMgrObj.AddPlayer(fconn)
       fconn.SetProperty("pid", p.Pid)
}

func DoConnectionLost(fconn iface.Iconnection) {
       logger.Debug("222222222222222222222222")
       pid, _ := fconn.GetProperty("pid")
       p, _ := core.WorldMgrObj.GetPlayer(pid.(int32))
       //移除玩家
       core.WorldMgrObj.RemovePlayer(pid.(int32))
       //消失在地图
       p.LostConnection()
}

func main() {
       s := fserver.NewServer()
       //add api ---------------start
       TestRouterObj := &api.TestRouter{}
       s.AddRouter(TestRouterObj)//注册接口api
       //add api ---------------end
       //regest callback
       utils.GlobalObject.OnConnectioned = DoConnectionMade//绑定链接建立处理函数
       utils.GlobalObject.OnClosed = DoConnectionLost//绑定链接丢失处理函数

       s.Start()//开启服务器
       // close
       c := make(chan os.Signal, 1)
       signal.Notify(c, os.Interrupt, os.Kill)
       sig := <-c
       fmt.Println("=======", sig)
       s.Stop()
}

xingo代码结构

.
├── cluster                                 集群相关
│   ├── asyncresult.go                 rpc异步调用等待结果放回
│   ├── child.go                           集群子节点管理器
│   ├── clusterconf.go                  集群配置文件解析

│   ├── cmdinterpreter.go            telnet在线调试命令解释器

│   ├── router.go                        rpc接口模板

│   ├── rpc.go                             json rpc实现
│   ├── rpchandle.go                    rpc消息处理逻辑
│   ├── rpcpack.go                       rpc消息序列化和反序列化

│   ├── rpcprotocol.go                 rpc消息处理协议
│   └── telnetprotocol.go              在线调试工具telnet协议
├── clusterserver                         集群服务相关
│   ├── clusterglobal.go                集群全局变量
│   ├── clusterserver.go                集群服务
│   └── master.go                         集群管理服务

├── db                                        数据库操作
│   ├── mongo/dboperate.go        数据库操作
│   └── mongo/dboperate_test.go 单元测试
├── fnet                                      底层网络相关
│   ├── connection.go                   网络链接抽象
│   ├── connectionmgr.go              链接管理
│   ├── datapack.go                      序列化和反序列化
│   ├── msghandle.go                   网络数据包处理,消息路由
│   ├── protocol.go                       默认通信协议实现

│   ├── router.go                          api接口模板
│   └── tcpclient.go                      一个基于事件驱动的tcp客户端
├── fserver
│   └── server.go                          xingo 服务器抽象
├── iface                                     接口描述目录
│   ├── iclient.go

│   ├── icommand.go

│   ├── icommandinterpreter.go
│   ├── iconnection.go

│   ├── iconnectionmgr.go

│   ├── idatapack.go

│   ├── imsghandle.go
│   ├── iprotocol.go

│   ├── irouter.go
│   ├── iserver.go
│   └── iwriter.go
├── LICENSE
├── logger
│   ├── logger.go                           日志实现
│   └── logger_test.go
├── sys_rpc                                  系统的rpc通信
│   ├── child_rpc.go                       子节点rpc
│   ├── master_rpc.go                    管理节点rpc
│   └── root_rpc.go                        父节点rpc

├── telnetcmd
│   ├── mastercommand.go            master节点支持的telnet命令(集群服务器关闭,集群配置热更新)
│   └── pprofcpucommand.go         telnet在线调试工具之性能分析命令
├── timer

│   ├── hashwheel.go                     xingo hash时间轮定时器实现

│   ├── safetimer.go                      xingo goroutine安全的定时器

│   ├── safetimer_test.go               单元测试
│   └── timer.go                            goroutine版本的定时器实现
└── utils
│   ├── globalobj.go                       xingo全局变量,服务器相关事件回调绑定

│   ├── tools.go                            工具函数

│   ├── uuid_test.go                      单元测试
│   └── uuidfactory.go                   uuid生成器(暂时只用于rpc异步结果对应的key)

├── .gitignore

├── LICENSE

├── README.md

├── version.go

├── xingo.go

xingo之hello world

 xingo helloworld主要实现功能,xingo客户端helloworld_client.go连接xingo helloworld服务器发送HelloReq消息,服务器收到消息后发送ack HelloAck给客户端,之后延迟5秒发送DelayNtf消息给客户端报时,所有代码见仓库[地址:点我]。 

(1)定义protobuf通信协议 

syntax="proto3";
package pb;
option csharp_namespace="Pb";

//msg=1
message HelloReq{
    string name         = 1;
}

//msg=2
message HelloAck{
    string content      = 1;
}

//msg=3
message DelayNtf{
    string ts          = 1;
}

 生成msg.pb.go 

\path\to\protoc.exe –plugin=protoc-gen-go=%GOPATH%\bin\protoc-gen-go.exe –go_out F:\workspace\src\xingo_examples\helloword\pb -I F:\workspace\src\xingo_examples\helloword\pb F:\workspace\src\xingo_examples\helloword\pb\msg.proto 

 (2)api
这里需要实现HelloReq协议的处理函数

func (this *TestRouter)sendDelayMsg(fconn iface.Iconnection){
	utils.GlobalObject.GetSafeTimer().CreateTimer(5000, func(args ...interface{}){
		con := args[0].(iface.Iconnection)
		ntf := &pb.DelayNtf{
			Ts: time.Now().String(),
		}
		ntfRaw, err := utils.GlobalObject.Protoc.GetDataPack().Pack(3, ntf)
		if err == nil {
			con.Send(ntfRaw)
		}
	},[]interface{}{fconn})//xingo定时器
}

/*
HelloReq
*/
func (this *TestRouter)Handle(request iface.IRequest){
	msg := &pb.HelloReq{}
	err := proto.Unmarshal(request.GetData(), msg)
	if err == nil {
		request.GetConnection().SetProperty("name", msg.Name)
		//send ack
		ack := &pb.HelloAck{
			Content: fmt.Sprintf("Hello %s.You will receive a Ntf after 5 seconds.\n", msg.Name),
		}
		data, err := utils.GlobalObject.Protoc.GetDataPack().Pack(2, ack)
		if err == nil{
			request.GetConnection().Send(data)
			this.sendDelayMsg(request.GetConnection())
		}
	} else {
		logger.Error(err)
		request.GetConnection().LostConnection()
	}
}

 (3)实现自己的xingo server
首先需要修改配置文件conf/server.json

{
	"WriteList": ["127.0.0.1"],
	"DebugPort": 8881,
	"TcpPort": 8999,
	"PoolSize": 1,
	"LogLevel": 1,
	"MaxConn": 2000,
	"MaxPacketSize": 1024,
	"FrequencyControl": "10/s"
}

 helloworld_server.go

func DoConnectionMade(fconn iface.Iconnection) {
	logger.Debug(fmt.Sprintf("session %d connectioned helloworld server.", fconn.GetSessionId()))
}

func DoConnectionLost(fconn iface.Iconnection) {
	logger.Debug(fmt.Sprintf("session %d disconnectioned helloworld server.", fconn.GetSessionId()))
}

func main() {
	s := xingo.NewXingoTcpServer()

	//add api ---------------start
	TestRouterObj := &api.TestRouter{}
	s.AddRouter("1", TestRouterObj)
	//add api ---------------end

	//regest callback
	utils.GlobalObject.OnConnectioned = DoConnectionMade
	utils.GlobalObject.OnClosed = DoConnectionLost
	s.Serve()
}

 (4) helloworld_client.go
首先需要实现自己的clientprotocol ->HelloWorldCPtotoc
然后再HelloWorldCPtotoc的DoMsg中处理服务器返回的ack和ntf协议消息,详情见代码。

func (this *HelloWorldCPtotoc)DoMsg(fconn iface.Iclient, pdata *fnet.PkgData){
	//处理消息
	fmt.Println(fmt.Sprintf("msg id :%d, data len: %d", pdata.MsgId, pdata.Len))
	switch pdata.MsgId {
	case 2:
		ack := &pb.HelloAck{}
		err := proto.Unmarshal(pdata.Data, ack)
		if err == nil {
			logger.Debug(ack.Content)
		}else{
			logger.Error("Unmarshal ack err: ", err)
		}
	case 3:
		nft := &pb.DelayNtf{}
		err := proto.Unmarshal(pdata.Data, nft)
		if err == nil {
			logger.Debug(nft.Ts)
		}else{
			logger.Error("Unmarshal ntf err: ", err)
		}
	default:
		logger.Error("Unkown message!!!!")
	}
}