diff --git a/api/java/SphinxClient.java b/api/java/SphinxClient.java index b8e8597b..321fd436 100644 --- a/api/java/SphinxClient.java +++ b/api/java/SphinxClient.java @@ -80,6 +80,7 @@ public class SphinxClient private final static int SEARCHD_COMMAND_UPDATE = 2; private final static int SEARCHD_COMMAND_KEYWORDS = 3; private final static int SEARCHD_COMMAND_PERSIST = 4; + private final static int SEARCHD_COMMAND_FLUSHATTRS = 7; /* searchd command versions */ private final static int VER_MAJOR_PROTO = 0x1; @@ -87,6 +88,7 @@ public class SphinxClient private final static int VER_COMMAND_EXCERPT = 0x100; private final static int VER_COMMAND_UPDATE = 0x101; private final static int VER_COMMAND_KEYWORDS = 0x100; + private final static int VER_COMMAND_FLUSHATTRS = 0x100; /* filter types */ private final static int SPH_FILTER_VALUES = 0; @@ -1289,6 +1291,37 @@ public Map[] BuildKeywords ( String query, String index, boolean hits ) throws S } } + + + /** + * Force attribute flush, and block until it completes. + * Returns current internal flush tag on success, -1 on failure. + */ + public int FlushAttrs() throws SphinxException + { + /* build request */ + ByteArrayOutputStream reqBuf = new ByteArrayOutputStream(); + + /* run request */ + DataInputStream in = _DoRequest ( SEARCHD_COMMAND_FLUSHATTRS, VER_COMMAND_FLUSHATTRS, reqBuf ); + if ( in==null ) + return -1; + + /* parse reply */ + try + { + int iFlushTag = in.readInt (); + return iFlushTag; + + } catch ( Exception e ) + { + _error = "incomplete reply"; + return -1; + } + } + + + /** Escape the characters with special meaning in query syntax. */ static public String EscapeString ( String s ) { diff --git a/api/sphinxapi.php b/api/sphinxapi.php index e8ec772e..c001c9e6 100644 --- a/api/sphinxapi.php +++ b/api/sphinxapi.php @@ -18,13 +18,14 @@ ///////////////////////////////////////////////////////////////////////////// /// known searchd commands -define ( "SEARCHD_COMMAND_SEARCH", 0 ); -define ( "SEARCHD_COMMAND_EXCERPT", 1 ); -define ( "SEARCHD_COMMAND_UPDATE", 2 ); -define ( "SEARCHD_COMMAND_KEYWORDS",3 ); -define ( "SEARCHD_COMMAND_PERSIST", 4 ); -define ( "SEARCHD_COMMAND_STATUS", 5 ); -define ( "SEARCHD_COMMAND_QUERY", 6 ); +define ( "SEARCHD_COMMAND_SEARCH", 0 ); +define ( "SEARCHD_COMMAND_EXCERPT", 1 ); +define ( "SEARCHD_COMMAND_UPDATE", 2 ); +define ( "SEARCHD_COMMAND_KEYWORDS", 3 ); +define ( "SEARCHD_COMMAND_PERSIST", 4 ); +define ( "SEARCHD_COMMAND_STATUS", 5 ); +define ( "SEARCHD_COMMAND_QUERY", 6 ); +define ( "SEARCHD_COMMAND_FLUSHATRS", 7 ); /// current client-side command implementation versions define ( "VER_COMMAND_SEARCH", 0x117 ); @@ -33,6 +34,7 @@ define ( "VER_COMMAND_KEYWORDS", 0x100 ); define ( "VER_COMMAND_STATUS", 0x100 ); define ( "VER_COMMAND_QUERY", 0x100 ); +define ( "VER_COMMAND_FLUSHATTRS", 0x100 ); /// known searchd status codes define ( "SEARCHD_OK", 0 ); @@ -1604,6 +1606,36 @@ function Status () $this->_MBPop (); return $res; } + + ////////////////////////////////////////////////////////////////////////// + // flush + ////////////////////////////////////////////////////////////////////////// + + function FlushAttrs () + { + $this->_MBPush (); + if (!( $fp = $this->_Connect() )) + { + $this->_MBPop(); + return false; + } + + $req = pack ( "nnN", SEARCHD_COMMAND_FLUSHATTRS, VER_COMMAND_FLUSHATTRS, 0 ); // len=0 + if ( !( $this->_Send ( $fp, $req, 8 ) ) || + !( $response = $this->_GetResponse ( $fp, VER_COMMAND_FLUSHATTRS ) ) ) + { + $this->_MBPop (); + return false; + } + + $tag = -1; + if ( strlen($response)==4 ) + list(,$tag) = unpack ( "N*", $response ); + + $this->_MBPop (); + return $tag; + } + } // diff --git a/api/sphinxapi.py b/api/sphinxapi.py index 875ca2c2..c8033f4b 100644 --- a/api/sphinxapi.py +++ b/api/sphinxapi.py @@ -21,17 +21,19 @@ # known searchd commands -SEARCHD_COMMAND_SEARCH = 0 -SEARCHD_COMMAND_EXCERPT = 1 -SEARCHD_COMMAND_UPDATE = 2 -SEARCHD_COMMAND_KEYWORDS= 3 -SEARCHD_COMMAND_PERSIST = 4 +SEARCHD_COMMAND_SEARCH = 0 +SEARCHD_COMMAND_EXCERPT = 1 +SEARCHD_COMMAND_UPDATE = 2 +SEARCHD_COMMAND_KEYWORDS = 3 +SEARCHD_COMMAND_PERSIST = 4 +SEARCHD_COMMAND_FLUSHATTRS = 7 # current client-side command implementation versions VER_COMMAND_SEARCH = 0x116 VER_COMMAND_EXCERPT = 0x100 VER_COMMAND_UPDATE = 0x101 VER_COMMAND_KEYWORDS = 0x100 +VER_COMMAND_FLUSHATTRS = 0x100 # known searchd status codes SEARCHD_OK = 0 @@ -964,6 +966,22 @@ def Close(self): def EscapeString(self, string): return re.sub(r"([=\(\)|\-!@~\"&/\\\^\$\=])", r"\\\1", string) + + def FlushAttrs(self): + sock = self._Connect() + if not sock: + return None + + request = pack ( '>hhI', SEARCHD_COMMAND_FLUSHATTRS, VER_COMMAND_FLUSHATTRS, 0 ) # cmd, ver, bodylen + sock.send ( request ) + + response = self._GetResponse ( sock, VER_COMMAND_FLUSHATTRS ) + if not response or len(response)!=4: + return -1 + + tag = unpack ( '>L', response[0:4] )[0] + return tag + # # $Id$ # diff --git a/src/searchd.cpp b/src/searchd.cpp index 5b3ec8c5..386f6029 100644 --- a/src/searchd.cpp +++ b/src/searchd.cpp @@ -106,7 +106,8 @@ enum ESphLogLevel { LOG_FATAL = 0, LOG_WARNING = 1, - LOG_INFO = 2 + LOG_INFO = 2, + LOG_DEBUG = 3 }; enum ProtocolType_e @@ -228,10 +229,6 @@ static CSphVector g_dRotating; // names of indexes to be rota static const char * g_sPrereading = NULL; // name of index currently being preread static CSphIndex * g_pPrereading = NULL; // rotation "buffer" -static int g_iUpdateTag = 0; // ever-growing update tag -static bool g_bFlushing = false; // update flushing in progress -static int g_iFlushTag = 0; // last flushed tag - enum { SPH_PIPE_UPDATED_ATTRS, @@ -272,6 +269,7 @@ enum SearchdCommand_e SEARCHD_COMMAND_PERSIST = 4, SEARCHD_COMMAND_STATUS = 5, SEARCHD_COMMAND_QUERY = 6, + SEARCHD_COMMAND_FLUSHATTRS = 7, SEARCHD_COMMAND_TOTAL }; @@ -285,7 +283,8 @@ enum VER_COMMAND_UPDATE = 0x102, VER_COMMAND_KEYWORDS = 0x100, VER_COMMAND_STATUS = 0x100, - VER_COMMAND_QUERY = 0x100 + VER_COMMAND_QUERY = 0x100, + VER_COMMAND_FLUSHATTRS = 0x100 }; @@ -301,8 +300,6 @@ enum SearchdStatus_e const int MAX_RETRY_COUNT = 8; const int MAX_RETRY_DELAY = 1000; -////////////////////////////////////////////////////////////////////////// -// PERF COUNTERS ////////////////////////////////////////////////////////////////////////// struct SearchdStats_t @@ -332,6 +329,18 @@ static SearchdStats_t * g_pStats = NULL; static CSphSharedBuffer g_tStatsBuffer; static CSphProcessSharedMutex g_tStatsMutex; + +struct FlushState_t +{ + int m_iUpdateTag; ///< ever-growing update tag + int m_bFlushing; ///< update flushing in progress + int m_iFlushTag; ///< last flushed tag + bool m_bForceCheck; ///< forced check/flush flag +}; + +static volatile FlushState_t * g_pFlush = NULL; +static CSphSharedBuffer g_tFlushBuffer; + ///////////////////////////////////////////////////////////////////////////// // MACHINE-DEPENDENT STUFF ///////////////////////////////////////////////////////////////////////////// @@ -525,6 +534,7 @@ void sphLog ( ESphLogLevel eLevel, const char * sFmt, va_list ap ) if ( sFmt==NULL ) eLevel = eLastLevel; if ( eLevel==LOG_FATAL ) sBanner = "FATAL: "; if ( eLevel==LOG_WARNING ) sBanner = "WARNING: "; + if ( eLevel==LOG_DEBUG ) sBanner = "DEBUG: "; char sBuf [ 1024 ]; if ( !g_bLogTty ) @@ -616,6 +626,14 @@ void sphLogFatal ( const char * sFmt, ... ) va_end ( ap ); } +void sphLogDebug ( const char * sFmt, ... ) +{ + va_list ap; + va_start ( ap, sFmt ); + sphLog ( LOG_DEBUG, sFmt, ap ); + va_end ( ap ); +} + void LogInternalError ( const char * sError ) { sphWarning( "INTERNAL ERROR: %s", sError ); @@ -5718,6 +5736,7 @@ void BuildStatus ( CSphVector & dStatus ) dStatus.Add ( "command_keywords" ); dStatus.Add().SetSprintf ( FMT64, g_pStats->m_iCommandCount[SEARCHD_COMMAND_KEYWORDS] ); dStatus.Add ( "command_persist" ); dStatus.Add().SetSprintf ( FMT64, g_pStats->m_iCommandCount[SEARCHD_COMMAND_PERSIST] ); dStatus.Add ( "command_status" ); dStatus.Add().SetSprintf ( FMT64, g_pStats->m_iCommandCount[SEARCHD_COMMAND_STATUS] ); + dStatus.Add ( "command_flushattrs" ); dStatus.Add().SetSprintf ( FMT64, g_pStats->m_iCommandCount[SEARCHD_COMMAND_FLUSHATTRS] ); dStatus.Add ( "agent_connect" ); dStatus.Add().SetSprintf ( FMT64, g_pStats->m_iAgentConnect ); dStatus.Add ( "agent_retry" ); dStatus.Add().SetSprintf ( FMT64, g_pStats->m_iAgentRetry ); dStatus.Add ( "queries" ); dStatus.Add().SetSprintf ( FMT64, g_pStats->m_iQueries ); @@ -5839,6 +5858,51 @@ void HandleCommandStatus ( int iSock, int iVer, InputBuffer_c & tReq ) assert ( tOut.GetError()==true || tOut.GetSentCount()==8+iRespLen ); } +////////////////////////////////////////////////////////////////////////// +// FLUSH HANDLER +////////////////////////////////////////////////////////////////////////// + +void HandleCommandFlush ( int iSock, int iVer, InputBuffer_c & tReq ) +{ + if ( !CheckCommandVersion ( iVer, VER_COMMAND_FLUSHATTRS, tReq ) ) + return; + + // only if flushes are enabled + if ( g_iAttrFlushPeriod<=0 ) + { + // flushes are disabled + sphLogDebug ( "attrflush: attr_flush_period<=0, command ignored" ); + + } else if ( g_eWorkers==MPM_NONE ) + { + // --console mode, no async thread/process to handle the check + sphLogDebug ( "attrflush: --console mode, command ignored" ); + + } else + { + // force a check in head process, and wait it until completes + // FIXME! semi active wait.. + sphLogDebug ( "attrflush: forcing check, tag=%d", g_pFlush->m_iFlushTag ); + g_pFlush->m_bForceCheck = true; + while ( g_pFlush->m_bForceCheck ) + sphSleepMsec ( 1 ); + + // if we are flushing now, wait until flush completes + while ( g_pFlush->m_bFlushing ) + sphSleepMsec ( 10 ); + sphLogDebug ( "attrflush: check finished, tag=%d", g_pFlush->m_iFlushTag ); + } + + // return last flush tag, just for the fun of it + NetOutputBuffer_c tOut ( iSock ); + tOut.SendWord ( SEARCHD_OK ); + tOut.SendWord ( VER_COMMAND_FLUSHATTRS ); + tOut.SendInt ( 4 ); // resplen, 1 dword + tOut.SendInt ( g_pFlush->m_iFlushTag ); + tOut.Flush (); + assert ( tOut.GetError()==true || tOut.GetSentCount()==12 ); // 8+resplen +} + ///////////////////////////////////////////////////////////////////////////// // GENERAL HANDLER ///////////////////////////////////////////////////////////////////////////// @@ -5890,13 +5954,13 @@ void HandleClientSphinx ( int iSock, const char * sClientIP, int iPipeFD ) // check request if ( iCommand<0 || iCommand>=SEARCHD_COMMAND_TOTAL - || iLength<=0 || iLength>g_iMaxPacketSize ) + || iLength<0 || iLength>g_iMaxPacketSize ) { // unknown command, default response header tBuf.SendErrorReply ( "unknown command (code=%d)", iCommand ); // if request length is insane, low level comm is broken, so we bail out - if ( iLength<=0 || iLength>g_iMaxPacketSize ) + if ( iLength<0 || iLength>g_iMaxPacketSize ) { sphWarning ( "ill-formed client request (length=%d out of bounds)", iLength ); return; @@ -5912,8 +5976,8 @@ void HandleClientSphinx ( int iSock, const char * sClientIP, int iPipeFD ) } // get request body - assert ( iLength>0 && iLength<=g_iMaxPacketSize ); - if ( !tBuf.ReadFrom ( iLength ) ) + assert ( iLength>=0 && iLength<=g_iMaxPacketSize ); + if ( iLength && !tBuf.ReadFrom ( iLength ) ) { sphWarning ( "failed to receive client request body (client=%s, exp=%d)", sClientIP, iLength ); return; @@ -5936,6 +6000,7 @@ void HandleClientSphinx ( int iSock, const char * sClientIP, int iPipeFD ) case SEARCHD_COMMAND_PERSIST: bPersist = ( tBuf.GetInt()!=0 ); iTimeout = g_iClientTimeout; break; case SEARCHD_COMMAND_STATUS: HandleCommandStatus ( iSock, iCommandVer, tBuf ); break; case SEARCHD_COMMAND_QUERY: HandleCommandQuery ( iSock, iCommandVer, tBuf ); break; + case SEARCHD_COMMAND_FLUSHATTRS:HandleCommandFlush ( iSock,iCommandVer, tBuf ); break; default: assert ( 0 && "INTERNAL ERROR: unhandled command" ); break; } } while ( bPersist ); @@ -6890,7 +6955,7 @@ void HandlePipeUpdate ( PipeReader_t & tPipe, bool bFailure ) if ( bFailure ) return; // silently ignore errors - ++g_iUpdateTag; + ++g_pFlush->m_iUpdateTag; int iUpdIndexes = tPipe.GetInt (); for ( int i=0; im_iUpdateTag = g_iUpdateTag; + pServed->m_iUpdateTag = g_pFlush->m_iUpdateTag; pServed->m_pIndex->m_uAttrsStatus |= uStatus; } else sphWarning ( "INTERNAL ERROR: unknown index '%s' in HandlePipeUpdate()", sIndex.cstr() ); @@ -7015,7 +7080,7 @@ void HandlePipePreread ( PipeReader_t & tPipe, bool bFailure ) void HandlePipeSave ( PipeReader_t & tPipe, bool bFailure ) { // in any case, we're no more flushing - g_bFlushing = false; + g_pFlush->m_bFlushing = false; // silently ignore errors if ( bFailure ) @@ -7033,7 +7098,7 @@ void HandlePipeSave ( PipeReader_t & tPipe, bool bFailure ) ServedIndex_t * pServed = g_hIndexes(sIndex); if ( pServed ) { - if ( pServed->m_iUpdateTag<=g_iFlushTag ) + if ( pServed->m_iUpdateTag<=g_pFlush->m_iFlushTag ) pServed->m_pIndex->m_uAttrsStatus = 0; } else { @@ -7639,16 +7704,24 @@ void CheckReopen () void CheckFlush () { - if ( g_iAttrFlushPeriod<=0 || g_bFlushing ) + if ( g_iAttrFlushPeriod<=0 || g_pFlush->m_bFlushing ) return; - static int64_t tmLastCheck = -1000; - int64_t tmNow = sphMicroTimer(); + // do a periodic check, unless we have a forced check + if ( !g_pFlush->m_bForceCheck ) + { + static int64_t tmLastCheck = -1000; + int64_t tmNow = sphMicroTimer(); - if ( tmLastCheck + int64_t(g_iAttrFlushPeriod)*I64C(1000000) >= tmNow ) - return; + if ( tmLastCheck + int64_t(g_iAttrFlushPeriod)*I64C(1000000) >= tmNow ) + return; - tmLastCheck = tmNow; + tmLastCheck = tmNow; + sphLogDebug ( "attrflush: doing periodic check" ); + } else + { + sphLogDebug ( "attrflush: doing forced check" ); + } // check if there are dirty indexes bool bDirty = false; @@ -7656,21 +7729,35 @@ void CheckFlush () while ( g_hIndexes.IterateNext () ) { const ServedIndex_t & tServed = g_hIndexes.IterateGet (); - if ( tServed.m_bEnabled && tServed.m_iUpdateTag>g_iFlushTag ) + if ( tServed.m_bEnabled && tServed.m_iUpdateTag>g_pFlush->m_iFlushTag ) { bDirty = true; break; } } + + // need to set this before clearing check flag + if ( bDirty ) + g_pFlush->m_bFlushing = true; + + // if there was a forced check in progress, it no longer is + if ( g_pFlush->m_bForceCheck ) + g_pFlush->m_bForceCheck = false; + + // nothing to do, no indexes were updated if ( !bDirty ) + { + sphLogDebug ( "attrflush: no dirty indexes found" ); return; + } // launch the flush! - g_bFlushing = true; + sphLogDebug ( "attrflush: forking writer" ); + int iUpdateTag = g_pFlush->m_iUpdateTag; // avoid a race between forking writer and updating flush tag int iPipeFD = PipeAndFork ( false, SPH_PIPE_SAVED_ATTRS ); // FIXME! gracefully handle fork() failures, Windows, etc if ( g_bHeadDaemon ) { - g_iFlushTag = g_iUpdateTag; + g_pFlush->m_iFlushTag = iUpdateTag; return; } @@ -7681,7 +7768,7 @@ void CheckFlush () while ( g_hIndexes.IterateNext () ) { const ServedIndex_t & tServed = g_hIndexes.IterateGet (); - if ( tServed.m_bEnabled && tServed.m_iUpdateTag>g_iFlushTag ) + if ( tServed.m_bEnabled && tServed.m_iUpdateTag > g_pFlush->m_iFlushTag ) if ( tServed.m_pIndex->SaveAttributes () ) // FIXME? report errors somehow? dSaved.Add ( g_hIndexes.IterateGetKey() ); } @@ -7910,6 +7997,7 @@ void ShowHelp () "-l, --listen \tlisten on given address, port or path (overrides\n" "\t\t\tconfig settings)\n" "-i, --index \tonly serve one given index\n" + "--logdebug\t\tenable additional debug information logging\n" #if !USE_WINDOWS "--nodetach\t\tdo not detach into background\n" #endif @@ -8409,6 +8497,17 @@ void TickHead ( CSphProcessSharedMutex * pAcceptMutex ) } +void InitSharedBuffer ( CSphSharedBuffer & tBuffer, void ** ppBuffer, int iLen ) +{ + CSphString sError, sWarning; + if ( !tBuffer.Alloc ( iLen, sError, sWarning ) ) + sphDie ( "failed to allocate shared buffer (msg=%s)", sError.cstr() ); + + *ppBuffer = tBuffer.GetWritePtr(); // FIXME? should be ok even on strange platforms but.. + memset ( *ppBuffer, 0, iLen ); // reset +} + + int WINAPI ServiceMain ( int argc, char **argv ) { g_bLogTty = isatty ( g_iLogFile )!=0; @@ -8487,6 +8586,7 @@ int WINAPI ServiceMain ( int argc, char **argv ) #else OPT1 ( "--nodetach" ) g_bOptNoDetach = true; #endif + OPT1 ( "--logdebug" ) g_eLogLevel = LOG_DEBUG; // handle 1-arg options else if ( (i+1)>=argc ) break; @@ -8806,21 +8906,13 @@ int WINAPI ServiceMain ( int argc, char **argv ) } } - ///////////////////////// - // perf counters startup - ///////////////////////// + ////////////////////////////////////////////////// + // shared stuff (perf counters, flushing) startup + ////////////////////////////////////////////////// - CSphString sError, sWarning; - if ( !g_tStatsBuffer.Alloc ( sizeof(SearchdStats_t), sError, sWarning ) ) - { - sphWarning ( "performance counters disabled (alloc error: %s)", sError.cstr() ); - g_pStats = NULL; - } else - { - g_pStats = (SearchdStats_t*) g_tStatsBuffer.GetWritePtr(); // FIXME? should be ok even on strange platforms but.. - memset ( g_pStats, 0, sizeof(SearchdStats_t) ); // reset - g_pStats->m_uStarted = (DWORD)time(NULL); - } + InitSharedBuffer ( g_tStatsBuffer, (void**)&g_pStats, sizeof(SearchdStats_t) ); + InitSharedBuffer ( g_tFlushBuffer, (void**)&g_pFlush, sizeof(FlushState_t) ); + g_pStats->m_uStarted = (DWORD)time(NULL); //////////////////// // network startup