8. Reactorフレームワーク

最終回は、予定を変えてReactorフレームワークを詳解します。
TCPServerはスレッドプールによるマルチスレッドのソケット通信ですが、クライアントの数が巨大なシステムではC10K問題を起こしてしまいます。それを解決するノンブロッキングI/Oを実現するのにPocoではReactorフレームワークを提供しています。

Reactorフレームワークを使うNetライブラリのサンプルとして、EchoServerプロジェクトがPOCOに付属しています。EchoServerプロジェクトを、Reactorフレームワークが提供するすべてのイベントハンドラをサポートするように改造してみます。

まずは、サンプルをそのままコンパイルして実行してみましょう。
$ EchoServer
これで、サーバが起動されたので、telnetでサーバと通信してみます。
$ telnet 127.0.0.1 9977
以下のように、接続したあと入力した文字をエコーすると思います。

Shell
1
2
3
4
5
6
$ telnet localhost 9977
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello

それでは、何も入力しないで30秒経つとタイムアウトして接続を切断するように変更します。
変更したソースを以下に記載します。

C++
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/SocketAcceptor.h"
#include "Poco/Net/SocketNotification.h"
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/NObserver.h"
#include "Poco/Exception.h"
#include "Poco/Thread.h"
#include "Poco/Timespan.h"
#include "Poco/Util/ServerApplication.h"
#include "Poco/Util/Option.h"
#include "Poco/Util/OptionSet.h"
#include "Poco/Util/HelpFormatter.h"
#include <iostream>
#include <sstream>
using Poco::Net::SocketReactor;
using Poco::Net::SocketAcceptor;
using Poco::Net::ReadableNotification;
using Poco::Net::ShutdownNotification;
using Poco::Net::WritableNotification;
using Poco::Net::ErrorNotification;
using Poco::Net::TimeoutNotification;
using Poco::Net::IdleNotification;
using Poco::Net::ServerSocket;
using Poco::Net::StreamSocket;
using Poco::NObserver;
using Poco::AutoPtr;
using Poco::Thread;
using Poco::Timespan;
using Poco::Util::ServerApplication;
using Poco::Util::Application;
using Poco::Util::Option;
using Poco::Util::OptionSet;
using Poco::Util::HelpFormatter;
class EchoServiceHandler
{
public:
    EchoServiceHandler(StreamSocket& socket, SocketReactor& reactor)
        :_socket(socket)
        ,_reactor(reactor)
        ,_pBuffer(new char[BUFFER_SIZE])
        ,_onWritableAdded(false)
        ,_onTimeoutAdded(false)
    {
        Application& app = Application::instance();
        app.logger().information("Connection from " + socket.peerAddress().toString());
        _reactor.addEventHandler(_socket, NObserver<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
        _reactor.addEventHandler(_socket, NObserver<EchoServiceHandler, ShutdownNotification>(*this, &EchoServiceHandler::onShutdown));
        _reactor.addEventHandler(_socket, NObserver<EchoServiceHandler, ErrorNotification>(*this, &EchoServiceHandler::onError));
        _reactor.addEventHandler(_socket, NObserver<EchoServiceHandler, IdleNotification>(*this, &EchoServiceHandler::onIdle));
    }
    
    ~EchoServiceHandler()
    {
        Application& app = Application::instance();
        try
        {
            app.logger().information("Disconnecting " + _socket.peerAddress().toString());
        }
        catch (...)
        {
        }
        _reactor.removeEventHandler(_socket, NObserver<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
        _reactor.removeEventHandler(_socket, NObserver<EchoServiceHandler, ShutdownNotification>(*this, &EchoServiceHandler::onShutdown));
        _reactor.removeEventHandler(_socket, NObserver<EchoServiceHandler, ErrorNotification>(*this, &EchoServiceHandler::onError));
        _reactor.removeEventHandler(_socket, NObserver<EchoServiceHandler, IdleNotification>(*this, &EchoServiceHandler::onIdle));
        if (_onWritableAdded)
            _reactor.removeEventHandler(_socket, NObserver<EchoServiceHandler, WritableNotification>(*this, &EchoServiceHandler::onWritable));
        if (_onTimeoutAdded)
            _reactor.removeEventHandler(_socket, NObserver<EchoServiceHandler, TimeoutNotification>(*this, &EchoServiceHandler::onTimeout));
        delete [] _pBuffer;
    }
    
    void onReadable(const AutoPtr<ReadableNotification>& pNf)
    {
        int n = _socket.receiveBytes(_pBuffer, BUFFER_SIZE);
        if (n > 0) {
            std::stringstream str;
            str.write(_pBuffer, n);
            _data = str.str();
            _reactor.addEventHandler(_socket, NObserver<EchoServiceHandler, WritableNotification>(*this, &EchoServiceHandler::onWritable));
            _onWritableAdded = true;
        }
        else {
            delete this;
        }
    }
    
    void onShutdown(const AutoPtr<ShutdownNotification>& pNf)
    {
        delete this;
    }
    
    void onWritable(const AutoPtr<WritableNotification>& pNf)
    {
        Application::instance().logger().information("onWritable called");
        std::string data = Poco::format("received %s", _data);
        _socket.sendBytes(data.data(), data.length());
        _reactor.removeEventHandler(_socket, NObserver<EchoServiceHandler, WritableNotification>(*this, &EchoServiceHandler::onWritable));
        if (_onTimeoutAdded) {
            _reactor.removeEventHandler(_socket, NObserver<EchoServiceHandler, TimeoutNotification>(*this, &EchoServiceHandler::onTimeout));
        }
        _reactor.addEventHandler(_socket, NObserver<EchoServiceHandler, TimeoutNotification>(*this, &EchoServiceHandler::onTimeout));
        _onTimeoutAdded = true;
    }
    void onError(const AutoPtr<ErrorNotification>& pNf)
    {
        Application::instance().logger().information("onError called");
    }
    void onTimeout(const AutoPtr<TimeoutNotification>& pNf)
    {
        Application::instance().logger().information("onTimeout called");
        _reactor.removeEventHandler(_socket, NObserver<EchoServiceHandler, TimeoutNotification>(*this, &EchoServiceHandler::onTimeout));
        delete this;
    }
    void onIdle(const AutoPtr<IdleNotification>& pNf)
    {
        Application::instance().logger().information("onIdle called");
    }
private:
    enum
    {
        BUFFER_SIZE = 1024
    };
    
    StreamSocket _socket;
    SocketReactor& _reactor;
    char* _pBuffer;
    std::string   _data;
    bool          _onWritableAdded;
    bool          _onTimeoutAdded;
};
class EchoServer: public Poco::Util::ServerApplication
    /// The main application class.
    ///
    /// This class handles command-line arguments and
    /// configuration files.
    /// Start the EchoServer executable with the help
    /// option (/help on Windows, --help on Unix) for
    /// the available command line options.
    ///
    /// To use the sample configuration file (EchoServer.properties),
    /// copy the file to the directory where the EchoServer executable
    /// resides. If you start the debug version of the EchoServer
    /// (EchoServerd[.exe]), you must also create a copy of the configuration
    /// file named EchoServerd.properties. In the configuration file, you
    /// can specify the port on which the server is listening (default
    /// 9977) and the format of the date/time string sent back to the client.
    ///
    /// To test the EchoServer you can use any telnet client (telnet localhost 9977).
{
public:
    EchoServer(): _helpRequested(false)
    {
    }
    
    ~EchoServer()
    {
    }
protected:
    void initialize(Application& self)
    {
        loadConfiguration(); // load default configuration files, if present
        ServerApplication::initialize(self);
    }
        
    void uninitialize()
    {
        ServerApplication::uninitialize();
    }
    void defineOptions(OptionSet& options)
    {
        ServerApplication::defineOptions(options);
        
        options.addOption(
            Option("help", "h", "display help information on command line arguments")
                .required(false)
                .repeatable(false));
    }
    void handleOption(const std::string& name, const std::string& value)
    {
        ServerApplication::handleOption(name, value);
        if (name == "help")
            _helpRequested = true;
    }
    void displayHelp()
    {
        HelpFormatter helpFormatter(options());
        helpFormatter.setCommand(commandName());
        helpFormatter.setUsage("OPTIONS");
        helpFormatter.setHeader("An echo server implemented using the Reactor and Acceptor patterns.");
        helpFormatter.format(std::cout);
    }
    int main(const std::vector<std::string>& args)
    {
        if (_helpRequested)
        {
            displayHelp();
        }
        else
        {
            // get parameters from configuration file
            unsigned short port = (unsigned short) config().getInt("EchoServer.port", 9977);
            
            // set-up a server socket
            ServerSocket svs(port);
            // set-up a SocketReactor...
            Timespan timeout(30000000);    // timeout is set to 30sec
            SocketReactor reactor(timeout);
            // ... and a SocketAcceptor
            SocketAcceptor<EchoServiceHandler> acceptor(svs, reactor);
            // run the reactor in its own thread so that we can wait for
            // a termination request
            Thread thread;
            thread.start(reactor);
            // wait for CTRL-C or kill
            waitForTerminationRequest();
            // Stop the SocketReactor
            reactor.stop();
            thread.join();
        }
        return Application::EXIT_OK;
    }
    
private:
    bool _helpRequested;
};
int main(int argc, char** argv)
{
    EchoServer app;
    return app.run(argc, argv);
}

Comments are closed.