diff --git a/modules/s6/default.nix b/modules/s6/default.nix index b31e606..786cbda 100644 --- a/modules/s6/default.nix +++ b/modules/s6/default.nix @@ -278,10 +278,7 @@ in eat = longrun { name = "log-shipping-pipe-eat"; run = '' - fdmove -c 12 1 \ - ${pkgs.s6}/bin/s6-ipcserver ${cfg.shipping.socket} \ - fdmove -c 1 12 \ - cat + cat ${cfg.shipping.socket} ''; producer-for = spew.name; }; diff --git a/pkgs/logtap/logtap.c b/pkgs/logtap/logtap.c index f038f0d..921f31d 100644 --- a/pkgs/logtap/logtap.c +++ b/pkgs/logtap/logtap.c @@ -1,15 +1,13 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include // for errno +#include // for fcntl, O_NONBLOCK, open, F_GETFL +#include // for POLLERR, POLLHUP, POLLIN +#include // for signal, SIGPIPE, SIG_IGN +#include // for va_end, va_list, va_start +#include // for fprintf, stderr, vfprintf +#include // for malloc, exit +#include // for strlen, strcat, strcpy, strerror +#include // for stat, mkfifo, S_IFIFO +#include // for write, STDOUT_FILENO, read #define PROGRAM_NAME "logtap" @@ -17,43 +15,48 @@ #include #else #include -static void error(int status, int errnum, const char * fmt, ...) { +static void error(int status, int errnum, const char* fmt, ...) +{ va_list ap; va_start(ap, fmt); - fprintf(stderr, PROGRAM_NAME ": "); + fprintf(stderr, PROGRAM_NAME ": "); vfprintf(stderr, fmt, ap); - if(errnum) fprintf(stderr, ": %s", strerror(errnum)); + if (errnum) + fprintf(stderr, ": %s", strerror(errnum)); fprintf(stderr, "\n"); - if(status) exit(status); + va_end(ap); + if (status) + exit(status); } #endif +int open_shipper_fifo(char* pathname) +{ + int fd = -1; + struct stat statbuf; -int open_shipper_socket(char *pathname) { - int fd; - static int fail_count = 0; - - struct sockaddr_un sa = { - .sun_family = AF_LOCAL - }; - strncpy(sa.sun_path, pathname, sizeof(sa.sun_path) - 1); - - fd = socket(AF_LOCAL, SOCK_STREAM, 0); - if(fd >= 0) { - if(connect(fd, (struct sockaddr *) &sa, sizeof sa)) { - if((fail_count % 30) == 0) { - printf(PROGRAM_NAME ": cannot connect socket \"%s\": %s\n", - pathname, - strerror(errno)); - } - fail_count++; - close(fd); - return -1; + if (stat(pathname, &statbuf)) { + switch (errno) { + case ENOENT: + if(mkfifo(pathname, 0700)) { + error(1, errno, "mkfifo %s failed", pathname); + } + break; + default: + error(1, errno, "stat %s failed", pathname); + break; + } + } else { + if (!(statbuf.st_mode & S_IFIFO)) { + error(1, errno, "%s exists already and is not a fifo", pathname); } - int flags = fcntl(fd, F_GETFL); - fcntl(fd, F_SETFL, flags | O_NONBLOCK); } + if (fd < 0) { + fd = open(pathname, O_NONBLOCK | O_RDWR, 0); + if (fd < 0) + error(1, errno, "failed to open fifo %s", pathname); + }; return fd; } @@ -63,80 +66,105 @@ struct pollfd fds[] = { { .fd = -1, .events = POLLERR }, }; -int is_connected(void) { - return (fds[2].fd >= 0); -} +#define FIFO_STATE_GOOD (-1) +#define FIFO_STATE_TIMEOUT_EXPIRED (0) +#define FIFO_STATE_TIMEOUT_MAX (50) /* ? probably going to depend on log volume */ +char *start_cookie, *stop_cookie; +static int fifo_state = FIFO_STATE_TIMEOUT_EXPIRED; -int main(int argc, char * argv[]) { - - char * buf = malloc(8192); - int out_bytes = 0; - int tee_bytes = 0; - - if(argc != 3) { - error(1, 0, "usage: " PROGRAM_NAME " /path/to/socket cookie-text"); - } - char * socket_pathname = argv[1]; - char * cookie = argv[2]; - char * start_cookie = malloc(strlen(cookie) + 8); - char * stop_cookie = malloc(strlen(cookie) + 7); - - if(strlen(socket_pathname) > 108) { - error(1, 0, "socket pathname \"%s\" is too long, max 108 bytes", - socket_pathname); +int write_fifo(int fd, char* buf, int count) +{ + int written_bytes = 0; + if (fifo_state == FIFO_STATE_GOOD) { + written_bytes = write(fd, buf, count); + if (written_bytes == -1) { + fifo_state = FIFO_STATE_TIMEOUT_MAX; + write(1, stop_cookie, strlen(stop_cookie)); + } + } else if (fifo_state > 0) { + fifo_state--; + } else if (fifo_state == FIFO_STATE_TIMEOUT_EXPIRED) { + written_bytes = write(fd, buf, count); + if (written_bytes >= 0) { + fifo_state = FIFO_STATE_GOOD; + write(1, start_cookie, strlen(start_cookie)); + } else { + fifo_state = FIFO_STATE_TIMEOUT_MAX; + /* don't log again, we're in this state because it was bad + already, and it's still bad */ + } + } else { + error(1, 0, "impossible(sic) fifo state %d", fifo_state); }; - strcpy(start_cookie, cookie); strcat(start_cookie, " START\n"); - strcpy(stop_cookie, cookie); strcat(stop_cookie, " STOP\n"); + /* if the fifo can't be written, pretend to caller that we wrote + everything so that it doesn't back up. the backfill process + will write these entries later when the shipper is online + again */ + return (fifo_state == FIFO_STATE_GOOD) ? written_bytes : count; +} + +#define WRITE_LITERAL(fd, c) write(fd, c, sizeof c) + +int main(int argc, char* argv[]) +{ + char* buf = malloc(8192); + int out_bytes = 0; + int fifo_bytes = 0; + + if (argc != 3) { + error(1, 0, "usage: " PROGRAM_NAME " /path/to/fifo cookie-text"); + } + char* fifo_pathname = argv[1]; + char* cookie = argv[2]; + start_cookie = malloc(strlen(cookie) + 8); + stop_cookie = malloc(strlen(cookie) + 7); + + strcpy(start_cookie, cookie); + strcat(start_cookie, " START\n"); + strcpy(stop_cookie, cookie); + strcat(stop_cookie, " STOP\n"); signal(SIGPIPE, SIG_IGN); + fds[2].fd = open_shipper_fifo(fifo_pathname); + int flags = fcntl(STDOUT_FILENO, F_GETFL); fcntl(STDOUT_FILENO, F_SETFL, flags | O_NONBLOCK); int quitting = 0; - while(! quitting) { + while (!quitting) { int nfds = poll(fds, 3, 2000); - if(nfds > 0) { - if((fds[0].revents & (POLLIN|POLLHUP)) && - (out_bytes == 0) && - (tee_bytes == 0)) { + if (nfds > 0) { + if ((fds[0].revents & (POLLIN | POLLHUP)) && (out_bytes == 0) && (fifo_bytes == 0)) { out_bytes = read(fds[0].fd, buf, 8192); - if(out_bytes == 0) { + if (out_bytes == 0) { quitting = 1; - buf = PROGRAM_NAME " detected eof of file on stdin, exiting\n"; - out_bytes = strlen(buf); + WRITE_LITERAL(1, PROGRAM_NAME " detected eof of file on stdin, exiting\n"); }; - if(is_connected()) tee_bytes = out_bytes; + fifo_bytes = out_bytes; }; - if(out_bytes) { - out_bytes -= write(fds[1].fd, buf, out_bytes); - }; - if(fds[1].revents & (POLLERR|POLLHUP)) { + if (fds[1].revents & (POLLERR | POLLHUP)) { exit(1); // can't even log an error if the logging stream fails }; - if(is_connected()) { - if(tee_bytes) { - tee_bytes -= write(fds[2].fd, buf, tee_bytes); - }; - if(fds[2].revents & (POLLERR|POLLHUP)) { - close(fds[2].fd); - fds[2].fd = -1; - (void) write(1, stop_cookie, strlen(stop_cookie)); - }; + if (fds[2].revents & (POLLERR | POLLHUP)) { + error(1, 0, "error or hangup writing to log fifo (revents=%d)", fds[2].revents); + }; + + if (out_bytes) { + out_bytes -= write(fds[1].fd, buf, out_bytes); + }; + if (fifo_bytes) { + fifo_bytes -= write_fifo(fds[2].fd, buf, fifo_bytes); }; } else { - if(! is_connected()) { - fds[2].fd = open_shipper_socket(argv[1]); - if(is_connected()) { - /* write cookie to stdout so that the backfill - * process knows we are now logging realtime - */ - write(fds[1].fd, start_cookie, strlen(start_cookie)); - } - } + /* poll timed out, may as well try and see if the shipper + * is alive again + */ + if (fifo_state > FIFO_STATE_TIMEOUT_EXPIRED) + fifo_state = FIFO_STATE_TIMEOUT_EXPIRED; }; }; }