0%

从0开始构建muduo(二)

从0开始构建muduo(二)

加入base模块下相关的代码,主要是和进程线程相关的操作封装,包括:原子类操作、线程操作、互斥锁、条件变量、日志类。这些原版都是是使用pthread相关实现的,自己实现的话可以通过C++11进行实现一下。

加入base模块之后,可以将echo的处理改成多线程(使用线程池)。

我这里实现是将接收到的消息进行了一个简单的翻转。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <sys/epoll.h>
#include <string.h>
#include <string>
#include <unordered_map>
#include <vector>
#include <algorithm>
#include <iostream>
#include <string>

#include "base/ThreadPool.h"
#include "base/CurrentThread.h"
#include "base/Logging.h"
#include "base/LogFile.h"

// 重定义epoll_event 数组
typedef std::vector<epoll_event> EventList;
#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while (0)

#define PORT 8080

std::unique_ptr<tinyMuduo::LogFile> g_logFile;
std::unordered_map<int, std::string> buf_map;
void outputFunc(const char *msg, int len)
{
g_logFile->append(msg, len);
}

void flushFunc()
{
g_logFile->flush();
}

/**
* @brief Set the Logger object
*
*/
void setLogger()
{
g_logFile.reset(new tinyMuduo::LogFile("epoll", 500 * 1000 * 1000, false));
tinyMuduo::Logger::setOutput(outputFunc);
tinyMuduo::Logger::setFlush(flushFunc);
}

void strFunc(std::string s, const int epollfd, const int connfd)
{
LOG_INFO << "tid=" << tinyMuduo::CurrentThread::tid() << s;
// 字符串翻转,相当于是处理计算任务
reverse(s.begin(), s.end());
buf_map[connfd] = s;

// 添加读取事件
epoll_event event;
event.data.fd = connfd;
event.events = EPOLLOUT;
epoll_ctl(epollfd, EPOLL_CTL_MOD, connfd, &event);
}

int main()
{
// 设置日志
setLogger();

/***
* 屏蔽 SIGPIPE SIGCHLD 信号
*/
signal(SIGPIPE, SIG_IGN);
signal(SIGCHLD, SIG_IGN);

/***
* 打开一个文件描述符,解决EMFILE错误
*/
int idlefd = open("/dev/null", O_RDONLY | O_CLOEXEC);

int listenfd;
if ((listenfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP)) < 0)
ERR_EXIT("socket");

struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

/***
* 设置端口重用,无需 TIME_WAIT
*/
int on = 1;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
ERR_EXIT("setsockopt");

if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < 0)
ERR_EXIT("listen");

std::vector<int> clients;
int epollfd;
epollfd = epoll_create1(EPOLL_CLOEXEC);

struct epoll_event event;
event.data.fd = listenfd;
event.events = EPOLLIN /* | EPOLLET*/;
epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event);

EventList events(16);
struct sockaddr_in peeraddr;
socklen_t peerlen;
int connfd;

// 线程池相关设置
tinyMuduo::ThreadPool pool("pool");
pool.start(5);

int nready;
while (1)
{
nready = epoll_wait(epollfd, &*events.begin(), static_cast<int>(events.size()), -1);
if (nready == -1)
{
if (errno == EINTR)
continue;

ERR_EXIT("epoll_wait");
}
if (nready == 0) // 什么都没有发生
continue;

/***
* 当nready达到events的大小,说明可能有更多的事件触发
*/
if ((size_t)nready == events.size())
events.resize(events.size() * 2);

for (int i = 0; i < nready; ++i)
{
if (events[i].data.fd == listenfd)
{
peerlen = sizeof(peeraddr);
connfd = accept4(listenfd, (struct sockaddr *)&peeraddr,
&peerlen, SOCK_NONBLOCK | SOCK_CLOEXEC);

if (connfd == -1)
{
if (errno == EMFILE)
{
/***
* 发生EMFILE时的处理方法
*/

close(idlefd);
idlefd = accept(listenfd, NULL, NULL);
close(idlefd);
idlefd = open("/dev/null", O_RDONLY | O_CLOEXEC);
continue;
}
else
ERR_EXIT("accept4");
}
// 使用日志
LOG_INFO << "ip=" << inet_ntoa(peeraddr.sin_addr) << " port=" << ntohs(peeraddr.sin_port);

clients.push_back(connfd);

event.data.fd = connfd;
event.events = EPOLLIN /* | EPOLLET*/;
epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &event);
}
else if (events[i].events & EPOLLIN)
{
connfd = events[i].data.fd;
if (connfd < 0)
continue;

char buf[1024] = {0};
int ret = read(connfd, buf, 1024);
if (ret == -1)
ERR_EXIT("read");
if (ret == 0)
{
/***
* 说明对方断开了连接
*/
LOG_INFO << "client close";
close(connfd);
event = events[i];
epoll_ctl(epollfd, EPOLL_CTL_DEL, connfd, &event);
clients.erase(std::remove(clients.begin(), clients.end(), connfd), clients.end());
continue;
}

LOG_INFO << buf;
// 放入线程池中运行
// strFunc(std::string(buf), epollfd, connfd);
pool.run(std::bind(strFunc, std::string(buf), epollfd, connfd));
}
else if (events[i].events & EPOLLOUT)
{
connfd = events[i].data.fd;
if (connfd < 0)
continue;

if (buf_map.count(connfd))
{
// 读取处理好的缓存
std::string s = buf_map[connfd];
const char *buf = s.c_str();
write(connfd, buf, strlen(buf));
// 删除缓存
buf_map.erase(connfd);
// 写完后将事件改为监听
event = events[i];
event.events = EPOLLIN;
epoll_ctl(epollfd, EPOLL_CTL_MOD, connfd, &event);
}
else
{
LOG_ERROR << "connfd " << connfd << "dont have buf";
}
}
}
}

return 0;
}