11 Commits

Author SHA1 Message Date
9dd92ac53e mob next [ci-skip] [ci skip] [skip ci]
lastFile:pkgs/kernel/default.nix
2025-10-14 10:07:38 +07:00
Daniel Barlow
29fbb5461d send log timestamp parse errors to victorialogs 2025-10-08 20:10:11 +01:00
Daniel Barlow
70786712b3 make victorialogsend ucspi-compatible 2025-10-08 20:10:11 +01:00
Daniel Barlow
be26df4e95 don't send 9 digits of nanosecond to victorialogs
It fails to parse. I havent investigated how many digits it will
parse, but let's try 3
2025-10-08 20:10:11 +01:00
Daniel Barlow
71aed767f2 switch logtap to use fifo unstead of af_unix 2025-10-08 20:10:11 +01:00
Daniel Barlow
af6e41db7a logshippers: add victorialogsend script
it's only very lightly tested but it seems to work.  the _stream
may be quite wrong, or perhaps that's a local admin decision anyway
2025-10-08 20:10:11 +01:00
Daniel Barlow
947a1c1373 return utc as (values seconds nanos)
fractional seconds is just inviting too much fun with floating point
error
2025-10-08 20:10:11 +01:00
Daniel Barlow
59eea64985 add tai64 module to anoia 2025-10-08 20:10:11 +01:00
Daniel Barlow
a343e63231 rename logshipper -> logtap
- it matches the executable name
- it doesn't ship anything anyway, it's just plumbing
2025-10-08 20:10:11 +01:00
Daniel Barlow
cacde953cb don't ask chrony to drop privs, it needs libpcap 2025-10-08 20:10:11 +01:00
Daniel Barlow
9f64eabeb4 rename incz package to logshippers 2025-10-08 20:10:11 +01:00
17 changed files with 415 additions and 198 deletions

View File

@@ -61,9 +61,8 @@ OpenWrt web page: https://openwrt.org/toh/gl.inet/gl-ar750
let let
inherit (lib) mkIf; inherit (lib) mkIf;
openwrt = pkgs.openwrt; openwrt = pkgs.openwrt;
firmwareBlobs = pkgs.pkgsBuildBuild.fetchFromGitHub { firmwareBlobs = pkgs.pkgsBuildBuild.fetchgit {
owner = "kvalo"; url = "https://git.codelinaro.org/clo/ath-firmware/ath10k-firmware";
repo = "ath10k-firmware";
rev = "5d63529ffc6e24974bc7c45b28fd1c34573126eb"; rev = "5d63529ffc6e24974bc7c45b28fd1c34573126eb";
sha256 = "1bwpifrwl5mvsmbmc81k8l22hmkwk05v7xs8dxag7fgv2kd6lv2r"; sha256 = "1bwpifrwl5mvsmbmc81k8l22hmkwk05v7xs8dxag7fgv2kd6lv2r";
}; };

View File

