diff options
-rw-r--r-- | .gitmodules | 9 | ||||
-rw-r--r-- | Makefile | 31 | ||||
-rw-r--r-- | cc_const.cc | 106 | ||||
-rw-r--r-- | const.c | 198 | ||||
m--------- | libaco | 0 | ||||
-rw-r--r-- | list.h | 140 | ||||
-rw-r--r-- | log.c | 88 | ||||
-rw-r--r-- | log.h | 39 | ||||
m--------- | picohttpparser | 0 | ||||
m--------- | sds | 0 | ||||
-rw-r--r-- | stats.h | 17 | ||||
-rw-r--r-- | very_const.c | 106 |
12 files changed, 734 insertions, 0 deletions
diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..d3d848b --- /dev/null +++ b/.gitmodules @@ -0,0 +1,9 @@ +[submodule "libaco"] + path = libaco + url = ./libaco +[submodule "picohttpparser"] + path = picohttpparser + url = ./picohttpparser +[submodule "sds"] + path = sds + url = ./sds diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..43e275c --- /dev/null +++ b/Makefile @@ -0,0 +1,31 @@ +TARGET = const +CFLAGS += -Wall -Wextra -Wshadow -pedantic -g +CFLAGS += $(shell echo $(fast) | sed -e 's/yes/-DFAST_AS_FUCK/g') +LDLIBS += -L$(shell jemalloc-config --libdir) -Wl,-rpath,$(shell jemalloc-config --libdir) -ljemalloc $(shell jemalloc-config --libs) +LDLIBS += -lprofiler +ACO_FLAGS = -Wall -Werror -DACO_USE_VALGRIND +RM ?= rm -f + +all: $(TARGET) + +ACO_SRC = libaco/aco.c +ACO_ASM = libaco/acosw.S +ACO = aco.o +$(ACO): $(ACO_SRC) + $(CC) -c -o $@ $^ $(ACO_FLAGS) $(LDLIBS) $(CPPFLAGS) + + +log.o: log.c log.h + +$(TARGET): const.c $(ACO) $(ACO_ASM) log.o + +very_const: very_const.c log.o + +cc: cc_const.cc log.c log.h + $(CXX) cc_const.cc log.c -o cc_const $(CFLAGS) $(LDLIBS) -lstdc++ --std=c++14 + +run: $(TARGET) + ./$(TARGET) + +clean: + $(RM) $(TARGET) *.o diff --git a/cc_const.cc b/cc_const.cc new file mode 100644 index 0000000..85e9ec7 --- /dev/null +++ b/cc_const.cc @@ -0,0 +1,106 @@ +#define _GNU_SOURCE + +#include <unistd.h> +#include <errno.h> +#include <fcntl.h> +#include <string.h> +#include <aio.h> + +#include <sys/socket.h> +#include <sys/epoll.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <sys/epoll.h> +#include <jemalloc/jemalloc.h> + +#include <string> + +#include "log.h" + +#define PORT 8080 +#define LISTEN_N 50 +#define EVENTS_N 50 + +void die (const char *msg) { + (void)msg; + log_error (" %s", msg); + log_error (" %s", strerror (errno)); + exit (EXIT_FAILURE); +} + + +#define assertfd(FD) if (FD < 0) die (#FD " is not a valid file descriptor") +#define assert(EX) if (!(EX)) die (#EX " is false") + +std::string resp = + "HTTP/1.1 200 OK\r\n" + "Content-Length: 25\r\n" + "Content-Type: text/html\r\n" + "Connection: close\r\n" + "\r\n" + "Hello from const server\r\n"; + +void dostuff (int sock); + +int main () { + + int sockfd, newsockfd; + unsigned clilen; + struct sockaddr_in serv_addr, cli_addr; + log_set_level (LOG_INFO); + + + sockfd = socket(AF_INET, SOCK_STREAM, 0); + assertfd (sockfd); + int enable = 1; + assert (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) >= 0); + + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(PORT); + + assert (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) >= 0); + + struct epoll_event ev, events[EVENTS_N]; + int epollfd = epoll_create1 (0); + ev.data.ptr = NULL; + ev.events = EPOLLIN | EPOLLERR; + assert (epoll_ctl (epollfd, EPOLL_CTL_ADD, sockfd, &ev) == 0); + + listen(sockfd, LISTEN_N); + clilen = sizeof(cli_addr); + for (;;) { + /*int nfds = epoll_wait (epollfd, events, EVENTS_N, 3000); + if (nfds == 0) { + exit (EXIT_SUCCESS); + } + assert (nfds != -1); + log_trace ("nfds: %d", nfds); + for (int i = 0; i < nfds; ++i) {*/ + for (;;) { //if (events[i].data.ptr == NULL) { + newsockfd = accept4(sockfd, (struct sockaddr *) &cli_addr, &clilen, SOCK_NONBLOCK); + assertfd (newsockfd); + log_trace ("new: %d", newsockfd); + + dostuff (newsockfd); + //} + //else { + // die ("pffff"); + //} + } + } + return 0; +} + +ssize_t write_sock (int sock); + +void dostuff (int sock) { + write_sock (sock); + close (sock); +} + +inline ssize_t write_sock (int sock) { + + return write (sock, resp.c_str (), resp.size ()); +} @@ -0,0 +1,198 @@ +#define _GNU_SOURCE + +#include <unistd.h> +#include <stdlib.h> +#include <stdio.h> +#include <errno.h> +#include <string.h> +#include <fcntl.h> + +#include <sys/socket.h> +#include <sys/epoll.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <sys/epoll.h> +#include <jemalloc/jemalloc.h> + +#include "libaco/aco.h" +#include "list.h" +#include "log.h" +#include "stats.h" + +#define PORT 8080 +#define LISTEN_N 50 +#define EVENTS_N 50 + +void die (const char *msg) { + (void)msg; + log_error (" %s", msg); + log_error (" %s", strerror (errno)); + exit (EXIT_FAILURE); +} + + +#define assertfd(FD) if (FD < 0) die (#FD " is not a valid file descriptor") +#define assert(EX) if (!(EX)) die (#EX " is false") + +const char resp[] = + "HTTP/1.1 200 OK\r\n" + "Content-Length: 25\r\n" + "Content-Type: text/html\r\n" + "Connection: close\r\n" + "\r\n" + "Hello from const server\r\n"; + +void dostuff (int sock); +void do_aco_stuff (); + +void co_cleanup (aco_t *co); +typedef struct co_ctx { + int sock; + struct aco_list *noderef; +} co_ctx_t; + + +int main () { + memset (&stats, 0, sizeof (stats)); + + int sockfd, newsockfd; + unsigned clilen; + struct sockaddr_in serv_addr, cli_addr; + log_set_level (LOG_INFO); + + + sockfd = socket(AF_INET, SOCK_STREAM, 0); + assertfd (sockfd); + int enable = 1; + assert (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) >= 0); + + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(PORT); + + assert (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) >= 0); + + + struct epoll_event ev, events[EVENTS_N]; + int epollfd = epoll_create1 (0); + ev.data.ptr = NULL; + ev.events = EPOLLIN | EPOLLERR; + assert (epoll_ctl (epollfd, EPOLL_CTL_ADD, sockfd, &ev) == 0); + + + aco_thread_init (NULL); + struct aco_list *coros = aco_list_new (); + aco_t *main_co = aco_create (NULL, NULL, 0, NULL, NULL); + aco_share_stack_t *sstk = aco_share_stack_new (0); + log_trace ("main_co: %p, sstk: %p", (void*)main_co, (void*)sstk); + + + listen(sockfd, LISTEN_N); + clilen = sizeof(cli_addr); + for (;;) { + int nfds = epoll_wait (epollfd, events, EVENTS_N, 3000); + if (nfds == 0) { + report_stat (); + exit (EXIT_SUCCESS); + } + stats.nfds_cnt++; + stats.nfds_sum += nfds; + if (stats.nfds_max < (size_t)nfds) + stats.nfds_max = (size_t)nfds; + assert (nfds != -1); + log_trace ("nfds: %d", nfds); + for (int i = 0; i < nfds; ++i) { + if (events[i].data.ptr == NULL) { + newsockfd = accept4(sockfd, (struct sockaddr *) &cli_addr, &clilen, SOCK_NONBLOCK); + assertfd (newsockfd); + log_trace ("new: %d", newsockfd); + + //int flags = fcntl (newsockfd, F_GETFL); + //fcntl (newsockfd, F_SETFL, flags | O_NONBLOCK); + + co_ctx_t *ctx = malloc (sizeof (co_ctx_t)); + ctx->sock = newsockfd; + aco_t *co = aco_create (main_co, sstk, 0, do_aco_stuff, ctx); + struct aco_list *node = aco_list_add (coros, co); + ctx->noderef = node; + log_trace("co: %p, node: %p, node->co: %p", (void*)co, (void*)node, (void*)(node->co)); + log_trace("arg: %p", co->arg); + + memset (&ev, 0, sizeof (ev)); + ev.data.ptr = node; + ev.events = EPOLLIN | EPOLLERR; + assert (epoll_ctl (epollfd, EPOLL_CTL_ADD, newsockfd, &ev) == 0); + aco_resume (co); + co_cleanup (co); + } + else { + aco_t *co = ((struct aco_list *)events[i].data.ptr)->co; + log_trace("ev. node: %p, co: %p", events[i].data.ptr, (void*)co); + if (co->is_end) + die ("coro ended but left socket open"); + else { + aco_resume (co); + co_cleanup (co); + } + stats.yield_cnt++; + } + } + } + return 0; +} + + +void co_cleanup (aco_t *co) { + if (!co->is_end) + return; + + co_ctx_t *ctx = co->arg; + close (ctx->sock); + aco_list_remove (ctx->noderef); + free (ctx); +} + +ssize_t write_sock (int sock); +int check_req_finished (sbo_buf_t *buf); + + +void do_aco_stuff () { + co_ctx_t *ctx = aco_get_arg (); + sbo_buf_t buf; + sbo_buf_init0 (&buf); + + + for (; ;) { + const int size = 256; + char tmpbuf[size]; + ssize_t rv = read (ctx->sock, tmpbuf, size); + if (rv == 0) + break; + if (rv == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + if (check_req_finished (&buf)) + break; + else + aco_yield (); + } + sbo_buf_grow (&buf, tmpbuf, (size_t)rv); + log_trace("> received %ld bytes", rv); + } + + sbo_buf_free (&buf); + write_sock (ctx->sock); + aco_exit (); +} + +inline int check_req_finished (sbo_buf_t *buf) { + /* Check if we finished transmission (e.g. request ends with "\r\n\r\n") */ + size_t len = sbo_buf_size (buf); + if (len < 4ULL) + return 0; + char *base = sbo_buf_mem (buf); + return strncmp (base + (len - 4), "\r\n\r\n", 4) == 0; +} + +inline ssize_t write_sock (int sock) { + return write (sock, resp, sizeof (resp) - 1); +} diff --git a/libaco b/libaco new file mode 160000 +Subproject e3058b3b3a8ed8390a3bedf6d093e568d344966 @@ -0,0 +1,140 @@ +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include "libaco/aco.h" + +struct aco_list { + aco_t *co; + int fd; + struct aco_list *next, *prev; +}; + +struct aco_list *aco_list_new () { + struct aco_list *head = malloc (sizeof (struct aco_list)); + memset (head, 0, sizeof (struct aco_list)); + head->next = head->prev = head; + return head; +} + +struct aco_list *aco_list_add (struct aco_list *head, aco_t *co) { + struct aco_list *node = malloc (sizeof (struct aco_list)); + node->co = co; + node->prev = head->prev; + node->next = head; + head->prev = node; + return node; +} + +void aco_list_remove (struct aco_list *iter) { + if (iter->next == iter) + abort(); + iter->prev->next = iter->next; + iter->next->prev = iter->prev; +} + +#define SBO_BUF_SMALL 0x80 +#define SBO_BUF_BIG 0x00 +#define SBO_BUF_MASK 0x80 +#define SBO_BUF_SIZEMASK 0x7f +#define SBO_BUF_SMALL_MAX_SIZE 0x7f /* 0x80 - 0x1 */ + + +typedef struct sbo_buf { + unsigned char hdr; + union { + char small[SBO_BUF_SMALL_MAX_SIZE]; + struct { + char *base; + size_t size; + size_t alloc; + } big; + } buf; +} sbo_buf_t; + + +int sbo_buf_init (sbo_buf_t *buf, char *init, size_t count); +void sbo_buf_free (sbo_buf_t *buf); +char *sbo_buf_mem (sbo_buf_t *buf); +size_t sbo_buf_size (sbo_buf_t *buf); +int sbo_buf_type (sbo_buf_t *buf); +int sbo_buf_grow (sbo_buf_t *buf, char *grow, size_t count); + + +int sbo_buf_init0 (sbo_buf_t *buf) { + memset (buf, 0, sizeof (*buf)); + buf->hdr = 0 | SBO_BUF_SMALL; + return 0; +} + + +int sbo_buf_init (sbo_buf_t *buf, char *init, size_t count) { + if (count <= (size_t)SBO_BUF_SMALL_MAX_SIZE) { + buf->hdr = 0 | SBO_BUF_SMALL; + buf->hdr |= (unsigned char) count; + memcpy (buf->buf.small, init, count); + } + else { + buf->buf.big.base = malloc (count); + if (!buf->buf.big.base) + return errno; + buf->hdr = 0 | SBO_BUF_BIG; + buf->buf.big.size = count; + buf->buf.big.alloc = count; + } + return 0; +} + +void sbo_buf_free (sbo_buf_t *buf) { + if (sbo_buf_type (buf) == SBO_BUF_BIG) + free (buf->buf.big.base); +} + +inline char *sbo_buf_mem (sbo_buf_t *buf) { + if (sbo_buf_type (buf) == SBO_BUF_BIG) + return buf->buf.big.base; + return buf->buf.small; +} + +inline size_t sbo_buf_size (sbo_buf_t *buf) { + if (sbo_buf_type (buf) == SBO_BUF_BIG) + return buf->buf.big.size; + + return buf->hdr & SBO_BUF_SIZEMASK; +} + +inline int sbo_buf_type (sbo_buf_t *buf) { + return buf->hdr & SBO_BUF_MASK; +} + +int sbo_buf_grow (sbo_buf_t *buf, char *grow, size_t count) { + if (count == 0) + return 0; + //size_t bufsize = sbo_buf_size (buf); + if (sbo_buf_type (buf) == SBO_BUF_SMALL) { + size_t current_size = sbo_buf_size (buf); + if (current_size + count <= (size_t)SBO_BUF_SMALL_MAX_SIZE) { + /* no need to become big. just concat two existing buffers */ + char *dst = sbo_buf_mem (buf) + current_size; + memcpy (dst, grow, count); + buf->hdr += count; + return 0; + } + /* fallthrough to realloc */ + } + + char *base = sbo_buf_mem (buf); + size_t len = sbo_buf_size (buf); + char *newbuf = malloc (len + count); + if (!newbuf) + return errno; + + memcpy (newbuf, base, len); + memcpy (newbuf + len, grow, count); + sbo_buf_free (buf); + buf->hdr = 0 | SBO_BUF_BIG; /* We definitely exceeded small buffer */ + buf->buf.big.base = newbuf; + buf->buf.big.alloc = len + count; + buf->buf.big.size = len + count; + + return 0; +}
\ No newline at end of file @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2017 rxi + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <stdarg.h> +#include <string.h> +#include <time.h> + +#include "log.h" + +static struct { + void *udata; + FILE *fp; + int level; + int quiet; +} L; + + +static const char *level_names[] = { + "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL" +}; + +static const char *level_colors[] = { + "\x1b[94m", "\x1b[36m", "\x1b[32m", "\x1b[33m", "\x1b[31m", "\x1b[35m" +}; + + +void log_set_fp(FILE *fp) { + L.fp = fp; +} + + +void log_set_level(int level) { + L.level = level; +} + + +void log_set_quiet(int enable) { + L.quiet = enable ? 1 : 0; +} + + +void log_log(int level, const char *file, int line, const char *fmt, ...) { + if (level < L.level) { + return; + } + + /* Get current time */ + time_t t = time(NULL); + struct tm *lt = localtime(&t); + + /* Log to stderr */ + if (!L.quiet) { + va_list args; + char buf[16]; + buf[strftime(buf, sizeof(buf), "%H:%M:%S", lt)] = '\0'; + + fprintf( + stderr, "%s %s%-5s\x1b[0m \x1b[96m%s:%d:\x1b[0m ", + buf, level_colors[level], level_names[level], file, line); + + va_start(args, fmt); + vfprintf(stderr, fmt, args); + va_end(args); + fprintf(stderr, "\n"); + fflush(stderr); + } +}
\ No newline at end of file @@ -0,0 +1,39 @@ +/** + * Copyright (c) 2017 rxi + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the MIT license. See `log.c` for details. + */ + +#ifndef LOG_H +#define LOG_H + +#include <stdio.h> +#include <stdarg.h> + +enum { LOG_TRACE, LOG_DEBUG, LOG_INFO, LOG_WARN, LOG_ERROR, LOG_FATAL }; + +#ifndef FAST_AS_FUCK +#define log_trace(...) log_log(LOG_TRACE, __FILE__, __LINE__, __VA_ARGS__) +#define log_debug(...) log_log(LOG_DEBUG, __FILE__, __LINE__, __VA_ARGS__) +#define log_info(...) log_log(LOG_INFO, __FILE__, __LINE__, __VA_ARGS__) +#define log_warn(...) log_log(LOG_WARN, __FILE__, __LINE__, __VA_ARGS__) +#define log_error(...) log_log(LOG_ERROR, __FILE__, __LINE__, __VA_ARGS__) +#define log_fatal(...) log_log(LOG_FATAL, __FILE__, __LINE__, __VA_ARGS__) +#else +#define log_trace(...) +#define log_debug(...) +#define log_info(...) +#define log_warn(...) +#define log_error(...) +#define log_fatal(...) +#endif + +void log_set_udata(void *udata); +void log_set_fp(FILE *fp); +void log_set_level(int level); +void log_set_quiet(int enable); + +void log_log(int level, const char *file, int line, const char *fmt, ...); + +#endif
\ No newline at end of file diff --git a/picohttpparser b/picohttpparser new file mode 160000 +Subproject 81fe3d99fd90a55cafb993e53fd3000dbc4d564 diff --git a/sds b/sds new file mode 160000 +Subproject a3087cf2be1300649495ff9dfc4327bb8c980ab @@ -0,0 +1,17 @@ +#define STAT_APPLY(XX)\ + XX(nfds_cnt)\ + XX(nfds_sum)\ + XX(nfds_max)\ + XX(yield_cnt) + + +#define STAT_STRUCT_FN(name) size_t name; +struct { + STAT_APPLY(STAT_STRUCT_FN) +} stats; + + +#define STAT_PRINT_FN(name) printf ( #name ": %zu\n", stats. name); +void report_stat () { + STAT_APPLY(STAT_PRINT_FN); +} diff --git a/very_const.c b/very_const.c new file mode 100644 index 0000000..7342034 --- /dev/null +++ b/very_const.c @@ -0,0 +1,106 @@ +#define _GNU_SOURCE + +#include <unistd.h> +#include <stdlib.h> +#include <stdio.h> +#include <errno.h> +#include <string.h> +#include <fcntl.h> +#include <aio.h> + +#include <sys/socket.h> +#include <sys/epoll.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <sys/epoll.h> +#include <jemalloc/jemalloc.h> + +#include "log.h" + +#define PORT 8080 +#define LISTEN_N 50 +#define EVENTS_N 50 + +void die (const char *msg) { + (void)msg; + log_error (" %s", msg); + log_error (" %s", strerror (errno)); + exit (EXIT_FAILURE); +} + + +#define assertfd(FD) if (FD < 0) die (#FD " is not a valid file descriptor") +#define assert(EX) if (!(EX)) die (#EX " is false") + +const char resp[] = + "HTTP/1.1 200 OK\r\n" + "Content-Length: 25\r\n" + "Content-Type: text/html\r\n" + "Connection: close\r\n" + "\r\n" + "Hello from const server\r\n"; + +void dostuff (int sock); + +int main () { + + int sockfd, newsockfd; + unsigned clilen; + struct sockaddr_in serv_addr, cli_addr; + log_set_level (LOG_INFO); + + + sockfd = socket(AF_INET, SOCK_STREAM, 0); + assertfd (sockfd); + int enable = 1; + assert (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) >= 0); + + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(PORT); + + assert (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) >= 0); + + struct epoll_event ev, events[EVENTS_N]; + int epollfd = epoll_create1 (0); + ev.data.ptr = NULL; + ev.events = EPOLLIN | EPOLLERR; + assert (epoll_ctl (epollfd, EPOLL_CTL_ADD, sockfd, &ev) == 0); + + listen(sockfd, LISTEN_N); + clilen = sizeof(cli_addr); + for (;;) { + /*int nfds = epoll_wait (epollfd, events, EVENTS_N, 3000); + if (nfds == 0) { + exit (EXIT_SUCCESS); + } + assert (nfds != -1); + log_trace ("nfds: %d", nfds); + for (int i = 0; i < nfds; ++i) {*/ + for (;;) { //if (events[i].data.ptr == NULL) { + newsockfd = accept4(sockfd, (struct sockaddr *) &cli_addr, &clilen, SOCK_NONBLOCK); + assertfd (newsockfd); + log_trace ("new: %d", newsockfd); + + dostuff (newsockfd); + //} + //else { + // die ("pffff"); + //} + } + } + return 0; +} + +ssize_t write_sock (int sock); + +void dostuff (int sock) { + write_sock (sock); + close (sock); +} + +inline ssize_t write_sock (int sock) { + + return write (sock, resp, sizeof (resp) - 1); +} |