tcp服务器
包括日志,定时处理,广播,超时
map写添加了锁(读不用锁)
添加了解码器
删除了addr-buf映射,添加删除锁
mark:今天听大神所要处理系统中断EINTR,以后做简单处理EINTR--retry
mark:用struct封装addr,net.Listener,exit(是否断开)等信息..最重要的是使用:
br := bufioNewReader(conn),bw :=bufio.NewWriter(conn)来取代读循环,这样就可以需要的时候再读/写
https://github.com/zhangpeihao/gortmp/blob/master/server.go
packagemain import( "bytes" "encoding/binary" "fmt" "log" "net" "os" "strconv" "strings" "sync" "time" ) funcmain(){ tcpStart(8889) } /* 定义相关锁 */ var( connMkMutexsync.Mutex connDelMutexsync.Mutex ) /* 定义logger */ varlogger*log.Logger /* 初始化log */ funcinitLog(logfile*os.File){ //logger=log.New(logfile,"log:",log.Ldate|log.Ltime) logger=log.New(logfile,"prefix",0) } /* 处理log */ funcdoLog(args...interface{}){ str:=time.Now().Format("2006-01-0215:04:05") varlogDatastring vartempstring for_,arg:=rangeargs{ switchval:=arg.(type){ caseint: temp=strconv.Itoa(val) casestring: temp=val } iflen(temp)>64{//限制只打印前64个字符 logData=temp[:64] }else{ logData=temp } str=str+""+logData } logger.Println(str) } /* 定义socketconn映射 */ varclisConnMapmap[string]*net.TCPConn /* 初始化socketconn映射 */ funcinitClisConnMap(){ clisConnMap=make(map[string]*net.TCPConn) } /* 建立socketconn映射 */ funcmkClisConn(keystring,conn*net.TCPConn){ connMkMutex.Lock() deferconnMkMutex.Unlock() clisConnMap[key]=conn } /* 删除socketconn映射 */ funcdelClisConn(keystring){ connDelMutex.Lock() deferconnDelMutex.Unlock() delete(clisConnMap,key) } /* 定义解码器 */ typeUnpackerstruct{ //头(xy)2bytes+标识1byte+包长度2bytes+data //当然了,头不可能是xy,这里举例子,而且一般还需要转义 _buf[]byte } func(unpacker*Unpacker)Feed(data[]byte){ unpacker._buf=append(unpacker._buf,data...) } func(unpacker*Unpacker)unpack()(flagbyte,msg[]byte){ str:=string(unpacker._buf) for{ iflen(str)<5{ break }else{ _,head,data:=Partition(str,"xy") iflen(head)==0{//没有头 ifstr[len(str)-1]==byte(120){//120=>'x' unpacker._buf=[]byte{byte(120)} }else{ unpacker._buf=[]byte{} } break } buf:=bytes.NewReader([]byte(data)) msg=make([]byte,buf.Len()) vardataLenuint16 binary.Read(buf,binary.LittleEndian,&flag) binary.Read(buf,&dataLen) fmt.Println("DEC:",flag,dataLen) ifbuf.Len()<int(dataLen){ break } binary.Read(buf,&msg) unpacker._buf=unpacker._buf[2+1+2+dataLen:] } } return } /* 启动服务 */ functcpStart(portint){ initLog(os.Stderr) initClisConnMap() doLog("tcpStart:") host:=":"+strconv.Itoa(port) tcpAddr,err:=net.ResolveTCPAddr("tcp4",host) checkError(err) listener,err:=net.ListenTCP("tcp",tcpAddr) checkError(err) for{ conn,err:=listener.AcceptTCP() iferr!=nil{ continue } gohandleClient(conn) } } /* socketconn */ funchandleClient(conn*net.TCPConn){ //****这里是初始化连接处理 addr:=conn.RemoteAddr().String() doLog("handleClient:",addr) connectionMade(conn) request:=make([]byte,128) deferconn.Close() buf:=make([]byte,0) for{ //****这里是读循环处理 readLoopHandled(conn) read_len,err:=conn.Read(request) iferr!=nil{ //这里没使用checkError因为不退出,只是break出去 doLog("ERR:","readerr",err.Error()) break } ifread_len==0{//在gprs时数据不能通过这个判断是否断开连接,要通过心跳包 doLog("ERR:","connectionalreadyclosedbyclient") break }else{ //request[:read_len]处理 buf=append(buf,request[:read_len]...) doLog("<=",addr,string(request[:read_len])) dataReceived(conn,&buf) request=make([]byte,128)//clearlastreadcontent } } //****这里是连接断开处理 connectionLost(conn) } /* 连接初始处理(ed) */ funcconnectionMade(conn*net.TCPConn){ //初始化连接这个函数被调用 //****建立conn映射 addr:=conn.RemoteAddr().String() ip:=strings.Split(addr,":")[0] mkClisConn(ip,conn) doLog("connectionMade:",addr) //****定时处理(心跳等) goloopingCall(conn) } /* 读循环处理(ed) */ funcreadLoopHandled(conn*net.TCPConn){ //当进入循环读数据这个函数被调用,主要用于设置超时(好刷新设置超时) //*****设置超时(要写在for循环里) setReadTimeout(conn,10*time.Minute) } /* 客户端连接发送来的消息处理(ed) */ funcdataReceived(conn*net.TCPConn,pBuf*[]byte){ //一般情况可以用pBuf参数,但是如果有分包粘包的情况就必须使用clisBufMap的buf //clisBufMap的buf不断增大,不管是否使用都应该处理 //addr:=conn.RemoteAddr().String() doLog("*pBuf:",string(*pBuf)) //sendData(clisConnMap["192.168.6.234"],[]byte("xxx")) sendData(conn,[]byte("echo")) } /* 连接断开(ed) */ funcconnectionLost(conn*net.TCPConn){ //连接断开这个函数被调用 addr:=conn.RemoteAddr().String() ip:=strings.Split(addr,":")[0] delClisConn(ip)//删除关闭的连接对应的clisMap项 doLog("connectionLost:",addr) } /* 发送数据 */ funcsendData(conn*net.TCPConn,data[]byte)(nint,errerror){ addr:=conn.RemoteAddr().String() n,err=conn.Write(data) iferr==nil{ doLog("=>",string(data)) } return } /* 广播数据 */ funcbroadcast(tclisMapmap[string]*net.TCPConn,data[]byte){ for_,conn:=rangetclisMap{ sendData(conn,data) } } /* 定时处理&延时处理 */ funcloopingCall(conn*net.TCPConn){ pingTicker:=time.NewTicker(30*time.Second)//定时 testAfter:=time.After(5*time.Second)//延时 for{ select{ case<-pingTicker.C: //发送心跳 _,err:=sendData(conn,[]byte("PING")) iferr!=nil{ pingTicker.Stop() return } case<-testAfter: doLog("testAfter:") } } } /* 设置读数据超时 */ funcsetReadTimeout(conn*net.TCPConn,ttime.Duration){ conn.SetReadDeadline(time.Now().Add(t)) } /* 错误处理 */ funccheckError(errerror){ iferr!=nil{ doLog("ERR:",err.Error()) os.Exit(1) } } funcPartition(sstring,sepstring)(headstring,retSepstring,tailstring){ //Partition(s,sep)->(head,sep,tail) index:=strings.Index(s,sep) ifindex==-1{ head=s retSep="" tail="" }else{ head=s[:index] retSep=sep tail=s[len(head)+len(sep):] } return }原文链接:https://www.f2er.com/go/191154.html