完全披露,我是学生,这是一项任务.我已经工作了一个多星期几乎不停(除了以前花的时间),我无法弄清楚我做错了什么.只有“少量”的recvs完成后,我的服务器一直挂在epoll_wait上(“少数”,因为我预计有几GB的数据而且我只得到几十MB).我不认为我的客户端如何工作有任何问题,因为它与我的选择和多线程服务器一起工作得很好.请快速浏览一下,让我知道是否有什么因为我的问题而跳出来.
客户端/服务器的基本思想是使用连接(10k)轰炸服务器并多次传输给定数量的数据.这个epoll服务器在使用2000时遇到了麻烦,当时我的多线程服务器只处理了10k的目标.
我不是要求你为我做任务(我差不多完成了)我只是需要帮助弄清楚我在这里做错了什么.在此先感谢您提供的任何帮助:)
- 1 #include "common.h"
- 2 #include <sys/epoll.h>
- 3
- 4 uint16_t ready[MAX_CONNS];
- 5 uint16_t next;
- 6 pthread_mutex_t mutex;
- 7
- 8 void *worker_thread(void *param) {
- 9 int my_sock,pos;
- 10 struct conn_params *conn_ps = (struct conn_params *)param;
- 11
- 12 while (1) {
- 13 pthread_mutex_lock(&mutex);
- 14
- 15 while (1) {
- 16 if (next == MAX_CONNS) {
- 17 printf("balls\n");
- 18 next = 4;
- 19 }
- 20
- 21 if (ready[next] != 0) {
- 22 pos = next;
- 23 my_sock = ready[pos];
- 24 next++;
- 25 break;
- 26 }
- 27 }
- 28
- 29 pthread_mutex_unlock(&mutex);
- 30 /* handle recv/send */
- 31 if (echo_recv(&conn_ps[my_sock],MULTIPLE) == 0) { /* closed conn */
- 32 shutdown(my_sock,SHUT_RDWR);
- 33 close(my_sock);
- 34 serv_stats.active_connections--;
- 35 }
- 36 ready[pos] = 0;
- 37 /* print_conn_stats(&conn_ps[my_sock]);*/
- 38 }
- 39 }
- 40
- 41 void *add_client_thread(void *param) {
- 42 struct epoll_accept_thread *eat = (struct epoll_accept_thread *)param;
- 43 struct sockaddr client;
- 44 struct epoll_event event;
- 45 socklen_t client_len;
- 46 int new_sock,ret;
- 47 char hostbuf[NI_MAXHOST],servbuf[NI_MAXSERV];
- 48
- 49 bzero(&client,sizeof(struct sockaddr));
- 50 event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
- 51
- 52 while ((new_sock = accept(eat->listen_sock,&client,&client_len)) != -1) {
- 53 set_nonblock(new_sock);
- 54 event.data.fd = new_sock;
- 55 if (epoll_ctl(eat->fd_epoll,EPOLL_CTL_ADD,new_sock,&event) == -1) {
- 56 perror("epoll_ctl");
- 57 printf("%u\n",new_sock);
- 58 continue;
- 59 }
- 60
- 61 bzero(&(eat->conn_ps[new_sock]),sizeof(struct conn_params));
- 62 eat->conn_ps[new_sock].sock = new_sock;
- 63 if ((ret = getnameinfo(&client,client_len,hostbuf,NI_MAXHOST,servbuf,NI_MAXSERV,NI_NUMERICHOST)) != 0) {
- 64 gai_strerror(ret);
- 65 }
- 66
- 67 update_server_stats();
- 68 printf("added client\n");
- 69 }
- 70
- 71 if (errno != EAGAIN) {
- 72 perror("Couldn't accept connection");
- 73 }
- 74
- 75 pthread_exit(NULL);
- 76 }
- 77
- 78 int main(int argc,char **argv) {
- 79 char opt,*port = NULL;
- 80 struct addrinfo hints,*results,*p;
- 81 int listen_sock = new_tcp_sock(),nfds,i,ret;
- 82 int fd_epoll,next_avail = 4;
- 83 struct conn_params conn_ps[MAX_CONNS];
- 84 struct epoll_event evs[MAX_CONNS];
- 85 struct epoll_event event;
- 86 struct epoll_accept_thread eat;
- 87 pthread_t thread;
- 88
- 89 while ((opt = getopt(argc,argv,":l:")) != -1) {
- 90 switch (opt) {
- 91 case 'l': /* port to listen on */
- 92 port = optarg;
- 93 break;
- 94 case '?': /* unknown option */
- 95 fprintf(stderr,"The option -%c is not supported.\n",opt);
- 96 exit(1);
- 97 case ':': /* required arg not supplied for option */
- 98 fprintf(stderr,"The option -%c requires an argument.\n",opt);
- 99 exit(1);
- 100 }
- 101 } /* command line arg processing done */
- 102
- 103 if (port == NULL) {
- 104 fprintf(stderr,"You must provide the port to listen on (-l).\n");
- 105 exit(1);
- 106 }
- 107
- 108 signal(SIGINT,handle_interrupt);
- 109
- 110 bzero(&hints,sizeof(struct addrinfo));
- 111 hints.ai_family = AF_INET;
- 112 hints.ai_socktype = SOCK_STREAM;
- 113 hints.ai_flags = AI_PASSIVE;
- 114
- 115 set_nonblock(listen_sock);
- 116 set_reuseaddr(listen_sock);
- 117
- 118 if ((ret = getaddrinfo(NULL,port,&hints,&results) != 0)) {
- 119 gai_strerror(ret);
- 120 exit(1);
- 121 }
- 122
- 123 for (p = results; p != NULL; p = p->ai_next) { /* attempt to connect to the host */
- 124 if (bind(listen_sock,p->ai_addr,p->ai_addrlen) == -1) {
- 125 perror("Bind Failed");
- 126 } else {
- 127 break;
- 128 }
- 129 }
- 130
- 131 if (p == NULL) { /* we were unable to connect to anything */
- 132 fprintf(stderr,"Unable to bind to the specified port. Exiting...\n");
- 133 exit(1);
- 134 }
- 135
- 136 freeaddrinfo(results);
- 137
- 138 if (listen(listen_sock,5) == -1) {
- 139 perror("Listen Failed");
- 140 exit(1);
- 141 }
- 142
- 143 /* everything is set up. method-specific code goes below */
- 144
- 145 start_server_stats();
- 146 next = 4;
- 147
- 148 if ((fd_epoll = epoll_create(MAX_CONNS)) == -1) {
- 149 perror("epoll_create");
- 150 exit(1);
- 151 }
- 152
- 153 event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
- 154 event.data.fd = listen_sock;
- 155 if (epoll_ctl(fd_epoll,listen_sock,&event) == -1) {
- 156 perror("epoll_ctl");
- 157 exit(1);
- 158 }
- 159
- 160 signal(SIGPIPE,SIG_IGN);
- 161 bzero(ready,MAX_CONNS * sizeof(uint16_t));
- 162 pthread_mutex_init(&mutex,NULL);
- 163
- 164 for (i = 0; i < 5; i++) { /* five workers should be enough */
- 165 pthread_create(&thread,NULL,worker_thread,(void *)&conn_ps);
- 166 }
- 167
- 168 while (1) {
- 169 if ((nfds = epoll_wait(fd_epoll,evs,MAX_CONNS,-1)) > 0 && errno == EINTR) {
- 170 continue;
- 171 }
- 172 for (i = 0; i < nfds; i++) { /* loop through all FDs */
- 173 if (evs[i].events & (EPOLLERR | EPOLLHUP)) { /* if there's an error or a hangup */
- 174 /*fprintf(stderr,"Error! Danger,Will Robinson! Danger!");*/
- 175 close(evs[i].data.fd);
- 176 continue;
- 177 } else if (evs[i].data.fd == listen_sock) { /* we have a new connection coming in */
- 178 eat.listen_sock = listen_sock;
- 179 eat.fd_epoll = fd_epoll;
- 180 eat.conn_ps = conn_ps;
- 181 pthread_create(&thread,add_client_thread,(void *)&eat);
- 182 } else { /* inbound data */
- 183 while (ready[next_avail] != 0) {
- 184 next_avail++;
- 185
- 186 if (next_avail == MAX_CONNS) {
- 187 next_avail = 4;
- 188 }
- 189 }
- 190 ready[next_avail] = evs[i].data.fd;
- 191 } /* end inbound data */
- 192 } /* end iterating through FDs */
- 193 } /* end epoll_wait loop */
- 194
- 195 perror("epoll_wait");
- 196
- 197 return 0;
- 198 }
这是echo_recv函数,因为我假设某人也想看到它:
- 14 int echo_recv(struct conn_params *conn_p,int single) {
- 15 char client_buf[CLIENT_BUF_SIZE],buffer[BUF_SIZE];
- 16 int nread,nwrite,nsent = 0,i;
- 17
- 18 while ((nread = recv(conn_p->sock,client_buf,CLIENT_BUF_SIZE,0)) > 0) {
- 19 /* create buffer of MULTIPLIER(int) times what was received */
- 20 for (i = 0; i < MULTIPLIER && nread*i < BUF_SIZE; i++) {
- 21 memcpy(buffer+(nread*i),nread);
- 22 }
- 23
- 24 /* send the created buffer */
- 25 while ((nwrite = send(conn_p->sock,buffer+nsent,(nread*MULTIPLIER)-nsent,0)) > 0) {
- 26 nsent += nwrite;
- 27 }
- 28
- 29 conn_p->total_recvd += nread; /* update our stats for this conn */
- 30 conn_p->total_sent += nsent; /* update our status for this conn */
- 31 serv_stats.total_recvd += nread;
- 32 serv_stats.total_sent += nsent;
- 33 nsent = 0;
- 34
- 35 if (single) {
- 36 return 1;
- 37 }
- 38 }
- 39
- 40 if (nread == -1 && (errno & EAGAIN)) {
- 41 return 1;
- 42 }
- 43
- 44 if (nread == -1) {
- 45 perror("wtf?");
- 46 }
- 47
- 48 shutdown(conn_p->sock,SHUT_RDWR);
- 49 close(conn_p->sock);
- 50
- 51 return 0; /* recv Failed */
- 52 }