Skip to content

Commit

Permalink
added --replay-flags switch to searchd (to control misc binlog replay…
Browse files Browse the repository at this point in the history
… settings)

git-svn-id: http://sphinxsearch.googlecode.com/svn/trunk@2943 8b96e2b9-35c5-2c16-bc47-5122d61876d4
  • Loading branch information
shodan committed Sep 8, 2011
1 parent 1c4ad1e commit 124ad5d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 17 deletions.
7 changes: 6 additions & 1 deletion src/searchd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14784,6 +14784,8 @@ int WINAPI ServiceMain ( int argc, char **argv )
bool bOptListen = false;
bool bTestMode = false;

DWORD uReplayFlags = 0;

#define OPT(_a1,_a2) else if ( !strcmp(argv[i],_a1) || !strcmp(argv[i],_a2) )
#define OPT1(_a1) else if ( !strcmp(argv[i],_a1) )

Expand Down Expand Up @@ -14819,6 +14821,9 @@ int WINAPI ServiceMain ( int argc, char **argv )
OPT1 ( "--test" ) { g_bWatchdog = false; bTestMode = true; }
OPT1 ( "--strip-path" ) g_bStripPath = true;

// FIXME! add opt=(csv)val handling here
OPT1 ( "--replay-flags=accept-desc-timestamp" ) uReplayFlags |= SPH_REPLAY_ACCEPT_DESC_TIMESTAMP;

// handle 1-arg options
else if ( (i+1)>=argc ) break;
OPT ( "-c", "--config" ) g_sConfigFile = argv[++i];
Expand Down Expand Up @@ -15468,7 +15473,7 @@ int WINAPI ServiceMain ( int argc, char **argv )
hIndexes.Add ( it.Get().m_pIndex, it.GetKey() );

if ( g_eWorkers==MPM_THREADS )
sphReplayBinlog ( hIndexes, DumpMemStat );
sphReplayBinlog ( hIndexes, uReplayFlags, DumpMemStat );

if ( !g_bOptNoDetach )
g_bLogStdout = false;
Expand Down
35 changes: 22 additions & 13 deletions src/sphinxrt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ class BinlogReader_c : protected CSphAutoreader
DWORD m_uCRC;
};


class RtBinlog_c : public ISphBinlog
{
public:
Expand All @@ -801,7 +802,7 @@ class RtBinlog_c : public ISphBinlog
void NotifyIndexFlush ( const char * sIndexName, int64_t iTID, bool bShutdown );

void Configure ( const CSphConfigSection & hSearchd, bool bTestMode );
void Replay ( const SmallStringHash_T<CSphIndex*> & hIndexes, ProgressCallbackSimple_t * pfnProgressCallback );
void Replay ( const SmallStringHash_T<CSphIndex*> & hIndexes, DWORD uReplayFlags, ProgressCallbackSimple_t * pfnProgressCallback );

void CreateTimerThread ();

Expand Down Expand Up @@ -853,8 +854,8 @@ class RtBinlog_c : public ISphBinlog
void CheckDoFlush ();
void OpenNewLog ( int iLastState=0 );

int ReplayBinlog ( const SmallStringHash_T<CSphIndex*> & hIndexes, int iBinlog );
bool ReplayCommit ( int iBinlog, BinlogReader_c & tReader ) const;
int ReplayBinlog ( const SmallStringHash_T<CSphIndex*> & hIndexes, DWORD uReplayFlags, int iBinlog );
bool ReplayCommit ( int iBinlog, DWORD uReplayFlags, BinlogReader_c & tReader ) const;
bool ReplayUpdateAttributes ( int iBinlog, BinlogReader_c & tReader ) const;
bool ReplayIndexAdd ( int iBinlog, const SmallStringHash_T<CSphIndex*> & hIndexes, BinlogReader_c & tReader ) const;
bool ReplayCacheAdd ( int iBinlog, BinlogReader_c & tReader ) const;
Expand Down Expand Up @@ -5109,7 +5110,7 @@ void RtBinlog_c::Configure ( const CSphConfigSection & hSearchd, bool bTestMode
}
}

void RtBinlog_c::Replay ( const SmallStringHash_T<CSphIndex*> & hIndexes, ProgressCallbackSimple_t * pfnProgressCallback )
void RtBinlog_c::Replay ( const SmallStringHash_T<CSphIndex*> & hIndexes, DWORD uReplayFlags, ProgressCallbackSimple_t * pfnProgressCallback )
{
if ( m_bDisabled || !hIndexes.GetLength() )
return;
Expand All @@ -5124,7 +5125,7 @@ void RtBinlog_c::Replay ( const SmallStringHash_T<CSphIndex*> & hIndexes, Progre
int iLastLogState = 0;
ARRAY_FOREACH ( i, m_dLogFiles )
{
iLastLogState = ReplayBinlog ( hIndexes, i );
iLastLogState = ReplayBinlog ( hIndexes, uReplayFlags, i );
if ( pfnProgressCallback ) // on each replayed binlog
pfnProgressCallback();
}
Expand Down Expand Up @@ -5409,7 +5410,7 @@ void RtBinlog_c::CheckDoFlush ()
}
}