@@ -107,6 +107,7 @@ rec {
}; };
services.ntp = svc.ntp.build { services.ntp = svc.ntp.build {
user = "root";
pools = { pools = {
"pool.ntp.org" = [ "iburst" ]; "pool.ntp.org" = [ "iburst" ];
}; };

View File

@@ -14,7 +14,7 @@ let
(mapAttrsToList (name: opts: "server ${name} ${concatStringsSep "" opts}") p.servers) (mapAttrsToList (name: opts: "server ${name} ${concatStringsSep "" opts}") p.servers)
++ (mapAttrsToList (name: opts: "pool ${name} ${concatStringsSep "" opts}") p.pools) ++ (mapAttrsToList (name: opts: "pool ${name} ${concatStringsSep "" opts}") p.pools)
++ (mapAttrsToList (name: opts: "peer ${name} ${concatStringsSep "" opts}") p.peers) ++ (mapAttrsToList (name: opts: "peer ${name} ${concatStringsSep "" opts}") p.peers)
++ lib.optional (p.user != null) "user ${p.user}" ++ lib.optional (p.user != "root") "user ${p.user}"
++ (lib.optional ( ++ (lib.optional (
p.makestep != null p.makestep != null
) "makestep ${toString p.makestep.threshold} ${toString p.makestep.limit}") ) "makestep ${toString p.makestep.threshold} ${toString p.makestep.limit}")

View File

@@ -31,7 +31,7 @@ let
pipecmds = pipecmds =
[ "${s6}/bin/s6-log -bpd3 -- ${cfg.script} 1" ] [ "${s6}/bin/s6-log -bpd3 -- ${cfg.script} 1" ]
++ (lib.optional (cfg ? persistent && cfg.persistent.enable) "/bin/tee /dev/pmsg0") ++ (lib.optional (cfg ? persistent && cfg.persistent.enable) "/bin/tee /dev/pmsg0")
++ (lib.optional cfg.shipping.enable "${pkgs.logshipper}/bin/logtap ${cfg.shipping.socket} logshipper-socket-event"); ++ (lib.optional cfg.shipping.enable "${pkgs.logtap}/bin/logtap ${cfg.shipping.socket} logshipper-socket-event");
in in
'' ''
#!${execline}/bin/execlineb -P #!${execline}/bin/execlineb -P
@@ -278,10 +278,7 @@ in
eat = longrun { eat = longrun {
name = "log-shipping-pipe-eat"; name = "log-shipping-pipe-eat";
run = '' run = ''
fdmove -c 12 1 \ cat ${cfg.shipping.socket}
${pkgs.s6}/bin/s6-ipcserver ${cfg.shipping.socket} \
fdmove -c 1 12 \
cat
''; '';
producer-for = spew.name; producer-for = spew.name;
}; };

View File

@@ -1,9 +1,9 @@
servicedir:=$(shell mktemp -d) servicedir:=$(shell mktemp -d)
outputdir:=$(servicedir)/.outputs outputdir:=$(servicedir)/.outputs
default: fs.lua init.lua nl.lua svc.lua process.lua net/constants.lua default: fs.lua init.lua nl.lua svc.lua process.lua net/constants.lua tai64.lua
CHECK=fs.fnl init.fnl svc.fnl process.fnl CHECK=fs.fnl init.fnl svc.fnl process.fnl tai64.fnl
check: check:
ln -s . anoia ln -s . anoia

79
pkgs/anoia/tai64.fnl Normal file
View File

@@ -0,0 +1,79 @@
(local { : base64 : assoc } (require :anoia))
(import-macros { : expect= : define-tests } :anoia.assert)
(local
leap-seconds-list
(let [tbl
[
;; https://data.iana.org/time-zones/data/leap-seconds.list
;; comments are the _start_ of the day where the second was
;; added at the end of the previous day
[2272060800 10] ; 1 jan 1972 ; baseline, not a leap second
[2287785600 11] ; 1 jul 1972
[2303683200 12] ; 1 jan 1973
[2335219200 13] ; 1 jan 1974
[2366755200 14] ; 1 Jan 1975
[2398291200 15] ; 1 Jan 1976
[2429913600 16] ; 1 Jan 1977
[2461449600 17] ; 1 Jan 1978
[2492985600 18] ; 1 Jan 1979
[2524521600 19] ; 1 Jan 1980
[2571782400 20] ; 1 Jul 1981
[2603318400 21] ; 1 Jul 1982
[2634854400 22] ; 1 Jul 1983
[2698012800 23] ; 1 Jul 1985
[2776982400 24] ; 1 Jan 1988
[2840140800 25] ; 1 Jan 1990
[2871676800 26] ; 1 Jan 1991
[2918937600 27] ; 1 Jul 1992
[2950473600 28] ; 1 Jul 1993
[2982009600 29] ; 1 Jul 1994
[3029443200 30] ; 1 Jan 1996
[3076704000 31] ; 1 Jul 1997
[3124137600 32] ; 1 Jan 1999
[3345062400 33] ; 1 Jan 2006
[3439756800 34] ; 1 Jan 2009
[3550089600 35] ; 1 Jul 2012
[3644697600 36] ; 1 Jul 2015
[3692217600 37] ; 1 Jan 2017
]]
(icollect [_ [ts dtai] (ipairs tbl)]
[(+ (- ts 2208988800) dtai) dtai])))
(fn leap-seconds [timestamp]
(accumulate [secs 10
_ [epoch leap-seconds] (ipairs leap-seconds-list)
&until (> epoch timestamp)]
leap-seconds))
(define-tests :leap-seconds
(expect= (leap-seconds 104694412) 12)
(expect= (leap-seconds 23) 10)
(expect= (leap-seconds (+ 3692217600 60)) 37)
(expect= (leap-seconds (+ 10 773020829)) 29)
(expect= (leap-seconds 362793520) 19))
(fn from-timestamp [str]
(if (= (string.sub str 1 1) "@")
(let [s (tonumber (string.sub str 2 17) 16)
two_62 (lshift 1 62)
sec (if (>= s two_62)
(- s two_62)
(- two_62 s))
nano (tonumber (string.sub str 18 25) 16)]
{:s sec :n nano})
nil))
(fn to-utc [tai]
(values (- tai.s (leap-seconds tai.s)) tai.n))
(define-tests
(expect=
(from-timestamp "@4000000068e2f0d3257dc09b")
{:s 1759703251 :n 628998299})
(let [(s n) (to-utc (from-timestamp "@4000000068e2f0d3257dc09b"))]
(expect= [s n] [1759703214 628998299]))
)
{ : from-timestamp : to-utc }

View File

@@ -93,14 +93,14 @@ in
hi = callPackage ./hi { }; hi = callPackage ./hi { };
ifwait = callPackage ./ifwait { }; ifwait = callPackage ./ifwait { };
initramfs-peek = callPackage ./initramfs-peek { }; initramfs-peek = callPackage ./initramfs-peek { };
incz = callPackage ./incz { }; logshippers = callPackage ./logshippers { };
json-to-fstree = callPackage ./json-to-fstree { }; json-to-fstree = callPackage ./json-to-fstree { };
kernel-backport = callPackage ./kernel-backport { }; kernel-backport = callPackage ./kernel-backport { };
kmodloader = callPackage ./kmodloader { }; kmodloader = callPackage ./kmodloader { };
levitate = callPackage ./levitate { }; levitate = callPackage ./levitate { };
libubootenv = callPackage ./libubootenv { }; libubootenv = callPackage ./libubootenv { };
linotify = callPackage ./linotify { }; linotify = callPackage ./linotify { };
logshipper = callPackage ./logshipper { }; logtap = callPackage ./logtap { };
lualinux = callPackage ./lualinux { }; lualinux = callPackage ./lualinux { };
# we need to build real lzma instead of using xz, because the lzma # we need to build real lzma instead of using xz, because the lzma

View File

@@ -1,42 +0,0 @@
{
fetchurl,
writeFennel,
fennel,
fennelrepl,
runCommand,
lua,
anoia,
lualinux,
stdenv,
}:
let
name = "incz";
in
stdenv.mkDerivation {
inherit name;
src = ./.;
buildInputs = [ lua ];
nativeBuildInputs = [ fennelrepl ];
buildPhase = ''
fennelrepl --test ./incz.fnl
cp -p ${
writeFennel name {
packages = [
anoia
lualinux
fennel
];
macros = [
anoia.dev
];
mainFunction = "run";
} ./incz.fnl
} ${name}
'';
installPhase = ''
install -D ${name} $out/bin/${name}
'';
}

View File

@@ -98,6 +98,11 @@ stdenv.mkDerivation rec {
checkConfigurationPhase = '' checkConfigurationPhase = ''
echo Checking required config items: echo Checking required config items:
echo ======================================================
echo ======================================================
echo ------------------------------------------------------
echo $modulesupport
echo ------------------------------------------------------
if comm -2 -3 <(grep 'CONFIG' ${kconfigFile} |sort) <(grep 'CONFIG' .config|sort) |grep '.' ; then if comm -2 -3 <(grep 'CONFIG' ${kconfigFile} |sort) <(grep 'CONFIG' .config|sort) |grep '.' ; then
echo -e "^^^ Some configuration lost :-(\nPerhaps you have mutually incompatible settings, or have disabled options on which these depend.\n" echo -e "^^^ Some configuration lost :-(\nPerhaps you have mutually incompatible settings, or have disabled options on which these depend.\n"
exit 0 exit 0
@@ -119,5 +124,9 @@ stdenv.mkDerivation rec {
make modules make modules
cp -a . $modulesupport cp -a . $modulesupport
cp .config $config cp .config $config
echo ------------------------------------------------------
cat .config
echo $config
echo ------------------------------------------------------
''; '';
} }

View File

@@ -24,6 +24,11 @@ stdenv.mkDerivation {
phases = [ "buildPhase" ]; phases = [ "buildPhase" ];
nativeBuildInputs = [ dtc ]; nativeBuildInputs = [ dtc ];
buildPhase = '' buildPhase = ''
echo ==================================================
echo ${dtcSearchFlags}
echo ==
echo ${cppDtSearchFlags}
echo ==================================================
${stdenv.cc.targetPrefix}cpp -nostdinc -x assembler-with-cpp ${cppDtSearchFlags} -undef -D__DTS__ -o dtb.tmp ${combined} ${stdenv.cc.targetPrefix}cpp -nostdinc -x assembler-with-cpp ${cppDtSearchFlags} -undef -D__DTS__ -o dtb.tmp ${combined}
dtc ${dtcSearchFlags} -I dts -O dtb -o $out dtb.tmp dtc ${dtcSearchFlags} -I dts -O dtb -o $out dtb.tmp
# dtc -I dtb -O dts $out # dtc -I dtb -O dts $out

View File

@@ -1,142 +0,0 @@
#include <poll.h>
#include <sys/timerfd.h>
#include <time.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <errno.h>
#define PROGRAM_NAME "logtap"
#ifdef _GNU_SOURCE
#include <error.h>
#else
#include <stdarg.h>
static void error(int status, int errnum, const char * fmt, ...) {
va_list ap;
va_start(ap, fmt);
fprintf(stderr, PROGRAM_NAME ": ");
vfprintf(stderr, fmt, ap);
if(errnum) fprintf(stderr, ": %s", strerror(errnum));
fprintf(stderr, "\n");
if(status) exit(status);
}
#endif
int open_shipper_socket(char *pathname) {
int fd;
static int fail_count = 0;
struct sockaddr_un sa = {
.sun_family = AF_LOCAL
};
strncpy(sa.sun_path, pathname, sizeof(sa.sun_path) - 1);
fd = socket(AF_LOCAL, SOCK_STREAM, 0);
if(fd >= 0) {
if(connect(fd, (struct sockaddr *) &sa, sizeof sa)) {
if((fail_count % 30) == 0) {
printf(PROGRAM_NAME ": cannot connect socket \"%s\": %s\n",
pathname,
strerror(errno));
}
fail_count++;
close(fd);
return -1;
}
int flags = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
return fd;
}
struct pollfd fds[] = {
{ .fd = 0, .events = POLLIN },
{ .fd = 1, .events = POLLERR },
{ .fd = -1, .events = POLLERR },
};
int is_connected(void) {
return (fds[2].fd >= 0);
}
int main(int argc, char * argv[]) {
char * buf = malloc(8192);
int out_bytes = 0;
int tee_bytes = 0;
if(argc != 3) {
error(1, 0, "usage: " PROGRAM_NAME " /path/to/socket cookie-text");
}
char * socket_pathname = argv[1];
char * cookie = argv[2];
char * start_cookie = malloc(strlen(cookie) + 8);
char * stop_cookie = malloc(strlen(cookie) + 7);
if(strlen(socket_pathname) > 108) {
error(1, 0, "socket pathname \"%s\" is too long, max 108 bytes",
socket_pathname);
};
strcpy(start_cookie, cookie); strcat(start_cookie, " START\n");
strcpy(stop_cookie, cookie); strcat(stop_cookie, " STOP\n");
signal(SIGPIPE, SIG_IGN);
int flags = fcntl(STDOUT_FILENO, F_GETFL);
fcntl(STDOUT_FILENO, F_SETFL, flags | O_NONBLOCK);
int quitting = 0;
while(! quitting) {
int nfds = poll(fds, 3, 2000);
if(nfds > 0) {
if((fds[0].revents & (POLLIN|POLLHUP)) &&
(out_bytes == 0) &&
(tee_bytes == 0)) {
out_bytes = read(fds[0].fd, buf, 8192);
if(out_bytes == 0) {
quitting = 1;
buf = PROGRAM_NAME " detected eof of file on stdin, exiting\n";
out_bytes = strlen(buf);
};
if(is_connected()) tee_bytes = out_bytes;
};
if(out_bytes) {
out_bytes -= write(fds[1].fd, buf, out_bytes);
};
if(fds[1].revents & (POLLERR|POLLHUP)) {
exit(1); // can't even log an error if the logging stream fails
};
if(is_connected()) {
if(tee_bytes) {
tee_bytes -= write(fds[2].fd, buf, tee_bytes);
};
if(fds[2].revents & (POLLERR|POLLHUP)) {
close(fds[2].fd);
fds[2].fd = -1;
(void) write(1, stop_cookie, strlen(stop_cookie));
};
};
} else {
if(! is_connected()) {
fds[2].fd = open_shipper_socket(argv[1]);
if(is_connected()) {
/* write cookie to stdout so that the backfill
* process knows we are now logging realtime
*/
write(fds[1].fd, start_cookie, strlen(start_cookie));
}
}
};
};
}

View File

@@ -0,0 +1,41 @@
{
fetchurl,
writeFennel,
fennel,
fennelrepl,
runCommand,
lua,
anoia,
lualinux,
stdenv,
}:
let
name = "logshippers";
luafy = name : source :
writeFennel name {
packages = [ anoia lualinux fennel ];
macros = [ anoia.dev ];
mainFunction = "run";
} source;
incz = luafy name ./incz.fnl;
victorialogsend = luafy name ./victorialogsend.fnl;
in
stdenv.mkDerivation {
inherit name;
src = ./.;
buildInputs = [ lua ];
nativeBuildInputs = [ fennelrepl ];
buildPhase = ''
fennelrepl --test ./incz.fnl
fennelrepl --test ./victorialogsend.fnl
cp -p ${incz} incz
cp -p ${victorialogsend} victorialogsend
'';
installPhase = ''
install -D incz $out/bin/incz
install -D victorialogsend $out/bin/victorialogsend
'';
}

View File

@@ -0,0 +1,100 @@
(local { : base64 : assoc } (require :anoia))
(local tai64 (require :anoia.tai64))
(local ll (require :lualinux))
(import-macros { : expect= : define-tests } :anoia.assert)
(local crlf "\r\n")
(fn chunk [str]
(let [len (# str)]
(string.format "%x%s%s%s" len crlf str crlf)))
(fn parse-url [str]
;; this is a very poor parser as it won't recognise
;; credentials in the authority and it lumps query-string/fragment
;; into the path
(let [(scheme host path)
(string.match str "(.-)://(.-)(/.+)")]
{ : scheme : host : path }))
(define-tests
(expect= (parse-url "https://www.example.com/stairway/to/heaven")
{ :scheme "https"
:host "www.example.com"
:path "/stairway/to/heaven"
}))
(fn parse-args [args]
(case args
["--basic-auth" auth & rest]
(assoc (parse-args rest) :auth auth)
[url] { :url (parse-url url) }
_ (error "invalid args")))
(fn http-header [host path auth]
(let [b64 (base64 :url)
authstr
(if auth
(string.format "Authorization: basic %s\n" (b64:encode auth))
"")]
(string.format
"POST %s HTTP/1.1\r
Host: %s\
%sTransfer-Encoding: chunked\r
\r
"
path host authstr)))
(fn format-timestamp-rfc3339 [timestamp prec]
(let [(sec nano) (-> timestamp tai64.from-timestamp tai64.to-utc)
subsec (string.sub (string.format "%09d" nano) 1 prec)]
(.. (os.date "!%FT%T" sec)
"." subsec
"Z")))
(define-tests
(expect= (format-timestamp-rfc3339 "@4000000068e2f0d3257dc09b" 9)
"2025-10-05T22:26:54.628998299Z")
(expect= (format-timestamp-rfc3339 "@4000000068e2f0d3257dc09b" 3)
"2025-10-05T22:26:54.628Z"))
(fn process-line [line]
(let [(timestamp hostname service msg) (string.match line "(@%x+) (%g+) (%g+) (.+)$")]
(->
(if timestamp
(string.format
"{%q:%q,%q:%q,%q:%q,%q:%q}\n"
:_time (format-timestamp-rfc3339 timestamp 3)
:service service
:_msg msg
:host hostname)
(string.format
"{%q:%q,%q:%q,%q:%q,%q:%q}\n"
:_time (os.date "!%FT%TZ")
:service "ERROR"
:_msg (string.format "can't parse log %q" msg)
:host hostname))
chunk)
))
(fn writefd [fd body]
(case (ll.write fd body)
(bytes) (when (< bytes (# body)) (writefd fd (string.sub body bytes)))
(nil errno)
(error (string.format "write to fd %d failed: %s" fd (ll.strerror errno))))
true)
(fn run []
(let [{ : auth : url } (parse-args arg)
in-fd 6
out-fd 7]
(writefd out-fd (http-header url.host url.path auth))
(while (case (io.stdin:read "l")
line (writefd out-fd (process-line line))))
(writefd out-fd (chunk ""))
(io.stderr:write (ll.read in-fd))))
{ : run }

View File

@@ -2,7 +2,7 @@
stdenv, stdenv,
}: }:
stdenv.mkDerivation { stdenv.mkDerivation {
name = "logshipper"; name = "logtap";
makeFlags = [ "PREFIX=${placeholder "out"}" ]; makeFlags = [ "PREFIX=${placeholder "out"}" ];
src = ./.; src = ./.;
} }

170
pkgs/logtap/logtap.c Normal file
View File

@@ -0,0 +1,170 @@
#include <errno.h> // for errno
#include <fcntl.h> // for fcntl, O_NONBLOCK, open, F_GETFL
#include <poll.h> // for POLLERR, POLLHUP, POLLIN
#include <signal.h> // for signal, SIGPIPE, SIG_IGN
#include <stdarg.h> // for va_end, va_list, va_start
#include <stdio.h> // for fprintf, stderr, vfprintf
#include <stdlib.h> // for malloc, exit
#include <string.h> // for strlen, strcat, strcpy, strerror
#include <sys/stat.h> // for stat, mkfifo, S_IFIFO
#include <unistd.h> // for write, STDOUT_FILENO, read
#define PROGRAM_NAME "logtap"
#ifdef _GNU_SOURCE
#include <error.h>
#else
#include <stdarg.h>
static void error(int status, int errnum, const char* fmt, ...)
{
va_list ap;
va_start(ap, fmt);
fprintf(stderr, PROGRAM_NAME ": ");
vfprintf(stderr, fmt, ap);
if (errnum)
fprintf(stderr, ": %s", strerror(errnum));
fprintf(stderr, "\n");
va_end(ap);
if (status)
exit(status);
}
#endif
int open_shipper_fifo(char* pathname)
{
int fd = -1;
struct stat statbuf;
if (stat(pathname, &statbuf)) {
switch (errno) {
case ENOENT:
if(mkfifo(pathname, 0700)) {
error(1, errno, "mkfifo %s failed", pathname);
}
break;
default:
error(1, errno, "stat %s failed", pathname);
break;
}
} else {
if (!(statbuf.st_mode & S_IFIFO)) {
error(1, errno, "%s exists already and is not a fifo", pathname);
}
}
if (fd < 0) {
fd = open(pathname, O_NONBLOCK | O_RDWR, 0);
if (fd < 0)
error(1, errno, "failed to open fifo %s", pathname);
};
return fd;
}
struct pollfd fds[] = {
{ .fd = 0, .events = POLLIN },
{ .fd = 1, .events = POLLERR },
{ .fd = -1, .events = POLLERR },
};
#define FIFO_STATE_GOOD (-1)
#define FIFO_STATE_TIMEOUT_EXPIRED (0)
#define FIFO_STATE_TIMEOUT_MAX (50) /* ? probably going to depend on log volume */
char *start_cookie, *stop_cookie;
static int fifo_state = FIFO_STATE_TIMEOUT_EXPIRED;
int write_fifo(int fd, char* buf, int count)
{
int written_bytes = 0;
if (fifo_state == FIFO_STATE_GOOD) {
written_bytes = write(fd, buf, count);
if (written_bytes == -1) {
fifo_state = FIFO_STATE_TIMEOUT_MAX;
write(1, stop_cookie, strlen(stop_cookie));
}
} else if (fifo_state > 0) {
fifo_state--;
} else if (fifo_state == FIFO_STATE_TIMEOUT_EXPIRED) {
written_bytes = write(fd, buf, count);
if (written_bytes >= 0) {
fifo_state = FIFO_STATE_GOOD;
write(1, start_cookie, strlen(start_cookie));
} else {
fifo_state = FIFO_STATE_TIMEOUT_MAX;
/* don't log again, we're in this state because it was bad
already, and it's still bad */
}
} else {
error(1, 0, "impossible(sic) fifo state %d", fifo_state);
};
/* if the fifo can't be written, pretend to caller that we wrote
everything so that it doesn't back up. the backfill process
will write these entries later when the shipper is online
again */
return (fifo_state == FIFO_STATE_GOOD) ? written_bytes : count;
}
#define WRITE_LITERAL(fd, c) write(fd, c, sizeof c)
int main(int argc, char* argv[])
{
char* buf = malloc(8192);
int out_bytes = 0;
int fifo_bytes = 0;
if (argc != 3) {
error(1, 0, "usage: " PROGRAM_NAME " /path/to/fifo cookie-text");
}
char* fifo_pathname = argv[1];
char* cookie = argv[2];
start_cookie = malloc(strlen(cookie) + 8);
stop_cookie = malloc(strlen(cookie) + 7);
strcpy(start_cookie, cookie);
strcat(start_cookie, " START\n");
strcpy(stop_cookie, cookie);
strcat(stop_cookie, " STOP\n");
signal(SIGPIPE, SIG_IGN);
fds[2].fd = open_shipper_fifo(fifo_pathname);
int flags = fcntl(STDOUT_FILENO, F_GETFL);
fcntl(STDOUT_FILENO, F_SETFL, flags | O_NONBLOCK);
int quitting = 0;
while (!quitting) {
int nfds = poll(fds, 3, 2000);
if (nfds > 0) {
if ((fds[0].revents & (POLLIN | POLLHUP)) && (out_bytes == 0) && (fifo_bytes == 0)) {
out_bytes = read(fds[0].fd, buf, 8192);
if (out_bytes == 0) {
quitting = 1;
WRITE_LITERAL(1, PROGRAM_NAME " detected eof of file on stdin, exiting\n");
};
fifo_bytes = out_bytes;
};
if (fds[1].revents & (POLLERR | POLLHUP)) {
exit(1); // can't even log an error if the logging stream fails
};
if (fds[2].revents & (POLLERR | POLLHUP)) {
error(1, 0, "error or hangup writing to log fifo (revents=%d)", fds[2].revents);
};
if (out_bytes) {
out_bytes -= write(fds[1].fd, buf, out_bytes);
};
if (fifo_bytes) {
fifo_bytes -= write_fifo(fds[2].fd, buf, fifo_bytes);
};
} else {
/* poll timed out, may as well try and see if the shipper
* is alive again
*/
if (fifo_state > FIFO_STATE_TIMEOUT_EXPIRED)
fifo_state = FIFO_STATE_TIMEOUT_EXPIRED;
};
};
}