Logo
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
asyncchannel.hh
Go to the documentation of this file.
1 #pragma once
2 #include "channel.hh"
3 #include "../messages/message.hh"
4 #include "netobserver.hh"
5 
6 #include <string>
7 #include <queue>
8 #include <atomic>
9 #include <boost/asio.hpp>
10 #include <boost/asio/spawn.hpp>
11 #include <boost/asio/error.hpp>
12 #include <memory>
13 #include <mutex>
14 
15 namespace eclipse {
16 namespace network {
17 
18 using boost::asio::ip::tcp;
24 class AsyncChannel: public Channel, public std::enable_shared_from_this<AsyncChannel> {
25  public:
27  AsyncChannel(NetObserver* node_);
28  virtual ~AsyncChannel() ;
29 
31  void do_write(messages::Message*) override;
32 
34  void do_write(std::shared_ptr<std::string>&);
35 
37  void do_write_buffer();
38 
40  void do_read();
41 
43  tcp::socket& get_socket();
44 
46  void commit(std::shared_ptr<std::string>&);
47 
48  private:
49  void on_write(const boost::system::error_code&, size_t);
50  void do_write_impl();
51 
52  void read_coroutine(boost::asio::yield_context);
53 
54  NetObserver* node = nullptr;
55  tcp::socket socket;
56  std::queue<std::shared_ptr<std::string>> messages_queue;
57  std::atomic<bool> is_writing;
58  boost::asio::io_service& iosvc;
59 
60  std::mutex queue_mutex;
61 
62  std::string host;
63  uint32_t port;
64 };
65 
66 }
67 }
AsyncChannel(NetObserver *node_)
Definition: asyncchannel.cc:26
Definition: netobserver.hh:9
tcp::socket & get_socket()
Get internal socket.
Definition: asyncchannel.cc:38
Definition: message.hh:15
void do_write(messages::Message *) override
Write asynchronously the message.
Definition: asyncchannel.cc:53
void commit(std::shared_ptr< std::string > &)
Add a message to the sending queue.
Definition: asyncchannel.cc:73
void do_write_buffer()
Write asynchronously the message sharing the payload.
Definition: asyncchannel.cc:66
void do_read()
This method implements the reading loop.
Definition: asyncchannel.cc:157
Definition: channel.hh:10
virtual ~AsyncChannel()
Definition: asyncchannel.cc:33
Represent an opened channel between two endpoints.
Definition: asyncchannel.hh:24