summaryrefslogtreecommitdiffstats
path: root/const.c
diff options
context:
space:
mode:
Diffstat (limited to 'const.c')
-rw-r--r--const.c198
1 files changed, 198 insertions, 0 deletions
diff --git a/const.c b/const.c
new file mode 100644
index 0000000..b0ec1e6
--- /dev/null
+++ b/const.c
@@ -0,0 +1,198 @@
+#define _GNU_SOURCE
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <fcntl.h>
+
+#include <sys/socket.h>
+#include <sys/epoll.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <sys/epoll.h>
+#include <jemalloc/jemalloc.h>
+
+#include "libaco/aco.h"
+#include "list.h"
+#include "log.h"
+#include "stats.h"
+
+#define PORT 8080
+#define LISTEN_N 50
+#define EVENTS_N 50
+
+void die (const char *msg) {
+ (void)msg;
+ log_error (" %s", msg);
+ log_error (" %s", strerror (errno));
+ exit (EXIT_FAILURE);
+}
+
+
+#define assertfd(FD) if (FD < 0) die (#FD " is not a valid file descriptor")
+#define assert(EX) if (!(EX)) die (#EX " is false")
+
+const char resp[] =
+ "HTTP/1.1 200 OK\r\n"
+ "Content-Length: 25\r\n"
+ "Content-Type: text/html\r\n"
+ "Connection: close\r\n"
+ "\r\n"
+ "Hello from const server\r\n";
+
+void dostuff (int sock);
+void do_aco_stuff ();
+
+void co_cleanup (aco_t *co);
+typedef struct co_ctx {
+ int sock;
+ struct aco_list *noderef;
+} co_ctx_t;
+
+
+int main () {
+ memset (&stats, 0, sizeof (stats));
+
+ int sockfd, newsockfd;
+ unsigned clilen;
+ struct sockaddr_in serv_addr, cli_addr;
+ log_set_level (LOG_INFO);
+
+
+ sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ assertfd (sockfd);
+ int enable = 1;
+ assert (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) >= 0);
+
+ bzero((char *) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = INADDR_ANY;
+ serv_addr.sin_port = htons(PORT);
+
+ assert (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) >= 0);
+
+
+ struct epoll_event ev, events[EVENTS_N];
+ int epollfd = epoll_create1 (0);
+ ev.data.ptr = NULL;
+ ev.events = EPOLLIN | EPOLLERR;
+ assert (epoll_ctl (epollfd, EPOLL_CTL_ADD, sockfd, &ev) == 0);
+
+
+ aco_thread_init (NULL);
+ struct aco_list *coros = aco_list_new ();
+ aco_t *main_co = aco_create (NULL, NULL, 0, NULL, NULL);
+ aco_share_stack_t *sstk = aco_share_stack_new (0);
+ log_trace ("main_co: %p, sstk: %p", (void*)main_co, (void*)sstk);
+
+
+ listen(sockfd, LISTEN_N);
+ clilen = sizeof(cli_addr);
+ for (;;) {
+ int nfds = epoll_wait (epollfd, events, EVENTS_N, 3000);
+ if (nfds == 0) {
+ report_stat ();
+ exit (EXIT_SUCCESS);
+ }
+ stats.nfds_cnt++;
+ stats.nfds_sum += nfds;
+ if (stats.nfds_max < (size_t)nfds)
+ stats.nfds_max = (size_t)nfds;
+ assert (nfds != -1);
+ log_trace ("nfds: %d", nfds);
+ for (int i = 0; i < nfds; ++i) {
+ if (events[i].data.ptr == NULL) {
+ newsockfd = accept4(sockfd, (struct sockaddr *) &cli_addr, &clilen, SOCK_NONBLOCK);
+ assertfd (newsockfd);
+ log_trace ("new: %d", newsockfd);
+
+ //int flags = fcntl (newsockfd, F_GETFL);
+ //fcntl (newsockfd, F_SETFL, flags | O_NONBLOCK);
+
+ co_ctx_t *ctx = malloc (sizeof (co_ctx_t));
+ ctx->sock = newsockfd;
+ aco_t *co = aco_create (main_co, sstk, 0, do_aco_stuff, ctx);
+ struct aco_list *node = aco_list_add (coros, co);
+ ctx->noderef = node;
+ log_trace("co: %p, node: %p, node->co: %p", (void*)co, (void*)node, (void*)(node->co));
+ log_trace("arg: %p", co->arg);
+
+ memset (&ev, 0, sizeof (ev));
+ ev.data.ptr = node;
+ ev.events = EPOLLIN | EPOLLERR;
+ assert (epoll_ctl (epollfd, EPOLL_CTL_ADD, newsockfd, &ev) == 0);
+ aco_resume (co);
+ co_cleanup (co);
+ }
+ else {
+ aco_t *co = ((struct aco_list *)events[i].data.ptr)->co;
+ log_trace("ev. node: %p, co: %p", events[i].data.ptr, (void*)co);
+ if (co->is_end)
+ die ("coro ended but left socket open");
+ else {
+ aco_resume (co);
+ co_cleanup (co);
+ }
+ stats.yield_cnt++;
+ }
+ }
+ }
+ return 0;
+}
+
+
+void co_cleanup (aco_t *co) {
+ if (!co->is_end)
+ return;
+
+ co_ctx_t *ctx = co->arg;
+ close (ctx->sock);
+ aco_list_remove (ctx->noderef);
+ free (ctx);
+}
+
+ssize_t write_sock (int sock);
+int check_req_finished (sbo_buf_t *buf);
+
+
+void do_aco_stuff () {
+ co_ctx_t *ctx = aco_get_arg ();
+ sbo_buf_t buf;
+ sbo_buf_init0 (&buf);
+
+
+ for (; ;) {
+ const int size = 256;
+ char tmpbuf[size];
+ ssize_t rv = read (ctx->sock, tmpbuf, size);
+ if (rv == 0)
+ break;
+ if (rv == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ if (check_req_finished (&buf))
+ break;
+ else
+ aco_yield ();
+ }
+ sbo_buf_grow (&buf, tmpbuf, (size_t)rv);
+ log_trace("> received %ld bytes", rv);
+ }
+
+ sbo_buf_free (&buf);
+ write_sock (ctx->sock);
+ aco_exit ();
+}
+
+inline int check_req_finished (sbo_buf_t *buf) {
+ /* Check if we finished transmission (e.g. request ends with "\r\n\r\n") */
+ size_t len = sbo_buf_size (buf);
+ if (len < 4ULL)
+ return 0;
+ char *base = sbo_buf_mem (buf);
+ return strncmp (base + (len - 4), "\r\n\r\n", 4) == 0;
+}
+
+inline ssize_t write_sock (int sock) {
+ return write (sock, resp, sizeof (resp) - 1);
+}