/*
 *         PVM version 3.4:  Parallel Virtual Machine System
 *               University of Tennessee, Knoxville TN.
 *           Oak Ridge National Laboratory, Oak Ridge TN.
 *                   Emory University, Atlanta GA.
 *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
 *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
 *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
 *                   (C) 1997 All Rights Reserved
 *
 *                              NOTICE
 *
 * Permission to use, copy, modify, and distribute this software and
 * its documentation for any purpose and without fee is hereby granted
 * provided that the above copyright notice appear in all copies and
 * that both the copyright notice and this permission notice appear in
 * supporting documentation.
 *
 * Neither the Institutions (Emory University, Oak Ridge National
 * Laboratory, and University of Tennessee) nor the Authors make any
 * representations about the suitability of this software for any
 * purpose.  This software is provided ``as is'' without express or
 * implied warranty.
 *
 * PVM version 3 was funded in part by the U.S. Department of Energy,
 * the National Science Foundation and the State of Tennessee.
 */

/*
 *	pvmdmimd.c
 *
 *  MPP interface.
 *
 *		void mpp_init(int argc, char **argv):	
 *			Initialization. Create a table to keep track of active nodes.
 *			argc, argv: passed from main.
 *
 *		int mpp_load( struct waitc_spawn *wxp ) 
 *
 *			Load executable onto nodes; create new entries in task table,
 *			encode node number and process type into task IDs, etc.
 *
 *				Construction of Task ID:
 *
 *			 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0
 *			+-+-+-----------------------+-+-----+--------------------------+
 *			|s|g|		host index	    |n| prt |    	node # (16384)	   |
 *			+-+-+-----------------------+-+-----+--------------------------+
 *
 *			The "n" bit is set for node task but clear for host task.
 *
 *			flags:	exec options;
 *			name:	executable to be loaded;
 *			argv:	command line argument for executable
 *			count:	number of tasks to be created;
 *			tids:	array to store new task IDs;
 *			ptid:	parent task ID.
 *
 *			mpp_new(int count, int ptid):		
 *				Allocate a set of nodes. (called by mpp_load())
 *				count: number of nodes;  ptid: parent task ID.
 *
 *		int mpp_output():	
 *			Send all pending packets to nodes via native send. Node number
 *			and process type are extracted from task ID.
 *
 *		struct task *mpp_conn(int pid):
 *			Find a task in task table by its Unix pid.
 *
 *		void mpp_free(struct task *tp):
 *			Remove node/process-type from active list.
 *			tp: task pointer.
 *
$Log: pvmdmimd.c,v $
Revision 1.4  2009/01/22 23:04:23  pvmsrc
New version submitted by Paul Springer...
(Spanker=kohl)

Revision 1.3  2005/08/22 15:13:46  pvmsrc
Added #include <pvmtev.h> for #include global.h...  :-Q
	- submitted by Skipper Hartley <charles.l.hartley@bankofamerica.com>
(Spanker=kohl)

Revision 1.2  2002/02/21 23:19:15  pvmsrc
Added new (not to be documented!) PVM_MAX_TASKS env var support.
	- for Mahir Lokvancic <mahir@math.ufl.edu>.
	- forcefully limits the number of tasks that can attach to a
		pvmd, required on Solaris in rare circumstances when hard
		FD_SETSIZE limit is reached, and all hell breaks loose...
	- check return for task_new() call, can now produce NULL ptr,
		indicating PvmOutOfRes...
(Spanker=kohl)

Revision 1.1  2000/02/17 21:02:24  pvmsrc
Architecture-specific Makefile and Pvmd code.
	- submitted by Paul Springer <pls@smokeymt.jpl.nasa.gov>.
	- hacked a little by Jeembo for convention compliance...  :-)
(Spanker=kohl)

 *
 */

#include <stdlib.h>
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#include <netdb.h>
#ifdef  SYSVSTR
#include <string.h>
#define CINDEX(s,c) strchr(s,c)
#else
#include <strings.h>
#define CINDEX(s,c) index(s,c)
#endif

#include <pvm3.h>
#include <pvmproto.h>
#include <pvmtev.h>
#include "global.h"
#include "host.h"
#include "waitc.h"
#include "pvmalloc.h"
#include "pkt.h"
#include "task.h"
#include "listmac.h"
#if 0
#include "tvdefs.h"
#endif
#include "pvmdmp.h"
#include "pvmmimd.h"
#include "bfunc.h"

