00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "FileStore.h"
00028 #include "SessionID.h"
00029 #include "Parser.h"
00030 #include "Utility.h"
00031 #include <fstream>
00032
00033 namespace FIX
00034 {
00035 FileStore::FileStore( std::string path, const SessionID& s )
00036 : m_msgFile( 0 ), m_headerFile( 0 ), m_seqNumsFile( 0 ), m_sessionFile( 0 )
00037 {
00038 file_mkdir( path.c_str() );
00039
00040 if ( path.empty() ) path = ".";
00041 const std::string& begin =
00042 s.getBeginString().getString();
00043 const std::string& sender =
00044 s.getSenderCompID().getString();
00045 const std::string& target =
00046 s.getTargetCompID().getString();
00047 const std::string& qualifier =
00048 s.getSessionQualifier();
00049
00050 std::string sessionid = begin + "-" + sender + "-" + target;
00051 if( qualifier.size() )
00052 sessionid += "-" + qualifier;
00053
00054 std::string prefix
00055 = file_appendpath(path, sessionid + ".");
00056
00057 m_msgFileName = prefix + "body";
00058 m_headerFileName = prefix + "header";
00059 m_seqNumsFileName = prefix + "seqnums";
00060 m_sessionFileName = prefix + "session";
00061
00062 try
00063 {
00064 open( false );
00065 }
00066 catch ( IOException & e )
00067 {
00068 throw ConfigError( e.what() );
00069 }
00070 }
00071
00072 FileStore::~FileStore()
00073 {
00074 if( m_msgFile ) fclose( m_msgFile );
00075 if( m_headerFile ) fclose( m_headerFile );
00076 if( m_seqNumsFile ) fclose( m_seqNumsFile );
00077 if( m_sessionFile ) fclose( m_sessionFile );
00078 }
00079
00080 void FileStore::open( bool deleteFile )
00081 { QF_STACK_PUSH(FileStore::open)
00082
00083 if ( m_msgFile ) fclose( m_msgFile );
00084 if ( m_headerFile ) fclose( m_headerFile );
00085 if ( m_seqNumsFile ) fclose( m_seqNumsFile );
00086 if ( m_sessionFile ) fclose( m_sessionFile );
00087
00088 m_msgFile = 0;
00089 m_headerFile = 0;
00090 m_seqNumsFile = 0;
00091 m_sessionFile = 0;
00092
00093 if ( deleteFile )
00094 {
00095 file_unlink( m_msgFileName.c_str() );
00096 file_unlink( m_headerFileName.c_str() );
00097 file_unlink( m_seqNumsFileName.c_str() );
00098 file_unlink( m_sessionFileName.c_str() );
00099 }
00100
00101 populateCache();
00102 m_msgFile = file_fopen( m_msgFileName.c_str(), "r+" );
00103 if ( !m_msgFile ) m_msgFile = file_fopen( m_msgFileName.c_str(), "w+" );
00104 if ( !m_msgFile ) throw ConfigError( "Could not open body file: " + m_msgFileName );
00105
00106 m_headerFile = file_fopen( m_headerFileName.c_str(), "r+" );
00107 if ( !m_headerFile ) m_headerFile = file_fopen( m_headerFileName.c_str(), "w+" );
00108 if ( !m_headerFile ) throw ConfigError( "Could not open header file: " + m_headerFileName );
00109
00110 m_seqNumsFile = file_fopen( m_seqNumsFileName.c_str(), "r+" );
00111 if ( !m_seqNumsFile ) m_seqNumsFile = file_fopen( m_seqNumsFileName.c_str(), "w+" );
00112 if ( !m_seqNumsFile ) throw ConfigError( "Could not open seqnums file: " + m_seqNumsFileName );
00113
00114 bool setCreationTime = false;
00115 m_sessionFile = file_fopen( m_sessionFileName.c_str(), "r" );
00116 if ( !m_sessionFile ) setCreationTime = true;
00117 else fclose( m_sessionFile );
00118
00119 m_sessionFile = file_fopen( m_sessionFileName.c_str(), "r+" );
00120 if ( !m_sessionFile ) m_sessionFile = file_fopen( m_sessionFileName.c_str(), "w+" );
00121 if ( !m_sessionFile ) throw ConfigError( "Could not open session file" );
00122 if ( setCreationTime ) setSession();
00123
00124 setNextSenderMsgSeqNum( getNextSenderMsgSeqNum() );
00125 setNextTargetMsgSeqNum( getNextTargetMsgSeqNum() );
00126
00127 QF_STACK_POP
00128 }
00129
00130 void FileStore::populateCache()
00131 { QF_STACK_PUSH(FileStore::populateCache)
00132
00133 std::string msg;
00134 Message message;
00135
00136 FILE* headerFile;
00137 headerFile = file_fopen( m_headerFileName.c_str(), "r+" );
00138 if ( headerFile )
00139 {
00140 int num, offset, size;
00141 while ( FILE_FSCANF( headerFile, "%d,%d,%d ", &num, &offset, &size ) == 3 )
00142 m_offsets[ num ] = std::make_pair( offset, size );
00143 fclose( headerFile );
00144 }
00145
00146 FILE* seqNumsFile;
00147 seqNumsFile = file_fopen( m_seqNumsFileName.c_str(), "r+" );
00148 if ( seqNumsFile )
00149 {
00150 int sender, target;
00151 if ( FILE_FSCANF( seqNumsFile, "%d : %d", &sender, &target ) == 2 )
00152 {
00153 m_cache.setNextSenderMsgSeqNum( sender );
00154 m_cache.setNextTargetMsgSeqNum( target );
00155 }
00156 fclose( seqNumsFile );
00157 }
00158
00159 FILE* sessionFile;
00160 sessionFile = file_fopen( m_sessionFileName.c_str(), "r+" );
00161 if ( sessionFile )
00162 {
00163 char time[ 22 ];
00164 #ifdef HAVE_FSCANF_S
00165 int result = FILE_FSCANF( sessionFile, "%s", time, 22 );
00166 #else
00167 int result = FILE_FSCANF( sessionFile, "%s", time );
00168 #endif
00169 if( result == 1 )
00170 {
00171 m_cache.setCreationTime( UtcTimeStampConvertor::convert( time, true ) );
00172 }
00173 fclose( sessionFile );
00174 }
00175
00176 QF_STACK_POP
00177 }
00178
00179 MessageStore* FileStoreFactory::create( const SessionID& s )
00180 { QF_STACK_PUSH(FileStoreFactory::create)
00181
00182 if ( m_path.size() ) return new FileStore( m_path, s );
00183
00184 std::string path;
00185 Dictionary settings = m_settings.get( s );
00186 path = settings.getString( FILE_STORE_PATH );
00187 return new FileStore( path, s );
00188
00189 QF_STACK_POP
00190 }
00191
00192 void FileStoreFactory::destroy( MessageStore* pStore )
00193 { QF_STACK_PUSH(FileStoreFactory::destroy)
00194 delete pStore;
00195 QF_STACK_POP
00196 }
00197
00198 bool FileStore::set( int msgSeqNum, const std::string& msg )
00199 throw ( IOException )
00200 { QF_STACK_PUSH(FileStore::set)
00201
00202 if ( fseek( m_msgFile, 0, SEEK_END ) )
00203 throw IOException( "Cannot seek to end of " + m_msgFileName );
00204 if ( fseek( m_headerFile, 0, SEEK_END ) )
00205 throw IOException( "Cannot seek to end of " + m_headerFileName );
00206
00207 int offset = ftell( m_msgFile );
00208 if ( offset < 0 )
00209 throw IOException( "Unable to get file pointer position from " + m_msgFileName );
00210 int size = msg.size();
00211
00212 if ( fprintf( m_headerFile, "%d,%d,%d ", msgSeqNum, offset, size ) < 0 )
00213 throw IOException( "Unable to write to file " + m_headerFileName );
00214 m_offsets[ msgSeqNum ] = std::make_pair( offset, size );
00215 fwrite( msg.c_str(), sizeof( char ), msg.size(), m_msgFile );
00216 if ( ferror( m_msgFile ) )
00217 throw IOException( "Unable to write to file " + m_msgFileName );
00218 if ( fflush( m_msgFile ) == EOF )
00219 throw IOException( "Unable to flush file " + m_msgFileName );
00220 if ( fflush( m_headerFile ) == EOF )
00221 throw IOException( "Unable to flush file " + m_headerFileName );
00222 return true;
00223
00224 QF_STACK_POP
00225 }
00226
00227 void FileStore::get( int begin, int end,
00228 std::vector < std::string > & result ) const
00229 throw ( IOException )
00230 { QF_STACK_PUSH(FileStore::get)
00231
00232 result.clear();
00233 std::string msg;
00234 for ( int i = begin; i <= end; ++i )
00235 {
00236 if ( get( i, msg ) )
00237 result.push_back( msg );
00238 }
00239
00240 QF_STACK_POP
00241 }
00242
00243 int FileStore::getNextSenderMsgSeqNum() const throw ( IOException )
00244 { QF_STACK_PUSH(FileStore::getNextSenderMsgSeqNum)
00245 return m_cache.getNextSenderMsgSeqNum();
00246 QF_STACK_POP
00247 }
00248
00249 int FileStore::getNextTargetMsgSeqNum() const throw ( IOException )
00250 { QF_STACK_PUSH(FileStore::getNextTargetMsgSeqNum)
00251 return m_cache.getNextTargetMsgSeqNum();
00252 QF_STACK_POP
00253 }
00254
00255 void FileStore::setNextSenderMsgSeqNum( int value ) throw ( IOException )
00256 { QF_STACK_PUSH(FileStore::setNextSenderMsgSeqNum)
00257 m_cache.setNextSenderMsgSeqNum( value );
00258 setSeqNum();
00259 QF_STACK_POP
00260 }
00261
00262 void FileStore::setNextTargetMsgSeqNum( int value ) throw ( IOException )
00263 { QF_STACK_PUSH(FileStore::setNextTargetMsgSeqNum)
00264 m_cache.setNextTargetMsgSeqNum( value );
00265 setSeqNum();
00266 QF_STACK_POP
00267 }
00268
00269 void FileStore::incrNextSenderMsgSeqNum() throw ( IOException )
00270 { QF_STACK_PUSH(FileStore::incrNextSenderMsgSeqNum)
00271 m_cache.incrNextSenderMsgSeqNum();
00272 setSeqNum();
00273 QF_STACK_POP
00274 }
00275
00276 void FileStore::incrNextTargetMsgSeqNum() throw ( IOException )
00277 { QF_STACK_PUSH(FileStore::incrNextTargetMsgSeqNum)
00278 m_cache.incrNextTargetMsgSeqNum();
00279 setSeqNum();
00280 QF_STACK_POP
00281 }
00282
00283 UtcTimeStamp FileStore::getCreationTime() const throw ( IOException )
00284 { QF_STACK_PUSH(FileStore::getCreationTime)
00285 return m_cache.getCreationTime();
00286 QF_STACK_POP
00287 }
00288
00289 void FileStore::reset() throw ( IOException )
00290 { QF_STACK_PUSH(FileStore::reset)
00291
00292 m_cache.reset();
00293 open( true );
00294 setSession();
00295
00296 QF_STACK_POP
00297 }
00298
00299 void FileStore::refresh() throw ( IOException )
00300 { QF_STACK_PUSH(FileStore::refresh)
00301
00302 m_cache.reset();
00303 open( false );
00304
00305 QF_STACK_POP
00306 }
00307
00308 void FileStore::setSeqNum()
00309 { QF_STACK_PUSH(FileStore::setSeqNum)
00310
00311 rewind( m_seqNumsFile );
00312 fprintf( m_seqNumsFile, "%10.10d : %10.10d",
00313 getNextSenderMsgSeqNum(), getNextTargetMsgSeqNum() );
00314 if ( ferror( m_seqNumsFile ) )
00315 throw IOException( "Unable to write to file " + m_seqNumsFileName );
00316 if ( fflush( m_seqNumsFile ) )
00317 throw IOException( "Unable to flush file " + m_seqNumsFileName );
00318
00319 QF_STACK_POP
00320 }
00321
00322 void FileStore::setSession()
00323 { QF_STACK_PUSH(FileStore::setSession)
00324
00325 rewind( m_sessionFile );
00326 fprintf( m_sessionFile, "%s",
00327 UtcTimeStampConvertor::convert( m_cache.getCreationTime() ).c_str() );
00328 if ( ferror( m_sessionFile ) )
00329 throw IOException( "Unable to write to file " + m_sessionFileName );
00330 if ( fflush( m_sessionFile ) )
00331 throw IOException( "Unable to flush file " + m_sessionFileName );
00332
00333 QF_STACK_POP
00334 }
00335
00336 bool FileStore::get( int msgSeqNum, std::string& msg ) const
00337 throw ( IOException )
00338 { QF_STACK_PUSH(FileStore::get)
00339
00340 NumToOffset::const_iterator find = m_offsets.find( msgSeqNum );
00341 if ( find == m_offsets.end() ) return false;
00342 const OffsetSize& offset = find->second;
00343 if ( fseek( m_msgFile, offset.first, SEEK_SET ) )
00344 throw IOException( "Unable to seek in file " + m_msgFileName );
00345 char* buffer = new char[ offset.second + 1 ];
00346 fread( buffer, sizeof( char ), offset.second, m_msgFile );
00347 if ( ferror( m_msgFile ) )
00348 throw IOException( "Unable to read from file " + m_msgFileName );
00349 buffer[ offset.second ] = 0;
00350 msg = buffer;
00351 delete [] buffer;
00352 return true;
00353
00354 QF_STACK_POP
00355 }
00356
00357 }