/* ** MUX CHANNEL, SESSION AND PROTOCOL MANAGEMENT ** ** (c) COPYRIGHT MIT 1995. ** Please first read the full copyright statement in the file COPYRIGH. ** @(#) $Id: HTMuxCh.c,v 2.3 1998/05/04 19:37:02 frystyk Exp $ ** ** Handles a MUX Channel with sessions and protocols ** ** Authors ** HFN Henrik Frystyk Nielsen ** ** HISTORY: ** Oct 96 HFN Written */ /* Library Include files */ #include "wwwsys.h" #include "WWWUtil.h" #include "WWWCore.h" #include "WWWTrans.h" #include "WWWStream.h" #include "HTMuxTx.h" #include "HTMuxHeader.h" #include "HTDemux.h" #include "HTMuxCh.h" /* Implemented here */ #define MAX_SESSIONS 0xFF /* Max 256 sessions */ #define SID_BASE 2 #define RECEIVER_OFFSET 0 /* Client control session */ #define SENDER_OFFSET 1 /* Server control session */ struct _HTStream { const HTStreamClass * isa; /* ... */ }; struct _HTOutputStream { const HTOutputStreamClass * isa; /* ... */ }; #define PUTBLOCK(b,l) (*me->isa->put_block)(me,(b),(l)) struct _HTMuxProtocol { HTAtom * name; HTProtocolId pid; }; struct _HTMuxSession { HTMuxSessionId sid; HTProtocolId pid; HTNet * net; /* State */ HTMuxClose close; /* State of the connection */ int credit; /* Available credit */ int fragment; /* Fragment size */ int read; /* Data read sine last credit */ /* Input */ HTStream * buffer; /* If we have to buffer data */ BOOL buffering; }; struct _HTMuxChannel { int hash; HTHost * host; int max_sid; /* A la max_sockfd in select */ HTNet * net; HTList * protocols; /* List of defined protocols */ HTMuxSession * sessions[MAX_SESSIONS]; }; PRIVATE HTList ** muxchs = NULL; /* List of mux muxchs */ /* ------------------------------------------------------------------------- */ PRIVATE HTMuxSession * session_new (void) { HTMuxSession * me; if ((me = (HTMuxSession *) HT_CALLOC(1, sizeof(HTMuxSession))) == NULL) HT_OUTOFMEM("HTMuxSession_new"); me->credit = DEFAULT_CREDIT; return me; } PRIVATE BOOL session_delete (HTMuxSession * session) { if (session) { HT_FREE(session); return YES; } return NO; } PUBLIC HTMuxSession * HTMuxSession_register (HTMuxChannel * muxch, HTMuxSessionId sid, HTProtocolId pid) { if (muxch) { HTMuxSession * session = muxch->sessions[sid]; if (session == NULL) { session = session_new(); session->sid = sid; session->pid = pid; muxch->sessions[sid] = session; if (MUX_TRACE) HTTrace("Mux Channel. Registered session %d on channel %p\n", sid, muxch); } return session; } if (MUX_TRACE) HTTrace("Mux Channel. Can't register new session\n"); return NULL; } PUBLIC HTMuxSessionId HTMuxSession_accept (HTMuxChannel * muxch, HTNet * net, HTProtocolId pid) { if (muxch && net) { HTMuxSession * session; HTMuxSessionId sid = SID_BASE + RECEIVER_OFFSET; for (; sidsessions[sid]) && session->net == NULL && session->pid == pid) { if (MUX_TRACE) HTTrace("Mux Channel. Accepting session %d on channel %p\n", sid, muxch); return sid; } } } if (MUX_TRACE) HTTrace("Mux Channel. Can't accept new session\n"); return INVSID; } PUBLIC HTMuxSessionId HTMuxSession_connect (HTMuxChannel * muxch, HTNet * net, HTProtocolId pid) { if (muxch && net) { HTMuxSessionId sid = SID_BASE + SENDER_OFFSET; for (; sidsessions[sid] == NULL) { HTMuxSession * session = session_new(); session->sid = sid; session->pid = pid; session->net = net; muxch->sessions[sid] = session; if (MUX_TRACE) HTTrace("Mux Channel. Creating session %d on channel %p\n", sid, muxch); return sid; } } } if (MUX_TRACE) HTTrace("Mux Channel. Can't create new session\n"); return INVSID; } PUBLIC int HTMuxSession_close (HTMuxChannel * muxch, HTMuxSessionId sid) { if (muxch) { HTMuxSession * session = muxch->sessions[sid]; HTMuxSession_setClose(muxch, session, MUX_S_END_WRITE); return YES; } return NO; } PUBLIC HTMuxSessionId HTMuxSession_id (HTMuxSession * session) { return session ? session->sid : INVSID; } PUBLIC HTProtocolId HTMuxSession_pid (HTMuxSession * session) { return session ? session->pid : INVPID; } PUBLIC HTNet * HTMuxSession_net (HTMuxSession * session) { return session ? session->net : NULL; } PUBLIC BOOL HTMuxSession_setClose (HTMuxChannel * muxch, HTMuxSession * session, HTMuxClose close) { if (muxch && session) { session->close |= close; /* ** If both directions are closed down then we can put the session ** to sleep. */ if (session->close == MUX_S_END) { if (MUX_TRACE) HTTrace("Mux Channel. Closing session %d on channel %p\n", session->sid, muxch); muxch->sessions[session->sid] = NULL; session_delete(session); } return YES; } return NO; } PUBLIC int HTMuxSession_credit (HTMuxSession * session) { return session ? session->credit : -1; } PUBLIC BOOL HTMuxSession_setCredit (HTMuxChannel * muxch, HTMuxSessionId sid, int credit) { HTMuxSession * session; if (muxch && (session = muxch->sessions[sid])) { session->credit = credit; return YES; } return NO; } PUBLIC int HTMuxSession_fragment (HTMuxSession * session) { return session ? session->fragment : -1; } PUBLIC BOOL HTMuxSession_setFragment (HTMuxChannel * muxch, HTMuxSessionId sid, int fragment) { HTMuxSession * session; if (muxch && (session = muxch->sessions[sid])) { session->fragment = fragment; return YES; } return NO; } /* ** Tries really hard to get rid of the data. ** Returns: ** -1 Error ** 0 Buffered the data ** 1 Got rid of the data */ PUBLIC int HTMuxSession_disposeData (HTMuxSession * me, const char * buf, int len) { if (MUX_TRACE) HTTrace("Mux Channel. Writing %d bytes to session %p\n", len, me); /* ** There are two situations that can occur: Either we have an accepted session ** with a Net object or we have an unaccepted session with no Net object. In ** the former case we try to get rid of the data by pushing it directly to the ** read stream of the Net object. In the latter case we buffer as much as we ** can. */ if (me) { HTNet * net; HTStream * sink; int status; if ((net = me->net) && (sink = HTNet_readStream(net))) { /* ** Look first to see if we have old data that we can dispose down ** the sink. We keep the buffer stream so that we can reuse it later. */ if (me->buffer && me->buffering) { if ((*me->buffer->isa->flush)(me->buffer) == HT_OK) { if (MUX_TRACE) HTTrace("Mux Channel. Flushed buffered data\n"); me->buffering = NO; } else if ((*me->buffer->isa->put_block)(me->buffer, buf, len) >= 0) { if (MUX_TRACE) HTTrace("Mux Channel. Buffer accepted data\n"); return 0; } if (MUX_TRACE) HTTrace("Mux Channel. Can't buffer data\n"); return (-1); } /* ** See if we can get rid of the new data. If not then try to buffer it. ** If this also fails then we reset the channel. A positive return code ** from the stream means that we got rid of the data successfully. */ if ((status = (*sink->isa->put_block)(sink, buf, len)) >= 0) { if (MUX_TRACE) HTTrace("Mux Channel. Stream returned %d\n", status); /* ** If we get back a HT_LOADED then we have all the data we need ** and we can terminate the request */ if (status == HT_LOADED) { HTNet_execute (net, HTEvent_END); return 0; } /* ** Decide whether we should send a credit message ** MORE TO COME */ me->read += len; if (me->read >= DEFAULT_CREDIT / 2) { me->read = 0; return 1; } return 0; } } /* ** The stream is not ready and we try to buffer the data in ** the meantime. */ if (!me->buffer) { me->buffer = HTPipeBuffer(sink, DEFAULT_CREDIT); me->buffering = YES; } status = (*me->buffer->isa->put_block)(me->buffer, buf, len); if (status >= 0) { if (MUX_TRACE) HTTrace("Mux Channel. Buffer accepted data\n"); return 0; } if (MUX_TRACE) HTTrace("Mux Channel. Buffer returned %d\n", status); } return (-1); } /* ------------------------------------------------------------------------- */ PRIVATE BOOL channel_delete (HTMuxChannel * ch) { if (ch) { HT_FREE(ch); return YES; } return NO; } PUBLIC HTMuxChannel * HTMuxChannel_new (HTHost * host) { if (host) { HTMuxChannel * me = NULL; /* Create new object */ if ((me = (HTMuxChannel *) HT_CALLOC(1, sizeof(HTMuxChannel))) == NULL) HT_OUTOFMEM("HTMuxChannel_new"); me->hash = HTHost_hash(host); me->host = host; /* ** Make sure that we are in interleave mode */ HTHost_setMode(host, HT_TP_INTERLEAVE); /* ** Get a special MUX Net object that we keep to our selves. We don't ** associate a request object as the Net object lives longer. */ me->net = HTNet_new(NULL); HTNet_setReadStream(me->net, HTDemux_new(host, me)); /* Insert into hash table */ if (!muxchs) { if ((muxchs=(HTList **) HT_CALLOC(HOST_HASH_SIZE, sizeof(HTList *))) == NULL) HT_OUTOFMEM("HTMuxChannel_new"); } if (!muxchs[me->hash]) muxchs[me->hash] = HTList_new(); HTList_addObject(muxchs[me->hash], (void *) me); if (MUX_TRACE) HTTrace("Mux Channel. %p created with hash %d\n",me, me->hash); return me; } return NULL; } PUBLIC HTMuxChannel * HTMuxChannel_find (HTHost * host) { if (muxchs) { int hash = HTHost_hash(host); HTList * list = muxchs[hash]; if (list) { HTMuxChannel * pres = NULL; while ((pres = (HTMuxChannel *) HTList_nextObject(list))) if (pres->host == host) return pres; } } return NULL; } PUBLIC BOOL HTMuxChannel_delete (HTMuxChannel * me) { if (me) { HTList * list = NULL; if (MUX_TRACE) HTTrace("Mux Channel. Deleting %p\n", me); if (muxchs && (list = muxchs[me->hash])) { HTList_removeObject(list, (void *) me); channel_delete(me); return YES; } } return NO; } PUBLIC BOOL HTMuxChannel_deleteAll (void) { if (muxchs) { HTList * cur; int cnt; for (cnt=0; cntnet : NULL; } PUBLIC HTMuxSession * HTMuxChannel_findSession (HTMuxChannel * me, HTMuxSessionId sid) { return (me) ? me->sessions[sid] : NULL; } #if 0 PRIVATE HTMuxSession * HTMuxChannel_findSessionFromNet (HTMuxChannel * me, HTNet * net) { if (me && net) { int cnt = 0; HTMuxSession **session = me->sessions; while (cntnet == net) return *session; session++, cnt++; } } return NULL; } #endif PUBLIC HTHost * HTMuxChannel_host (HTMuxChannel * muxch) { return muxch ? muxch->host : NULL; } PUBLIC int HTMuxChannel_sendControl (HTMuxChannel * muxch, HTMuxSessionId sid, HTMuxHeader opcode, int value, const void * param, int param_size) { if (muxch && muxch->host) { HTOutputStream * me = HTChannel_output(HTHost_channel(muxch->host)); HTMuxHeader header[2]; switch (opcode) { case MUX_STRING: if (param && param_size) { header[0] = HT_WORDSWAP(MUX_CONTROL | MUX_LONG_LENGTH | MUX_SET_LEN(value)); header[1] = HT_WORDSWAP(param_size); PUTBLOCK((const char *) header, 8); PUTBLOCK((const char *) param, MUX_LONG_ALIGN(param_size)); } break; case MUX_STACK: if (param && param_size) { header[0] = HT_WORDSWAP(MUX_CONTROL | MUX_LONG_LENGTH | MUX_SET_LEN(value)); header[1] = HT_WORDSWAP(param_size); PUTBLOCK((const char *) header, 8); PUTBLOCK((const char *) param, MUX_LONG_ALIGN(param_size)); } break; case MUX_FRAGMENT: header[0] = HT_WORDSWAP(MUX_CONTROL | MUX_SET_SID(sid) | MUX_SET_LEN(value)); PUTBLOCK((const char *) header, 4); break; case MUX_CREDIT: header[0] = HT_WORDSWAP(MUX_CONTROL | MUX_LONG_LENGTH | MUX_SET_SID(sid)); header[1] = HT_WORDSWAP(value); PUTBLOCK((const char *) header, 8); break; default: if (MUX_TRACE) HTTrace("Mux Channel. UNKNOWN OPCODE %d\n", opcode); return HT_ERROR; } /* Flush for now */ #if 1 return (*me->isa->flush)(me); #else return HT_OK; #endif } return HT_ERROR; } /* ------------------------------------------------------------------------- */ PUBLIC BOOL HTMuxProtocol_add (HTMuxChannel * muxch, HTProtocolId pid, const char * protocol) { if (muxch && protocol) { HTMuxProtocol * ms; if ((ms = (HTMuxProtocol *) HT_CALLOC(1, sizeof(HTMuxProtocol))) == NULL) HT_OUTOFMEM("HTMuxProtocol_new"); ms->name = HTAtom_caseFor(protocol); ms->pid = pid; if (!muxch->protocols) muxch->protocols = HTList_new(); return HTList_addObject(muxch->protocols, ms); } return NO; } PUBLIC BOOL HTMuxProtocol_delete (HTMuxChannel * muxch, HTProtocolId pid) { if (muxch && muxch->protocols) { HTList * cur = muxch->protocols; HTMuxProtocol * pres; while ((pres = (HTMuxProtocol *) HTList_nextObject(cur))) { if (pres->pid == pid) { HTList_removeObject(muxch->protocols, pres); HT_FREE(pres); return YES; } } } return NO; }