#!mkcmd # $Compile(*): %b %o -mMkcmd %f && %b %o -dHAVE_RLIMIT prog.c && mv -i prog %F # $Mkcmd(*): ${mkcmd-mkcmd} std_help.m std_version.m %f from '' from '' from '' from '' from '' from '' from '' from '' from '' from '' from '"machine.h"' from '"pipering.h"' require "std_help.m" "std_version.m" basename "Tee" "" %i #if HAVE_RLIMIT #include #endif #if USE_SYS_INODE_H #include #endif #if !defined(PIPSIZ) #if defined(PIPE_BUF) #define PIPSIZ PIPE_BUF #else #define PIPSIZ 5120 #endif #endif #if !defined(PIPS_PER_VLINE) #define PIPS_PER_VLINE 72 #endif static int iMaxFd; static char rcsid[] = "$Id: Tee.m,v 2.18 2012/11/06 17:25:31 ksb Exp $"; %% before { } escape integer { named "iMaxSpig" init "0" param "spigots" help "number of tees per process" } boolean 'a' { named "fAppend" help "append to the files in the list" } # From the FreeBSD tee manual page... action 'i' { update '(void)signal(SIGINT, SIG_IGN);' help "ignore interrupts" } boolean 'v' { named "fVerbose" help "trace progress on stderr" } # Takes "20b" and the like -- ksb type "bytes" 'b' { named "iChunk" "pcChunkText" param "blocksize" init '"20b"' help "set the blocksize for the transfer" } boolean 'x' { hidden named "fTrace" init "0" help "trace creation of data network" } integer 'R' { named "iRing" "pcRing" track "fGaveRing" noparam "%N = \"0\";" param "[ring]" help "size of the double buffer process ring" } after { } list { param "files" named "Tee" help "files to tee to, or shell processes with a leading pipe" } augment action 'V' { user 'after_func(-1);Version();' } %c static void Version() { printf("%s: spigots %d, max fd %d\n",progname, iMaxSpig, iMaxFd); printf("%s: blocksize %ld [%s]\n", progname, (long)iChunk, pcChunkText); if (fGaveRing) { printf("%s: ring %d\n", progname, iRing); } } static const char acPip[] = "."; /* insert runtime defaults and system params (ksb) */ static void before_func(int dummy) /*ARGSUSED*/ { #if defined(_SC_OPEN_MAX) iMaxFd = sysconf(_SC_OPEN_MAX); #else #if HAVE_RLIMIT auto struct rlimit rl; getrlimit(RLIMIT_NOFILE, &rl); iMaxFd = rl.rlim_cur; #else #if NEED_GETDTABLESIZE iMaxFd = NOFILE; #else iMaxFd = getdtablesize(); #endif #endif #endif /* Never ignore children, we need to see exit codes. * But some user processes might fail and we don't want * a SIGPIPE from those. */ (void)signal(SIGPIPE, SIG_IGN); } /* Figure out the best course of action (ksb) */ static void after_func(int dummy) /*ARGSUSED*/ { if (iMaxFd < 5) { fprintf(stderr, "%s: not enough system file descriptors (%d < 5)\n", progname, iMaxFd); exit(EX_SOFTWARE); } /* we count how many we can procees: * count 0, 1, 2 which are already open * count extra one needed if last in list is |cmd */ if (0 >= iMaxSpig || iMaxSpig > (iMaxFd - 4)) { iMaxSpig = ((unsigned long)iMaxFd * 4181)/6765; } if (fGaveRing && 0 == iRing) { register unsigned long iRoot; iRoot = (iMaxSpig-1)/2; while (iRoot*iRoot > iMaxSpig) { iRoot = (iRoot + iMaxSpig/iRoot)/2; } iRing = iRoot < 3 ? 3 : iRoot; } if (0 >= iChunk) { iChunk = PIPSIZ; } } /* Couple a command to the tee system. (ksb) */ static int MkProc(pcCmd) char *pcCmd; { register int i; auto int iKid; auto int aiPipe[2]; static char *pcShell = "/bin/sh"; if (-1 == pipe(aiPipe)) { fprintf(stderr, "%s: pipe: %s\n", progname, strerror(errno)); return -1; } switch (iKid = fork()) { case 0: break; case -1: fprintf(stderr, "%s: fork: %s\n", progname, strerror(errno)); close(aiPipe[0]), close(aiPipe[1]); return -1; default: if (fTrace) { fprintf(stderr, "%s: kid %d fork'd\n", progname, iKid); } close(aiPipe[0]); return aiPipe[1]; } close(0); if (0 != dup(aiPipe[0])) { fprintf(stderr, "%s: dup: %s\n", progname, strerror(errno)); exit(EX_OSERR); } close(aiPipe[0]); close(aiPipe[1]); /* Close the other streams we have open, we just pipe'd * aiPipe[0] so it should be a good upper bound on fd's */ for (i = 3; i < aiPipe[0]; ++i) { (void)close(i); } execl(pcShell, "sh", "-c", pcCmd, (char *)0); fprintf(stderr, "%s: execl: %s: %s\n", progname, pcShell, strerror(errno)); exit(EX_UNAVAILABLE); /*NOTREACHED*/ } #if !defined(O_ACCMODE) #define O_ACCMODE (O_RDONLY|O_WRONLY|O_RDWR) #endif /* Couple an existing file descriptor to the tee system. (ksb) * [/dev/fd/10 would be cool, if we had that available, but we don't.] */ static int FdOut(pcFd) char *pcFd; { register int iCvt, iFlag; iCvt = atoi(pcFd); if (-1 == (iFlag = fcntl(iCvt, F_GETFL, (void *)0))) { return -1; } if (O_RDONLY == (iFlag & O_ACCMODE)) { fprintf(stderr, "%s: %s: file descriptor %d is read only\n", progname, pcFd, iCvt); errno = EBADF; return -1; } return iCvt; } /* single threaded pumper (ksb) */ static void StdPump(int iMade, int *piSpigots) { register int i, cc; register char *pcBuf; register int iCnt = 0; pcBuf = (char *)calloc(iChunk, sizeof(char)); while (0 < (cc = read(0, pcBuf, iChunk))) { register int wc, oc; register char *pc; if (fVerbose) { if (1 != write(2, acPip, sizeof(acPip)-1)) { iCnt = fVerbose = 0; } if (PIPS_PER_VLINE == ++iCnt) { if (1 != write(2, "\n", 1)) { fVerbose = 0; } iCnt = 0; } } for (i = 0; i < iMade; ++i) { oc = cc; pc = pcBuf; while (0 < oc) { wc = write(piSpigots[i], pc, oc); if (-1 == wc) { fprintf(stderr, "%s: write: %d: %s\n", progname, piSpigots[i], strerror(errno)); break; } oc -= wc; pc += wc; } } } if (iCnt && 1 != write(2, "\n", 1)) { fprintf(stderr, "%s: write: 2: %s\n", progname, strerror(errno)); exit(EX_OSERR); } if (-1 == cc) { fprintf(stderr, "%s: read: stdin: %s\n", progname, strerror(errno)); } } typedef struct PWnode { int ibsize; int iin; int ispigots; int *piouts; } PIPE_WORKS; /* Read a block, write it many times in a pipe ring (ksb) * When don't send a 'q' around the ring to finish the workers. */ static int RingWorker(int iAm, int fdRead, int fdSend, void *pvData) { register int i, iCc, iHold, iOut, fEoln, iVoid; register char *pcBuf; register PIPE_WORKS *pPW; register int iCnt = iAm % PIPS_PER_VLINE; auto char acMsg[4]; /* Check for a snark, as the API should never call us with no data */ if ((PIPE_WORKS *)0 == (pPW = pvData)) { return EX_SOFTWARE; } if ((char *)0 == (pcBuf = malloc(pPW->ibsize))) { fprintf(stderr, "%s: malloc: %d: %s\n", progname, pPW->ibsize, strerror(errno)); return EX_OSERR; } while (1 == read(fdRead, acMsg, 1) && 'r' == acMsg[0]) { for (iHold = 0; iHold < pPW->ibsize; iHold += iCc) { iCc = read(pPW->iin, pcBuf+iHold, pPW->ibsize-iHold); if (0 >= iCc) break; } /* Actually we count reads here, not writes. --ksb */ if (fVerbose) { ++iCnt; fEoln = 0; if (1 != write(2, acPip, sizeof(acPip)-1)) { fVerbose = 0; } else if (iCnt != PIPS_PER_VLINE) { /* nada */ } else if (1 == write(2, "\n", 1)) { fEoln = 1; } else { fVerbose = 0; } iCnt += iRing-1; if (iCnt >= PIPS_PER_VLINE) { iCnt -= PIPS_PER_VLINE; } } /* If we didn't read any data, poison the workers after us * and wait for their write tokens, otherwise we'll SIGPIPE * the upstream workers when we close our read pipe. */ if (0 == iHold) { if (fVerbose && !fEoln) iVoid = write(2, "\n", 1); #if DEBUG fprintf(stderr, "%s: %d[%d]: EOF from stdin\n", progname, getpid(), iAm); #endif iVoid = write(fdSend, "q", 1); for (i = 0; i < pPW->ispigots; ++i) { iVoid = read(fdRead, acMsg, sizeof(acMsg)); #if DEBUG fprintf(stderr, "%s: %d[%d]: read w token#%d of %d\n", progname, getpid(), iAm, i+1, pPW->ispigots); #endif } iVoid = read(fdRead, acMsg, 1); acMsg[0] = 'r'; break; } /* Unblock the next reader and write our payload. */ if (1 != write(fdSend, "r", 1)) { return EX_SOFTWARE; } for (i = 0; 0 < iHold && i < pPW->ispigots; ++i, iVoid = write(fdSend, "w", 1)) { if (-1 == read(fdRead, acMsg, 1) || 'w' != acMsg[0]) break; #if DEBUG fprintf(stderr, "%s: %d[%d]: w -> %d\n", progname, getpid(), iAm, pPW->piouts[i]); #endif if (-1 == pPW->piouts[i]) continue; for (iOut = 0; iOut < iHold && -1 != (iCc = write(pPW->piouts[i], pcBuf+iOut, iHold-iOut)); iOut += iCc) /* sent some */; if (-1 == iCc) pPW->piouts[i] = -1; } if (i < pPW->ispigots) { return EX_OSERR; } } if ('q' == acMsg[0]) { iVoid = write(fdSend, acMsg, 1); } return EX_OK; /* which closes fdSend QED */ } /* Pump with a pipe ring on stdin (ksb) */ static void RingPump(int iMade, int *piSpigots) { register int i; auto char *pcSeed; auto int *piList; auto PIPE_WORKS *pPWList; /* We can't keep more than iMade workers writing, build the books for * the new processes. But if you asked for it, we'll do it. */ if (!fGaveRing && iRing > iMade+1) { iRing = iMade+1; } if ((int *)0 == (piList = calloc(iRing, sizeof(int)))) { fprintf(stderr, "%s: calloc: %ld,%lu: %s\n", progname, (long)iRing, (unsigned long)sizeof(int), strerror(errno)); exit(EX_OSERR); } if ((PIPE_WORKS *)0 == (pPWList = calloc(iRing, sizeof(PIPE_WORKS)))) { fprintf(stderr, "%s: calloc: %ld,%lu: %s\n", progname, (long)iRing, (unsigned long)sizeof(PIPE_WORKS), strerror(errno)); exit(EX_OSERR); } for (i = 0; i < iRing; ++i) { piList[i] = EX_OK; pPWList[i].ibsize = iChunk; pPWList[i].iin = 0; pPWList[i].ispigots = iMade; pPWList[i].piouts = piSpigots; } if ((char *)0 == (pcSeed = calloc(((iMade+1)|7)+9, sizeof(char)))) { fprintf(stderr, "%s: calloc: %d: %s\n", progname, ((iMade+1)|7)+9, strerror(errno)); exit(EX_OSERR); } pcSeed[0] = 'r'; for (i = 0; i < iMade; /*below*/) { pcSeed[++i] = 'w'; } pcSeed[iMade+1] = '\000'; PipeRing(RingWorker, iRing, (void *)pPWList, piList, pcSeed); } #if !defined(O_NOCTTY) #define O_NOCTTY 0 /* for platforms lacking this open option */ #endif /* tee to all the files given, if more are given than we can handle, (ksb) * fork another tee to do the rest, which is a pipe to our stdout. * If we can only fit 3 then * tee a1 a2 a3 a4 a5 a6 a7 a8 * would become * tee a1 a2 a3 | tee a4 a5 a6 | tee a7 a8 * If we have a ring set we might allow later copies a ring as well. * * There is a subtle bug in this program, it assumes that we do not * inherit a lot of open file descriptors. If we do we can run out * unexpectedly and not do our mission. -- ksb (XXX) */ static void Tee(argc, argv) int argc; char **argv; { register int i, iSend; auto int *piSpigots; auto int iKid; register int iReap; if (argc > iMaxSpig) { register int iSplit; auto int aiPipe[2]; /* If we have 9 and a max of 8 break at 5 + 4, not 8 + 1 */ i = (argc < iMaxSpig * 2) ? (argc+1)/2 : iMaxSpig; if (-1 == pipe(aiPipe)) { fprintf(stderr, "%s: pipe: %s\n", progname, strerror(errno)); exit(EX_OSERR); } iSplit = iRing/2; if (iSplit > iMaxSpig) { iSplit = iMaxSpig; } switch (iKid = fork()) { case -1: fprintf(stderr, "%s: fork: %s\n", progname, strerror(errno)); exit(EX_OSERR); /*XXX should retry here sometimes */ case 0: /* child */ /* only parent notes blocks as they pass */ fVerbose = 0; iRing = iSplit; if (0 == aiPipe[0]) { /* nothing */ } else if (-1 == dup2(aiPipe[0], 0)) { fprintf(stderr, "%s: dup: %s\n", progname, strerror(errno)); exit(EX_OSERR); } else { close(aiPipe[0]); } close(aiPipe[1]); Tee(argc-i, argv+i); exit(EX_OK); default: /* parrent */ iRing -= iSplit; if (1 == aiPipe[1]) { /* nothing */ } else if (-1 == dup2(aiPipe[1], 1)) { fprintf(stderr, "%s: dup: %s\n", progname, strerror(errno)); exit(EX_OSERR); } else { close(aiPipe[1]); } close(aiPipe[0]); break; } argc = i; } piSpigots = (int *)calloc(argc+2, sizeof(int)); iSend = 0; piSpigots[iSend++] = 1; for (i = 0; i < argc; ++i) { if ('-' == argv[i][0] && '\000' == argv[i][1]) { piSpigots[iSend++] = 1; continue; } if ('|' == argv[i][0]) { piSpigots[iSend] = MkProc(argv[i]+1); } else if ('>' == argv[i][0] && '&' == argv[i][1] && isdigit(argv[i][2])) { piSpigots[iSend] = FdOut(argv[i]+2); } else if ('>' == argv[i][0] && '>' == argv[i][1]) { piSpigots[iSend] = open(argv[i]+2, O_WRONLY|O_APPEND|O_NOCTTY, 0666); } else if ('>' == argv[i][0]) { piSpigots[iSend] = open(argv[i]+1, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0666); } else { piSpigots[iSend] = open(argv[i], fAppend ? O_WRONLY|O_APPEND : O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0666); } if (-1 != piSpigots[iSend]) { if (piSpigots[iSend] > 2) (void)fcntl(piSpigots[iSend], F_SETFD, 1); ++iSend; continue; } fprintf(stderr, "%s: open: %s: %s\n", progname, argv[i], strerror(errno)); /* UNIX tee ignores this, sigh */ } if (fGaveRing && iRing > 1) { RingPump(iSend, piSpigots); } else { StdPump(iSend, piSpigots); } for (i = 0; i < iSend; ++i) { (void)close(piSpigots[i]); } while (-1 != (iReap = wait((void *)0)) && EINTR != errno) { if (fTrace) { fprintf(stderr, "%s: reap %d\n", progname, iReap); } } exit(EX_OK); }