一、main.go该代码从项目中分离出来,自行修改后再运行)
packagemain import( "flag" "fmt" "log" "os" "runtime" ) var( Port=flag.String("i",":12345","IPporttolistenon") logFileName=flag.String("log","cServer.log","Logfilename") configFileName=flag.String("configfile","config.ini","Generalconfigurationfile") ) var( configFile=flag.String("configfile","Generalconfigurationfile") ) funcmain(){ runtime.GOMAXPROCS(runtime.Numcpu()) flag.Parse() //setlogfileStdout logFile,logErr:=os.OpenFile(*logFileName,os.O_CREATE|os.O_RDWR|os.O_APPEND,0666) iflogErr!=nil{ fmt.Println("Failtofind",*logFile,"cServerstartFailed") os.Exit(1) } log.SetOutput(logFile) log.SetFlags(log.Ldate|log.Ltime|log.Lshortfile) //setlogfileStdoutEnd //startlisten listenErr:=StartListen(*Port) iflistenErr!=nil{ log.Fatalf("Serverabort!Cause:%v\n",listenErr) } }
二、listener.go该代码从项目中分离出来,自行修改后再运行)
packagemain import( "code.google.com/p/go-uuid/uuid" "errors" "fmt" "goCS/consistent" "log" "net" "sync" "time" ) const( ConnectionMax=100//CSmaxconnect ) //connPoolinfo var( poolLocksync.RWMutex poolCli[ConnectionMax]*CliInfo ) //Webuserinfo typeUserInfostruct{ WS_IDint WS_Namestring ServiceNamestring } //Cliinfo typeCliInfostruct{ AssignIDint//csassignID Connnet.Conn//TheTCP/IPconnectintotheplayer. ConnTimetime.Time//连接时间 VerifyKeystring//连接验证KEY ConnVerifybool//是否验证 ServerTypeint32//服务器类型(1DB服务器,2WEB服务器) NodeStatint32//服务器当前状态(0、宕机;1、正常;2、数据导入中;3、准备中;4、数据迁出中 Addressstring//服务地址 Portint//服务端口 BackupSermap[string]int//备份服务器列表map(ip:port) sync.RWMutex } typehashPoolstruct{ Versionint Circlemap[uint32]string//hash圈节点分布 Replicasmap[string]int//hash圈节点范围 } varSerHashPool*consistent.Consistent=consistent.New() //Clientdisconnect func(cli*CliInfo)disconnect(clientIDint){ poolLock.Lock() deferpoolLock.Unlock() cli.Conn.Close() log.Printf("Client:%squit\n",cli.VerifyKey) ifcli.ServerType==1{ //掉线处理 ifok:=cli.removeDBS();ok{ poolCli[clientID]=nil } }else{ } } //listenhandle func(cli*CliInfo)listenHandle(clientIDint){ headBuff:=make([]byte,12)//setreadstreamsize defercli.Conn.Close() //sendverifyKey: b:=[]byte(cli.VerifyKey) cli.Conn.Write(b) //fmt.Println("cli-IP:",cli.Conn.RemoteAddr().String()) //await10secondverify cli.Conn.SetDeadline(time.Now().Add(time.Duration(10)*time.Second)) forControl:=true forforControl{ varheadNumint forheadNum<cap(headBuff){ readHeadNum,readHeadErr:=cli.Conn.Read(headBuff[headNum:]) ifreadHeadErr!=nil{ log.Println("errHead:",readHeadErr) forControl=false break } headNum+=readHeadNum } ifheadNum==cap(headBuff){ //packheadHandle packHead:=packHeadAnalysis(headBuff) bodyBuff:=make([]byte,packHead.PackLen) varbodyNumint forbodyNum<cap(bodyBuff){ readBodyNum,readBodyErr:=cli.Conn.Read(bodyBuff[bodyNum:]) ifreadBodyErr!=nil{ log.Println("errBody:",readBodyErr) forControl=false break } bodyNum+=readBodyNum } ifbodyNum==int(packHead.PackLen){ //packbodyHandle cli.packBodyAnalysis(clientID,packHead,bodyBuff) //fmt.Printf("packType:%d;packOther:%d;packLen:%d\n",packHead.PackType,packHead.PackOther,packHead.PackLen) } } } cli.disconnect(clientID) } //Checkorassignnewconn funcNewConnection_CS(connnet.Conn)(okbool,indexint,info*CliInfo){ poolLock.Lock() deferpoolLock.Unlock() //AssignIDforclient variint fori=0;i<ConnectionMax;i++{ ifpoolCli[i]==nil{ break } } //Toomanyconnections ifi>ConnectionMax{ log.Printf("Toomanyconnections!ActiveDenial%s\n",conn.RemoteAddr().String()) conn.Close() returnfalse,nil } //Createclientbaseinfo Cli:=new(CliInfo) Cli.Conn=conn Cli.ConnTime=time.Now() Cli.VerifyKey=uuid.New() Cli.BackupSer=make(map[string]int) //UpdatePoolinfo poolCli[i]=Cli log.Println("CliIDassignok:",i) returntrue,i,Cli } //startlistens funcStartListen(addrstring)error{ listener,err:=net.Listen("tcp",addr) iferr!=nil{ returnerr } //ifErrorsacceptarrive100.listenerstop. forfailures:=0;failures<100;{ conn,listenErr:=listener.Accept() iflistenErr!=nil{ log.Printf("number:%d,Failedlistening:%v\n",failures,listenErr) failures++ } ifok,index,Cli:=NewConnection_CS(conn);ok{ //Anewconnectionisestablished.SpawnanewgoroutingtohandlethatClient. goCli.listenHandle(index) } } returnerrors.New("Toomanylistener.Accept()errors,listenerstop") }
三、原理
一个新的连接建立。产生一个新的gorouting来处理客户端。
一个客户端进来首先分配一个唯一ID,并初始化该客户端的基本信息(见:NewConnection_CS方法),产生一个新的gorouting来处理客户端。
如果服务器达到设定的连接上限,将抛弃该客户端。
客户端连接(分配ID)正常后,将等待10秒来给客户端进行验证超期将断开该客户端连接(见:listenHandle中的cli.Conn.SetDeadline)。
验证成功后,开接与用户数据进行分析处理:接收原理为:前4字节为包类型,4-12字节为包长,首先接收够12字节后进行包头解析(如不够12字节将进行等待直到够12字节),解析出4-12字节来得到整个包体的长度进行读取(如不够将等待直到够包体长度)
整包读取完后,根据0-4字节判断包的类型进行包的处理。
四、服务器连接出错达到100条后将终止运行