#define BLCOMM		"/usr/bin/rsh"
#define BLARGC		3
#define NNAMELENGTH	128

char *getenv();

/* Global */

extern int pvmdebmask;			/* from pvmd.c */
extern char **epaths;			/* from pvmd.c */
extern int myhostpart;			/* from pvmd.c */
extern int myndf;			/* from pvmd.c */
extern struct htab *hosts;		/* from pvmd.c */
extern struct task *locltasks;	/* from task.c */
extern int pvmmydsig;			/* from pvmd.c */
extern int pvmudpmtu;			/* from pvmd.c */


int tidtmask = TIDPTYPE;		/* mask for ptype field of tids */
int tidnmask = TIDNODE;			/* mask for node field of tids */

/* private */

static char rcsid[] = "$Id: pvmdmimd.c,v 1.4 2009/01/22 23:04:23 pvmsrc Exp $";
static struct nodeset *busynodes;	/* active nodes; ordered by proc type */
static char pvmtxt[512];		/* scratch for error log */
static int ptypemask;			/* mask; we use these bits of ptype in tids */
static u_long *nodeaddr = 0;
static char **nodelist = 0;		/* default poe node list */
static int *nodepart = 0;		/* processor # within node */
#if 9
static int *nodeconn = 0;		/* true if connection is made */
static int *nodehost = 0;		/* host task ID */
#endif
static int partsize = 0;		/* total number of nodes allocated */

static int mpppvminfo[SIZEHINFO];

