switch logtap to use fifo unstead of af_unix

This commit is contained in:
Daniel Barlow
2025-10-08 19:02:53 +01:00
parent af6e41db7a
commit 71aed767f2
2 changed files with 120 additions and 95 deletions

View File

@@ -278,10 +278,7 @@ in
eat = longrun { eat = longrun {
name = "log-shipping-pipe-eat"; name = "log-shipping-pipe-eat";
run = '' run = ''
fdmove -c 12 1 \ cat ${cfg.shipping.socket}
${pkgs.s6}/bin/s6-ipcserver ${cfg.shipping.socket} \
fdmove -c 1 12 \
cat
''; '';
producer-for = spew.name; producer-for = spew.name;
}; };

View File

@@ -1,15 +1,13 @@
#include <poll.h> #include <errno.h> // for errno
#include <sys/timerfd.h> #include <fcntl.h> // for fcntl, O_NONBLOCK, open, F_GETFL
#include <time.h> #include <poll.h> // for POLLERR, POLLHUP, POLLIN
#include <stdio.h> #include <signal.h> // for signal, SIGPIPE, SIG_IGN
#include <string.h> #include <stdarg.h> // for va_end, va_list, va_start
#include <stdlib.h> #include <stdio.h> // for fprintf, stderr, vfprintf
#include <unistd.h> #include <stdlib.h> // for malloc, exit
#include <fcntl.h> #include <string.h> // for strlen, strcat, strcpy, strerror
#include <signal.h> #include <sys/stat.h> // for stat, mkfifo, S_IFIFO
#include <sys/socket.h> #include <unistd.h> // for write, STDOUT_FILENO, read
#include <sys/un.h>
#include <errno.h>
#define PROGRAM_NAME "logtap" #define PROGRAM_NAME "logtap"
@@ -17,43 +15,48 @@
#include <error.h> #include <error.h>
#else #else
#include <stdarg.h> #include <stdarg.h>
static void error(int status, int errnum, const char * fmt, ...) { static void error(int status, int errnum, const char* fmt, ...)
{
va_list ap; va_list ap;
va_start(ap, fmt); va_start(ap, fmt);
fprintf(stderr, PROGRAM_NAME ": "); fprintf(stderr, PROGRAM_NAME ": ");
vfprintf(stderr, fmt, ap); vfprintf(stderr, fmt, ap);
if(errnum) fprintf(stderr, ": %s", strerror(errnum)); if (errnum)
fprintf(stderr, ": %s", strerror(errnum));
fprintf(stderr, "\n"); fprintf(stderr, "\n");
if(status) exit(status); va_end(ap);
if (status)
exit(status);
} }
#endif #endif
int open_shipper_fifo(char* pathname)
{
int fd = -1;
struct stat statbuf;
int open_shipper_socket(char *pathname) { if (stat(pathname, &statbuf)) {
int fd; switch (errno) {
static int fail_count = 0; case ENOENT:
if(mkfifo(pathname, 0700)) {
struct sockaddr_un sa = { error(1, errno, "mkfifo %s failed", pathname);
.sun_family = AF_LOCAL }
}; break;
strncpy(sa.sun_path, pathname, sizeof(sa.sun_path) - 1); default:
error(1, errno, "stat %s failed", pathname);
fd = socket(AF_LOCAL, SOCK_STREAM, 0); break;
if(fd >= 0) { }
if(connect(fd, (struct sockaddr *) &sa, sizeof sa)) { } else {
if((fail_count % 30) == 0) { if (!(statbuf.st_mode & S_IFIFO)) {
printf(PROGRAM_NAME ": cannot connect socket \"%s\": %s\n", error(1, errno, "%s exists already and is not a fifo", pathname);
pathname,
strerror(errno));
}
fail_count++;
close(fd);
return -1;
} }
int flags = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
} }
if (fd < 0) {
fd = open(pathname, O_NONBLOCK | O_RDWR, 0);
if (fd < 0)
error(1, errno, "failed to open fifo %s", pathname);
};
return fd; return fd;
} }
@@ -63,80 +66,105 @@ struct pollfd fds[] = {
{ .fd = -1, .events = POLLERR }, { .fd = -1, .events = POLLERR },
}; };
int is_connected(void) { #define FIFO_STATE_GOOD (-1)
return (fds[2].fd >= 0); #define FIFO_STATE_TIMEOUT_EXPIRED (0)
} #define FIFO_STATE_TIMEOUT_MAX (50) /* ? probably going to depend on log volume */
char *start_cookie, *stop_cookie;
static int fifo_state = FIFO_STATE_TIMEOUT_EXPIRED;
int main(int argc, char * argv[]) { int write_fifo(int fd, char* buf, int count)
{
char * buf = malloc(8192); int written_bytes = 0;
int out_bytes = 0; if (fifo_state == FIFO_STATE_GOOD) {
int tee_bytes = 0; written_bytes = write(fd, buf, count);
if (written_bytes == -1) {
if(argc != 3) { fifo_state = FIFO_STATE_TIMEOUT_MAX;
error(1, 0, "usage: " PROGRAM_NAME " /path/to/socket cookie-text"); write(1, stop_cookie, strlen(stop_cookie));
} }
char * socket_pathname = argv[1]; } else if (fifo_state > 0) {
char * cookie = argv[2]; fifo_state--;
char * start_cookie = malloc(strlen(cookie) + 8); } else if (fifo_state == FIFO_STATE_TIMEOUT_EXPIRED) {
char * stop_cookie = malloc(strlen(cookie) + 7); written_bytes = write(fd, buf, count);
if (written_bytes >= 0) {
if(strlen(socket_pathname) > 108) { fifo_state = FIFO_STATE_GOOD;
error(1, 0, "socket pathname \"%s\" is too long, max 108 bytes", write(1, start_cookie, strlen(start_cookie));
socket_pathname); } else {
fifo_state = FIFO_STATE_TIMEOUT_MAX;
/* don't log again, we're in this state because it was bad
already, and it's still bad */
}
} else {
error(1, 0, "impossible(sic) fifo state %d", fifo_state);
}; };
strcpy(start_cookie, cookie); strcat(start_cookie, " START\n"); /* if the fifo can't be written, pretend to caller that we wrote
strcpy(stop_cookie, cookie); strcat(stop_cookie, " STOP\n"); everything so that it doesn't back up. the backfill process
will write these entries later when the shipper is online
again */
return (fifo_state == FIFO_STATE_GOOD) ? written_bytes : count;
}
#define WRITE_LITERAL(fd, c) write(fd, c, sizeof c)
int main(int argc, char* argv[])
{
char* buf = malloc(8192);
int out_bytes = 0;
int fifo_bytes = 0;
if (argc != 3) {
error(1, 0, "usage: " PROGRAM_NAME " /path/to/fifo cookie-text");
}
char* fifo_pathname = argv[1];
char* cookie = argv[2];
start_cookie = malloc(strlen(cookie) + 8);
stop_cookie = malloc(strlen(cookie) + 7);
strcpy(start_cookie, cookie);
strcat(start_cookie, " START\n");
strcpy(stop_cookie, cookie);
strcat(stop_cookie, " STOP\n");
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
fds[2].fd = open_shipper_fifo(fifo_pathname);
int flags = fcntl(STDOUT_FILENO, F_GETFL); int flags = fcntl(STDOUT_FILENO, F_GETFL);
fcntl(STDOUT_FILENO, F_SETFL, flags | O_NONBLOCK); fcntl(STDOUT_FILENO, F_SETFL, flags | O_NONBLOCK);
int quitting = 0; int quitting = 0;
while(! quitting) { while (!quitting) {
int nfds = poll(fds, 3, 2000); int nfds = poll(fds, 3, 2000);
if(nfds > 0) { if (nfds > 0) {
if((fds[0].revents & (POLLIN|POLLHUP)) && if ((fds[0].revents & (POLLIN | POLLHUP)) && (out_bytes == 0) && (fifo_bytes == 0)) {
(out_bytes == 0) &&
(tee_bytes == 0)) {
out_bytes = read(fds[0].fd, buf, 8192); out_bytes = read(fds[0].fd, buf, 8192);
if(out_bytes == 0) { if (out_bytes == 0) {
quitting = 1; quitting = 1;
buf = PROGRAM_NAME " detected eof of file on stdin, exiting\n"; WRITE_LITERAL(1, PROGRAM_NAME " detected eof of file on stdin, exiting\n");
out_bytes = strlen(buf);
}; };
if(is_connected()) tee_bytes = out_bytes; fifo_bytes = out_bytes;
}; };
if(out_bytes) { if (fds[1].revents & (POLLERR | POLLHUP)) {
out_bytes -= write(fds[1].fd, buf, out_bytes);
};
if(fds[1].revents & (POLLERR|POLLHUP)) {
exit(1); // can't even log an error if the logging stream fails exit(1); // can't even log an error if the logging stream fails
}; };
if(is_connected()) { if (fds[2].revents & (POLLERR | POLLHUP)) {
if(tee_bytes) { error(1, 0, "error or hangup writing to log fifo (revents=%d)", fds[2].revents);
tee_bytes -= write(fds[2].fd, buf, tee_bytes); };
};
if(fds[2].revents & (POLLERR|POLLHUP)) { if (out_bytes) {
close(fds[2].fd); out_bytes -= write(fds[1].fd, buf, out_bytes);
fds[2].fd = -1; };
(void) write(1, stop_cookie, strlen(stop_cookie)); if (fifo_bytes) {
}; fifo_bytes -= write_fifo(fds[2].fd, buf, fifo_bytes);
}; };
} else { } else {
if(! is_connected()) { /* poll timed out, may as well try and see if the shipper
fds[2].fd = open_shipper_socket(argv[1]); * is alive again
if(is_connected()) { */
/* write cookie to stdout so that the backfill if (fifo_state > FIFO_STATE_TIMEOUT_EXPIRED)
* process knows we are now logging realtime fifo_state = FIFO_STATE_TIMEOUT_EXPIRED;
*/
write(fds[1].fd, start_cookie, strlen(start_cookie));
}
}
}; };
}; };
} }