PWLib
1.10.10
|
00001 /* 00002 * sockagg.h 00003 * 00004 * Generalised Socket Aggregation functions 00005 * 00006 * Portable Windows Library 00007 * 00008 * Copyright (C) 2005 Post Increment 00009 * 00010 * The contents of this file are subject to the Mozilla Public License 00011 * Version 1.0 (the "License"); you may not use this file except in 00012 * compliance with the License. You may obtain a copy of the License at 00013 * http://www.mozilla.org/MPL/ 00014 * 00015 * Software distributed under the License is distributed on an "AS IS" 00016 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 00017 * the License for the specific language governing rights and limitations 00018 * under the License. 00019 * 00020 * The Original Code is Portable Windows Library. 00021 * 00022 * The Initial Developer of the Original Code is Post Increment 00023 * 00024 * Portions of this code were written with the financial assistance of 00025 * Metreos Corporation (http://www.metros.com). 00026 * 00027 * Contributor(s): ______________________________________. 00028 * 00029 * $Log: sockagg.h,v $ 00030 * Revision 1.4 2006/01/03 04:23:32 csoutheren 00031 * Fixed Unix implementation 00032 * 00033 * Revision 1.3 2005/12/23 06:44:30 csoutheren 00034 * Working implementation 00035 * 00036 * Revision 1.2 2005/12/22 07:27:36 csoutheren 00037 * More implementation 00038 * 00039 * Revision 1.1 2005/12/22 03:55:52 csoutheren 00040 * Added initial version of socket aggregation classes 00041 * 00042 * 00043 */ 00044 00045 00046 #ifndef _SOCKAGG_H 00047 #define _SOCKAGG_H 00048 00049 #ifdef P_USE_PRAGMA 00050 #pragma interface 00051 #endif 00052 00053 #include <ptlib.h> 00054 #include <ptlib/sockets.h> 00055 00056 /* 00057 00058 These classes implements a generalised method for aggregating sockets so that they can be handled by a single thread. It is 00059 intended to provide a backwards-compatible mechanism to supplant the "one socket - one thread" model used by OpenH323 00060 and OPAL with a model that provides a better thread to call ratio. A more complete explanation of this model can be 00061 found in the white paper "Increasing the Maximum Call Density of OpenH323 and OPAL" which can be at: 00062 00063 http://www.voxgratia.org/docs/call%20thread%20handling%20model%201.0.pdf 00064 00065 There are two assumptions made by this code: 00066 00067 1) The most efficient way to handle I/O is for a thread to be blocked on I/O. Any sort of timer or other 00068 polling mechanism is less efficient 00069 00070 2) The time taken to handle a received PDU is relatively small, and will not interfere with the handling of 00071 other calls that are handled in the same thread 00072 00073 UDP and TCP sockets are aggregated in different ways. UDP sockets are aggregated on the basis of a simple loop that looks 00074 for received datagrams and then processes them. TCP sockets are more complex because there is no intrinsic record-marking 00075 protocol, so it is difficult to know when a complete PDU has been received. This problem is solved by having the loop collect 00076 received data into a buffer until the handling routine decides that a full PDU has been received. 00077 00078 At the heart of each socket aggregator is a select statement that contains all of the file descriptors that are managed 00079 by the thread. One extra handle for a pipe (or on Windows, a local socket) is added to each handle list so that the thread can 00080 be woken up in order to allow the addition or removal of sockets to the list 00081 00082 */ 00083 00084 #include <ptlib.h> 00085 #include <functional> 00086 #include <vector> 00087 00089 // 00090 // this class encapsulates the system specific handle needed to specifiy a socket. 00091 // On Unix systems, this is a simple file handle. This file handle is used to uniquely 00092 // identify the socket and used in the "select" system call 00093 // On Windows systems the SOCKET handle is used to identify the socket, but a seperate WSAEVENT 00094 // handle is needed for the WSWaitForMultpleEvents call. 00095 // This is further complicated by the fact that we need to treat some pairs of sockets as a single 00096 // entity (i.e. RTP sockets) to avoid rewriting the RTP handler code. 00097 // 00098 00099 class PAggregatedHandle; 00100 00101 class PAggregatorFD 00102 { 00103 public: 00104 #ifdef _WIN32 00105 typedef WSAEVENT FD; 00106 typedef SOCKET FDType; 00107 SOCKET socket; 00108 #else 00109 typedef int FD; 00110 typedef int FDType; 00111 #endif 00112 00113 PAggregatorFD(FDType fd); 00114 00115 FD fd; 00116 00117 ~PAggregatorFD(); 00118 bool IsValid(); 00119 }; 00120 00121 typedef std::vector<PAggregatorFD *> PAggregatorFDList_t; 00122 00124 // 00125 // This class defines an abstract class used to define a handle that can be aggregated 00126 // 00127 // Normally this will correspond directly to a socket, but for RTP this actually corresponds to two sockets 00128 // which greatly complicates things 00129 // 00130 00131 #ifdef _MSC_VER 00132 #pragma warning(push) 00133 #pragma warning(disable:4127) 00134 #endif 00135 00136 class PAggregatedHandle : public PObject 00137 { 00138 PCLASSINFO(PAggregatedHandle, PObject); 00139 public: 00140 PAggregatedHandle(BOOL _autoDelete = FALSE) 00141 : autoDelete(_autoDelete), preReadDone(FALSE) 00142 { } 00143 00144 virtual PAggregatorFDList_t GetFDs() = 0; 00145 00146 virtual PTimeInterval GetTimeout() 00147 { return PMaxTimeInterval; } 00148 00149 virtual BOOL Init() { return TRUE; } 00150 virtual BOOL PreRead() { return TRUE; } 00151 virtual BOOL OnRead() = 0; 00152 virtual void DeInit() { } 00153 00154 virtual BOOL IsPreReadDone() const 00155 { return preReadDone; } 00156 00157 virtual void SetPreReadDone(BOOL v = TRUE) 00158 { preReadDone = v; } 00159 00160 BOOL autoDelete; 00161 00162 protected: 00163 BOOL preReadDone; 00164 }; 00165 00166 #ifdef _MSC_VER 00167 #pragma warning(pop) 00168 #endif 00169 00170 00172 // 00173 // This class is the actual socket aggregator 00174 // 00175 00176 class PHandleAggregator : public PObject 00177 { 00178 PCLASSINFO(PHandleAggregator, PObject) 00179 public: 00180 class EventBase 00181 { 00182 public: 00183 virtual PAggregatorFD::FD GetHandle() = 0; 00184 virtual void Set() = 0; 00185 virtual void Reset() = 0; 00186 }; 00187 00188 typedef std::vector<PAggregatedHandle *> HandleContextList_t; 00189 00190 class WorkerThreadBase : public PThread 00191 { 00192 public: 00193 WorkerThreadBase(EventBase & _event) 00194 : PThread(100, NoAutoDeleteThread), event(_event), listChanged(TRUE) 00195 { } 00196 00197 virtual void Trigger() = 0; 00198 void Main(); 00199 00200 EventBase & event; 00201 PMutex mutex; 00202 BOOL listChanged; 00203 00204 HandleContextList_t handleList; 00205 }; 00206 00207 typedef std::vector<WorkerThreadBase *> WorkerList_t; 00208 00209 PHandleAggregator(unsigned _max = 10); 00210 00211 BOOL AddHandle(PAggregatedHandle * handle); 00212 00213 BOOL RemoveHandle(PAggregatedHandle * handle); 00214 00215 PMutex mutex; 00216 WorkerList_t workers; 00217 unsigned maxWorkerSize; 00218 unsigned minWorkerSize; 00219 }; 00220 00221 00223 // 00224 // This template class allows the creation of aggregators for sockets that are 00225 // descendants of PIPSocket 00226 // 00227 00228 template <class PSocketType> 00229 class PSocketAggregator : public PHandleAggregator 00230 { 00231 PCLASSINFO(PSocketAggregator, PHandleAggregator) 00232 public: 00233 class AggregatedPSocket : public PAggregatedHandle 00234 { 00235 public: 00236 AggregatedPSocket(PSocketType * _s) 00237 : psocket(_s), fd(_s->GetHandle()) { } 00238 00239 BOOL OnRead() 00240 { return psocket->OnRead(); } 00241 00242 PAggregatorFDList_t GetFDs() 00243 { PAggregatorFDList_t list; list.push_back(&fd); return list; } 00244 00245 protected: 00246 PSocketType * psocket; 00247 PAggregatorFD fd; 00248 }; 00249 00250 typedef std::map<PSocketType *, AggregatedPSocket *> SocketList_t; 00251 SocketList_t socketList; 00252 00253 BOOL AddSocket(PSocketType * sock) 00254 { 00255 PWaitAndSignal m(mutex); 00256 00257 AggregatedPSocket * handle = new AggregatedPSocket(sock); 00258 if (AddHandle(handle)) { 00259 socketList.insert(SocketList_t::value_type(sock, handle)); 00260 return true; 00261 } 00262 00263 delete handle; 00264 return false; 00265 } 00266 00267 BOOL RemoveSocket(PSocketType * sock) 00268 { 00269 PWaitAndSignal m(mutex); 00270 00271 typename SocketList_t::iterator r = socketList.find(sock); 00272 if (r == socketList.end()) 00273 return FALSE; 00274 00275 AggregatedPSocket * handle = r->second; 00276 RemoveHandle(handle); 00277 delete handle; 00278 socketList.erase(r); 00279 return TRUE; 00280 } 00281 }; 00282 00283 #endif