void
mpp_init(argc, argv)
	int *argc;
	char **argv;
{
	struct hostent *hostaddr;
	int 	i, j;
	int		n;
	struct in_addr	node_sin_addr;
	char 	nname[NNAMELENGTH];	/* node name */
	char 	*p, *q;
	int		partNum;	/* task ID number for a node */
	char 	*plist;		/* processor list */

	if ((plist = getenv("PROC_LIST"))) 
		{
		sprintf(pvmtxt, 
		  "PROC_LIST=%s\n",plist);
		pvmlogerror(pvmtxt);
		p = plist;
		while (1)
			{
			while (*p == ':')
				p++;
			if (!*p)
				break;
			n = (q = CINDEX(p, ':')) ? q - p : strlen(p);
			partsize++;
			p += n;
			}

		nodeaddr = TALLOC(partsize, u_long, "nodeaddr");
		nodelist = TALLOC(partsize, char*, "nname");
		nodepart = TALLOC(partsize, int, "nodepart");
#if 9
		nodeconn = TALLOC( partsize, int, "nodeconn" );
		nodehost = TALLOC( partsize, int, "nodehost" );
#endif
		p = plist;
		i = 0;
		while (1)
			{	/* now get the node names */
			while (*p == ':')
				p++;
			if (!*p)
				break;
			n = (q = CINDEX(p, ':')) ? q - p : strlen(p);
			if (n >= NNAMELENGTH)
				{		/* node name too long */
				pvmlogerror( "mpp_init():  node name is too long\n" );
				}
			else
				{
				strncpy(nname, p, n);
				nname[n] = 0;
				if (!(hostaddr = gethostbyname( nname )))
					{
					sprintf( pvmtxt, "mpp_init() can't gethostbyname() for %s\n", 
							nname );
					pvmlogerror( pvmtxt );
					}
				else
					{	/* got addr, now save it */
					BCOPY( hostaddr->h_addr_list[0], (char*)&node_sin_addr,
						sizeof(struct in_addr));
					}
				partNum = 0;
				for (j = i - 1; j >= 0; j--)
					{	/* check for multiple procs on one node */
					/* hash this if it takes too long */
					if (!strcmp( nodelist[j], nname ))
						{	/* found last reference--save info & exit */
						partNum = nodepart[j] + 1;
						break;
						}
					}
				if (partNum >= NPARTITIONS)
					{	/* too many task requests for this processor */
					sprintf( pvmtxt, 
						"mpp_init():  only %d tasks per processor allowed\n,
							NPARTITIONS" );
					pvmlogerror( pvmtxt );
					}
				else
					{
					nodepart[i] = partNum;
					nodeaddr[i] = node_sin_addr.s_addr;
#if 9
					nodeconn[i] = 0;	/* no connection now */
					nodehost[i] = 0;	/* zero out host task ID field */
#endif
					nodelist[i++] = STRALLOC(nname);
					}
				}
			p += n;
			}
		} 
	else 
		pvmlogerror("mpp_init() PROC_LIST must be set for parallelism.\n");

	sprintf(pvmtxt, "%d nodes in list.\n", partsize);
	pvmlogerror(pvmtxt);

	busynodes = TALLOC(1, struct nodeset, "nsets");
	BZERO((char*)busynodes, sizeof(struct nodeset));
	busynodes->n_link = busynodes;
	busynodes->n_rlink = busynodes;	/* make the list empty */

	ptypemask = tidtmask >> (ffs(tidtmask) - 1);
}	/* mpp_init() */


/*
 * find a set of free nodes from nodelist; assign ptype sequentially,
 * only tasks spawned together get the same ptype--really a partition ID
 *
 * the code depends on the n_first field being usable as an index into
 * the node___ arrays, and also assumes that
 * all nodes in a set are sequential in the same order as in the node___
 * arrays (eg. see code just before forkexec())
 */
struct nodeset *
mpp_new(count, ptid)
	int count;		/* number of nodes requested */
	int ptid;		/* parent's tid */
{
	struct nodeset *sp, *newp, *sp2;
	int last = -1;
	int ptype = 0;

	if (!(newp = TALLOC(1, struct nodeset, "nsets"))) {
		pvmlogerror("mpp_new() can't get memory\n");
		pvmbailout(0);
	}
	BZERO((char*)newp, sizeof(struct nodeset));

	newp->n_size = count;
	for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link) {
		if (sp->n_first - last > count)
			break;
		last = sp->n_first + sp->n_size - 1;
/*
		if (sp->n_link == busynodes && partsize - last > count)
			break;
*/
		if (ptype <= sp->n_ptype)
			ptype = sp->n_ptype + 1;	/* find lowest unused ptype */
	}
	if (sp == busynodes && partsize - last <= count) {
		pvmlogerror("mpp_new() not enough nodes in partition\n");
		PVM_FREE(newp);
		return (struct nodeset *)0;
	}
	for (sp2 = busynodes->n_link; sp2 != busynodes; sp2 = sp2->n_link)
		if ((sp2->n_ptype & ptypemask) == (ptype & ptypemask))
			break;						/* break if ptype is in use */
	if (sp2 != busynodes || ptype == NPARTITIONS) {
		for (ptype = 0; ptype < NPARTITIONS; ptype++) {
			for (sp2 = busynodes->n_link; sp2 != busynodes; sp2 = sp2->n_link)
				if ((sp2->n_ptype & ptypemask) == (ptype & ptypemask))
					break;				/* search for any unused ptype */
			if (sp2 == busynodes)
				break;
		}
		if (ptype == NPARTITIONS) {
			pvmlogerror("mpp_new() out of ptypes: too many spawns\n");
			return (struct nodeset *)0;
		}
	}

done:
	if (pvmdebmask & PDMNODE) {
		sprintf(pvmtxt, "mpp_new() %d nodes %d ... ptype=%d ptid=%x\n",
			count, last+1, ptype, ptid);
		pvmlogerror(pvmtxt);
	}
	newp->n_first = last + 1;
	newp->n_ptype = ptype;
	newp->n_ptid = ptid;
	newp->n_alive = count;
#if 0
	sprintf( pvmtxt, "mpp_new() sp=%x sp->n_link=%x\n", sp, sp->n_link );
	pvmlogerror( pvmtxt );
#endif
	LISTPUTBEFORE(sp, newp, n_link, n_rlink);
#if 0
	sprintf( pvmtxt, "mpp_new() sp=%x sp->n_link=%x\n", sp, sp->n_link );
	pvmlogerror( pvmtxt );
#endif

	return newp;
}

/*
 * remove node/ptype from active list; if tid is the last to go, shutdown
 * pvmhost's socket, but do not destroy the node set because pvmhost may 
 * not exit immediately. To avoid a race condition, let mpp_output()
 * do the cleanup.
 */
void
mpp_free(tp)
	struct task *tp;
{
	struct nodeset *sp;
	int PLIndex;
#if 0
	struct timeval tout;
#endif
	struct task *tp2;
	int tid = tp->t_tid;

	if (!TIDISNODE(tid))
		return;