int RtBinlog_c::ReplayBinlog ( const SmallStringHash_T<CSphIndex*> & hIndexes, int iBinlog )
int RtBinlog_c::ReplayBinlog ( const SmallStringHash_T<CSphIndex*> & hIndexes, DWORD uReplayFlags, int iBinlog )
{
assert ( iBinlog>=0 && iBinlog<m_dLogFiles.GetLength() );
CSphString sError;
Expand Down Expand Up @@ -5477,7 +5478,7 @@ int RtBinlog_c::ReplayBinlog ( const SmallStringHash_T<CSphIndex*> & hIndexes, i
switch ( uOp )
{
case BLOP_COMMIT:
bReplayOK = ReplayCommit ( iBinlog, tReader );
bReplayOK = ReplayCommit ( iBinlog, uReplayFlags, tReader );
break;

case BLOP_UPDATE_ATTRS:
Expand Down Expand Up @@ -5559,7 +5560,7 @@ static BinlogIndexInfo_t & ReplayIndexID ( BinlogReader_c & tReader, BinlogFileD
}


bool RtBinlog_c::ReplayCommit ( int iBinlog, BinlogReader_c & tReader ) const
bool RtBinlog_c::ReplayCommit ( int iBinlog, DWORD uReplayFlags, BinlogReader_c & tReader ) const
{
// load and lookup index
const int64_t iTxnPos = tReader.GetPos();
Expand Down Expand Up @@ -5599,13 +5600,21 @@ bool RtBinlog_c::ReplayCommit ( int iBinlog, BinlogReader_c & tReader ) const
if ( tReader.GetErrorFlag() || !tReader.CheckCrc ( "commit", tIndex.m_sName.cstr(), iTID, iTxnPos ) )
return false;

// check TID, time order in log
// check TID
if ( iTID<tIndex.m_iMaxTID )
sphDie ( "binlog: commit: descending tid (index=%s, lasttid="INT64_FMT", logtid="INT64_FMT", pos="INT64_FMT")",
tIndex.m_sName.cstr(), tIndex.m_iMaxTID, iTID, iTxnPos );

// check timestamp
if ( tmStamp<tIndex.m_tmMax )
sphDie ( "binlog: commit: descending time (index=%s, lasttime="INT64_FMT", logtime="INT64_FMT", pos="INT64_FMT")",
tIndex.m_sName.cstr(), tIndex.m_tmMax, tmStamp, iTxnPos );
{
if (!( uReplayFlags & SPH_REPLAY_ACCEPT_DESC_TIMESTAMP ))
sphDie ( "binlog: commit: descending time (index=%s, lasttime="INT64_FMT", logtime="INT64_FMT", pos="INT64_FMT")",
tIndex.m_sName.cstr(), tIndex.m_tmMax, tmStamp, iTxnPos );
else
sphWarning ( "binlog: commit: replaying txn despite descending time (index=%s, logtid="INT64_FMT", lasttime="INT64_FMT", logtime="INT64_FMT", pos="INT64_FMT")",
tIndex.m_sName.cstr(), iTID, tIndex.m_tmMax, tmStamp, iTxnPos );
}

// only replay transaction when index exists and does not have it yet (based on TID)
if ( tIndex.m_pRT && iTID > tIndex.m_pRT->m_iTID )
Expand Down Expand Up @@ -5846,10 +5855,10 @@ void sphRTDone ()
}


void sphReplayBinlog ( const SmallStringHash_T<CSphIndex*> & hIndexes, ProgressCallbackSimple_t * pfnProgressCallback )
void sphReplayBinlog ( const SmallStringHash_T<CSphIndex*> & hIndexes, DWORD uReplayFlags, ProgressCallbackSimple_t * pfnProgressCallback )
{
MEMORY ( SPH_MEM_BINLOG );
g_pRtBinlog->Replay ( hIndexes, pfnProgressCallback );
g_pRtBinlog->Replay ( hIndexes, uReplayFlags, pfnProgressCallback );
g_pRtBinlog->CreateTimerThread();
g_bRTChangesAllowed = true;
}
Expand Down
9 changes: 8 additions & 1 deletion src/sphinxrt.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,15 @@ ISphRtIndex * sphGetCurrentIndexRT();

typedef void ProgressCallbackSimple_t ();

//////////////////////////////////////////////////////////////////////////

enum ESphBinlogReplayFlags
{
SPH_REPLAY_ACCEPT_DESC_TIMESTAMP = 1
};

/// replay stored binlog
void sphReplayBinlog ( const SmallStringHash_T<CSphIndex*> & hIndexes, ProgressCallbackSimple_t * pfnProgressCallback=NULL );
void sphReplayBinlog ( const SmallStringHash_T<CSphIndex*> & hIndexes, DWORD uReplayFlags, ProgressCallbackSimple_t * pfnProgressCallback=NULL );

#endif // _sphinxrt_

Expand Down
2 changes: 1 addition & 1 deletion src/testrt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ int main ()
sphRTInit();
sphRTConfigure ( tRTConfig, true );
SmallStringHash_T< CSphIndex * > dTemp;
sphReplayBinlog ( dTemp );
sphReplayBinlog ( dTemp, 0 );
ISphRtIndex * pIndex = sphCreateIndexRT ( tSchema, "testrt", 32*1024*1024, "data/dump" );
pIndex->SetTokenizer ( pTok ); // index will own this pair from now on
pIndex->SetDictionary ( pDict );
Expand Down
2 changes: 1 addition & 1 deletion src/tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1665,7 +1665,7 @@ void TestRTInit ()
sphRTConfigure ( tRTConfig, true );

SmallStringHash_T<CSphIndex*> hIndexes;
sphReplayBinlog ( hIndexes );
sphReplayBinlog ( hIndexes, 0 );
}


Expand Down

0 comments on commit 124ad5d

Please sign in to comment.