1. Server端分析
muduo中关于chatserver有四个例子,第一个oneloopthread+Reactor模式较为简单,直接看第二个multi_Reactor + sub_Reactor例子,这里的sub_Reactor用了threadpool
我们看第三个例子,server_threaded_efficient,这里就是第二个例子加上copy_on_write手法,降低锁竞争。(减少使用读写锁)
#include "examples/asio/chat/codec.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
#include <set>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
class ChatServer : noncopyable
{
public:
ChatServer(EventLoop* loop,
const InetAddress& listenAddr)
: server_(loop, listenAddr, "ChatServer"),
codec_(std::bind(&ChatServer::onStringMessage, this, _1, _2, _3)),
connections_(new ConnectionList)
{
server_.setConnectionCallback(
std::bind(&ChatServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
}
void setThreadNum(int numThreads)
{
server_.setThreadNum(numThreads);
}
void start()
{
server_.start();
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
MutexLockGuard lock(mutex_);
if (!connections_.unique())
{
connections_.reset(new ConnectionList(*connections_));
}
assert(connections_.unique());
if (conn->connected())
{
connections_->insert(conn);
}
else
{
connections_->erase(conn);
}
}
typedef std::set<TcpConnectionPtr> ConnectionList;
typedef std::shared_ptr<ConnectionList> ConnectionListPtr;
void onStringMessage(const TcpConnectionPtr&,
const string& message,
Timestamp)
{
ConnectionListPtr connections = getConnectionList();
for (ConnectionList::iterator it = connections->begin();
it != connections->end();
++it)
{
codec_.send(get_pointer(*it), message);
}
}
ConnectionListPtr getConnectionList()
{
MutexLockGuard lock(mutex_);
return connections_;
}
TcpServer server_;
LengthHeaderCodec codec_;
MutexLock mutex_;
ConnectionListPtr connections_ GUARDED_BY(mutex_);
};
int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid();
if (argc > 1)
{
EventLoop loop;
uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
InetAddress serverAddr(port);
ChatServer server(&loop, serverAddr);
if (argc > 2)
{
server.setThreadNum(atoi(argv[2]));
}
server.start();
loop.loop();
}
else
{
printf("Usage: %s port [thread_num]\n", argv[0]);
}
}
1.1 chatServer如何工作
这里需要注意,TcpConnection对象使用了shared_ptr来管理,因为TcpConnection一般为短连接,其中有 std::unique_ptr< Socket> socket_,socket对象管理socketfd,如果想要长期持有socketfd,进而不会出现socketfd被不同线程关了又开,恰好使用的同一个socketfd的情况。muduo在处理这种情况时,使用shared_ptr来延长TcpConnection的生命,进而延长socket对象
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::set<TcpConnectionPtr> ConnectionList;
1.2 如果实现copy_on_write
使用shared_ptr与mutex可以完成读写锁的功能,可以降低读写锁的开销,同时,读写锁写优先,会导致读阻塞。
- 尽量减少临界区
- 使用shared_ptr 配合mutex,可以在读的时候(除去获取数据环节),同时写
- 读的时候必须shared_ptr完成指针拷贝,指向原地址。写的时候如果有人读,深拷贝一份,指向新对象完成写
- 适合读的频率比写的频率高很多的场合
2 Client端分析
2.1 Client源代码
#include "examples/asio/chat/codec.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/net/TcpClient.h"
#include <iostream>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
class ChatClient : noncopyable
{
public:
ChatClient(EventLoop* loop, const InetAddress& serverAddr)
: client_(loop, serverAddr, "ChatClient"),
codec_(std::bind(&ChatClient::onStringMessage, this, _1, _2, _3))
{
client_.setConnectionCallback(
std::bind(&ChatClient::onConnection, this, _1));
client_.setMessageCallback(
std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
client_.enableRetry();
}
void connect()
{
client_.connect();
}
void disconnect()
{
client_.disconnect();
}
void write(const StringPiece& message)
{
MutexLockGuard lock(mutex_);
if (connection_)
{
codec_.send(get_pointer(connection_), message);
}
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->localAddress().toIpPort() << " -> "
<< conn->peerAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
MutexLockGuard lock(mutex_);
if (conn->connected())
{
connection_ = conn;
}
else
{
connection_.reset();
}
}
void onStringMessage(const TcpConnectionPtr&,
const string& message,
Timestamp)
{
printf("<<< %s\n", message.c_str());
}
TcpClient client_;
LengthHeaderCodec codec_;
MutexLock mutex_;
TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid();
if (argc > 2)
{
EventLoopThread loopThread;
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
InetAddress serverAddr(argv[1], port);
ChatClient client(loopThread.startLoop(), serverAddr);
client.connect();
std::string line;
while (std::getline(std::cin, line))
{
client.write(line);
}
client.disconnect();
CurrentThread::sleepUsec(1000*1000); // wait for disconnect, see ace/logging/client.cc
}
else
{
printf("Usage: %s host_ip port\n", argv[0]);
}
}
2.2 Client代码分析
- 回调函数的设置:
ChatClient类中组合了client类,通过设置client_.setMessageCallback,可以完成当channel配置的fd有可读事件发生,可以调用channel_->setReadCallback,但是为了接收完整一个message(头部4byte为length,后部为数据)触发一个回调,设置client_的setMessageCallback为LengthHeaderCodec::onMessage - 注意其设置了两个线程,一个为ioloop线程处理网络IO,一个为主线程,负责cin中接收数据
2.3 codec源代码
#ifndef MUDUO_EXAMPLES_ASIO_CHAT_LXZ_CODEC_H
#define MUDUO_EXAMPLES_ASIO_CHAT_LXZ_CODEC_H
#include "muduo/base/Logging.h"
#include "muduo/net/Buffer.h"
#include "muduo/net/Endian.h"
#include "muduo/net/TcpConnection.h"
using namespace muduo;
using namespace muduo::net;
class LengthHeaderCodec : noncopyable
{
public:
typedef std::function<void (const TcpConnectionPtr&,
const string& message,
Timestamp)> StringMessageCallback;
explicit LengthHeaderCodec(const StringMessageCallback& cb):messageCallback_(cb)
{
}
void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp receiveTime)
{
while(buf->readableBytes() >= kHeaderLen)
{
const void* cptr = buf->peek();
const int32_t be32 = *static_cast<const int32_t*>(cptr); //要用void*转换
const int32_t length = sockets::hostToNetwork32(be32);
if(buf->readableBytes() >= length + kHeaderLen)
{
buf->retrieve(kHeaderLen);
muduo::string message(buf->peek(),length);
messageCallback_(conn,message,receiveTime);
buf->retrieve(length);
}
else if(length > 65536 || length < 0)
{
LOG_ERROR << "Invalid length " << length;
conn->shutdown(); // FIXME: disable reading
break;
}
else
break;
}
}
void send( TcpConnection* conn, const StringPiece& strin) //
{
Buffer buf;
buf.append(strin.data(),strin.size());
int32_t len = static_cast<int32_t>(strin.size());
int32_t be32 = sockets::hostToNetwork32(len);
buf.prepend(&be32,sizeof be32);
conn->send(&buf); //注意, TcpConnection* conn不能是const,否则只能调用const 函数
}
private:
const static size_t kHeaderLen = sizeof(int32_t);
StringMessageCallback messageCallback_;
};
#endif
3. muduo如何实现跨线程runinloop
- 利用eventfd可被epoll监听,完成线程同步
- 利用全局变量pendingFunctors_完成pending事件的压入弹出
这里分析client端使用的
ChatClient client(loopThread.startLoop(), serverAddr);
client.connect();
很明显,client起了EventLoopThread,而client.connect()
是在主线程中,但是实际connect操作应该在ioloop中完成,下面看看muduo如何实现的。