Browse Source

it works

master
abbycin 4 months ago
parent
commit
ccaeb4bc69
7 changed files with 429 additions and 50 deletions
  1. 9
    1
      CMakeLists.txt
  2. 45
    0
      acceptor.h
  3. 102
    0
      aux.h
  4. 109
    0
      context.h
  5. 0
    45
      future_adapter.h
  6. 87
    4
      main.cpp
  7. 77
    0
      peer.h

+ 9
- 1
CMakeLists.txt View File

@@ -2,5 +2,13 @@ cmake_minimum_required(VERSION 3.18)
project(conet)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)

add_executable(conet future_adapter.h main.cpp)
add_executable(conet
aux.h
context.h
peer.h
acceptor.h
main.cpp)
target_compile_options(conet PRIVATE -fcoroutines)

+ 45
- 0
acceptor.h View File

@@ -0,0 +1,45 @@
/***********************************************
File Name: acceptor.h
Author: Abby Cin
Mail: abbytsing@gmail.com
Created Time: 10/22/20 8:05 PM
***********************************************/

#ifndef _CONET_ACCEPTOR_H_
#define _CONET_ACCEPTOR_H_

class acceptor
{
public:
acceptor(context& ctx, resolver r) : ctx_{ctx}, fd_{r.fd_} {}

[[nodiscard]] int listen() const { return ::listen(fd_, SOMAXCONN); }

auto accept()
{
struct awaiter
{
context& ctx_;
int fd_;
awaiter(context& ctx, int fd) : ctx_{ctx}, fd_{fd} {}

bool await_ready() { return false; }
void await_suspend(handle_t co) { ctx_.push(fd_, co); }
peer await_resume()
{
if(!ctx_.running())
{
return {ctx_, -1};
}
int sock = ::accept4(fd_, nullptr, nullptr, SOCK_NONBLOCK);
return {ctx_, sock};
}
};
return awaiter{ctx_, fd_};
}

private:
context& ctx_;
int fd_;
};
#endif //_CONET_ACCEPTOR_H_

+ 102
- 0
aux.h View File

@@ -0,0 +1,102 @@
/***********************************************
File Name: aux.h
Author: Abby Cin
Mail: abbytsing@gmail.com
Created Time: 10/24/20 2:43 PM
***********************************************/

#ifndef _CONET_AUX_H_
#define _CONET_AUX_H_

#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <signal.h>
#include <errno.h>
#include <netdb.h>
#include <string.h>
#include <unistd.h>
#include <thread>
#include <mutex>
#include <string>
#include <coroutine>
#include <vector>
#include <map>
#include <iostream>

struct debugger
{
debugger(const char* s, int n) { std::cout << s << ':' << n << " => "; }
template<typename... Args>
void print(Args&&... args)
{
((std::cout << args << ' '), ...);
std::cout << '\n';
}
};

#define debug(...) debugger(__func__, __LINE__).print(__VA_ARGS__)

using handle_t = std::coroutine_handle<>;

class acceptor;
class resolver
{
friend acceptor;
int fd_{0};
explicit resolver(int fd) : fd_{fd} {}

public:
static resolver from_str(std::string s)
{
struct addrinfo hints
{
};
::memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
struct addrinfo* addrs;

auto sep = s.find(':');
if(sep == std::string::npos)
{
throw std::runtime_error("invalid address");
}
auto port = s.substr(sep + 1);
auto domain = s.substr(0, sep);
if(port.empty() || domain.empty())
{
throw std::runtime_error("invalid address");
}

int r = ::getaddrinfo(domain.data(), port.data(), &hints, &addrs);
if(r < 0)
{
throw std::runtime_error(::gai_strerror(r));
}
int fd = 0;
for(auto p = addrs; p != nullptr; p = p->ai_next)
{
fd = ::socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if(fd == -1)
{
continue;
}

if(::bind(fd, p->ai_addr, p->ai_addrlen) == 0)
{
break;
}
::close(fd);
}
::freeaddrinfo(addrs);
if(fd == 0)
{
throw std::runtime_error("can't bind to any interface");
}
return resolver{fd};
}
};

#endif //_CONET_AUX_H_

+ 109
- 0
context.h View File

@@ -0,0 +1,109 @@
/***********************************************
File Name: context.h
Author: Abby Cin
Mail: abbytsing@gmail.com
Created Time: 10/24/20 2:43 PM
***********************************************/

#ifndef _CONET_CONTEXT_H_
#define _CONET_CONTEXT_H_

class peer;
class acceptor;
class context
{
public:
context() : efd_{-1}, stop_{true}, revs_{1}, conn_{} { efd_ = ::epoll_create1(EPOLL_CLOEXEC); }
~context() { cleanup(); }

int run()
{
stop_ = false;
while(!stop_)
{
errno = 0;
int res = ::epoll_wait(efd_, revs_.data(), revs_.size(), -1);
if(res < 0)
{
return res;
}
for(int i = 0; i < res; ++i)
{
if((revs_[i].events & EPOLLIN) || (revs_[i].events & EPOLLPRI) || (revs_[i].events & EPOLLHUP) ||
(revs_[i].events & EPOLLERR))
{
auto co = conn_.find(revs_[i].data.fd);
if(co != conn_.end())
{
co->second.resume();
}
}
}
}
return 0;
}

void stop()
{
stop_ = true;
cleanup();
}

bool running() { return !stop_; }

private:
int efd_;
std::atomic_bool stop_;
std::vector<epoll_event> revs_;
std::map<int, handle_t> conn_;

friend peer;
friend acceptor;

void cleanup()
{
for(auto& [_, v]: conn_)
{
v.destroy();
}
conn_.clear();
}

int push(int fd, handle_t co)
{
if(!conn_.contains(fd))
{
epoll_event ev{};
ev.data.fd = fd;
ev.events = EPOLLIN;
int r = ::epoll_ctl(efd_, EPOLL_CTL_ADD, fd, &ev);
if(r < 0)
{
return r;
}
}
if(conn_.size() + 1 == revs_.size())
{
revs_.resize(revs_.size() * 2);
}
conn_[fd] = co;
return 0;
}

int pop(int fd)
{
debug("remove from loop:", fd);
auto c = conn_.find(fd);
if(c != conn_.end())
{
c->second = nullptr;
conn_.erase(c);
}
epoll_event ev{};
ev.events = EPOLLIN;
ev.data.fd = fd;
return ::epoll_ctl(efd_, EPOLL_CTL_DEL, fd, &ev);
}
};