	PLIndex = tid & TIDNODE;	/* get processor index (into PROC_LIST) */
	tp->t_txq = 0;				/* don't free pvmhost's txq */
	sp = busynodes->n_link;
#if 0
	sprintf( pvmtxt, "mpp_free() sp=%x sp->n_link=%x\n", sp, sp->n_link );
	pvmlogerror( pvmtxt );
#endif
	for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link) {
#if 0
		sprintf(pvmtxt, "mpp_free() n_ptype = %d, PLIndex = %d\n",
			sp->n_ptype, PLIndex );
		pvmlogerror(pvmtxt);
#endif
		/* check if index is in range of this node set */
		if ((PLIndex >= sp->n_first) && (PLIndex < sp->n_first + sp->n_size)) {

			if (pvmdebmask & PDMNODE) {
				sprintf(pvmtxt, "mpp_free() t%x type=%ld alive=%d\n",
					tid, sp->n_ptype, sp->n_alive);
				pvmlogerror(pvmtxt);
			}
			if (sp->n_alive == 0)
				{
				sprintf( pvmtxt, "mpp_free called for dead task t%x\n", tid );
				pvmlogerror( pvmtxt );
				return;
				}
			if (tp2 = task_findpid(tp->t_pid)) {
				/* find corresponding pvmhost and shut it down */
				tp2->t_flag |= TF_CLOSE;
#if 0
				pvmlogprintf( "mpp_free() Marked t%x for closure\n", tp2->t_tid );
#endif
				if (tp2->t_sock != -1) {
/*
					wrk_fds_delete(tp2->t_sock, 3);
					(void)close(tp2->t_sock);
					tp2->t_sock = -1;
*/
					shutdown(tp2->t_sock, 1);
				}
					/* close stdout after pvmhost dies */
				tp2->t_out = tp->t_out;
#if 0
				pvmlogprintf( "mpp_free() Set t%x t_tout to %d\n", tp2->t_tid, tp->t_out );
#endif
			}
			if (--sp->n_alive == 0) {
/*
				LISTDELETE(sp, n_link, n_rlink);
				PVM_FREE(sp);
*/
			}
#if 0
			if (tp->t_out >= 0) {

				fd_set rfds;
				FD_ZERO(&rfds);
				FD_SET(tp->t_out, &rfds);
				TVCLEAR(&tout);
				if (select(tp->t_out + 1,
						(fd_set *)&rfds, (fd_set *)0, (fd_set *)0,
						&tout) == 1)
					{
					pvmlogprintf( "mpp_free() unprinted stdout on t%x\n", tp->t_tid );
					loclstout( tp );
					}
				else
					pvmlogprintf( "mpp_free() no stdout on t%x\n", tp->t_tid );
			}
#endif
			tp->t_out = -1;		/* don't free shared stdout if alive > 0 */
			return;
		}
	}
	sprintf(pvmtxt, "mpp_free() t%x not active\n", tid);
	pvmlogerror(pvmtxt);
	return;
}


/* load executable onto the given set of nodes */
int
mpp_load( wxp )
struct waitc_spawn *wxp;
{
	int flags = 0;              /* exec options */
	char *name;             /* executable */
	char **argv;            /* arg list (argv[-1] must be there) */
	int count;				/* how many */
	int *tids;				/* array to store new tids */
	int	partNum;			/* unique task ID for this processor */
	int ptid;				/* parent task ID */
	int ptypepart;			/* type field */
	int j;
	struct task *tp;
	struct pkt *hosttxq;	/* out-going queue of pvmhost */
	int err = 0;
	struct nodeset *sp;
	char c[128];			/* buffer to store count, name.host */
	int nargs;
	char **ep, **eplist;
	char path[MAXPATHLEN];
	struct stat sb;
	char **av;
	int hostout;			/* stdout of pvmhost */
	int hostpid;			/* Unix pid of pvmhost */

	static char *nullep[] = { "", 0 };

    /* -- initialize some variables from the waitc_spawn struct -- */

    name = wxp->w_argv[0];
    argv = wxp->w_argv;
    count = wxp->w_veclen;
    tids = wxp->w_vec;
    ptid = wxp->w_ptid;

	eplist = CINDEX(name, '/') ? nullep : epaths;

