SocketInitiator.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "SocketInitiator.h"
00028 #include "Session.h"
00029 #include "Settings.h"
00030
00031 namespace FIX
00032 {
00033 SocketInitiator::SocketInitiator( Application& application,
00034 MessageStoreFactory& factory,
00035 const SessionSettings& settings )
00036 throw( ConfigError )
00037 : Initiator( application, factory, settings ),
00038 m_connector( 1 ), m_lastConnect( 0 ),
00039 m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
00040 m_rcvBufSize( 0 )
00041 {
00042 }
00043
00044 SocketInitiator::SocketInitiator( Application& application,
00045 MessageStoreFactory& factory,
00046 const SessionSettings& settings,
00047 LogFactory& logFactory )
00048 throw( ConfigError )
00049 : Initiator( application, factory, settings, logFactory ),
00050 m_connector( 1 ), m_lastConnect( 0 ),
00051 m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
00052 m_rcvBufSize( 0 )
00053 {
00054 }
00055
00056 SocketInitiator::~SocketInitiator()
00057 {
00058 SocketConnections::iterator i;
00059 for (i = m_connections.begin();
00060 i != m_connections.end(); ++i)
00061 delete i->second;
00062
00063 for (i = m_pendingConnections.begin();
00064 i != m_pendingConnections.end(); ++i)
00065 delete i->second;
00066 }
00067
00068 void SocketInitiator::onConfigure( const SessionSettings& s )
00069 throw ( ConfigError )
00070 { QF_STACK_PUSH(SocketInitiator::onConfigure)
00071
00072 try { m_reconnectInterval = s.get().getLong( RECONNECT_INTERVAL ); }
00073 catch ( std::exception& ) {}
00074 if( s.get().has( SOCKET_NODELAY ) )
00075 m_noDelay = s.get().getBool( SOCKET_NODELAY );
00076 if( s.get().has( SOCKET_SEND_BUFFER_SIZE ) )
00077 m_sendBufSize = s.get().getLong( SOCKET_SEND_BUFFER_SIZE );
00078 if( s.get().has( SOCKET_RECEIVE_BUFFER_SIZE ) )
00079 m_rcvBufSize = s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE );
00080
00081 QF_STACK_POP
00082 }
00083
00084 void SocketInitiator::onInitialize( const SessionSettings& s )
00085 throw ( RuntimeError )
00086 { QF_STACK_PUSH(SocketInitiator::onInitialize)
00087 QF_STACK_POP
00088 }
00089
00090 void SocketInitiator::onStart()
00091 { QF_STACK_PUSH(SocketInitiator::onStart)
00092
00093 connect();
00094
00095 while ( !isStopped() )
00096 m_connector.block( *this );
00097
00098 time_t start = 0;
00099 time_t now = 0;
00100
00101 ::time( &start );
00102 while ( isLoggedOn() )
00103 {
00104 m_connector.block( *this );
00105 if( ::time(&now) -5 >= start )
00106 break;
00107 }
00108
00109 QF_STACK_POP
00110 }
00111
00112 bool SocketInitiator::onPoll( double timeout )
00113 { QF_STACK_PUSH(SocketInitiator::onPoll)
00114
00115 time_t start = 0;
00116 time_t now = 0;
00117
00118 if( isStopped() )
00119 {
00120 if( start == 0 )
00121 ::time( &start );
00122 if( !isLoggedOn() )
00123 return false;
00124 if( ::time(&now) - 5 >= start )
00125 return false;
00126 }
00127
00128 m_connector.block( *this, true, timeout );
00129 return true;
00130
00131 QF_STACK_POP
00132 }
00133
00134 void SocketInitiator::onStop()
00135 { QF_STACK_PUSH(SocketInitiator::onStop)
00136 QF_STACK_POP
00137 }
00138
00139 void SocketInitiator::doConnect( const SessionID& s, const Dictionary& d )
00140 { QF_STACK_PUSH(SocketInitiator::doConnect)
00141
00142 try
00143 {
00144 std::string address;
00145 short port = 0;
00146 Session* session = Session::lookupSession( s );
00147 if( !session->isSessionTime(UtcTimeStamp()) ) return;
00148
00149 Log* log = session->getLog();
00150
00151 getHost( s, d, address, port );
00152
00153 log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
00154 int result = m_connector.connect( address, port, m_noDelay, m_sendBufSize, m_rcvBufSize );
00155
00156 if( result != -1 )
00157 {
00158 setPending( s );
00159
00160 m_pendingConnections[ result ]
00161 = new SocketConnection( *this, s, result, &m_connector.getMonitor() );
00162 }
00163 }
00164 catch ( std::exception& ) {}
00165
00166 QF_STACK_POP
00167 }
00168
00169 void SocketInitiator::onConnect( SocketConnector&, int s )
00170 { QF_STACK_PUSH(SocketInitiator::onConnect)
00171
00172 SocketConnections::iterator i = m_pendingConnections.find( s );
00173 if( i == m_pendingConnections.end() ) return;
00174 SocketConnection* pSocketConnection = i->second;
00175
00176 m_connections[s] = pSocketConnection;
00177 m_pendingConnections.erase( i );
00178 setConnected( pSocketConnection->getSession()->getSessionID() );
00179 pSocketConnection->onTimeout();
00180
00181 QF_STACK_POP
00182 }
00183
00184 void SocketInitiator::onWrite( SocketConnector& connector, int s )
00185 { QF_STACK_PUSH(SocketInitiator::onWrite)
00186
00187 SocketConnections::iterator i = m_connections.find( s );
00188 if ( i == m_connections.end() ) return ;
00189 SocketConnection* pSocketConnection = i->second;
00190 if( pSocketConnection->processQueue() )
00191 pSocketConnection->unsignal();
00192
00193 QF_STACK_POP
00194 }
00195
00196 bool SocketInitiator::onData( SocketConnector& connector, int s )
00197 { QF_STACK_PUSH(SocketInitiator::onData)
00198
00199 SocketConnections::iterator i = m_connections.find( s );
00200 if ( i == m_connections.end() ) return false;
00201 SocketConnection* pSocketConnection = i->second;
00202 return pSocketConnection->read( connector );
00203
00204 QF_STACK_POP
00205 }
00206
00207 void SocketInitiator::onDisconnect( SocketConnector&, int s )
00208 { QF_STACK_PUSH(SocketInitiator::onDisconnect)
00209
00210 SocketConnections::iterator i = m_connections.find( s );
00211 SocketConnections::iterator j = m_pendingConnections.find( s );
00212
00213 SocketConnection* pSocketConnection = 0;
00214 if( i != m_connections.end() )
00215 pSocketConnection = i->second;
00216 if( j != m_pendingConnections.end() )
00217 pSocketConnection = j->second;
00218 if( !pSocketConnection )
00219 return;
00220
00221 setDisconnected( pSocketConnection->getSession()->getSessionID() );
00222
00223 Session* pSession = pSocketConnection->getSession();
00224 if ( pSession )
00225 {
00226 pSession->disconnect();
00227 setDisconnected( pSession->getSessionID() );
00228 }
00229
00230 delete pSocketConnection;
00231 m_connections.erase( s );
00232 m_pendingConnections.erase( s );
00233
00234 QF_STACK_POP
00235 }
00236
00237 void SocketInitiator::onError( SocketConnector& connector )
00238 { QF_STACK_PUSH(SocketInitiator::onError)
00239 onTimeout( connector );
00240 QF_STACK_POP
00241 }
00242
00243 void SocketInitiator::onTimeout( SocketConnector& )
00244 { QF_STACK_PUSH(SocketInitiator::onTimeout)
00245
00246 time_t now;
00247 ::time( &now );
00248
00249 if ( (now - m_lastConnect) >= m_reconnectInterval )
00250 {
00251 connect();
00252 m_lastConnect = now;
00253 }
00254
00255 SocketConnections::iterator i;
00256 for ( i = m_connections.begin(); i != m_connections.end(); ++i )
00257 i->second->onTimeout();
00258
00259 QF_STACK_POP
00260 }
00261
00262 void SocketInitiator::getHost( const SessionID& s, const Dictionary& d,
00263 std::string& address, short& port )
00264 { QF_STACK_PUSH(SocketInitiator::getHost)
00265
00266 int num = 0;
00267 SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
00268 if ( i != m_sessionToHostNum.end() ) num = i->second;
00269
00270 std::stringstream hostStream;
00271 hostStream << SOCKET_CONNECT_HOST << num;
00272 std::string hostString = hostStream.str();
00273
00274 std::stringstream portStream;
00275 std::string portString = portStream.str();
00276 portStream << SOCKET_CONNECT_PORT << num;
00277
00278 if( d.has(hostString) && d.has(portString) )
00279 {
00280 address = d.getString( hostString );
00281 port = ( short ) d.getLong( portString );
00282 }
00283 else
00284 {
00285 num = 0;
00286 address = d.getString( SOCKET_CONNECT_HOST );
00287 port = ( short ) d.getLong( SOCKET_CONNECT_PORT );
00288 }
00289
00290 m_sessionToHostNum[ s ] = ++num;
00291
00292 QF_STACK_POP
00293 }
00294 }