#endif //_CONET_CONTEXT_H_

+ 0
- 45
future_adapter.h View File

@@ -1,45 +0,0 @@
/***********************************************
File Name: future_adapter.h.h
Author: Abby Cin
Mail: abbytsing@gmail.com
Created Time: 10/15/20 8:37 PM
***********************************************/

#ifndef FUTURE_ADAOTER_H_
#define FUTURE_ADAOTER_H_

#include <coroutine>
#include <future>

namespace std
{
template<typename R, typename... Args>
struct coroutine_traits<future<R>, Args...>
{
struct promise_type
{
promise<R> p;
auto initial_suspend() { return suspend_never{}; }

auto finial_suspend() { return suspend_never{}; }

template<typename T>
void return_value(T&& t)
{
p.set_value(forward<T>(t));
}

template<typename T>
void yield_value(T&& t)
{
p.set_value(forward<T>(t));
}

void unhandled_exception() { p.set_exception(current_exception()); }

future<R> get_return_object() { return p.get_future(); }
};
};
} // namespace std

#endif // FUTURE_ADAOTER_H_

+ 87
- 4
main.cpp View File

@@ -1,7 +1,90 @@
#include <iostream>
/***********************************************
File Name: main.cpp
Author: Abby Cin
Mail: abbytsing@gmail.com
Created Time: 10/24/20 2:45 PM
***********************************************/

int main()
#include "aux.h"
#include "context.h"
#include "peer.h"
#include "acceptor.h"

struct task
{
};
template<typename... Args>
struct std::coroutine_traits<task, Args...>
{
struct promise_type
{
task get_return_object() { return {}; }
suspend_never initial_suspend() { return {}; }
suspend_never final_suspend() { return {}; }
void return_void(){};
void unhandled_exception() { std::terminate(); }
};
};

task session(peer&& tmp)
{
peer client{std::move(tmp)};
char data[128]{};
for(;;)
{
auto n = co_await client.read_some(data, sizeof(data));
if(n < 1)
{
break;
}

int sent = 0;
while(sent < n)
{
auto x = co_await client.write_some(data + sent, n - sent);
if(x < 0)
{
sent = x;
break;
}
sent += x;
}
if(sent < 0)
{
break;
}
}
}

task spawn(context& ctx, resolver& rr)
{
std::cout << "Hello, World!" << std::endl;
return 0;
acceptor a{ctx, rr};
if(a.listen() < 0)
{
ctx.stop();
co_return;
}
for(;;)
{
session(co_await a.accept());
if(!ctx.running())
{
break;
}
}
}

context* gctx = nullptr;
void handler(int) { gctx->stop(); }

int main()
{
context ctx{};
auto addr = resolver::from_str("127.0.0.1:8889");
spawn(ctx, addr);

::signal(SIGINT, handler);
::signal(SIGTERM, handler);
gctx = &ctx;
ctx.run();
}

+ 77
- 0
peer.h View File

@@ -0,0 +1,77 @@
/***********************************************
File Name: peer.h
Author: Abby Cin
Mail: abbytsing@gmail.com
Created Time: 10/24/20 2:42 PM
***********************************************/

#ifndef _CONET_PEER_H_
#define _CONET_PEER_H_

class peer
{
public:
peer(context& ctx, int fd) : ctx_{ctx}, fd_{fd} {}
peer(peer&& r) noexcept : ctx_{r.ctx_}, fd_{r.fd_} { r.fd_ = -1; }
~peer() { this->close(); }

auto read_some(char* data, int n)
{
struct awaiter
{
context& ctx_;
int fd_;
char* data_;
int n_;
awaiter(context& ctx, int f, char* data, int n) : ctx_{ctx}, fd_{f}, data_{data}, n_{n} {}

bool await_ready() { return false; }

bool await_suspend(handle_t co)
{
ctx_.push(fd_, co);
return true; // same to `void await_suspend()`, immediately back to caller
}

int await_resume() { return ::read(fd_, data_, n_); }
};
return awaiter{ctx_, fd_, data, n};
}

// no guarantee that all data will be sent
auto write_some(char* data, int n)
{
struct awaiter
{
peer* p_;
int fd_;
char* data_;
int n_;
int res_{0};
awaiter(peer* p, int f, char* data, int n) : p_{p}, fd_{f}, data_{data}, n_{n} {}

bool await_ready() { return false; }

void await_suspend(handle_t) { res_ = ::write(fd_, data_, n_); }

int await_resume() { return res_; }
};
return awaiter{this, fd_, data, n};
}

void close()
{
if(fd_ > 0)
{
ctx_.pop(fd_);
::close(fd_);
fd_ = -1;
}
}

private:
context& ctx_;
int fd_;
};

#endif //_CONET_PEER_H_

Loading…
Cancel
Save