	for (ep = eplist; *ep; ep++) {
		/* search for file */
		(void)strcpy(path, *ep);
		if (path[0])
			(void)strcat(path, "/");
		(void)strncat(path, name, sizeof(path) - strlen(path) - 1);

		if (stat(path, &sb) == -1
		|| ((sb.st_mode & S_IFMT) != S_IFREG)
		|| !(sb.st_mode & S_IEXEC)) {
			if (pvmdebmask & PDMTASK) {
				sprintf(pvmtxt, "mpp_load() stat failed <%s>\n", path);
				pvmlogerror(pvmtxt);
			}
			continue;
		}

		if (!(sp = mpp_new(count, ptid))) {
			err = PvmOutOfRes;
			goto done;
		}

		if (argv)
			for (nargs = 0; argv[nargs]; nargs++);
		else
			nargs = 0;

		/* ar[-1], rsh, host, command */
		nargs += BLARGC + 1;
		av = TALLOC(nargs + 1, char*, "argv");
		av++;				/* reserve room for debugger */
		BZERO((char*)av, nargs * sizeof(char*));
		av[0] = BLCOMM;
		av[2] = path;
		av[--nargs] = 0;
		for (j = 3; j < nargs; j++)
			av[j] = argv[j - 2];	/* poe name argv -procs # -euilib us */
/*
		if ((sock = mksock()) == -1) {
			err = PvmSysErr;
			goto done;
		}
*/
		if (flags & PvmTaskDebug)
			av++;					/* pdbx name -procs # -euilib us */

		for (j = 0; j < count; j++) 
			{
			av[1] = nodelist[sp->n_first + j];
			partNum = nodepart[sp->n_first + j];
			ptypepart = (partNum << (ffs(tidtmask) - 1)) | TIDONNODE;
			sprintf(pvmtxt, "mppload(): Forking to %s\n", av[1]);
			pvmlogerror(pvmtxt);
			/* if (err = forkexec(flags, av[0], av, 0, (char **)0, &tp)) */
			if (err = forkexec(flags, av[0], av, 0, (char **)0, 0,
				0, 0, &tp))
				goto done;
			tp->t_ptid = ptid;
			PVM_FREE(tp->t_a_out);
			sprintf(c, "%s.host", name);
			tp->t_a_out = STRALLOC(c);
			if (pvmdebmask & PDMNODE) 
				pvmlogprintf( "mpp_load() setting n_ptid to %x\n", tp->t_tid );
			sp->n_ptid = tp->t_tid;		/* pvmhost's tid */
			nodehost[sp->n_first + j] = tp->t_tid;
			hosttxq = tp->t_txq;
			hostout = tp->t_out;
			hostpid = tp->t_pid;
			tp->t_out = -1;
			mpppvminfo[0] = TDPROTOCOL;
			mpppvminfo[1] = myhostpart + ptypepart;
			mpppvminfo[2] = ptid;
			mpppvminfo[3] = wxp->w_outtid;
			mpppvminfo[4] = wxp->w_outtag;
			mpppvminfo[5] = wxp->w_outctx;
			mpppvminfo[6] = wxp->w_trctid;
			mpppvminfo[7] = wxp->w_trctag;
			mpppvminfo[8] = wxp->w_trcctx;
			mpppvminfo[9] = pvmudpmtu;
			mpppvminfo[10] = pvmmydsig;
/*
		pvminfo[0] = TDPROTOCOL;
		pvminfo[1] = myhostpart + ptypepart;
		pvminfo[2] = ptid;
		pvminfo[3] = MAXFRAGSIZE;
		pvminfo[4] = myndf;
		if (sockconn(sock, tp, pvminfo) == -1) {
			err = PvmSysErr;
			task_free(tp);
			goto done;
		}
*/
			if (pvmdebmask & PDMTASK) {
				sprintf(pvmtxt, "mpp_load() %d type=%d ptid=%x t%x...\n",
					count, 0, ptid, myhostpart + ptypepart + sp->n_first + j);
				pvmlogerror(pvmtxt);
			}

		/* create new task structs */

			if ((tp = task_new(myhostpart + ptypepart
					+ sp->n_first + j)) == NULL) {
				err = PvmOutOfRes;
				goto done;
			}
			tp->t_a_out = STRALLOC(name);
			tp->t_ptid = ptid;
			tids[j] = tp->t_tid;
			PVM_FREE(tp->t_txq);
			tp->t_txq = hosttxq;		/* node tasks share pvmhost's txq */
			tp->t_out = hostout;        /* and stdout */
#if 0
			pvmlogprintf( "Setting t%x pid to p%d\n", tp->t_tid, hostpid );
#endif
			tp->t_pid = hostpid;		/* pvm_kill should kill pvmhost */
			tp->t_outtid = wxp->w_outtid;  /* catch stdout/stderr  */
			tp->t_outtag = wxp->w_outtag; 
			tp->t_outctx = wxp->w_outctx; 
		}
		return 0;
	}
    if (pvmdebmask & PDMTASK) {
        sprintf(pvmtxt, "mpp_load() didn't find <%s>\n", name);
        pvmlogerror(pvmtxt);
    }
    err = PvmNoFile;

done:
	for (j = 0; j < count; j++)
		tids[j] = err;
	return err;
}

