2
// $Id: searchd.cpp 3017 2011-11-15 22:22:49Z shodan $
2
// $Id: searchd.cpp 3127 2012-03-01 00:26:39Z shodan $
6
// Copyright (c) 2001-2011, Andrew Aksyonoff
7
// Copyright (c) 2008-2011, Sphinx Technologies Inc
6
// Copyright (c) 2001-2012, Andrew Aksyonoff
7
// Copyright (c) 2008-2012, Sphinx Technologies Inc
8
8
// All rights reserved
10
10
// This program is free software; you can redistribute it and/or modify
191
192
void SetupTLS ();
194
CrashQuery_t m_tQuery;
196
static SphThreadKey_t m_tLastQueryTLS; // last query ( non threaded workers could use dist_threads too )
195
CrashQuery_t m_tQuery; // per thread copy of last query for thread mode
196
static CrashQuery_t m_tForkQuery; // copy of last query for fork / prefork modes
197
static SphThreadKey_t m_tLastQueryTLS; // last query ( non threaded workers could use dist_threads too )
572
573
//////////////////////////////////////////////////////////////////////////
575
/// available uservar types
581
/// uservar name to value binding
582
CSphVector<SphAttr_t> * m_pVal;
585
UservarIntSet_c * m_pVal;
588
: m_eType ( USERVAR_INT_SET )
585
593
static CSphStaticMutex g_tUservarsMutex;
789
797
// hence, we also need to acquire a lock on entry, and an exclusive one
791
799
bool bRes = false;
792
ServedIndex_t * pEntry = GetWlockedEntry ( tKey );
800
ServedIndex_t * pEntry = BASE::operator() ( tKey );
795
804
pEntry->Unlock();
796
805
bRes = BASE::Delete ( tKey );
1276
1285
// tell rotation thread to shutdown, and wait until it does
1277
1286
g_bRotateShutdown = true;
1278
sphThreadJoin ( &g_tRotateThread );
1287
if ( g_bSeamlessRotate )
1289
sphThreadJoin ( &g_tRotateThread );
1279
1291
g_tRotateQueueMutex.Done();
1280
1292
g_tRotateConfigMutex.Done();
1526
1538
static char g_sMinidump[SPH_TIME_PID_MAX_SIZE] = "";
1541
CrashQuery_t SphCrashLogger_c::m_tForkQuery = CrashQuery_t();
1529
1542
SphThreadKey_t SphCrashLogger_c::m_tLastQueryTLS = SphThreadKey_t ();
1531
1544
void SphCrashLogger_c::Init ()
1690
1704
CrashQuery_t SphCrashLogger_c::GetQuery()
1692
1706
SphCrashLogger_c * pCrashLogger = (SphCrashLogger_c *)sphThreadGet ( m_tLastQueryTLS );
1693
return pCrashLogger ? pCrashLogger->m_tQuery : CrashQuery_t();
1707
return pCrashLogger ? pCrashLogger->m_tQuery : m_tForkQuery;
2239
2253
explicit NetOutputBuffer_c ( int iSock );
2241
2255
bool SendInt ( int iValue ) { return SendT<int> ( htonl ( iValue ) ); }
2256
bool SendAsDword ( int64_t iValue ) ///< sends the 32bit MAX_UINT if the value is greater than it.
2259
return SendDword ( 0 );
2260
if ( iValue > UINT_MAX )
2261
return SendDword ( UINT_MAX );
2262
return SendDword ( DWORD(iValue) );
2242
2264
bool SendDword ( DWORD iValue ) { return SendT<DWORD> ( htonl ( iValue ) ); }
2243
2265
bool SendLSBDword ( DWORD v ) { SendByte ( (BYTE)( v&0xff ) ); SendByte ( (BYTE)( (v>>8)&0xff ) ); SendByte ( (BYTE)( (v>>16)&0xff ) ); return SendByte ( (BYTE)( (v>>24)&0xff) ); }
2244
2266
bool SendWord ( WORD iValue ) { return SendT<WORD> ( htons ( iValue ) ); }
3251
3273
// send request
3252
3274
NetOutputBuffer_c tOut ( tAgent.m_iSock );
3253
3275
tBuilder.BuildRequest ( tAgent.m_sIndexes.cstr(), tOut, i );
3254
tOut.Flush (); // FIXME! handle flush failure?
3276
bool bFlushed = tOut.Flush (); // FIXME! handle flush failure?
3280
if ( bFlushed && tAgent.m_iFamily==AF_INET )
3281
setsockopt ( tAgent.m_iSock, IPPROTO_TCP, TCP_NODELAY, (char*)&iDisable, sizeof(iDisable) );
3256
3284
tAgent.m_eState = AGENT_QUERY;
3445
3473
} else if ( tAgent.m_iReplyStatus==SEARCHD_RETRY )
3447
3475
tAgent.m_eState = AGENT_RETRY;
3476
CSphString sAgentError = tReq.GetString ();
3477
tAgent.m_sFailure.SetSprintf ( "remote warning: %s", sAgentError.cstr() );
3450
3480
} else if ( tAgent.m_iReplyStatus!=SEARCHD_OK )
3554
3584
+ q.m_sGroupDistinct.Length()
3555
3585
+ q.m_sComment.Length()
3556
3586
+ q.m_sSelect.Length();
3557
if ( !q.m_bAgent ) // send the magic to agent ("*,*," + real query)
3558
iReqSize += q.m_sSelect.IsEmpty() ? 3 : 4;
3559
3587
iReqSize += q.m_sRawQuery.IsEmpty()
3560
3588
? q.m_sQuery.Length()
3561
3589
: q.m_sRawQuery.Length();
3590
if ( q.m_eRanker==SPH_RANK_EXPR )
3591
iReqSize += q.m_sRankerExpr.Length() + 4;
3562
3592
ARRAY_FOREACH ( j, q.m_dFilters )
3564
3594
const CSphFilterSettings & tFilter = q.m_dFilters[j];
3589
3619
tOut.SendInt ( q.m_iMaxMatches ); // limit is MAX_MATCHES
3590
3620
tOut.SendInt ( (DWORD)q.m_eMode ); // match mode
3591
3621
tOut.SendInt ( (DWORD)q.m_eRanker ); // ranking mode
3622
if ( q.m_eRanker==SPH_RANK_EXPR )
3623
tOut.SendString ( q.m_sRankerExpr.cstr() );
3592
3624
tOut.SendInt ( q.m_eSort ); // sort mode
3593
3625
tOut.SendString ( q.m_sSortBy.cstr() ); // sort attr
3594
3626
if ( q.m_sRawQuery.IsEmpty() )
3681
tOut.SendString ( q.m_sSelect.cstr() );
3684
int iLen = q.m_sSelect.Length();
3687
tOut.SendString ( "*,*" );
3690
// this was a fun subtle issue
3691
// SetSprintf() uses a static 1K buffer...
3692
tOut.SendInt ( iLen+4 );
3693
tOut.SendBytes ( "*,*,", 4 );
3694
tOut.SendBytes ( q.m_sSelect.cstr(), iLen );
3711
tOut.SendString ( q.m_sSelect.cstr() );
3698
3712
// master v.1.0
3699
3713
tOut.SendDword ( q.m_eCollation );
3846
3860
// read totals (retrieved count, total count, query time, word count)
3847
3861
int iRetrieved = tReq.GetInt ();
3848
tRes.m_iTotalMatches = tReq.GetInt ();
3862
tRes.m_iTotalMatches = (unsigned int)tReq.GetInt ();
3849
3863
tRes.m_iQueryTime = tReq.GetInt ();
3850
3864
const int iWordsCount = tReq.GetInt (); // FIXME! sanity check?
3851
3865
if ( iRetrieved!=iMatches )
3858
3872
for ( int i=0; i<iWordsCount; i++ )
3860
3874
const CSphString sWord = tReq.GetString ();
3861
const int iDocs = tReq.GetInt ();
3862
const int iHits = tReq.GetInt ();
3875
const int64_t iDocs = (unsigned int)tReq.GetInt ();
3876
const int64_t iHits = (unsigned int)tReq.GetInt ();
3863
3877
const bool bExpanded = ( tReq.GetByte()!=0 ); // agents always send expanded flag to master
3865
3879
tRes.AddStat ( sWord, iDocs, iHits, bExpanded );
4019
void ParseIndexList ( const CSphString & sIndexes, CSphVector<CSphNamedInt> & dOut )
4033
void ParseIndexList ( const CSphString & sIndexes, CSphVector<CSphString> & dOut )
4021
4035
CSphString sSplit = sIndexes;
4022
4036
char * p = (char*)sSplit.cstr();
4118
4132
while ( ( c = *szQuery++ )!=0 )
4120
4134
// must be in sync with EscapeString (php api)
4121
if ( c=='(' || c==')' || c=='|' || c=='-' || c=='!' || c=='@' || c=='~' || c=='\"' || c=='&' || c=='/' || c=='<' || c=='\\' )
4135
const char sMagics[] = "<\\()|-!@~\"&/^$=";
4136
for ( const char * s = sMagics; *s; s++ )
4122
4139
*szRes++ = '\\';
4417
4436
if ( iVer>=0x116 )
4419
CSphString sRawSelect = tReq.GetString ();
4420
CSphString sSelect = "";
4438
tQuery.m_sSelect = tReq.GetString ();
4439
tQuery.m_bAgent = ( iMasterVer>0 );
4421
4440
CSphString sError;
4422
bool bAgent = false;
4423
if ( sRawSelect.Begins ( "*,*" ) ) // this is the mark of agent.
4426
if ( sRawSelect.Length()>3 )
4427
sSelect = sRawSelect.SubString ( 4, sRawSelect.Length()-4 );
4430
tQuery.m_sSelect = bAgent?sSelect:sRawSelect;
4431
4441
if ( !tQuery.ParseSelectList ( sError ) )
4433
4443
tReq.SendErrorReply ( "select: %s", sError.cstr() );
4438
tQuery.m_sSelect = sRawSelect;
4439
tQuery.m_bAgent = true;
4443
4448
// master v.1.0
4510
4515
// [matchmode/numfilters/sortmode matches (offset,limit)
4511
4516
static const char * sModes [ SPH_MATCH_TOTAL ] = { "all", "any", "phr", "bool", "ext", "scan", "ext2" };
4512
4517
static const char * sSort [ SPH_SORT_TOTAL ] = { "rel", "attr-", "attr+", "tsegs", "ext", "expr" };
4513
p += snprintf ( p, pMax-p, " [%s/%d/%s %d (%d,%d)",
4518
p += snprintf ( p, pMax-p, " [%s/%d/%s "INT64_FMT" (%d,%d)",
4514
4519
sModes [ tQuery.m_eMode ], tQuery.m_dFilters.GetLength(), sSort [ tQuery.m_eSort ],
4515
4520
tRes.m_iTotalMatches, tQuery.m_iOffset, tQuery.m_iLimit );
4746
4751
tBuf.Append ( "/""* " );
4747
4752
tBuf.AppendCurrentTime();
4748
4753
if ( tRes.m_iMultiplier>1 )
4749
tBuf.Append ( " conn %d wall %d.%03d x%d found %d *""/ ",
4754
tBuf.Append ( " conn %d wall %d.%03d x%d found "INT64_FMT" *""/ ",
4750
4755
iCid, iQueryTime/1000, iQueryTime%1000, tRes.m_iMultiplier, tRes.m_iTotalMatches );
4752
tBuf.Append ( " conn %d wall %d.%03d found %d *""/ ",
4757
tBuf.Append ( " conn %d wall %d.%03d found "INT64_FMT" *""/ ",
4753
4758
iCid, iQueryTime/1000, iQueryTime%1000, tRes.m_iTotalMatches );
4755
4760
///////////////////////////////////
4971
4976
//////////////////////////////////////////////////////////////////////////
4978
// internals attributes are last no need to send them
4979
static int SendGetAttrCount ( const CSphSchema & tSchema )
4981
int iCount = tSchema.GetAttrsCount();
4983
&& sphIsSortStringInternal ( tSchema.GetAttr ( iCount-1 ).m_sName.cstr() ) )
4985
for ( int i=iCount-1; i>=0 && sphIsSortStringInternal ( tSchema.GetAttr(i).m_sName.cstr() ); i-- )
4973
4995
int CalcResultLength ( int iVer, const CSphQueryResult * pRes, const CSphVector<PoolPtrs_t> & dTag2Pools, bool bExtendedStat )
4975
4997
int iRespLen = 0;
4997
5019
iRespLen += 20;
5021
int iAttrsCount = SendGetAttrCount ( pRes->m_tSchema );
5000
5024
if ( iVer>=0x102 )
5002
5026
iRespLen += 8; // 4 for field count, 4 for attr count
5003
5027
ARRAY_FOREACH ( i, pRes->m_tSchema.m_dFields )
5004
5028
iRespLen += 4 + strlen ( pRes->m_tSchema.m_dFields[i].m_sName.cstr() ); // namelen, name
5005
for ( int i=0; i<pRes->m_tSchema.GetAttrsCount(); i++ )
5029
for ( int i=0; i<iAttrsCount; i++ )
5006
5030
iRespLen += 8 + strlen ( pRes->m_tSchema.GetAttr(i).m_sName.cstr() ); // namelen, name, type
5010
5034
if ( iVer<0x102 )
5011
5035
iRespLen += 16*pRes->m_iCount; // matches
5012
5036
else if ( iVer<0x108 )
5013
iRespLen += ( 8+4*pRes->m_tSchema.GetAttrsCount() )*pRes->m_iCount; // matches
5037
iRespLen += ( 8+4*iAttrsCount )*pRes->m_iCount; // matches
5015
iRespLen += 4 + ( 8+4*USE_64BIT+4*pRes->m_tSchema.GetAttrsCount() )*pRes->m_iCount; // id64 tag and matches
5039
iRespLen += 4 + ( 8+4*USE_64BIT+4*iAttrsCount )*pRes->m_iCount; // id64 tag and matches
5017
5041
if ( iVer>=0x114 )
5019
5043
// 64bit matches
5020
5044
int iWideAttrs = 0;
5021
for ( int i=0; i<pRes->m_tSchema.GetAttrsCount(); i++ )
5045
for ( int i=0; i<iAttrsCount; i++ )
5022
5046
if ( pRes->m_tSchema.GetAttr(i).m_eAttrType==SPH_ATTR_BIGINT )
5024
5048
iRespLen += 4*pRes->m_iCount*iWideAttrs; // extra 4 bytes per attr per match
5035
5059
// MVA and string values
5036
5060
CSphVector<CSphAttrLocator> dMvaItems;
5037
5061
CSphVector<CSphAttrLocator> dStringItems;
5038
for ( int i=0; i<pRes->m_tSchema.GetAttrsCount(); i++ )
5062
for ( int i=0; i<iAttrsCount; i++ )
5040
5064
const CSphColumnInfo & tCol = pRes->m_tSchema.GetAttr(i);
5041
5065
if ( tCol.m_eAttrType==SPH_ATTR_UINT32SET || tCol.m_eAttrType==SPH_ATTR_UINT64SET )
5122
5148
ARRAY_FOREACH ( i, pRes->m_tSchema.m_dFields )
5123
5149
tOut.SendString ( pRes->m_tSchema.m_dFields[i].m_sName.cstr() );
5125
tOut.SendInt ( pRes->m_tSchema.GetAttrsCount() );
5126
for ( int i=0; i<pRes->m_tSchema.GetAttrsCount(); i++ )
5151
tOut.SendInt ( iAttrsCount );
5152
for ( int i=0; i<iAttrsCount; i++ )
5128
5154
const CSphColumnInfo & tCol = pRes->m_tSchema.GetAttr(i);
5129
5155
tOut.SendString ( tCol.m_sName.cstr() );
5180
5206
assert ( !tMatch.m_pDynamic || (int)tMatch.m_pDynamic[-1]==pRes->m_tSchema.GetDynamicSize() );
5183
for ( int j=0; j<pRes->m_tSchema.GetAttrsCount(); j++ )
5209
for ( int j=0; j<iAttrsCount; j++ )
5185
5211
const CSphColumnInfo & tAttr = pRes->m_tSchema.GetAttr(j);
5186
5212
if ( tAttr.m_eAttrType==SPH_ATTR_UINT32SET || tAttr.m_eAttrType==SPH_ATTR_UINT64SET )
5254
5280
tOut.SendInt ( pRes->m_dMatches.GetLength() );
5255
tOut.SendInt ( pRes->m_iTotalMatches );
5281
tOut.SendAsDword ( pRes->m_iTotalMatches );
5256
5282
tOut.SendInt ( Max ( pRes->m_iQueryTime, 0 ) );
5257
5283
tOut.SendInt ( pRes->m_hWordStats.GetLength() );
5262
5288
const CSphQueryResultMeta::WordStat_t & tStat = pRes->m_hWordStats.IterateGet();
5263
5289
tOut.SendString ( pRes->m_hWordStats.IterateGetKey().cstr() );
5264
tOut.SendInt ( tStat.m_iDocs );
5265
tOut.SendInt ( tStat.m_iHits );
5290
tOut.SendAsDword ( tStat.m_iDocs );
5291
tOut.SendAsDword ( tStat.m_iHits );
5266
5292
if ( bExtendedStat )
5267
5293
tOut.SendByte ( tStat.m_bExpanded );
5373
5399
int iLimit = bMultiSchema
5374
? ( iCur + pRes->m_dMatchCounts[iSchema] )
5375
: pRes->m_iTotalMatches;
5376
iLimit = Min ( iLimit, pRes->m_dMatches.GetLength() );
5400
? (int)Min ( iCur + pRes->m_dMatchCounts[iSchema], pRes->m_dMatches.GetLength() )
5401
: (int)Min ( pRes->m_iTotalMatches, pRes->m_dMatches.GetLength() );
5377
5402
for ( int i=iCur; i<iLimit; i++ )
5379
5404
CSphMatch & tMatch = pRes->m_dMatches[i];
6084
6109
void RunLocalSearches ( ISphMatchSorter * pLocalSorter, const char * sDistName );
6085
6110
void RunLocalSearchesMT ();
6086
6111
bool RunLocalSearch ( int iLocal, ISphMatchSorter ** ppSorters, CSphQueryResult ** pResults ) const;
6112
bool HasExpresions ( int iStart, int iEnd ) const;
6088
6114
CSphVector<DWORD> m_dMvaStorage;
6089
6115
CSphVector<BYTE> m_dStringsStorage;
6091
6117
int m_iStart; ///< subset start
6092
6118
int m_iEnd; ///< subset end
6093
6119
bool m_bMultiQueue; ///< whether current subset is subject to multi-queue optimization
6094
mutable CSphVector<CSphNamedInt> m_dLocal; ///< local indexes for the current subset
6120
CSphVector<CSphString> m_dLocal; ///< local indexes for the current subset
6095
6121
mutable CSphVector<CSphSchemaMT> m_dExtraSchemas; ///< the extra fields for agents
6096
mutable CSphMutex m_tLock;
6097
6122
bool m_bSphinxql; ///< if the query get from sphinxql - to avoid applying sphinxql magick for others
6098
6123
CSphAttrUpdateEx * m_pUpdates; ///< holder for updates
6125
mutable CSphMutex m_tLock;
6126
mutable SmallStringHash_T<int> m_hUsed;
6100
6128
const ServedIndex_t * UseIndex ( int iLocal ) const;
6101
6129
void ReleaseIndex ( int iLocal ) const;
6134
6162
SearchHandler_c::~SearchHandler_c ()
6136
6164
m_tLock.Done();
6137
ARRAY_FOREACH ( i, m_dLocal )
6165
m_hUsed.IterateStart();
6166
while ( m_hUsed.IterateNext() )
6139
if ( m_dLocal[i].m_iValue>0 )
6140
g_pIndexes->GetUnlockedEntry ( m_dLocal[i].m_sName ).Unlock();
6168
if ( m_hUsed.IterateGet()>0 )
6169
g_pIndexes->GetUnlockedEntry ( m_hUsed.IterateGetKey() ).Unlock();
6145
6174
const ServedIndex_t * SearchHandler_c::UseIndex ( int iLocal ) const
6147
6176
assert ( iLocal>=0 && iLocal<m_dLocal.GetLength() );
6177
const CSphString & sName = m_dLocal[iLocal];
6148
6178
if ( g_eWorkers!=MPM_THREADS )
6149
return g_pIndexes->GetRlockedEntry ( m_dLocal[iLocal].m_sName );
6179
return g_pIndexes->GetRlockedEntry ( sName );
6151
6181
m_tLock.Lock();
6153
int iUseCount = m_dLocal[iLocal].m_iValue;
6155
assert ( ( m_pUpdates && iUseCount>0 ) || !m_pUpdates );
6182
int * pUseCount = m_hUsed ( sName );
6183
assert ( ( m_pUpdates && pUseCount && *pUseCount>0 ) || !m_pUpdates );
6157
6185
const ServedIndex_t * pServed = NULL;
6159
pServed = &g_pIndexes->GetUnlockedEntry ( m_dLocal[iLocal].m_sName );
6161
pServed = g_pIndexes->GetRlockedEntry ( m_dLocal[iLocal].m_sName );
6163
m_dLocal[iLocal].m_iValue = iUseCount + (pServed!=NULL);
6186
if ( pUseCount && *pUseCount>0 )
6188
pServed = &g_pIndexes->GetUnlockedEntry ( sName );
6189
*pUseCount += ( pServed!=NULL );
6192
pServed = g_pIndexes->GetRlockedEntry ( sName );
6198
m_hUsed.Add ( 1, sName );
6165
6202
m_tLock.Unlock();
6167
6203
return pServed;
6174
6210
if ( g_eWorkers!=MPM_THREADS )
6213
const CSphString & sName = m_dLocal[iLocal];
6177
6214
m_tLock.Lock();
6179
int iUseCount = m_dLocal[iLocal].m_iValue - 1;
6180
assert ( iUseCount>=0 );
6181
m_dLocal[iLocal].m_iValue = iUseCount;
6184
g_pIndexes->GetUnlockedEntry ( m_dLocal[iLocal].m_sName ).Unlock();
6186
assert ( ( m_pUpdates && iUseCount>0 ) || !m_pUpdates );
6216
int * pUseCount = m_hUsed ( sName );
6217
assert ( pUseCount && *pUseCount>=0 );
6221
g_pIndexes->GetUnlockedEntry ( sName ).Unlock();
6223
assert ( ( m_pUpdates && pUseCount && *pUseCount ) || !m_pUpdates );
6188
6225
m_tLock.Unlock();
6196
6233
m_dQueries[0] = tQuery;
6197
6234
m_dQueries[0].m_sIndexes = sIndex;
6199
m_dLocal.Add().m_sName = sIndex;
6200
m_dLocal.Last().m_iValue = 1;
6236
// lets add index to prevent deadlock
6237
// as index already r-locker or w-locked at this point
6238
m_dLocal.Add ( sIndex );
6239
m_hUsed.Add ( 1, sIndex );
6202
6241
CheckQuery ( tQuery, *pUpdates->m_pError );
6203
6242
if ( !pUpdates->m_pError->IsEmpty() )
6474
6513
ARRAY_FOREACH ( iLocal, dLocals )
6476
6515
bool bResult = dLocals[iLocal].m_bResult;
6477
const char * sLocal = m_dLocal[iLocal].m_sName.cstr();
6516
const char * sLocal = m_dLocal[iLocal].cstr();
6479
6518
if ( !bResult )
6640
6679
CSphVector <int> dLocked;
6641
6680
ARRAY_FOREACH ( iLocal, m_dLocal )
6643
const char * sLocal = m_dLocal[iLocal].m_sName.cstr();
6682
const char * sLocal = m_dLocal[iLocal].cstr();
6645
6684
const ServedIndex_t * pServed = UseIndex ( iLocal );
6646
6685
if ( !pServed )
6808
6847
// check expressions into a query to make sure that it's ready for multi query optimization
6809
static bool HasExpresions ( const CSphQuery & tQuery, const CSphVector<CSphNamedInt>& m_dIndices )
6848
bool SearchHandler_c::HasExpresions ( int iStart, int iEnd ) const
6811
ARRAY_FOREACH ( i, m_dIndices )
6850
ARRAY_FOREACH ( i, m_dLocal )
6813
const ServedIndex_t * pServedIndex = g_pIndexes->GetRlockedEntry ( m_dIndices[i].m_sName );
6852
const ServedIndex_t * pServedIndex = UseIndex ( i );
6815
6854
// check that it exists
6816
if ( !pServedIndex )
6855
if ( !pServedIndex || !pServedIndex->m_bEnabled )
6819
6862
bool bHasExpression = false;
6820
if ( pServedIndex->m_bEnabled )
6821
bHasExpression = sphHasExpressions ( tQuery, pServedIndex->m_pIndex->GetMatchSchema() );
6863
const CSphSchema & tSchema = pServedIndex->m_pIndex->GetMatchSchema();
6864
for ( int iCheck=iStart; iCheck<=iEnd && !bHasExpression; iCheck++ )
6865
bHasExpression = sphHasExpressions ( m_dQueries[iCheck], tSchema );
6823
pServedIndex->Unlock();
6825
6869
if ( bHasExpression )
6922
6966
// search through all local indexes
6923
6967
for ( IndexHashIterator_c it ( g_pIndexes ); it.Next(); )
6924
6968
if ( it.Get ().m_bEnabled )
6925
m_dLocal.Add().m_sName = it.GetKey();
6969
m_dLocal.Add ( it.GetKey() );
6928
6972
// search through specified local indexes
6944
6988
if ( iDistFound!=-1 )
6946
6990
for ( int iRes=iStart; iRes<=iEnd; iRes++ )
6947
m_dResults[iRes].m_sError.SetSprintf ( "distributed index '%s' in multi-index query found", m_dLocal[iDistFound].m_sName.cstr() );
6991
m_dResults[iRes].m_sError.SetSprintf ( "distributed index '%s' in multi-index query found", m_dLocal[iDistFound].cstr() );
6951
6995
ARRAY_FOREACH ( i, m_dLocal )
6953
const ServedIndex_t * pServedIndex = g_pIndexes->GetRlockedEntry ( m_dLocal[i].m_sName );
6997
const ServedIndex_t * pServedIndex = UseIndex ( i );
6955
6999
// check that it exists
6956
7000
if ( !pServedIndex )
6958
7002
for ( int iRes=iStart; iRes<=iEnd; iRes++ )
6959
m_dResults[iRes].m_sError.SetSprintf ( "unknown local index '%s' in search request", m_dLocal[i].m_sName.cstr() );
7003
m_dResults[iRes].m_sError.SetSprintf ( "unknown local index '%s' in search request", m_dLocal[i].cstr() );
7007
bool bEnabled = pServedIndex->m_bEnabled;
6963
7009
// if it exists but is not enabled, remove it from the list and force recheck
6964
if ( !pServedIndex->m_bEnabled )
6965
7011
m_dLocal.Remove ( i-- );
6967
pServedIndex->Unlock();
6981
7025
// copy local indexes list from distributed definition, but filter out disabled ones
6982
7026
ARRAY_FOREACH ( i, dDistLocal )
6984
const ServedIndex_t * pServedIndex = g_pIndexes->GetRlockedEntry ( dDistLocal[i] );
7028
int iDistLocal = m_dLocal.GetLength();
7029
m_dLocal.Add ( dDistLocal[i] );
7031
const ServedIndex_t * pServedIndex = UseIndex ( iDistLocal );
7032
bool bValidLocalIndex = pServedIndex && pServedIndex->m_bEnabled;
6985
7033
if ( pServedIndex )
6987
if ( pServedIndex->m_bEnabled )
6988
m_dLocal.Add().m_sName = dDistLocal[i];
7034
ReleaseIndex ( iDistLocal );
6990
pServedIndex->Unlock();
7036
if ( !bValidLocalIndex )
7002
7048
CSphString sError;
7004
// check if all schemas are equal
7050
// check if all schemes are equal
7005
7051
bool bAllEqual = true;
7007
const ServedIndex_t * pFirstIndex = g_pIndexes->GetRlockedEntry ( m_dLocal[0].m_sName );
7053
const ServedIndex_t * pFirstIndex = UseIndex ( 0 );
7008
7054
if ( !pFirstIndex )
7011
7057
const CSphSchema & tFirstSchema = pFirstIndex->m_pIndex->GetMatchSchema();
7012
7058
for ( int i=1; i<m_dLocal.GetLength() && bAllEqual; i++ )
7014
const ServedIndex_t * pNextIndex = g_pIndexes->GetRlockedEntry ( m_dLocal[i].m_sName );
7060
const ServedIndex_t * pNextIndex = UseIndex ( i );
7015
7061
if ( !pNextIndex )
7017
7063
bAllEqual = false;
7032
7078
pLocalSorter = sphCreateQueue ( &m_dQueries[iStart], tFirstSchema, sError, true, pExtraSchemaMT );
7035
pFirstIndex->Unlock ();
7039
7085
// select lists must have no expressions
7040
for ( int iCheck=iStart; iCheck<=iEnd && m_bMultiQueue; iCheck++ )
7086
if ( m_bMultiQueue )
7042
m_bMultiQueue = !HasExpresions ( m_dQueries[iCheck], m_dLocal );
7088
m_bMultiQueue = !HasExpresions ( iStart, iEnd );
7045
7091
// these are mutual exclusive
7221
7267
tRes.m_tSchema = tRes.m_dSchemas[0];
7222
7268
if ( tRes.m_iSuccesses>1 || tQuery.m_dItems.GetLength() )
7224
if ( g_bCompatResults )
7270
if ( g_bCompatResults && !tQuery.m_bAgent )
7226
7272
if ( !MinimizeAggrResultCompat ( tRes, tQuery, m_dLocal.GetLength()!=0 ) )
7250
7296
tRes.m_iCount = Max ( Min ( tQuery.m_iLimit, tRes.m_dMatches.GetLength()-tQuery.m_iOffset ), 0 );
7253
// remove internal attributes from result schema
7254
ARRAY_FOREACH ( i, m_dResults )
7255
sphSortRemoveInternalAttrs ( m_dResults[i].m_tSchema );
7258
7300
tmSubset = sphMicroTimer() - tmSubset;
7259
7301
tmCpu = sphCpuTimer() - tmCpu;
8054
8097
void SqlParser_c::UpdateAttr ( const CSphString& sName, const SqlNode_t * pValue, ESphAttr eType )
8099
assert ( eType==SPH_ATTR_FLOAT || eType==SPH_ATTR_INTEGER || eType==SPH_ATTR_BIGINT );
8056
8100
if ( eType==SPH_ATTR_FLOAT )
8057
8102
m_pStmt->m_tUpdate.m_dPool.Add ( *(const DWORD*)( &pValue->m_fValue ) );
8058
else // default: if ( eType==SPH_ATTR_INTEGER )
8104
} else if ( eType==SPH_ATTR_INTEGER || eType==SPH_ATTR_BIGINT )
8060
8106
m_pStmt->m_tUpdate.m_dPool.Add ( (DWORD) pValue->m_iValue );
8061
8107
DWORD uHi = (DWORD) ( pValue->m_iValue>>32 );
8068
8114
AddUpdatedAttr ( sName, eType );
8071
void SqlParser_c::UpdateMVAAttr ( const CSphString& sName, const SqlNode_t& dValues )
8117
void SqlParser_c::UpdateMVAAttr ( const CSphString & sName, const SqlNode_t & dValues )
8073
8119
CSphAttrUpdate & tUpd = m_pStmt->m_tUpdate;
8074
assert ( dValues.m_pValues.Ptr() && dValues.m_pValues->GetLength()>0 );
8075
dValues.m_pValues->Uniq(); // don't need dupes within MVA
8076
tUpd.m_dPool.Add ( dValues.m_pValues->GetLength()*2 );
8077
SphAttr_t * pVal = dValues.m_pValues.Ptr()->Begin();
8078
SphAttr_t * pValMax = pVal + dValues.m_pValues->GetLength();
8079
8120
ESphAttr eType = SPH_ATTR_UINT32SET;
8080
for ( ;pVal<pValMax; pVal++ )
8122
if ( dValues.m_pValues.Ptr() && dValues.m_pValues->GetLength()>0 )
8082
SphAttr_t uVal = *pVal;
8083
if ( uVal>UINT_MAX )
8124
// got MVA values, let's process them
8125
dValues.m_pValues->Uniq(); // don't need dupes within MVA
8126
tUpd.m_dPool.Add ( dValues.m_pValues->GetLength()*2 );
8127
SphAttr_t * pVal = dValues.m_pValues.Ptr()->Begin();
8128
SphAttr_t * pValMax = pVal + dValues.m_pValues->GetLength();
8129
for ( ;pVal<pValMax; pVal++ )
8085
eType = SPH_ATTR_UINT64SET;
8131
SphAttr_t uVal = *pVal;
8132
if ( uVal>UINT_MAX )
8134
eType = SPH_ATTR_UINT64SET;
8136
tUpd.m_dPool.Add ( (DWORD)uVal );
8137
tUpd.m_dPool.Add ( (DWORD)( uVal>>32 ) );
8087
tUpd.m_dPool.Add ( (DWORD)uVal );
8088
tUpd.m_dPool.Add ( (DWORD)( uVal>>32 ) );
8141
// no values, means we should delete the attribute
8142
// we signal that to the update code by putting a single zero
8143
// to the values pool (meaning a zero-length MVA values list)
8144
tUpd.m_dPool.Add ( 0 );
8090
8147
AddUpdatedAttr ( sName, eType );
8127
8184
bool SqlParser_c::AddUservarFilter ( const CSphString & sCol, const CSphString & sVar, bool bExclude )
8129
g_tUservarsMutex.Lock();
8186
CSphScopedLock<CSphStaticMutex> tLock ( g_tUservarsMutex );
8130
8187
Uservar_t * pVar = g_hUservars ( sVar );
8133
g_tUservarsMutex.Unlock();
8134
8190
yyerror ( this, "undefined global variable in IN clause" );
8142
8198
pFilter->m_bExclude = bExclude;
8144
// INT_SET uservars must get sorted on SET once
8145
// FIXME? maybe we should do a readlock instead of copying?
8146
pFilter->m_dValues = *pVar->m_pVal;
8147
g_tUservarsMutex.Unlock();
8200
// tricky black magic
8201
// we want to avoid copying the data, hence external values in the filter
8202
// we need to guarantee the data (uservar value) lifetime, then
8203
// suddenly, enter mutex-protected refcounted value objects
8204
// suddenly, we need to track those values in the statement object, too
8205
assert ( pVar->m_pVal );
8206
CSphRefcountedPtr<UservarIntSet_c> & tRef = m_pStmt->m_dRefs.Add();
8207
tRef = pVar->m_pVal; // take over semantics, and thus NO (!) automatic addref
8208
pVar->m_pVal->AddRef(); // so do that addref manually
8209
pFilter->SetExternalValues ( pVar->m_pVal->Begin(), pVar->m_pVal->GetLength() );
8222
8284
tQuery.m_sSelect.SetBinary ( tParser.m_pBuf + tQuery.m_iSQLSelectStart,
8223
8285
tQuery.m_iSQLSelectEnd - tQuery.m_iSQLSelectStart );
8225
// finally check for agent magic
8226
if ( tQuery.m_sSelect.Begins ( "*,*" ) ) // this is the mark of agent.
8228
tQuery.m_dItems.Remove(0);
8229
tQuery.m_dItems.Remove(0);
8230
tQuery.m_bAgent = true;
8448
8507
+ q.m_sRawPassageBoundary.Length();
8450
8509
m_iNumDocs = 0;
8451
for ( int iDoc = tWorker.m_iHead; iDoc!=-1; iDoc=dQueries[iDoc].m_iNext )
8510
for ( int iDoc = tWorker.m_iHead; iDoc!=EOF_ITEM; iDoc=dQueries[iDoc].m_iNext )
8454
8513
m_iReqLen += 4 + dQueries[iDoc].m_sSource.Length();
8463
8522
tOut.SendInt ( m_iReqLen );
8465
8524
tOut.SendInt ( 0 );
8466
tOut.SendInt ( q.m_iRawFlags );
8527
tOut.SendInt ( q.m_iRawFlags & ~EXCERPT_FLAG_LOAD_FILES );
8467
8529
tOut.SendString ( sIndex );
8468
8530
tOut.SendString ( q.m_sWords.cstr() );
8469
8531
tOut.SendString ( q.m_sBeforeMatch.cstr() );
8479
8541
tOut.SendString ( q.m_sRawPassageBoundary.cstr() );
8481
8543
tOut.SendInt ( m_iNumDocs );
8482
for ( int iDoc = tWorker.m_iHead; iDoc!=-1; iDoc=dQueries[iDoc].m_iNext )
8544
for ( int iDoc = tWorker.m_iHead; iDoc!=EOF_ITEM; iDoc=dQueries[iDoc].m_iNext )
8483
8545
tOut.SendString ( dQueries[iDoc].m_sSource.cstr() );
8507
8569
if ( strcmp ( sRes, dQueries[iDoc].m_sRes )!=0 )
8509
8571
SafeDelete ( dQueries[iDoc].m_sRes );
8573
dQueries[iDoc].m_sError = "";
8511
8574
dQueries[iDoc].m_sRes = sRes;
8517
8580
dQueries[iDoc].m_sRes = tReq.GetString().Leak();
8518
iDoc = dQueries[iDoc].m_iNext;
8519
dQueries[iDoc].m_iNext = -2; // mark as processed
8581
int iNextDoc = dQueries[iDoc].m_iNext;
8582
dQueries[iDoc].m_iNext = PROCESSED_ITEM;
8696
8760
bool bDone = ( *pDesc->m_pCurQuery==pDesc->m_iQueries );
8697
8761
pDesc->m_pLock->Unlock();
8699
if ( pQuery->m_iNext!=-2 )
8763
if ( pQuery->m_iNext!=PROCESSED_ITEM )
8702
8766
pQuery->m_sRes = sphBuildExcerpt ( *pQuery, tCtx.m_pDict, tCtx.m_tTokenizer.Ptr(),
8735
8799
g_tDistLock.Lock();
8736
8800
DistributedIndex_t * pDist = g_hDistIndexes ( sIndex );
8737
8801
bool bRemote = pDist!=NULL;
8803
// hack! load_files && load_files_scattered is the 'final' call. It will report the absent files as errors.
8804
// simple load_files_scattered without load_files just omits the absent files (returns empty strings).
8738
8805
bool bScattered = ( q.m_iLoadFiles & 2 )!=0;
8806
bool bSkipAbsentFiles = !( q.m_iLoadFiles & 1 );
8811
8879
// get file sizes
8812
8880
ARRAY_FOREACH ( i, dQueries )
8814
dQueries[i].m_iNext = -2;
8882
dQueries[i].m_iNext = PROCESSED_ITEM;
8815
8883
if ( dQueries[i].m_iLoadFiles )
8817
8885
struct stat st;
8836
8904
// tough jobs first
8837
dQueries.Sort ( bind ( &ExcerptQuery_t::m_iSize ) );
8906
dQueries.Sort ( bind ( &ExcerptQuery_t::m_iSize ) );
8839
8908
ARRAY_FOREACH ( i, dQueries )
8840
if ( dQueries[i].m_iNext==-1 )
8909
if ( dQueries[i].m_iNext==EOF_ITEM )
8842
8911
dQueries[i].m_iNext = iAbsentHead;
8843
8912
iAbsentHead = i;
8844
dQueries[i].m_sError.SetSprintf ( "failed to stat %s: %s", dQueries[i].m_sSource.cstr(), strerror(errno) );
8913
if ( !bSkipAbsentFiles )
8914
dQueries[i].m_sError.SetSprintf ( "failed to stat %s: %s", dQueries[i].m_sSource.cstr(), strerror(errno) );
8849
8919
// check if all files are available locally.
8850
if ( bScattered && iAbsentHead==-1 )
8920
if ( bScattered && iAbsentHead==EOF_ITEM )
8852
8922
bRemote = false;
8853
8923
dRemoteSnippets.m_dAgents.Reset();
8868
8938
if ( bScattered )
8870
// on scattered case - the queries with m_iNext==-2 are here, and has to be scheduled to local agent
8940
// on scattered case - the queries with m_iNext==PROCESSED_ITEM are here, and has to be scheduled to local agent
8871
8941
// the rest has to be sent to remotes, all of them!
8872
8942
for ( int i=0; i<iRemoteAgents; i++ )
8873
8943
dRemoteSnippets.m_dWorkers[iLocalPart+i].m_iHead = iAbsentHead;
8878
8948
dRemoteSnippets.m_dWorkers[0].m_iTotal -= dQueries[i].m_iSize;
8879
8949
if ( !dRemoteSnippets.m_dWorkers[0].m_bLocal )
8881
// queries sheduled for local still have iNext==-2
8951
// queries sheduled for local still have iNext==PROCESSED_ITEM
8882
8952
dQueries[i].m_iNext = dRemoteSnippets.m_dWorkers[0].m_iHead;
8883
8953
dRemoteSnippets.m_dWorkers[0].m_iHead = i;
8942
9012
// inverse the success/failed state - so that the queries with negative m_iNext are treated as failed
8943
9013
ARRAY_FOREACH ( i, dQueries )
8944
dQueries[i].m_iNext = (dQueries[i].m_iNext==-2)?0:-2;
9014
dQueries[i].m_iNext = (dQueries[i].m_iNext==PROCESSED_ITEM)?0:PROCESSED_ITEM;
8946
9016
// failsafe - one more turn for failed queries on local agent
8947
9017
SnippetThread_t & t = dThreads[0];
9033
9103
q.m_bEmitZones = ( iFlags & EXCERPT_FLAG_EMIT_ZONES )!=0;
9035
9105
int iCount = tReq.GetInt ();
9036
if ( iCount<0 || iCount>EXCERPT_MAX_ENTRIES )
9106
if ( iCount<=0 || iCount>EXCERPT_MAX_ENTRIES )
9038
9108
tReq.SendErrorReply ( "invalid entries count %d", iCount );
9238
9308
int & iSuccesses, int & iUpdated,
9239
9309
SearchFailuresLog_c & dFails, const ServedIndex_t * pServed )
9241
if ( !pServed || !pServed->m_pIndex )
9311
if ( !pServed || !pServed->m_pIndex || !pServed->m_bEnabled )
9243
9313
dFails.Submit ( sIndex, "index not available" );
9331
static const ServedIndex_t * UpdateGetLockedIndex ( const CSphString & sName, bool bMvaUpdate )
9333
const ServedIndex_t * pLocked = g_pIndexes->GetRlockedEntry ( sName );
9337
if ( !( bMvaUpdate && pLocked->m_bRT ) )
9341
return g_pIndexes->GetWlockedEntry ( sName );
9262
9345
void HandleCommandUpdate ( int iSock, int iVer, InputBuffer_c & tReq )
9333
9423
// check index names
9334
CSphVector<CSphNamedInt> dIndexNames;
9424
CSphVector<CSphString> dIndexNames;
9335
9425
ParseIndexList ( sIndexes, dIndexNames );
9337
9427
if ( !dIndexNames.GetLength() )
9343
9433
CSphVector<DistributedIndex_t> dDistributed ( dIndexNames.GetLength() ); // lock safe storage for distributed indexes
9344
9434
ARRAY_FOREACH ( i, dIndexNames )
9346
if ( !g_pIndexes->Exists ( dIndexNames[i].m_sName ) )
9436
if ( !g_pIndexes->Exists ( dIndexNames[i] ) )
9348
9438
// search amongst distributed and copy for further processing
9349
9439
g_tDistLock.Lock();
9350
const DistributedIndex_t * pDistIndex = g_hDistIndexes ( dIndexNames[i].m_sName );
9440
const DistributedIndex_t * pDistIndex = g_hDistIndexes ( dIndexNames[i] );
9352
9442
if ( pDistIndex )
9354
9444
dDistributed[i] = *pDistIndex;
9355
dDistributed[i].m_bToDelete = true; // our presence flag
9358
9447
g_tDistLock.Unlock();
9375
9464
ARRAY_FOREACH ( iIdx, dIndexNames )
9377
const char * sReqIndex = dIndexNames[iIdx].m_sName.cstr();
9378
const ServedIndex_t * pLocked = g_pIndexes->GetWlockedEntry ( sReqIndex );
9466
const char * sReqIndex = dIndexNames[iIdx].cstr();
9467
const ServedIndex_t * pLocked = UpdateGetLockedIndex ( sReqIndex, bMvaUpdate );
9381
9470
DoCommandUpdate ( sReqIndex, tUpd, iSuccesses, iUpdated, dFails, pLocked );
9382
9471
pLocked->Unlock();
9385
assert ( dDistributed[iIdx].m_bToDelete );
9474
assert ( dDistributed[iIdx].m_dLocal.GetLength() || dDistributed[iIdx].m_dAgents.GetLength() );
9386
9475
CSphVector<CSphString>& dLocal = dDistributed[iIdx].m_dLocal;
9388
9477
ARRAY_FOREACH ( i, dLocal )
9390
9479
const char * sLocal = dLocal[i].cstr();
9391
const ServedIndex_t * pServed = g_pIndexes->GetWlockedEntry ( sLocal );
9480
const ServedIndex_t * pServed = UpdateGetLockedIndex ( sLocal, bMvaUpdate );
9392
9481
DoCommandUpdate ( sLocal, tUpd, iSuccesses, iUpdated, dFails, pServed );
9394
9483
pServed->Unlock();
9568
9657
dStatus.Add().SetSprintf ( "%d", tMeta.m_iMatches );
9570
9659
dStatus.Add ( "total_found" );
9571
dStatus.Add().SetSprintf ( "%d", tMeta.m_iTotalMatches );
9660
dStatus.Add().SetSprintf ( INT64_FMT, tMeta.m_iTotalMatches );
9573
9662
dStatus.Add ( "time" );
9574
9663
dStatus.Add().SetSprintf ( "%d.%03d", tMeta.m_iQueryTime/1000, tMeta.m_iQueryTime%1000 );
9583
9672
dStatus.Add ( tMeta.m_hWordStats.IterateGetKey() );
9585
9674
dStatus.Add().SetSprintf ( "docs[%d]", iWord );
9586
dStatus.Add().SetSprintf ( "%d", tStat.m_iDocs );
9675
dStatus.Add().SetSprintf ( INT64_FMT, tStat.m_iDocs );
9588
9677
dStatus.Add().SetSprintf ( "hits[%d]", iWord );
9589
dStatus.Add().SetSprintf ( "%d", tStat.m_iHits );
9678
dStatus.Add().SetSprintf ( INT64_FMT, tStat.m_iHits );
9769
9858
|| iLength<0 || iLength>g_iMaxPacketSize )
9771
9860
// unknown command, default response header
9772
tBuf.SendErrorReply ( "unknown command (code=%d)", iCommand );
9861
tBuf.SendErrorReply ( "invalid command (code=%d, len=%d)", iCommand, iLength );
9774
9863
// if request length is insane, low level comm is broken, so we bail out
9775
9864
if ( iLength<0 || iLength>g_iMaxPacketSize )
9777
9865
sphWarning ( "ill-formed client request (length=%d out of bounds)", iLength );
9867
// if command is insane, low level comm is broken, so we bail out
9868
if ( iCommand<0 || iCommand>=SEARCHD_COMMAND_TOTAL )
9869
sphWarning ( "ill-formed client request (command=%d, SEARCHD_COMMAND_TOTAL=%d)", iCommand, SEARCHD_COMMAND_TOTAL );
9782
9874
// count commands
9791
9883
assert ( iLength>=0 && iLength<=g_iMaxPacketSize );
9792
9884
if ( iLength && !tBuf.ReadFrom ( iLength ) )
9794
sphWarning ( "failed to receive client request body (client=%s, exp=%d)", sClientIP, iLength );
9886
sphWarning ( "failed to receive client request body (client=%s, exp=%d, error='%s')", sClientIP, iLength, sphSockError() );
10014
10106
tOut.SendBytes ( sVarLen, iLen ); // packed affected rows & insert_id
10015
10107
if ( iWarns<0 ) iWarns = 0;
10016
10108
if ( iWarns>65535 ) iWarns = 65535;
10017
tOut.SendLSBDword ( iWarns ); // N warnings, 0 status
10109
DWORD uWarnStatus = iWarns<<16;
10110
tOut.SendLSBDword ( uWarnStatus ); // N warnings, 0 status
10018
10111
if ( iMsgLen > 0 )
10019
10112
tOut.SendBytes ( sMessage, iMsgLen );
10219
10312
sError.SetSprintf ( "raw %d, column %d: MVA value specified for a non-MVA column", 1+c, 1+iQuerySchemaIdx ); // 1 for human base
10315
if ( ( tCol.m_eAttrType==SPH_ATTR_UINT32SET || tCol.m_eAttrType==SPH_ATTR_UINT64SET ) && tVal.m_iType!=TOK_CONST_MVA )
10317
sError.SetSprintf ( "raw %d, column %d: non-MVA value specified for a MVA column", 1+c, 1+iQuerySchemaIdx ); // 1 for human base
10223
10321
if ( tCol.m_eAttrType==SPH_ATTR_UINT32SET || tCol.m_eAttrType==SPH_ATTR_UINT64SET )
10225
10323
// collect data from scattered insvals
10226
10324
// FIXME! maybe remove this mess, and just have a single m_dMvas pool in parser instead?
10227
tVal.m_pVals->Uniq();
10228
int iLen = tVal.m_pVals->GetLength();
10326
if ( tVal.m_pVals.Ptr() )
10328
tVal.m_pVals->Uniq();
10329
iLen = tVal.m_pVals->GetLength();
10229
10331
if ( tCol.m_eAttrType==SPH_ATTR_UINT64SET )
10231
10333
dMvas.Add ( iLen*2 );
10817
10919
CSphString sError;
10819
10921
// check index names
10820
CSphVector<CSphNamedInt> dIndexNames;
10922
CSphVector<CSphString> dIndexNames;
10821
10923
ParseIndexList ( tStmt.m_sIndex, dIndexNames );
10823
10925
if ( !dIndexNames.GetLength() )
10830
10932
CSphVector<DistributedIndex_t> dDistributed ( dIndexNames.GetLength() ); // lock safe storage for distributed indexes
10831
10933
ARRAY_FOREACH ( i, dIndexNames )
10833
if ( !g_pIndexes->Exists ( dIndexNames[i].m_sName ) )
10935
if ( !g_pIndexes->Exists ( dIndexNames[i] ) )
10835
10937
// search amongst distributed and copy for further processing
10836
10938
g_tDistLock.Lock();
10837
const DistributedIndex_t * pDistIndex = g_hDistIndexes ( dIndexNames[i].m_sName );
10939
const DistributedIndex_t * pDistIndex = g_hDistIndexes ( dIndexNames[i] );
10839
10941
if ( pDistIndex )
10841
10943
dDistributed[i] = *pDistIndex;
10842
dDistributed[i].m_bToDelete = true; // our presence flag
10845
10946
g_tDistLock.Unlock();
10851
sError.SetSprintf ( "unknown index '%s' in update request", dIndexNames[i].m_sName.cstr() );
10952
sError.SetSprintf ( "unknown index '%s' in update request", dIndexNames[i].cstr() );
10852
10953
SendMysqlErrorPacket ( tOut, uPacketID, tStmt.m_sStmt, sError.cstr() );
10861
10962
int iUpdated = 0;
10862
10963
int iWarns = 0;
10965
bool bMvaUpdate = false;
10966
ARRAY_FOREACH_COND ( i, tStmt.m_tUpdate.m_dAttrs, !bMvaUpdate )
10968
bMvaUpdate = ( tStmt.m_tUpdate.m_dAttrs[i].m_eAttrType==SPH_ATTR_UINT32SET
10969
|| tStmt.m_tUpdate.m_dAttrs[i].m_eAttrType==SPH_ATTR_UINT64SET );
10864
10972
ARRAY_FOREACH ( iIdx, dIndexNames )
10866
const char * sReqIndex = dIndexNames[iIdx].m_sName.cstr();
10867
const ServedIndex_t * pLocked = g_pIndexes->GetWlockedEntry ( sReqIndex );
10974
const char * sReqIndex = dIndexNames[iIdx].cstr();
10975
const ServedIndex_t * pLocked = UpdateGetLockedIndex ( sReqIndex, bMvaUpdate );
10868
10976
if ( pLocked )
10870
10978
DoExtendedUpdate ( sReqIndex, tStmt, iSuccesses, iUpdated, bCommit, dFails, pLocked );
10873
assert ( dDistributed[iIdx].m_bToDelete );
10981
assert ( dDistributed[iIdx].m_dLocal.GetLength() || dDistributed[iIdx].m_dAgents.GetLength() );
10874
10982
CSphVector<CSphString>& dLocal = dDistributed[iIdx].m_dLocal;
10876
10984
ARRAY_FOREACH ( i, dLocal )
10878
10986
const char * sLocal = dLocal[i].cstr();
10879
const ServedIndex_t * pServed = g_pIndexes->GetWlockedEntry ( sLocal );
10987
const ServedIndex_t * pServed = UpdateGetLockedIndex ( sLocal, bMvaUpdate );
10880
10988
DoExtendedUpdate ( sLocal, tStmt, iSuccesses, iUpdated, bCommit, dFails, pServed );
10884
10992
// update remote agents
10885
if ( dDistributed[iIdx].m_bToDelete )
10993
if ( dDistributed[iIdx].m_dAgents.GetLength() )
10887
10995
DistributedIndex_t & tDist = dDistributed[iIdx];
11060
11168
// empty result sets just might carry the full uberschema
11061
11169
// bummer! lets protect ourselves against that
11170
int iSchemaAttrsCount = 0;
11171
int iAttrsCount = 1;
11063
11172
if ( tRes.m_dMatches.GetLength() )
11065
iAttrs = tRes.m_tSchema.GetAttrsCount();
11174
iSchemaAttrsCount = SendGetAttrCount ( tRes.m_tSchema );
11175
iAttrsCount = iSchemaAttrsCount;
11066
11176
if ( g_bCompatResults )
11179
if ( iAttrsCount>=251 )
11071
11181
// this will show up as success in query log, as the query itself was ok
11072
11182
// but we need some kind of a notice anyway, to nail down issues based on logs only
11078
11188
// result set header packet
11079
11189
tOut.SendLSBDword ( ((uPacketID++)<<24) + 2 );
11080
tOut.SendByte ( BYTE(iAttrs) );
11190
tOut.SendByte ( BYTE(iAttrsCount) );
11081
11191
tOut.SendByte ( 0 ); // extra
11083
11193
// field packets
11094
11204
SendMysqlFieldPacket ( tOut, uPacketID++, "weight", MYSQL_COL_LONG );
11097
for ( int i=0; i<tRes.m_tSchema.GetAttrsCount(); i++ )
11207
for ( int i=0; i<iSchemaAttrsCount; i++ )
11099
11209
const CSphColumnInfo & tCol = tRes.m_tSchema.GetAttr(i);
11100
11210
MysqlColumnType_e eType = MYSQL_COL_STRING;
11131
11241
const CSphSchema & tSchema = tRes.m_tSchema;
11132
for ( int i=0; i<tSchema.GetAttrsCount(); i++ )
11242
for ( int i=0; i<iSchemaAttrsCount; i++ )
11134
11244
CSphAttrLocator tLoc = tSchema.GetAttr(i).m_tLocator;
11135
11245
ESphAttr eAttrType = tSchema.GetAttr(i).m_eAttrType;
11628
// INT_SET type must be sorted
11629
tStmt.m_dSetValues.Sort();
11631
// create or update the variable
11518
11632
g_tUservarsMutex.Lock();
11519
11633
Uservar_t * pVar = g_hUservars ( tStmt.m_sSetName );
11636
// variable exists, release previous value
11637
// actual destruction of the value (aka data) might happen later
11638
// as the concurrent queries might still be using and holding that data
11639
// from here, the old value becomes nameless, though
11522
11640
assert ( pVar->m_eType==USERVAR_INT_SET );
11523
pVar->m_pVal->SwapData ( tStmt.m_dSetValues );
11641
assert ( pVar->m_pVal );
11642
pVar->m_pVal->Release();
11643
pVar->m_pVal = NULL;
11646
// create a shiny new variable
11526
11647
Uservar_t tVar;
11527
tVar.m_eType = USERVAR_INT_SET;
11528
tVar.m_pVal = new CSphVector<SphAttr_t>;
11529
tVar.m_pVal->SwapData ( tStmt.m_dSetValues );
11530
tVar.m_pVal->Sort();
11531
g_hUservars.Add ( tVar, tStmt.m_sSetName ); // FIXME? free those on shutdown?
11648
g_hUservars.Add ( tVar, tStmt.m_sSetName );
11649
pVar = g_hUservars ( tStmt.m_sSetName );
11652
// swap in the new value
11654
assert ( !pVar->m_pVal );
11655
pVar->m_eType = USERVAR_INT_SET;
11656
pVar->m_pVal = new UservarIntSet_c();
11657
pVar->m_pVal->SwapData ( tStmt.m_dSetValues );
11533
11658
g_tUservarsMutex.Unlock();
11958
12084
if ( sphSockSend ( iSock, g_sMysqlHandshake, g_iMysqlHandshake )!=g_iMysqlHandshake )
11960
sphWarning ( "failed to send server version (client=%s)", sClientIP );
12086
int iErrno = sphSockGetErrno ();
12087
sphWarning ( "failed to send server version (client=%s, error: %d '%s')", sClientIP, iErrno, sphSockError ( iErrno ) );
12043
12170
} else if ( uMysqlCmd==MYSQL_COM_SET_OPTION )
12045
12172
// bMulti = ( tIn.GetWord()==MYSQL_OPTION_MULTI_STATEMENTS_ON ); // that's how we could double check and validate multi query
12046
SendMysqlOkPacket ( tOut, uPacketID );
12173
// server reporting success in response to COM_SET_OPTION and COM_DEBUG
12174
SendMysqlEofPacket ( tOut, uPacketID, 0 );
12049
12177
} else if ( uMysqlCmd!=MYSQL_COM_QUERY )
14658
void FailClient ( int iSock, SearchdStatus_e eStatus, const char * sMessage )
14660
assert ( eStatus==SEARCHD_RETRY || eStatus==SEARCHD_ERROR );
14662
int iRespLen = 4 + strlen(sMessage);
14664
NetOutputBuffer_c tOut ( iSock );
14665
tOut.SendInt ( SPHINX_SEARCHD_PROTO );
14666
tOut.SendWord ( (WORD)eStatus );
14667
tOut.SendWord ( 0 ); // version doesn't matter
14668
tOut.SendInt ( iRespLen );
14669
tOut.SendString ( sMessage );
14672
// FIXME? without some wait, client fails to receive the response on windows
14673
sphSockClose ( iSock );
14530
14677
Listener_t * DoAccept ( int * pClientSock, char * sClientName )
14532
14679
int iMaxFD = 0;
14622
14769
#if !USE_WINDOWS
14770
// FIXME!!! either get git of select() or allocate list of FD (with dup2 back instead close for thouse FD)
14771
// with threads workers to prevent dup2 closes valid FD
14623
14773
if ( SPH_FDSET_OVERFLOW ( iClientSock ) )
14624
iClientSock = dup2 ( iClientSock, g_iClientFD );
14775
if ( ( g_eWorkers==MPM_FORK || g_eWorkers==MPM_PREFORK ) )
14777
iClientSock = dup2 ( iClientSock, g_iClientFD );
14780
FailClient ( iClientSock, SEARCHD_RETRY, "server maxed out, retry in a second" );
14781
sphWarning ( "maxed out, dismissing client (socket=%d)", iClientSock );
14782
sphSockClose ( iClientSock );
14627
14788
*pClientSock = iClientSock;
14669
void FailClient ( int iSock, SearchdStatus_e eStatus, const char * sMessage )
14671
assert ( eStatus==SEARCHD_RETRY || eStatus==SEARCHD_ERROR );
14673
int iRespLen = 4 + strlen(sMessage);
14675
NetOutputBuffer_c tOut ( iSock );
14676
tOut.SendInt ( SPHINX_SEARCHD_PROTO );
14677
tOut.SendWord ( (WORD)eStatus );
14678
tOut.SendWord ( 0 ); // version doesn't matter
14679
tOut.SendInt ( iRespLen );
14680
tOut.SendString ( sMessage );
14683
// FIXME? without some wait, client fails to receive the response on windows
14684
sphSockClose ( iSock );
14688
14830
void HandlerThread ( void * pArg )
14690
14832
// setup query guard for threaded mode
14768
14908
if ( !pListener )
14771
if ( ( g_iMaxChildren && g_dChildren.GetLength()>=g_iMaxChildren )
14911
if ( ( g_iMaxChildren && ( g_dChildren.GetLength()>=g_iMaxChildren || g_dThd.GetLength()>=g_iMaxChildren ) )
14772
14912
|| ( g_iRotateCount && !g_bSeamlessRotate ) )
14774
14914
FailClient ( iClientSock, SEARCHD_RETRY, "server maxed out, retry in a second" );
15425
15565
if ( !g_bOptNoLock )
15426
15566
OpenDaemonLog ( hConf["searchd"]["searchd"] );
15427
15567
bVisualLoad = SetWatchDog ( iDevNull );
15568
close ( g_iLogFile ); // just the 'IT Happens' magic - switch off, then on.
15569
OpenDaemonLog ( hConf["searchd"]["searchd"] );
15858
extern bool ( *g_pUservarsHook )( const CSphString & sUservar, CSphVector<SphAttr_t> & dVals );
16000
extern UservarIntSet_c * ( *g_pUservarsHook )( const CSphString & sUservar );
15860
bool UservarsHook ( const CSphString & sUservar, CSphVector<SphAttr_t> & dVals )
16002
UservarIntSet_c * UservarsHook ( const CSphString & sUservar )
15862
g_tUservarsMutex.Lock();
16004
CSphScopedLock<CSphStaticMutex> tLock ( g_tUservarsMutex );
15863
16006
Uservar_t * pVar = g_hUservars ( sUservar );
15866
assert ( pVar->m_eType==USERVAR_INT_SET );
15867
dVals = *pVar->m_pVal;
15869
g_tUservarsMutex.Unlock();
15870
return ( pVar!=NULL );
16010
assert ( pVar->m_eType==USERVAR_INT_SET );
16011
pVar->m_pVal->AddRef();
16012
return pVar->m_pVal;