/* Find task by socket address */

struct task *mpp_conn( tp )
	struct task	*tp;
{
	int			i;
	struct task	*nodetp = 0;
	int			ptypepart;

#if 0
	sprintf(pvmtxt, "mpp_conn asked for t%x\n", tp->t_tid);
	pvmlogerror(pvmtxt);
#endif
	ptypepart = (tp->t_tid & tidtmask) | TIDONNODE;
	for (i = 0; i < partsize; i++)
		if (nodeaddr[i] == tp->t_sad.sin_addr.s_addr)
			{	/* we have an IP match--check partition */
#if 0
			sprintf(pvmtxt, "mpp_conn looking for t%x\n", myhostpart + ptypepart + i);
			pvmlogerror(pvmtxt);
#endif
#if 9
			if (!nodeconn[i])
				{	/* no connection here yet */
				ptypepart = (nodepart[i] << (ffs(tidtmask) - 1)) | TIDONNODE;
#endif
				if (nodetp = task_find( myhostpart + ptypepart + i ))
#if 9
					/* from tm_conn2 in tdpro.c */
					if (!(nodetp->t_flag & (TF_AUTH|TF_CONN)) )
						{
						nodeconn[i] = 1;	/* making a connection */
#endif
						break;	/* found match */
#if 9
						}
				}
#endif
			}
	if (!nodetp)
		pvmlogerror( "mpp_conn:  Task not found\n" );
	return nodetp;
}	/* mpp_conn() */


/*
 * Add pvmhost's socket to wfds if there are packets waiting to
 * be sent to a related node task. Node tasks have no sockets;
 * they share pvmhost's packet queue (txq). Pvmhost simply
 * forwards any packets it receives to the appropriate node.
 */

int
mpp_output(dummy1, dummy2)
	struct task *dummy1;
	struct pkt *dummy2;
{
	struct nodeset *sp, *sp2;
	struct task *tp;
#if 9
    int     hostsFound;
    int     i;
	int		PLIndex;
#endif
	int ptype;

	sp = busynodes->n_link;
	for (sp = busynodes->n_link; sp != busynodes; sp = sp->n_link)
      {
      hostsFound = 0;
      for (PLIndex = sp->n_first; PLIndex < sp->n_first + sp->n_size;
            PLIndex++)
        {   /* cycle through the list for this group of nodes */
		if (tp = task_find(nodehost[PLIndex])) 
            {   /* got the host's tp */
			if (tp->t_txq->pk_link->pk_buf && tp->t_sock != -1)
				wrk_fds_add(tp->t_sock, 2);
            hostsFound++;
		    } 
        }
        if (!hostsFound) 
            {   /* no pvmhost for this entry */
			if (sp->n_alive) 
                {
				sprintf(pvmtxt, "mpp_output() pvmhost %x died for ptype %d!\n", sp->n_ptid, sp->n_ptype);
				pvmlogerror(pvmtxt);
				/* clean up tasks it serves */
				ptype = sp->n_ptype & ptypemask;
				for (tp = locltasks->t_link; tp != locltasks; tp = tp->t_link)
					if (TIDISNODE(tp->t_tid) && TIDTOTYPE(tp->t_tid) == ptype) {
						tp->t_txq = 0;
						tp = tp->t_rlink;
						task_cleanup(tp->t_link);
						task_free(tp->t_link);
					}
			    }
			/* all pvmhosts have died, destroy the node set */
			sp2 = sp;
			sp = sp->n_rlink;
#if 9
			for (i = 0; i < sp2->n_size; i++)
				{
				nodeconn[sp2->n_first + i] = 0;	/* disconnect */
				nodehost[sp2->n_first + i] = 0;	/* zero host task ID */
				}
#endif
			LISTDELETE(sp2, n_link, n_rlink);
			PVM_FREE(sp2);
		    }
      }   /* for sp... */
	return 0;
}