~ubuntu-branches/ubuntu/vivid/sphinxsearch/vivid

« back to all changes in this revision

Viewing changes to src/searchd.cpp

  • Committer: Package Import Robot
  • Author(s): Clint Byrum
  • Date: 2012-04-05 09:25:55 UTC
  • mfrom: (1.2.1) (7.1.3 sid)
  • Revision ID: package-import@ubuntu.com-20120405092555-65tc91rowhls3kob
Tags: 2.0.4-0ubuntu1
* New upstream release (LP: #930747)
* Remove explicit depends on libmysqlcient16 (LP: #974427)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
//
2
 
// $Id: searchd.cpp 3017 2011-11-15 22:22:49Z shodan $
 
2
// $Id: searchd.cpp 3127 2012-03-01 00:26:39Z shodan $
3
3
//
4
4
 
5
5
//
6
 
// Copyright (c) 2001-2011, Andrew Aksyonoff
7
 
// Copyright (c) 2008-2011, Sphinx Technologies Inc
 
6
// Copyright (c) 2001-2012, Andrew Aksyonoff
 
7
// Copyright (c) 2008-2012, Sphinx Technologies Inc
8
8
// All rights reserved
9
9
//
10
10
// This program is free software; you can redistribute it and/or modify
63
63
        // UNIX-specific headers and calls
64
64
        #include <unistd.h>
65
65
        #include <netinet/in.h>
 
66
        #include <netinet/tcp.h>
66
67
        #include <sys/file.h>
67
68
        #include <sys/socket.h>
68
69
        #include <sys/time.h>
191
192
        void SetupTLS ();
192
193
 
193
194
private:
194
 
        CrashQuery_t                    m_tQuery;
195
 
 
196
 
        static SphThreadKey_t   m_tLastQueryTLS; // last query ( non threaded workers could use dist_threads too )
 
195
        CrashQuery_t                    m_tQuery;                       // per thread copy of last query for thread mode
 
196
        static CrashQuery_t             m_tForkQuery;           // copy of last query for fork / prefork modes
 
197
        static SphThreadKey_t   m_tLastQueryTLS;        // last query ( non threaded workers could use dist_threads too )
197
198
};
198
199
 
199
200
enum LogFormat_e
571
572
 
572
573
//////////////////////////////////////////////////////////////////////////
573
574
 
 
575
/// available uservar types
574
576
enum Uservar_e
575
577
{
576
578
        USERVAR_INT_SET
577
579
};
578
580
 
 
581
/// uservar name to value binding
579
582
struct Uservar_t
580
583
{
581
 
        Uservar_e                                               m_eType;
582
 
        CSphVector<SphAttr_t> *                 m_pVal;
 
584
        Uservar_e                       m_eType;
 
585
        UservarIntSet_c *       m_pVal;
 
586
 
 
587
        Uservar_t ()
 
588
                : m_eType ( USERVAR_INT_SET )
 
589
                , m_pVal ( NULL )
 
590
        {}
583
591
};
584
592
 
585
593
static CSphStaticMutex                          g_tUservarsMutex;
789
797
        // hence, we also need to acquire a lock on entry, and an exclusive one
790
798
        Wlock();
791
799
        bool bRes = false;
792
 
        ServedIndex_t * pEntry = GetWlockedEntry ( tKey );
 
800
        ServedIndex_t * pEntry = BASE::operator() ( tKey );
793
801
        if ( pEntry )
794
802
        {
 
803
                pEntry->WriteLock();
795
804
                pEntry->Unlock();
796
805
                bRes = BASE::Delete ( tKey );
797
806
        }
1275
1284
 
1276
1285
                        // tell rotation thread to shutdown, and wait until it does
1277
1286
                        g_bRotateShutdown = true;
1278
 
                        sphThreadJoin ( &g_tRotateThread );
 
1287
                        if ( g_bSeamlessRotate )
 
1288
                        {
 
1289
                                sphThreadJoin ( &g_tRotateThread );
 
1290
                        }
1279
1291
                        g_tRotateQueueMutex.Done();
1280
1292
                        g_tRotateConfigMutex.Done();
1281
1293
 
1526
1538
static char             g_sMinidump[SPH_TIME_PID_MAX_SIZE] = "";
1527
1539
#endif
1528
1540
 
 
1541
CrashQuery_t SphCrashLogger_c::m_tForkQuery = CrashQuery_t();
1529
1542
SphThreadKey_t SphCrashLogger_c::m_tLastQueryTLS = SphThreadKey_t ();
1530
1543
 
1531
1544
void SphCrashLogger_c::Init ()
1667
1680
 
1668
1681
void SphCrashLogger_c::SetLastQuery ( const CrashQuery_t & tQuery )
1669
1682
{
 
1683
        m_tForkQuery = tQuery;
1670
1684
        SphCrashLogger_c * pCrashLogger = (SphCrashLogger_c *)sphThreadGet ( m_tLastQueryTLS );
1671
1685
        if ( pCrashLogger )
1672
1686
        {
1690
1704
CrashQuery_t SphCrashLogger_c::GetQuery()
1691
1705
{
1692
1706
        SphCrashLogger_c * pCrashLogger = (SphCrashLogger_c *)sphThreadGet ( m_tLastQueryTLS );
1693
 
        return pCrashLogger ? pCrashLogger->m_tQuery : CrashQuery_t();
 
1707
        return pCrashLogger ? pCrashLogger->m_tQuery : m_tForkQuery;
1694
1708
}
1695
1709
 
1696
1710
 
2239
2253
        explicit        NetOutputBuffer_c ( int iSock );
2240
2254
 
2241
2255
        bool            SendInt ( int iValue )                  { return SendT<int> ( htonl ( iValue ) ); }
 
2256
        bool            SendAsDword ( int64_t iValue ) ///< sends the 32bit MAX_UINT if the value is greater than it.
 
2257
                {
 
2258
                        if ( iValue < 0 )
 
2259
                                return SendDword ( 0 );
 
2260
                        if ( iValue > UINT_MAX )
 
2261
                                return SendDword ( UINT_MAX );
 
2262
                        return SendDword ( DWORD(iValue) );
 
2263
                }
2242
2264
        bool            SendDword ( DWORD iValue )              { return SendT<DWORD> ( htonl ( iValue ) ); }
2243
2265
        bool            SendLSBDword ( DWORD v )                { SendByte ( (BYTE)( v&0xff ) ); SendByte ( (BYTE)( (v>>8)&0xff ) ); SendByte ( (BYTE)( (v>>16)&0xff ) ); return SendByte ( (BYTE)( (v>>24)&0xff) ); }
2244
2266
        bool            SendWord ( WORD iValue )                { return SendT<WORD> ( htons ( iValue ) ); }
3251
3273
                                // send request
3252
3274
                                NetOutputBuffer_c tOut ( tAgent.m_iSock );
3253
3275
                                tBuilder.BuildRequest ( tAgent.m_sIndexes.cstr(), tOut, i );
3254
 
                                tOut.Flush (); // FIXME! handle flush failure?
 
3276
                                bool bFlushed = tOut.Flush (); // FIXME! handle flush failure?
 
3277
 
 
3278
#ifdef  TCP_NODELAY
 
3279
                                int iDisable = 1;
 
3280
                                if ( bFlushed && tAgent.m_iFamily==AF_INET )
 
3281
                                        setsockopt ( tAgent.m_iSock, IPPROTO_TCP, TCP_NODELAY, (char*)&iDisable, sizeof(iDisable) );
 
3282
#endif
3255
3283
 
3256
3284
                                tAgent.m_eState = AGENT_QUERY;
3257
3285
                                iAgents++;
3445
3473
                                        } else if ( tAgent.m_iReplyStatus==SEARCHD_RETRY )
3446
3474
                                        {
3447
3475
                                                tAgent.m_eState = AGENT_RETRY;
 
3476
                                                CSphString sAgentError = tReq.GetString ();
 
3477
                                                tAgent.m_sFailure.SetSprintf ( "remote warning: %s", sAgentError.cstr() );
3448
3478
                                                break;
3449
3479
 
3450
3480
                                        } else if ( tAgent.m_iReplyStatus!=SEARCHD_OK )
3554
3584
                + q.m_sGroupDistinct.Length()
3555
3585
                + q.m_sComment.Length()
3556
3586
                + q.m_sSelect.Length();
3557
 
        if ( !q.m_bAgent ) // send the magic to agent ("*,*," + real query)
3558
 
                iReqSize += q.m_sSelect.IsEmpty() ? 3 : 4;
3559
3587
        iReqSize += q.m_sRawQuery.IsEmpty()
3560
3588
                ? q.m_sQuery.Length()
3561
3589
                : q.m_sRawQuery.Length();
 
3590
        if ( q.m_eRanker==SPH_RANK_EXPR )
 
3591
                iReqSize += q.m_sRankerExpr.Length() + 4;
3562
3592
        ARRAY_FOREACH ( j, q.m_dFilters )
3563
3593
        {
3564
3594
                const CSphFilterSettings & tFilter = q.m_dFilters[j];
3589
3619
        tOut.SendInt ( q.m_iMaxMatches ); // limit is MAX_MATCHES
3590
3620
        tOut.SendInt ( (DWORD)q.m_eMode ); // match mode
3591
3621
        tOut.SendInt ( (DWORD)q.m_eRanker ); // ranking mode
 
3622
        if ( q.m_eRanker==SPH_RANK_EXPR )
 
3623
                tOut.SendString ( q.m_sRankerExpr.cstr() );
3592
3624
        tOut.SendInt ( q.m_eSort ); // sort mode
3593
3625
        tOut.SendString ( q.m_sSortBy.cstr() ); // sort attr
3594
3626
        if ( q.m_sRawQuery.IsEmpty() )
3676
3708
                        }
3677
3709
                }
3678
3710
        }
3679
 
        if ( q.m_bAgent )
3680
 
        {
3681
 
                tOut.SendString ( q.m_sSelect.cstr() );
3682
 
        } else
3683
 
        {
3684
 
                int iLen = q.m_sSelect.Length();
3685
 
                if ( !iLen )
3686
 
                {
3687
 
                        tOut.SendString ( "*,*" );
3688
 
                } else
3689
 
                {
3690
 
                        // this was a fun subtle issue
3691
 
                        // SetSprintf() uses a static 1K buffer...
3692
 
                        tOut.SendInt ( iLen+4 );
3693
 
                        tOut.SendBytes ( "*,*,", 4 );
3694
 
                        tOut.SendBytes ( q.m_sSelect.cstr(), iLen );
3695
 
                }
3696
 
        }
3697
 
 
 
3711
        tOut.SendString ( q.m_sSelect.cstr() );
3698
3712
        // master v.1.0
3699
3713
        tOut.SendDword ( q.m_eCollation );
3700
3714
}
3845
3859
 
3846
3860
                // read totals (retrieved count, total count, query time, word count)
3847
3861
                int iRetrieved = tReq.GetInt ();
3848
 
                tRes.m_iTotalMatches = tReq.GetInt ();
 
3862
                tRes.m_iTotalMatches = (unsigned int)tReq.GetInt ();
3849
3863
                tRes.m_iQueryTime = tReq.GetInt ();
3850
3864
                const int iWordsCount = tReq.GetInt (); // FIXME! sanity check?
3851
3865
                if ( iRetrieved!=iMatches )
3858
3872
                for ( int i=0; i<iWordsCount; i++ )
3859
3873
                {
3860
3874
                        const CSphString sWord = tReq.GetString ();
3861
 
                        const int iDocs = tReq.GetInt ();
3862
 
                        const int iHits = tReq.GetInt ();
 
3875
                        const int64_t iDocs = (unsigned int)tReq.GetInt ();
 
3876
                        const int64_t iHits = (unsigned int)tReq.GetInt ();
3863
3877
                        const bool bExpanded = ( tReq.GetByte()!=0 ); // agents always send expanded flag to master
3864
3878
 
3865
3879
                        tRes.AddStat ( sWord, iDocs, iHits, bExpanded );
4016
4030
}
4017
4031
 
4018
4032
 
4019
 
void ParseIndexList ( const CSphString & sIndexes, CSphVector<CSphNamedInt> & dOut )
 
4033
void ParseIndexList ( const CSphString & sIndexes, CSphVector<CSphString> & dOut )
4020
4034
{
4021
4035
        CSphString sSplit = sIndexes;
4022
4036
        char * p = (char*)sSplit.cstr();
4033
4047
                assert ( sNext!=p );
4034
4048
                if ( *p ) *p++ = '\0'; // if it was not the end yet, we'll continue from next char
4035
4049
 
4036
 
                dOut.Add().m_sName = sNext;
 
4050
                dOut.Add ( sNext );
4037
4051
        }
4038
4052
}
4039
4053
 
4118
4132
        while ( ( c = *szQuery++ )!=0 )
4119
4133
        {
4120
4134
                // must be in sync with EscapeString (php api)
4121
 
                if ( c=='(' || c==')' || c=='|' || c=='-' || c=='!' || c=='@' || c=='~' || c=='\"' || c=='&' || c=='/' || c=='<' || c=='\\' )
 
4135
                const char sMagics[] = "<\\()|-!@~\"&/^$=";
 
4136
                for ( const char * s = sMagics; *s; s++ )
 
4137
                        if ( c==*s )
 
4138
                {
4122
4139
                        *szRes++ = '\\';
 
4140
                        break;
 
4141
                }
4123
4142
 
4124
4143
                *szRes++ = c;
4125
4144
        }
4416
4435
        // v.1.22
4417
4436
        if ( iVer>=0x116 )
4418
4437
        {
4419
 
                CSphString sRawSelect = tReq.GetString ();
4420
 
                CSphString sSelect = "";
 
4438
                tQuery.m_sSelect = tReq.GetString ();
 
4439
                tQuery.m_bAgent = ( iMasterVer>0 );
4421
4440
                CSphString sError;
4422
 
                bool bAgent = false;
4423
 
                if ( sRawSelect.Begins ( "*,*" ) ) // this is the mark of agent.
4424
 
                {
4425
 
                        bAgent = true;
4426
 
                        if ( sRawSelect.Length()>3 )
4427
 
                                sSelect = sRawSelect.SubString ( 4, sRawSelect.Length()-4 );
4428
 
                }
4429
 
 
4430
 
                tQuery.m_sSelect = bAgent?sSelect:sRawSelect;
4431
4441
                if ( !tQuery.ParseSelectList ( sError ) )
4432
4442
                {
4433
4443
                        tReq.SendErrorReply ( "select: %s", sError.cstr() );
4434
4444
                        return false;
4435
4445
                }
4436
 
                if ( bAgent )
4437
 
                {
4438
 
                        tQuery.m_sSelect = sRawSelect;
4439
 
                        tQuery.m_bAgent = true;
4440
 
                }
4441
4446
        }
4442
4447
 
4443
4448
        // master v.1.0
4510
4515
        // [matchmode/numfilters/sortmode matches (offset,limit)
4511
4516
        static const char * sModes [ SPH_MATCH_TOTAL ] = { "all", "any", "phr", "bool", "ext", "scan", "ext2" };
4512
4517
        static const char * sSort [ SPH_SORT_TOTAL ] = { "rel", "attr-", "attr+", "tsegs", "ext", "expr" };
4513
 
        p += snprintf ( p, pMax-p, " [%s/%d/%s %d (%d,%d)",
 
4518
        p += snprintf ( p, pMax-p, " [%s/%d/%s "INT64_FMT" (%d,%d)",
4514
4519
                sModes [ tQuery.m_eMode ], tQuery.m_dFilters.GetLength(), sSort [ tQuery.m_eSort ],
4515
4520
                tRes.m_iTotalMatches, tQuery.m_iOffset, tQuery.m_iLimit );
4516
4521
 
4746
4751
        tBuf.Append ( "/""* " );
4747
4752
        tBuf.AppendCurrentTime();
4748
4753
        if ( tRes.m_iMultiplier>1 )
4749
 
                tBuf.Append ( " conn %d wall %d.%03d x%d found %d *""/ ",
 
4754
                tBuf.Append ( " conn %d wall %d.%03d x%d found "INT64_FMT" *""/ ",
4750
4755
                        iCid, iQueryTime/1000, iQueryTime%1000, tRes.m_iMultiplier, tRes.m_iTotalMatches );
4751
4756
        else
4752
 
                tBuf.Append ( " conn %d wall %d.%03d found %d *""/ ",
 
4757
                tBuf.Append ( " conn %d wall %d.%03d found "INT64_FMT" *""/ ",
4753
4758
                        iCid, iQueryTime/1000, iQueryTime%1000, tRes.m_iTotalMatches );
4754
4759
 
4755
4760
        ///////////////////////////////////
4970
4975
 
4971
4976
//////////////////////////////////////////////////////////////////////////
4972
4977
 
 
4978
// internals attributes are last no need to send them
 
4979
static int SendGetAttrCount ( const CSphSchema & tSchema )
 
4980
{
 
4981
        int iCount = tSchema.GetAttrsCount();
 
4982
        if ( iCount
 
4983
                && sphIsSortStringInternal ( tSchema.GetAttr ( iCount-1 ).m_sName.cstr() ) )
 
4984
        {
 
4985
                for ( int i=iCount-1; i>=0 && sphIsSortStringInternal ( tSchema.GetAttr(i).m_sName.cstr() ); i-- )
 
4986
                {
 
4987
                        iCount = i;
 
4988
                }
 
4989
        }
 
4990
 
 
4991
        return iCount;
 
4992
}
 
4993
 
 
4994
 
4973
4995
int CalcResultLength ( int iVer, const CSphQueryResult * pRes, const CSphVector<PoolPtrs_t> & dTag2Pools, bool bExtendedStat )
4974
4996
{
4975
4997
        int iRespLen = 0;
4996
5018
        // query stats
4997
5019
        iRespLen += 20;
4998
5020
 
 
5021
        int iAttrsCount = SendGetAttrCount ( pRes->m_tSchema );
 
5022
 
4999
5023
        // schema
5000
5024
        if ( iVer>=0x102 )
5001
5025
        {
5002
5026
                iRespLen += 8; // 4 for field count, 4 for attr count
5003
5027
                ARRAY_FOREACH ( i, pRes->m_tSchema.m_dFields )
5004
5028
                        iRespLen += 4 + strlen ( pRes->m_tSchema.m_dFields[i].m_sName.cstr() ); // namelen, name
5005
 
                for ( int i=0; i<pRes->m_tSchema.GetAttrsCount(); i++ )
 
5029
                for ( int i=0; i<iAttrsCount; i++ )
5006
5030
                        iRespLen += 8 + strlen ( pRes->m_tSchema.GetAttr(i).m_sName.cstr() ); // namelen, name, type
5007
5031
        }
5008
5032
 
5010
5034
        if ( iVer<0x102 )
5011
5035
                iRespLen += 16*pRes->m_iCount; // matches
5012
5036
        else if ( iVer<0x108 )
5013
 
                iRespLen += ( 8+4*pRes->m_tSchema.GetAttrsCount() )*pRes->m_iCount; // matches
 
5037
                iRespLen += ( 8+4*iAttrsCount )*pRes->m_iCount; // matches
5014
5038
        else
5015
 
                iRespLen += 4 + ( 8+4*USE_64BIT+4*pRes->m_tSchema.GetAttrsCount() )*pRes->m_iCount; // id64 tag and matches
 
5039
                iRespLen += 4 + ( 8+4*USE_64BIT+4*iAttrsCount )*pRes->m_iCount; // id64 tag and matches
5016
5040
 
5017
5041
        if ( iVer>=0x114 )
5018
5042
        {
5019
5043
                // 64bit matches
5020
5044
                int iWideAttrs = 0;
5021
 
                for ( int i=0; i<pRes->m_tSchema.GetAttrsCount(); i++ )
 
5045
                for ( int i=0; i<iAttrsCount; i++ )
5022
5046
                        if ( pRes->m_tSchema.GetAttr(i).m_eAttrType==SPH_ATTR_BIGINT )
5023
5047
                                iWideAttrs++;
5024
5048
                iRespLen += 4*pRes->m_iCount*iWideAttrs; // extra 4 bytes per attr per match
5035
5059
        // MVA and string values
5036
5060
        CSphVector<CSphAttrLocator> dMvaItems;
5037
5061
        CSphVector<CSphAttrLocator> dStringItems;
5038
 
        for ( int i=0; i<pRes->m_tSchema.GetAttrsCount(); i++ )
 
5062
        for ( int i=0; i<iAttrsCount; i++ )
5039
5063
        {
5040
5064
                const CSphColumnInfo & tCol = pRes->m_tSchema.GetAttr(i);
5041
5065
                if ( tCol.m_eAttrType==SPH_ATTR_UINT32SET || tCol.m_eAttrType==SPH_ATTR_UINT64SET )
5115
5139
                        tOut.SendString ( pRes->m_sWarning.cstr() );
5116
5140
        }
5117
5141
 
 
5142
        int iAttrsCount = SendGetAttrCount ( pRes->m_tSchema );
 
5143
 
5118
5144
        // send schema
5119
5145
        if ( iVer>=0x102 )
5120
5146
        {
5122
5148
                ARRAY_FOREACH ( i, pRes->m_tSchema.m_dFields )
5123
5149
                        tOut.SendString ( pRes->m_tSchema.m_dFields[i].m_sName.cstr() );
5124
5150
 
5125
 
                tOut.SendInt ( pRes->m_tSchema.GetAttrsCount() );
5126
 
                for ( int i=0; i<pRes->m_tSchema.GetAttrsCount(); i++ )
 
5151
                tOut.SendInt ( iAttrsCount );
 
5152
                for ( int i=0; i<iAttrsCount; i++ )
5127
5153
                {
5128
5154
                        const CSphColumnInfo & tCol = pRes->m_tSchema.GetAttr(i);
5129
5155
                        tOut.SendString ( tCol.m_sName.cstr() );
5180
5206
                        assert ( !tMatch.m_pDynamic || (int)tMatch.m_pDynamic[-1]==pRes->m_tSchema.GetDynamicSize() );
5181
5207
#endif
5182
5208
 
5183
 
                        for ( int j=0; j<pRes->m_tSchema.GetAttrsCount(); j++ )
 
5209
                        for ( int j=0; j<iAttrsCount; j++ )
5184
5210
                        {
5185
5211
                                const CSphColumnInfo & tAttr = pRes->m_tSchema.GetAttr(j);
5186
5212
                                if ( tAttr.m_eAttrType==SPH_ATTR_UINT32SET || tAttr.m_eAttrType==SPH_ATTR_UINT64SET )
5252
5278
                }
5253
5279
        }
5254
5280
        tOut.SendInt ( pRes->m_dMatches.GetLength() );
5255
 
        tOut.SendInt ( pRes->m_iTotalMatches );
 
5281
        tOut.SendAsDword ( pRes->m_iTotalMatches );
5256
5282
        tOut.SendInt ( Max ( pRes->m_iQueryTime, 0 ) );
5257
5283
        tOut.SendInt ( pRes->m_hWordStats.GetLength() );
5258
5284
 
5261
5287
        {
5262
5288
                const CSphQueryResultMeta::WordStat_t & tStat = pRes->m_hWordStats.IterateGet();
5263
5289
                tOut.SendString ( pRes->m_hWordStats.IterateGetKey().cstr() );
5264
 
                tOut.SendInt ( tStat.m_iDocs );
5265
 
                tOut.SendInt ( tStat.m_iHits );
 
5290
                tOut.SendAsDword ( tStat.m_iDocs );
 
5291
                tOut.SendAsDword ( tStat.m_iHits );
5266
5292
                if ( bExtendedStat )
5267
5293
                        tOut.SendByte ( tStat.m_bExpanded );
5268
5294
        }
5371
5397
                                );
5372
5398
                }
5373
5399
                int iLimit = bMultiSchema
5374
 
                        ? ( iCur + pRes->m_dMatchCounts[iSchema] )
5375
 
                        : pRes->m_iTotalMatches;
5376
 
                iLimit = Min ( iLimit, pRes->m_dMatches.GetLength() );
 
5400
                        ? (int)Min ( iCur + pRes->m_dMatchCounts[iSchema], pRes->m_dMatches.GetLength() )
 
5401
                        : (int)Min ( pRes->m_iTotalMatches, pRes->m_dMatches.GetLength() );
5377
5402
                for ( int i=iCur; i<iLimit; i++ )
5378
5403
                {
5379
5404
                        CSphMatch & tMatch = pRes->m_dMatches[i];
6084
6109
        void                                                    RunLocalSearches ( ISphMatchSorter * pLocalSorter, const char * sDistName );
6085
6110
        void                                                    RunLocalSearchesMT ();
6086
6111
        bool                                                    RunLocalSearch ( int iLocal, ISphMatchSorter ** ppSorters, CSphQueryResult ** pResults ) const;
 
6112
        bool                                                    HasExpresions ( int iStart, int iEnd ) const;
6087
6113
 
6088
6114
        CSphVector<DWORD>                               m_dMvaStorage;
6089
6115
        CSphVector<BYTE>                                m_dStringsStorage;
6091
6117
        int                                                             m_iStart;               ///< subset start
6092
6118
        int                                                             m_iEnd;                 ///< subset end
6093
6119
        bool                                                    m_bMultiQueue;  ///< whether current subset is subject to multi-queue optimization
6094
 
        mutable CSphVector<CSphNamedInt>                m_dLocal;               ///< local indexes for the current subset
 
6120
        CSphVector<CSphString>                  m_dLocal;               ///< local indexes for the current subset
6095
6121
        mutable CSphVector<CSphSchemaMT>                m_dExtraSchemas; ///< the extra fields for agents
6096
 
        mutable CSphMutex                               m_tLock;
6097
6122
        bool                                                    m_bSphinxql;    ///< if the query get from sphinxql - to avoid applying sphinxql magick for others
6098
6123
        CSphAttrUpdateEx *                              m_pUpdates;             ///< holder for updates
6099
6124
 
 
6125
        mutable CSphMutex                               m_tLock;
 
6126
        mutable SmallStringHash_T<int>  m_hUsed;
 
6127
 
6100
6128
        const ServedIndex_t *                   UseIndex ( int iLocal ) const;
6101
6129
        void                                                    ReleaseIndex ( int iLocal ) const;
6102
6130
 
6134
6162
SearchHandler_c::~SearchHandler_c ()
6135
6163
{
6136
6164
        m_tLock.Done();
6137
 
        ARRAY_FOREACH ( i, m_dLocal )
 
6165
        m_hUsed.IterateStart();
 
6166
        while ( m_hUsed.IterateNext() )
6138
6167
        {
6139
 
                if ( m_dLocal[i].m_iValue>0 )
6140
 
                        g_pIndexes->GetUnlockedEntry ( m_dLocal[i].m_sName ).Unlock();
 
6168
                if ( m_hUsed.IterateGet()>0 )
 
6169
                        g_pIndexes->GetUnlockedEntry ( m_hUsed.IterateGetKey() ).Unlock();
6141
6170
        }
6142
6171
}
6143
6172
 
6145
6174
const ServedIndex_t * SearchHandler_c::UseIndex ( int iLocal ) const
6146
6175
{
6147
6176
        assert ( iLocal>=0 && iLocal<m_dLocal.GetLength() );
 
6177
        const CSphString & sName = m_dLocal[iLocal];
6148
6178
        if ( g_eWorkers!=MPM_THREADS )
6149
 
                return g_pIndexes->GetRlockedEntry ( m_dLocal[iLocal].m_sName );
 
6179
                return g_pIndexes->GetRlockedEntry ( sName );
6150
6180
 
6151
6181
        m_tLock.Lock();
6152
 
 
6153
 
        int iUseCount = m_dLocal[iLocal].m_iValue;
6154
 
 
6155
 
        assert ( ( m_pUpdates && iUseCount>0 ) || !m_pUpdates );
 
6182
        int * pUseCount = m_hUsed ( sName );
 
6183
        assert ( ( m_pUpdates && pUseCount && *pUseCount>0 ) || !m_pUpdates );
6156
6184
 
6157
6185
        const ServedIndex_t * pServed = NULL;
6158
 
        if ( iUseCount )
6159
 
                pServed = &g_pIndexes->GetUnlockedEntry ( m_dLocal[iLocal].m_sName );
6160
 
        else
6161
 
                pServed = g_pIndexes->GetRlockedEntry ( m_dLocal[iLocal].m_sName );
6162
 
 
6163
 
        m_dLocal[iLocal].m_iValue = iUseCount + (pServed!=NULL);
 
6186
        if ( pUseCount && *pUseCount>0 )
 
6187
        {
 
6188
                pServed = &g_pIndexes->GetUnlockedEntry ( sName );
 
6189
                *pUseCount += ( pServed!=NULL );
 
6190
        } else
 
6191
        {
 
6192
                pServed = g_pIndexes->GetRlockedEntry ( sName );
 
6193
                if ( pServed )
 
6194
                {
 
6195
                        if ( pUseCount )
 
6196
                                (*pUseCount)++;
 
6197
                        else
 
6198
                                m_hUsed.Add ( 1, sName );
 
6199
                }
 
6200
        }
6164
6201
 
6165
6202
        m_tLock.Unlock();
6166
 
 
6167
6203
        return pServed;
6168
6204
}
6169
6205
 
6174
6210
        if ( g_eWorkers!=MPM_THREADS )
6175
6211
                return;
6176
6212
 
 
6213
        const CSphString & sName = m_dLocal[iLocal];
6177
6214
        m_tLock.Lock();
6178
6215
 
6179
 
        int iUseCount = m_dLocal[iLocal].m_iValue - 1;
6180
 
        assert ( iUseCount>=0 );
6181
 
        m_dLocal[iLocal].m_iValue = iUseCount;
6182
 
 
6183
 
        if ( !iUseCount )
6184
 
                g_pIndexes->GetUnlockedEntry ( m_dLocal[iLocal].m_sName ).Unlock();
6185
 
 
6186
 
        assert ( ( m_pUpdates && iUseCount>0 ) || !m_pUpdates );
 
6216
        int * pUseCount = m_hUsed ( sName );
 
6217
        assert ( pUseCount && *pUseCount>=0 );
 
6218
        (*pUseCount)--;
 
6219
 
 
6220
        if ( !*pUseCount )
 
6221
                g_pIndexes->GetUnlockedEntry ( sName ).Unlock();
 
6222
 
 
6223
        assert ( ( m_pUpdates && pUseCount && *pUseCount ) || !m_pUpdates );
6187
6224
 
6188
6225
        m_tLock.Unlock();
6189
6226
}
6196
6233
        m_dQueries[0] = tQuery;
6197
6234
        m_dQueries[0].m_sIndexes = sIndex;
6198
6235
 
6199
 
        m_dLocal.Add().m_sName = sIndex;
6200
 
        m_dLocal.Last().m_iValue = 1;
 
6236
        // lets add index to prevent deadlock
 
6237
        // as index already r-locker or w-locked at this point
 
6238
        m_dLocal.Add ( sIndex );
 
6239
        m_hUsed.Add ( 1, sIndex );
6201
6240
 
6202
6241
        CheckQuery ( tQuery, *pUpdates->m_pError );
6203
6242
        if ( !pUpdates->m_pError->IsEmpty() )
6474
6513
        ARRAY_FOREACH ( iLocal, dLocals )
6475
6514
        {
6476
6515
                bool bResult = dLocals[iLocal].m_bResult;
6477
 
                const char * sLocal = m_dLocal[iLocal].m_sName.cstr();
 
6516
                const char * sLocal = m_dLocal[iLocal].cstr();
6478
6517
 
6479
6518
                if ( !bResult )
6480
6519
                {
6640
6679
        CSphVector <int> dLocked;
6641
6680
        ARRAY_FOREACH ( iLocal, m_dLocal )
6642
6681
        {
6643
 
                const char * sLocal = m_dLocal[iLocal].m_sName.cstr();
 
6682
                const char * sLocal = m_dLocal[iLocal].cstr();
6644
6683
 
6645
6684
                const ServedIndex_t * pServed = UseIndex ( iLocal );
6646
6685
                if ( !pServed )
6806
6845
 
6807
6846
 
6808
6847
// check expressions into a query to make sure that it's ready for multi query optimization
6809
 
static bool HasExpresions ( const CSphQuery & tQuery, const CSphVector<CSphNamedInt>& m_dIndices )
 
6848
bool SearchHandler_c::HasExpresions ( int iStart, int iEnd ) const
6810
6849
{
6811
 
        ARRAY_FOREACH ( i, m_dIndices )
 
6850
        ARRAY_FOREACH ( i, m_dLocal )
6812
6851
        {
6813
 
                const ServedIndex_t * pServedIndex = g_pIndexes->GetRlockedEntry ( m_dIndices[i].m_sName );
 
6852
                const ServedIndex_t * pServedIndex = UseIndex ( i );
6814
6853
 
6815
6854
                // check that it exists
6816
 
                if ( !pServedIndex )
6817
 
                        return false;
 
6855
                if ( !pServedIndex || !pServedIndex->m_bEnabled )
 
6856
                {
 
6857
                        if ( pServedIndex )
 
6858
                                ReleaseIndex ( i );
 
6859
                        continue;
 
6860
                }
6818
6861
 
6819
6862
                bool bHasExpression = false;
6820
 
                if ( pServedIndex->m_bEnabled )
6821
 
                        bHasExpression = sphHasExpressions ( tQuery, pServedIndex->m_pIndex->GetMatchSchema() );
 
6863
                const CSphSchema & tSchema = pServedIndex->m_pIndex->GetMatchSchema();
 
6864
                for ( int iCheck=iStart; iCheck<=iEnd && !bHasExpression; iCheck++ )
 
6865
                        bHasExpression = sphHasExpressions ( m_dQueries[iCheck], tSchema );
6822
6866
 
6823
 
                pServedIndex->Unlock();
 
6867
                ReleaseIndex ( i );
6824
6868
 
6825
6869
                if ( bHasExpression )
6826
6870
                        return true;
6833
6877
{
6834
6878
        m_iStart = iStart;
6835
6879
        m_iEnd = iEnd;
6836
 
        m_dLocal.Reset ();
 
6880
        m_dLocal.Reset();
6837
6881
 
6838
6882
        // all my stats
6839
6883
        int64_t tmSubset = sphMicroTimer();
6922
6966
                        // search through all local indexes
6923
6967
                        for ( IndexHashIterator_c it ( g_pIndexes ); it.Next(); )
6924
6968
                                if ( it.Get ().m_bEnabled )
6925
 
                                        m_dLocal.Add().m_sName = it.GetKey();
 
6969
                                        m_dLocal.Add ( it.GetKey() );
6926
6970
                } else
6927
6971
                {
6928
6972
                        // search through specified local indexes
6933
6977
                        g_tDistLock.Lock();
6934
6978
 
6935
6979
                        ARRAY_FOREACH ( i, m_dLocal )
6936
 
                                if ( g_hDistIndexes.Exists ( m_dLocal[i].m_sName ) )
 
6980
                                if ( g_hDistIndexes.Exists ( m_dLocal[i] ) )
6937
6981
                                {
6938
6982
                                        iDistFound = i;
6939
6983
                                        break;
6944
6988
                        if ( iDistFound!=-1 )
6945
6989
                        {
6946
6990
                                for ( int iRes=iStart; iRes<=iEnd; iRes++ )
6947
 
                                        m_dResults[iRes].m_sError.SetSprintf ( "distributed index '%s' in multi-index query found", m_dLocal[iDistFound].m_sName.cstr() );
 
6991
                                        m_dResults[iRes].m_sError.SetSprintf ( "distributed index '%s' in multi-index query found", m_dLocal[iDistFound].cstr() );
6948
6992
                                return;
6949
6993
                        }
6950
6994
 
6951
6995
                        ARRAY_FOREACH ( i, m_dLocal )
6952
6996
                        {
6953
 
                                const ServedIndex_t * pServedIndex = g_pIndexes->GetRlockedEntry ( m_dLocal[i].m_sName );
 
6997
                                const ServedIndex_t * pServedIndex = UseIndex ( i );
6954
6998
 
6955
6999
                                // check that it exists
6956
7000
                                if ( !pServedIndex )
6957
7001
                                {
6958
7002
                                        for ( int iRes=iStart; iRes<=iEnd; iRes++ )
6959
 
                                                m_dResults[iRes].m_sError.SetSprintf ( "unknown local index '%s' in search request", m_dLocal[i].m_sName.cstr() );
 
7003
                                                m_dResults[iRes].m_sError.SetSprintf ( "unknown local index '%s' in search request", m_dLocal[i].cstr() );
6960
7004
                                        return;
6961
7005
                                }
6962
7006
 
 
7007
                                bool bEnabled = pServedIndex->m_bEnabled;
 
7008
                                ReleaseIndex ( i );
6963
7009
                                // if it exists but is not enabled, remove it from the list and force recheck
6964
 
                                if ( !pServedIndex->m_bEnabled )
 
7010
                                if ( !bEnabled )
6965
7011
                                        m_dLocal.Remove ( i-- );
6966
 
 
6967
 
                                pServedIndex->Unlock();
6968
7012
                        }
6969
7013
                }
6970
7014
 
6981
7025
                // copy local indexes list from distributed definition, but filter out disabled ones
6982
7026
                ARRAY_FOREACH ( i, dDistLocal )
6983
7027
                {
6984
 
                        const ServedIndex_t * pServedIndex = g_pIndexes->GetRlockedEntry ( dDistLocal[i] );
 
7028
                        int iDistLocal = m_dLocal.GetLength();
 
7029
                        m_dLocal.Add ( dDistLocal[i] );
 
7030
 
 
7031
                        const ServedIndex_t * pServedIndex = UseIndex ( iDistLocal );
 
7032
                        bool bValidLocalIndex = pServedIndex && pServedIndex->m_bEnabled;
6985
7033
                        if ( pServedIndex )
6986
 
                        {
6987
 
                                if ( pServedIndex->m_bEnabled )
6988
 
                                        m_dLocal.Add().m_sName = dDistLocal[i];
 
7034
                                ReleaseIndex ( iDistLocal );
6989
7035
 
6990
 
                                pServedIndex->Unlock();
6991
 
                        }
 
7036
                        if ( !bValidLocalIndex )
 
7037
                                m_dLocal.Pop();
6992
7038
                }
6993
7039
        }
6994
7040
 
7001
7047
        {
7002
7048
                CSphString sError;
7003
7049
 
7004
 
                // check if all schemas are equal
 
7050
                // check if all schemes are equal
7005
7051
                bool bAllEqual = true;
7006
7052
 
7007
 
                const ServedIndex_t * pFirstIndex = g_pIndexes->GetRlockedEntry ( m_dLocal[0].m_sName );
 
7053
                const ServedIndex_t * pFirstIndex = UseIndex ( 0 );
7008
7054
                if ( !pFirstIndex )
7009
7055
                        break;
7010
7056
 
7011
7057
                const CSphSchema & tFirstSchema = pFirstIndex->m_pIndex->GetMatchSchema();
7012
7058
                for ( int i=1; i<m_dLocal.GetLength() && bAllEqual; i++ )
7013
7059
                {
7014
 
                        const ServedIndex_t * pNextIndex = g_pIndexes->GetRlockedEntry ( m_dLocal[i].m_sName );
 
7060
                        const ServedIndex_t * pNextIndex = UseIndex ( i );
7015
7061
                        if ( !pNextIndex )
7016
7062
                        {
7017
7063
                                bAllEqual = false;
7021
7067
                        if ( !tFirstSchema.CompareTo ( pNextIndex->m_pIndex->GetMatchSchema(), sError ) )
7022
7068
                                bAllEqual = false;
7023
7069
 
7024
 
                        pNextIndex->Unlock();
 
7070
                        ReleaseIndex ( i );
7025
7071
                }
7026
7072
 
7027
7073
                // we can reuse the very same sorter
7032
7078
                        pLocalSorter = sphCreateQueue ( &m_dQueries[iStart], tFirstSchema, sError, true, pExtraSchemaMT );
7033
7079
                }
7034
7080
 
7035
 
                pFirstIndex->Unlock ();
 
7081
                ReleaseIndex ( 0 );
7036
7082
                break;
7037
7083
        }
7038
7084
 
7039
7085
        // select lists must have no expressions
7040
 
        for ( int iCheck=iStart; iCheck<=iEnd && m_bMultiQueue; iCheck++ )
 
7086
        if ( m_bMultiQueue )
7041
7087
        {
7042
 
                m_bMultiQueue = !HasExpresions ( m_dQueries[iCheck], m_dLocal );
 
7088
                m_bMultiQueue = !HasExpresions ( iStart, iEnd );
7043
7089
        }
7044
7090
 
7045
7091
        // these are mutual exclusive
7221
7267
                        tRes.m_tSchema = tRes.m_dSchemas[0];
7222
7268
                if ( tRes.m_iSuccesses>1 || tQuery.m_dItems.GetLength() )
7223
7269
                {
7224
 
                        if ( g_bCompatResults )
 
7270
                        if ( g_bCompatResults && !tQuery.m_bAgent )
7225
7271
                        {
7226
7272
                                if ( !MinimizeAggrResultCompat ( tRes, tQuery, m_dLocal.GetLength()!=0 ) )
7227
7273
                                        return;
7250
7296
                tRes.m_iCount = Max ( Min ( tQuery.m_iLimit, tRes.m_dMatches.GetLength()-tQuery.m_iOffset ), 0 );
7251
7297
        }
7252
7298
 
7253
 
        // remove internal attributes from result schema
7254
 
        ARRAY_FOREACH ( i, m_dResults )
7255
 
                sphSortRemoveInternalAttrs ( m_dResults[i].m_tSchema );
7256
 
 
7257
7299
        // stats
7258
7300
        tmSubset = sphMicroTimer() - tmSubset;
7259
7301
        tmCpu = sphCpuTimer() - tmCpu;
7516
7558
 
7517
7559
        // SELECT specific
7518
7560
        CSphQuery                               m_tQuery;
 
7561
        CSphVector < CSphRefcountedPtr<UservarIntSet_c> > m_dRefs;
7519
7562
 
7520
7563
        // used by INSERT, DELETE, CALL, DESC, ATTACH
7521
7564
        CSphString                              m_sIndex;
8053
8096
 
8054
8097
void SqlParser_c::UpdateAttr ( const CSphString& sName, const SqlNode_t * pValue, ESphAttr eType )
8055
8098
{
 
8099
        assert ( eType==SPH_ATTR_FLOAT || eType==SPH_ATTR_INTEGER || eType==SPH_ATTR_BIGINT );
8056
8100
        if ( eType==SPH_ATTR_FLOAT )
 
8101
        {
8057
8102
                m_pStmt->m_tUpdate.m_dPool.Add ( *(const DWORD*)( &pValue->m_fValue ) );
8058
 
        else // default: if ( eType==SPH_ATTR_INTEGER )
 
8103
 
 
8104
        } else if ( eType==SPH_ATTR_INTEGER || eType==SPH_ATTR_BIGINT )
8059
8105
        {
8060
8106
                m_pStmt->m_tUpdate.m_dPool.Add ( (DWORD) pValue->m_iValue );
8061
8107
                DWORD uHi = (DWORD) ( pValue->m_iValue>>32 );
8068
8114
        AddUpdatedAttr ( sName, eType );
8069
8115
}
8070
8116
 
8071
 
void SqlParser_c::UpdateMVAAttr ( const CSphString& sName, const SqlNode_t& dValues )
 
8117
void SqlParser_c::UpdateMVAAttr ( const CSphString & sName, const SqlNode_t & dValues )
8072
8118
{
8073
8119
        CSphAttrUpdate & tUpd = m_pStmt->m_tUpdate;
8074
 
        assert ( dValues.m_pValues.Ptr() && dValues.m_pValues->GetLength()>0 );
8075
 
        dValues.m_pValues->Uniq(); // don't need dupes within MVA
8076
 
        tUpd.m_dPool.Add ( dValues.m_pValues->GetLength()*2 );
8077
 
        SphAttr_t * pVal = dValues.m_pValues.Ptr()->Begin();
8078
 
        SphAttr_t * pValMax = pVal + dValues.m_pValues->GetLength();
8079
8120
        ESphAttr eType = SPH_ATTR_UINT32SET;
8080
 
        for ( ;pVal<pValMax; pVal++ )
 
8121
 
 
8122
        if ( dValues.m_pValues.Ptr() && dValues.m_pValues->GetLength()>0 )
8081
8123
        {
8082
 
                SphAttr_t uVal = *pVal;
8083
 
                if ( uVal>UINT_MAX )
 
8124
                // got MVA values, let's process them
 
8125
                dValues.m_pValues->Uniq(); // don't need dupes within MVA
 
8126
                tUpd.m_dPool.Add ( dValues.m_pValues->GetLength()*2 );
 
8127
                SphAttr_t * pVal = dValues.m_pValues.Ptr()->Begin();
 
8128
                SphAttr_t * pValMax = pVal + dValues.m_pValues->GetLength();
 
8129
                for ( ;pVal<pValMax; pVal++ )
8084
8130
                {
8085
 
                        eType = SPH_ATTR_UINT64SET;
 
8131
                        SphAttr_t uVal = *pVal;
 
8132
                        if ( uVal>UINT_MAX )
 
8133
                        {
 
8134
                                eType = SPH_ATTR_UINT64SET;
 
8135
                        }
 
8136
                        tUpd.m_dPool.Add ( (DWORD)uVal );
 
8137
                        tUpd.m_dPool.Add ( (DWORD)( uVal>>32 ) );
8086
8138
                }
8087
 
                tUpd.m_dPool.Add ( (DWORD)uVal );
8088
 
                tUpd.m_dPool.Add ( (DWORD)( uVal>>32 ) );
 
8139
        } else
 
8140
        {
 
8141
                // no values, means we should delete the attribute
 
8142
                // we signal that to the update code by putting a single zero
 
8143
                // to the values pool (meaning a zero-length MVA values list)
 
8144
                tUpd.m_dPool.Add ( 0 );
8089
8145
        }
 
8146
 
8090
8147
        AddUpdatedAttr ( sName, eType );
8091
8148
}
8092
8149
 
8126
8183
 
8127
8184
bool SqlParser_c::AddUservarFilter ( const CSphString & sCol, const CSphString & sVar, bool bExclude )
8128
8185
{
8129
 
        g_tUservarsMutex.Lock();
 
8186
        CSphScopedLock<CSphStaticMutex> tLock ( g_tUservarsMutex );
8130
8187
        Uservar_t * pVar = g_hUservars ( sVar );
8131
8188
        if ( !pVar )
8132
8189
        {
8133
 
                g_tUservarsMutex.Unlock();
8134
8190
                yyerror ( this, "undefined global variable in IN clause" );
8135
8191
                return false;
8136
8192
        }
8141
8197
                return false;
8142
8198
        pFilter->m_bExclude = bExclude;
8143
8199
 
8144
 
        // INT_SET uservars must get sorted on SET once
8145
 
        // FIXME? maybe we should do a readlock instead of copying?
8146
 
        pFilter->m_dValues = *pVar->m_pVal;
8147
 
        g_tUservarsMutex.Unlock();
 
8200
        // tricky black magic
 
8201
        // we want to avoid copying the data, hence external values in the filter
 
8202
        // we need to guarantee the data (uservar value) lifetime, then
 
8203
        // suddenly, enter mutex-protected refcounted value objects
 
8204
        // suddenly, we need to track those values in the statement object, too
 
8205
        assert ( pVar->m_pVal );
 
8206
        CSphRefcountedPtr<UservarIntSet_c> & tRef = m_pStmt->m_dRefs.Add();
 
8207
        tRef = pVar->m_pVal; // take over semantics, and thus NO (!) automatic addref
 
8208
        pVar->m_pVal->AddRef(); // so do that addref manually
 
8209
        pFilter->SetExternalValues ( pVar->m_pVal->Begin(), pVar->m_pVal->GetLength() );
8148
8210
        return true;
8149
8211
}
8150
8212
 
8221
8283
                {
8222
8284
                        tQuery.m_sSelect.SetBinary ( tParser.m_pBuf + tQuery.m_iSQLSelectStart,
8223
8285
                                tQuery.m_iSQLSelectEnd - tQuery.m_iSQLSelectStart );
8224
 
 
8225
 
                        // finally check for agent magic
8226
 
                        if ( tQuery.m_sSelect.Begins ( "*,*" ) ) // this is the mark of agent.
8227
 
                        {
8228
 
                                tQuery.m_dItems.Remove(0);
8229
 
                                tQuery.m_dItems.Remove(0);
8230
 
                                tQuery.m_bAgent = true;
8231
 
                        }
8232
8286
                }
8233
8287
        }
8234
8288
 
8311
8365
        EXCERPT_FLAG_FILES_SCATTERED    = 1024
8312
8366
};
8313
8367
 
 
8368
enum
 
8369
{
 
8370
        PROCESSED_ITEM                                  = -2,
 
8371
        EOF_ITEM                                                = -1
 
8372
};
8314
8373
struct SnippetWorker_t
8315
8374
{
8316
8375
        int64_t                                         m_iTotal;
8319
8378
 
8320
8379
        SnippetWorker_t()
8321
8380
                : m_iTotal ( 0 )
8322
 
                , m_iHead ( -1 ) // -1 is the marker of the end of the list
 
8381
                , m_iHead ( EOF_ITEM )
8323
8382
                , m_bLocal ( false )
8324
8383
        {}
8325
8384
};
8448
8507
                + q.m_sRawPassageBoundary.Length();
8449
8508
 
8450
8509
                m_iNumDocs = 0;
8451
 
                for ( int iDoc = tWorker.m_iHead; iDoc!=-1; iDoc=dQueries[iDoc].m_iNext )
 
8510
                for ( int iDoc = tWorker.m_iHead; iDoc!=EOF_ITEM; iDoc=dQueries[iDoc].m_iNext )
8452
8511
                {
8453
8512
                        ++m_iNumDocs;
8454
8513
                        m_iReqLen += 4 + dQueries[iDoc].m_sSource.Length();
8463
8522
        tOut.SendInt ( m_iReqLen );
8464
8523
 
8465
8524
        tOut.SendInt ( 0 );
8466
 
        tOut.SendInt ( q.m_iRawFlags );
 
8525
 
 
8526
        if ( m_bScattered )
 
8527
                tOut.SendInt ( q.m_iRawFlags & ~EXCERPT_FLAG_LOAD_FILES );
 
8528
 
8467
8529
        tOut.SendString ( sIndex );
8468
8530
        tOut.SendString ( q.m_sWords.cstr() );
8469
8531
        tOut.SendString ( q.m_sBeforeMatch.cstr() );
8479
8541
        tOut.SendString ( q.m_sRawPassageBoundary.cstr() );
8480
8542
 
8481
8543
        tOut.SendInt ( m_iNumDocs );
8482
 
        for ( int iDoc = tWorker.m_iHead; iDoc!=-1; iDoc=dQueries[iDoc].m_iNext )
 
8544
        for ( int iDoc = tWorker.m_iHead; iDoc!=EOF_ITEM; iDoc=dQueries[iDoc].m_iNext )
8483
8545
                tOut.SendString ( dQueries[iDoc].m_sSource.cstr() );
8484
8546
}
8485
8547
 
8493
8555
 
8494
8556
        int iDoc = tWorker.m_iHead;
8495
8557
        bool bOk = true;
8496
 
        while ( iDoc!=-1 )
 
8558
        while ( iDoc!=EOF_ITEM )
8497
8559
        {
8498
8560
                if ( ( dQueries[iDoc].m_iLoadFiles&2 )!=0 ) // NOLINT
8499
8561
                {
8507
8569
                                        if ( strcmp ( sRes, dQueries[iDoc].m_sRes )!=0 )
8508
8570
                                                bOk = false;
8509
8571
                                        SafeDelete ( dQueries[iDoc].m_sRes );
8510
 
                                }
 
8572
                                } else
 
8573
                                dQueries[iDoc].m_sError = "";
8511
8574
                                dQueries[iDoc].m_sRes = sRes;
8512
8575
                        }
8513
8576
 
8515
8578
                        continue;
8516
8579
                }
8517
8580
                dQueries[iDoc].m_sRes = tReq.GetString().Leak();
8518
 
                iDoc = dQueries[iDoc].m_iNext;
8519
 
                dQueries[iDoc].m_iNext = -2; // mark as processed
 
8581
                int iNextDoc = dQueries[iDoc].m_iNext;
 
8582
                dQueries[iDoc].m_iNext = PROCESSED_ITEM;
 
8583
                iDoc = iNextDoc;
8520
8584
        }
8521
8585
 
8522
8586
        return bOk;
8696
8760
                bool bDone = ( *pDesc->m_pCurQuery==pDesc->m_iQueries );
8697
8761
                pDesc->m_pLock->Unlock();
8698
8762
 
8699
 
                if ( pQuery->m_iNext!=-2 )
 
8763
                if ( pQuery->m_iNext!=PROCESSED_ITEM )
8700
8764
                        continue;
8701
8765
 
8702
8766
                pQuery->m_sRes = sphBuildExcerpt ( *pQuery, tCtx.m_pDict, tCtx.m_tTokenizer.Ptr(),
8735
8799
        g_tDistLock.Lock();
8736
8800
        DistributedIndex_t * pDist = g_hDistIndexes ( sIndex );
8737
8801
        bool bRemote = pDist!=NULL;
 
8802
 
 
8803
        // hack! load_files && load_files_scattered is the 'final' call. It will report the absent files as errors.
 
8804
        // simple load_files_scattered without load_files just omits the absent files (returns empty strings).
8738
8805
        bool bScattered = ( q.m_iLoadFiles & 2 )!=0;
 
8806
        bool bSkipAbsentFiles = !( q.m_iLoadFiles & 1 );
8739
8807
 
8740
8808
        if ( bRemote )
8741
8809
        {
8793
8861
        ///////////////////
8794
8862
 
8795
8863
        bool bOk = true;
8796
 
        int iAbsentHead = -1;
 
8864
        int iAbsentHead = EOF_ITEM;
8797
8865
        if ( g_iDistThreads<=1 || dQueries.GetLength()<2 )
8798
8866
        {
8799
8867
                // boring single threaded loop
8811
8879
                // get file sizes
8812
8880
                ARRAY_FOREACH ( i, dQueries )
8813
8881
                {
8814
 
                        dQueries[i].m_iNext = -2;
 
8882
                        dQueries[i].m_iNext = PROCESSED_ITEM;
8815
8883
                        if ( dQueries[i].m_iLoadFiles )
8816
8884
                        {
8817
8885
                                struct stat st;
8823
8891
                                                pServed->Unlock();
8824
8892
                                                return false;
8825
8893
                                        }
8826
 
                                        dQueries[i].m_iNext = -1;
 
8894
                                        dQueries[i].m_iNext = EOF_ITEM;
8827
8895
                                }
8828
8896
                                dQueries[i].m_iSize = -st.st_size; // so that sort would put bigger ones first
8829
8897
                        } else
8834
8902
                }
8835
8903
 
8836
8904
                // tough jobs first
8837
 
                dQueries.Sort ( bind ( &ExcerptQuery_t::m_iSize ) );
 
8905
                if ( !bScattered )
 
8906
                        dQueries.Sort ( bind ( &ExcerptQuery_t::m_iSize ) );
8838
8907
 
8839
8908
                ARRAY_FOREACH ( i, dQueries )
8840
 
                        if ( dQueries[i].m_iNext==-1 )
 
8909
                        if ( dQueries[i].m_iNext==EOF_ITEM )
8841
8910
                        {
8842
8911
                                dQueries[i].m_iNext = iAbsentHead;
8843
8912
                                iAbsentHead = i;
8844
 
                                dQueries[i].m_sError.SetSprintf ( "failed to stat %s: %s", dQueries[i].m_sSource.cstr(), strerror(errno) );
 
8913
                                if ( !bSkipAbsentFiles )
 
8914
                                        dQueries[i].m_sError.SetSprintf ( "failed to stat %s: %s", dQueries[i].m_sSource.cstr(), strerror(errno) );
8845
8915
                        }
8846
8916
 
8847
8917
 
8848
8918
 
8849
8919
                // check if all files are available locally.
8850
 
                if ( bScattered && iAbsentHead==-1 )
 
8920
                if ( bScattered && iAbsentHead==EOF_ITEM )
8851
8921
                {
8852
8922
                        bRemote = false;
8853
8923
                        dRemoteSnippets.m_dAgents.Reset();
8867
8937
 
8868
8938
                        if ( bScattered )
8869
8939
                        {
8870
 
                                // on scattered case - the queries with m_iNext==-2 are here, and has to be scheduled to local agent
 
8940
                                // on scattered case - the queries with m_iNext==PROCESSED_ITEM are here, and has to be scheduled to local agent
8871
8941
                                // the rest has to be sent to remotes, all of them!
8872
8942
                                for ( int i=0; i<iRemoteAgents; i++ )
8873
8943
                                        dRemoteSnippets.m_dWorkers[iLocalPart+i].m_iHead = iAbsentHead;
8878
8948
                                        dRemoteSnippets.m_dWorkers[0].m_iTotal -= dQueries[i].m_iSize;
8879
8949
                                        if ( !dRemoteSnippets.m_dWorkers[0].m_bLocal )
8880
8950
                                        {
8881
 
                                                // queries sheduled for local still have iNext==-2
 
8951
                                                // queries sheduled for local still have iNext==PROCESSED_ITEM
8882
8952
                                                dQueries[i].m_iNext = dRemoteSnippets.m_dWorkers[0].m_iHead;
8883
8953
                                                dRemoteSnippets.m_dWorkers[0].m_iHead = i;
8884
8954
                                        }
8941
9011
                        {
8942
9012
                                // inverse the success/failed state - so that the queries with negative m_iNext are treated as failed
8943
9013
                                ARRAY_FOREACH ( i, dQueries )
8944
 
                                        dQueries[i].m_iNext = (dQueries[i].m_iNext==-2)?0:-2;
 
9014
                                        dQueries[i].m_iNext = (dQueries[i].m_iNext==PROCESSED_ITEM)?0:PROCESSED_ITEM;
8945
9015
 
8946
9016
                                // failsafe - one more turn for failed queries on local agent
8947
9017
                                SnippetThread_t & t = dThreads[0];
9033
9103
        q.m_bEmitZones = ( iFlags & EXCERPT_FLAG_EMIT_ZONES )!=0;
9034
9104
 
9035
9105
        int iCount = tReq.GetInt ();
9036
 
        if ( iCount<0 || iCount>EXCERPT_MAX_ENTRIES )
 
9106
        if ( iCount<=0 || iCount>EXCERPT_MAX_ENTRIES )
9037
9107
        {
9038
9108
                tReq.SendErrorReply ( "invalid entries count %d", iCount );
9039
9109
                return;
9238
9308
        int & iSuccesses, int & iUpdated,
9239
9309
        SearchFailuresLog_c & dFails, const ServedIndex_t * pServed )
9240
9310
{
9241
 
        if ( !pServed || !pServed->m_pIndex )
 
9311
        if ( !pServed || !pServed->m_pIndex || !pServed->m_bEnabled )
9242
9312
        {
9243
9313
                dFails.Submit ( sIndex, "index not available" );
9244
9314
                return;
9258
9328
        }
9259
9329
}
9260
9330
 
 
9331
static const ServedIndex_t * UpdateGetLockedIndex ( const CSphString & sName, bool bMvaUpdate )
 
9332
{
 
9333
        const ServedIndex_t * pLocked = g_pIndexes->GetRlockedEntry ( sName );
 
9334
        if ( !pLocked )
 
9335
                return NULL;
 
9336
 
 
9337
        if ( !( bMvaUpdate && pLocked->m_bRT ) )
 
9338
                return pLocked;
 
9339
 
 
9340
        pLocked->Unlock();
 
9341
        return g_pIndexes->GetWlockedEntry ( sName );
 
9342
}
 
9343
 
9261
9344
 
9262
9345
void HandleCommandUpdate ( int iSock, int iVer, InputBuffer_c & tReq )
9263
9346
{
9269
9352
        CSphAttrUpdate tUpd;
9270
9353
        CSphVector<DWORD> dMva;
9271
9354
 
 
9355
        bool bMvaUpdate = false;
 
9356
 
9272
9357
        tUpd.m_dAttrs.Resize ( tReq.GetDword() ); // FIXME! check this
9273
9358
        ARRAY_FOREACH ( i, tUpd.m_dAttrs )
9274
9359
        {
9277
9362
 
9278
9363
                tUpd.m_dAttrs[i].m_eAttrType = SPH_ATTR_INTEGER;
9279
9364
                if ( iVer>=0x102 )
 
9365
                {
9280
9366
                        if ( tReq.GetDword() )
 
9367
                        {
9281
9368
                                tUpd.m_dAttrs[i].m_eAttrType = SPH_ATTR_UINT32SET;
 
9369
                                bMvaUpdate = true;
 
9370
                        }
 
9371
                }
9282
9372
        }
9283
9373
 
9284
9374
        int iNumUpdates = tReq.GetInt (); // FIXME! check this
9331
9421
        }
9332
9422
 
9333
9423
        // check index names
9334
 
        CSphVector<CSphNamedInt> dIndexNames;
 
9424
        CSphVector<CSphString> dIndexNames;
9335
9425
        ParseIndexList ( sIndexes, dIndexNames );
9336
9426
 
9337
9427
        if ( !dIndexNames.GetLength() )
9343
9433
        CSphVector<DistributedIndex_t> dDistributed ( dIndexNames.GetLength() ); // lock safe storage for distributed indexes
9344
9434
        ARRAY_FOREACH ( i, dIndexNames )
9345
9435
        {
9346
 
                if ( !g_pIndexes->Exists ( dIndexNames[i].m_sName ) )
 
9436
                if ( !g_pIndexes->Exists ( dIndexNames[i] ) )
9347
9437
                {
9348
9438
                        // search amongst distributed and copy for further processing
9349
9439
                        g_tDistLock.Lock();
9350
 
                        const DistributedIndex_t * pDistIndex = g_hDistIndexes ( dIndexNames[i].m_sName );
 
9440
                        const DistributedIndex_t * pDistIndex = g_hDistIndexes ( dIndexNames[i] );
9351
9441
 
9352
9442
                        if ( pDistIndex )
9353
9443
                        {
9354
9444
                                dDistributed[i] = *pDistIndex;
9355
 
                                dDistributed[i].m_bToDelete = true; // our presence flag
9356
9445
                        }
9357
9446
 
9358
9447
                        g_tDistLock.Unlock();
9361
9450
                                continue;
9362
9451
                        else
9363
9452
                        {
9364
 
                                tReq.SendErrorReply ( "unknown index '%s' in update request", dIndexNames[i].m_sName.cstr() );
 
9453
                                tReq.SendErrorReply ( "unknown index '%s' in update request", dIndexNames[i].cstr() );
9365
9454
                                return;
9366
9455
                        }
9367
9456
                }
9374
9463
 
9375
9464
        ARRAY_FOREACH ( iIdx, dIndexNames )
9376
9465
        {
9377
 
                const char * sReqIndex = dIndexNames[iIdx].m_sName.cstr();
9378
 
                const ServedIndex_t * pLocked = g_pIndexes->GetWlockedEntry ( sReqIndex );
 
9466
                const char * sReqIndex = dIndexNames[iIdx].cstr();
 
9467
                const ServedIndex_t * pLocked = UpdateGetLockedIndex ( sReqIndex, bMvaUpdate );
9379
9468
                if ( pLocked )
9380
9469
                {
9381
9470
                        DoCommandUpdate ( sReqIndex, tUpd, iSuccesses, iUpdated, dFails, pLocked );
9382
9471
                        pLocked->Unlock();
9383
9472
                } else
9384
9473
                {
9385
 
                        assert ( dDistributed[iIdx].m_bToDelete );
 
9474
                        assert ( dDistributed[iIdx].m_dLocal.GetLength() || dDistributed[iIdx].m_dAgents.GetLength() );
9386
9475
                        CSphVector<CSphString>& dLocal = dDistributed[iIdx].m_dLocal;
9387
9476
 
9388
9477
                        ARRAY_FOREACH ( i, dLocal )
9389
9478
                        {
9390
9479
                                const char * sLocal = dLocal[i].cstr();
9391
 
                                const ServedIndex_t * pServed = g_pIndexes->GetWlockedEntry ( sLocal );
 
9480
                                const ServedIndex_t * pServed = UpdateGetLockedIndex ( sLocal, bMvaUpdate );
9392
9481
                                DoCommandUpdate ( sLocal, tUpd, iSuccesses, iUpdated, dFails, pServed );
9393
9482
                                if ( pServed )
9394
9483
                                        pServed->Unlock();
9396
9485
                }
9397
9486
 
9398
9487
                // update remote agents
9399
 
                if ( dDistributed[iIdx].m_bToDelete )
 
9488
                if ( dDistributed[iIdx].m_dAgents.GetLength() )
9400
9489
                {
9401
9490
                        DistributedIndex_t & tDist = dDistributed[iIdx];
9402
9491
 
9568
9657
        dStatus.Add().SetSprintf ( "%d", tMeta.m_iMatches );
9569
9658
 
9570
9659
        dStatus.Add ( "total_found" );
9571
 
        dStatus.Add().SetSprintf ( "%d", tMeta.m_iTotalMatches );
 
9660
        dStatus.Add().SetSprintf ( INT64_FMT, tMeta.m_iTotalMatches );
9572
9661
 
9573
9662
        dStatus.Add ( "time" );
9574
9663
        dStatus.Add().SetSprintf ( "%d.%03d", tMeta.m_iQueryTime/1000, tMeta.m_iQueryTime%1000 );
9583
9672
                dStatus.Add ( tMeta.m_hWordStats.IterateGetKey() );
9584
9673
 
9585
9674
                dStatus.Add().SetSprintf ( "docs[%d]", iWord );
9586
 
                dStatus.Add().SetSprintf ( "%d", tStat.m_iDocs );
 
9675
                dStatus.Add().SetSprintf ( INT64_FMT, tStat.m_iDocs );
9587
9676
 
9588
9677
                dStatus.Add().SetSprintf ( "hits[%d]", iWord );
9589
 
                dStatus.Add().SetSprintf ( "%d", tStat.m_iHits );
 
9678
                dStatus.Add().SetSprintf ( INT64_FMT, tStat.m_iHits );
9590
9679
 
9591
9680
                iWord++;
9592
9681
        }
9769
9858
                        || iLength<0 || iLength>g_iMaxPacketSize )
9770
9859
                {
9771
9860
                        // unknown command, default response header
9772
 
                        tBuf.SendErrorReply ( "unknown command (code=%d)", iCommand );
 
9861
                        tBuf.SendErrorReply ( "invalid command (code=%d, len=%d)", iCommand, iLength );
9773
9862
 
9774
9863
                        // if request length is insane, low level comm is broken, so we bail out
9775
9864
                        if ( iLength<0 || iLength>g_iMaxPacketSize )
9776
 
                        {
9777
9865
                                sphWarning ( "ill-formed client request (length=%d out of bounds)", iLength );
9778
 
                                return;
9779
 
                        }
 
9866
 
 
9867
                        // if command is insane, low level comm is broken, so we bail out
 
9868
                        if ( iCommand<0 || iCommand>=SEARCHD_COMMAND_TOTAL )
 
9869
                                sphWarning ( "ill-formed client request (command=%d, SEARCHD_COMMAND_TOTAL=%d)", iCommand, SEARCHD_COMMAND_TOTAL );
 
9870
 
 
9871
                        return;
9780
9872
                }
9781
9873
 
9782
9874
                // count commands
9791
9883
                assert ( iLength>=0 && iLength<=g_iMaxPacketSize );
9792
9884
                if ( iLength && !tBuf.ReadFrom ( iLength ) )
9793
9885
                {
9794
 
                        sphWarning ( "failed to receive client request body (client=%s, exp=%d)", sClientIP, iLength );
 
9886
                        sphWarning ( "failed to receive client request body (client=%s, exp=%d, error='%s')", sClientIP, iLength, sphSockError() );
9795
9887
                        return;
9796
9888
                }
9797
9889
 
10014
10106
        tOut.SendBytes ( sVarLen, iLen );       // packed affected rows & insert_id
10015
10107
        if ( iWarns<0 ) iWarns = 0;
10016
10108
        if ( iWarns>65535 ) iWarns = 65535;
10017
 
        tOut.SendLSBDword ( iWarns );           // N warnings, 0 status
 
10109
        DWORD uWarnStatus = iWarns<<16;
 
10110
        tOut.SendLSBDword ( uWarnStatus );              // N warnings, 0 status
10018
10111
        if ( iMsgLen > 0 )
10019
10112
                tOut.SendBytes ( sMessage, iMsgLen );
10020
10113
}
10219
10312
                                        sError.SetSprintf ( "raw %d, column %d: MVA value specified for a non-MVA column", 1+c, 1+iQuerySchemaIdx ); // 1 for human base
10220
10313
                                        break;
10221
10314
                                }
 
10315
                                if ( ( tCol.m_eAttrType==SPH_ATTR_UINT32SET || tCol.m_eAttrType==SPH_ATTR_UINT64SET ) && tVal.m_iType!=TOK_CONST_MVA )
 
10316
                                {
 
10317
                                        sError.SetSprintf ( "raw %d, column %d: non-MVA value specified for a MVA column", 1+c, 1+iQuerySchemaIdx ); // 1 for human base
 
10318
                                        break;
 
10319
                                }
10222
10320
 
10223
10321
                                if ( tCol.m_eAttrType==SPH_ATTR_UINT32SET || tCol.m_eAttrType==SPH_ATTR_UINT64SET )
10224
10322
                                {
10225
10323
                                        // collect data from scattered insvals
10226
10324
                                        // FIXME! maybe remove this mess, and just have a single m_dMvas pool in parser instead?
10227
 
                                        tVal.m_pVals->Uniq();
10228
 
                                        int iLen = tVal.m_pVals->GetLength();
 
10325
                                        int iLen = 0;
 
10326
                                        if ( tVal.m_pVals.Ptr() )
 
10327
                                        {
 
10328
                                                tVal.m_pVals->Uniq();
 
10329
                                                iLen = tVal.m_pVals->GetLength();
 
10330
                                        }
10229
10331
                                        if ( tCol.m_eAttrType==SPH_ATTR_UINT64SET )
10230
10332
                                        {
10231
10333
                                                dMvas.Add ( iLen*2 );
10817
10919
        CSphString sError;
10818
10920
 
10819
10921
        // check index names
10820
 
        CSphVector<CSphNamedInt> dIndexNames;
 
10922
        CSphVector<CSphString> dIndexNames;
10821
10923
        ParseIndexList ( tStmt.m_sIndex, dIndexNames );
10822
10924
 
10823
10925
        if ( !dIndexNames.GetLength() )
10830
10932
        CSphVector<DistributedIndex_t> dDistributed ( dIndexNames.GetLength() ); // lock safe storage for distributed indexes
10831
10933
        ARRAY_FOREACH ( i, dIndexNames )
10832
10934
        {
10833
 
                if ( !g_pIndexes->Exists ( dIndexNames[i].m_sName ) )
 
10935
                if ( !g_pIndexes->Exists ( dIndexNames[i] ) )
10834
10936
                {
10835
10937
                        // search amongst distributed and copy for further processing
10836
10938
                        g_tDistLock.Lock();
10837
 
                        const DistributedIndex_t * pDistIndex = g_hDistIndexes ( dIndexNames[i].m_sName );
 
10939
                        const DistributedIndex_t * pDistIndex = g_hDistIndexes ( dIndexNames[i] );
10838
10940
 
10839
10941
                        if ( pDistIndex )
10840
10942
                        {
10841
10943
                                dDistributed[i] = *pDistIndex;
10842
 
                                dDistributed[i].m_bToDelete = true; // our presence flag
10843
10944
                        }
10844
10945
 
10845
10946
                        g_tDistLock.Unlock();
10848
10949
                                continue;
10849
10950
                        else
10850
10951
                        {
10851
 
                                sError.SetSprintf ( "unknown index '%s' in update request", dIndexNames[i].m_sName.cstr() );
 
10952
                                sError.SetSprintf ( "unknown index '%s' in update request", dIndexNames[i].cstr() );
10852
10953
                                SendMysqlErrorPacket ( tOut, uPacketID, tStmt.m_sStmt, sError.cstr() );
10853
10954
                                return;
10854
10955
                        }
10861
10962
        int iUpdated = 0;
10862
10963
        int iWarns = 0;
10863
10964
 
 
10965
        bool bMvaUpdate = false;
 
10966
        ARRAY_FOREACH_COND ( i, tStmt.m_tUpdate.m_dAttrs, !bMvaUpdate )
 
10967
        {
 
10968
                bMvaUpdate = ( tStmt.m_tUpdate.m_dAttrs[i].m_eAttrType==SPH_ATTR_UINT32SET
 
10969
                        || tStmt.m_tUpdate.m_dAttrs[i].m_eAttrType==SPH_ATTR_UINT64SET );
 
10970
        }
 
10971
 
10864
10972
        ARRAY_FOREACH ( iIdx, dIndexNames )
10865
10973
        {
10866
 
                const char * sReqIndex = dIndexNames[iIdx].m_sName.cstr();
10867
 
                const ServedIndex_t * pLocked = g_pIndexes->GetWlockedEntry ( sReqIndex );
 
10974
                const char * sReqIndex = dIndexNames[iIdx].cstr();
 
10975
                const ServedIndex_t * pLocked = UpdateGetLockedIndex ( sReqIndex, bMvaUpdate );
10868
10976
                if ( pLocked )
10869
10977
                {
10870
10978
                        DoExtendedUpdate ( sReqIndex, tStmt, iSuccesses, iUpdated, bCommit, dFails, pLocked );
10871
10979
                } else
10872
10980
                {
10873
 
                        assert ( dDistributed[iIdx].m_bToDelete );
 
10981
                        assert ( dDistributed[iIdx].m_dLocal.GetLength() || dDistributed[iIdx].m_dAgents.GetLength() );
10874
10982
                        CSphVector<CSphString>& dLocal = dDistributed[iIdx].m_dLocal;
10875
10983
 
10876
10984
                        ARRAY_FOREACH ( i, dLocal )
10877
10985
                        {
10878
10986
                                const char * sLocal = dLocal[i].cstr();
10879
 
                                const ServedIndex_t * pServed = g_pIndexes->GetWlockedEntry ( sLocal );
 
10987
                                const ServedIndex_t * pServed = UpdateGetLockedIndex ( sLocal, bMvaUpdate );
10880
10988
                                DoExtendedUpdate ( sLocal, tStmt, iSuccesses, iUpdated, bCommit, dFails, pServed );
10881
10989
                        }
10882
10990
                }
10883
10991
 
10884
10992
                // update remote agents
10885
 
                if ( dDistributed[iIdx].m_bToDelete )
 
10993
                if ( dDistributed[iIdx].m_dAgents.GetLength() )
10886
10994
                {
10887
10995
                        DistributedIndex_t & tDist = dDistributed[iIdx];
10888
10996
 
11059
11167
 
11060
11168
        // empty result sets just might carry the full uberschema
11061
11169
        // bummer! lets protect ourselves against that
11062
 
        int iAttrs = 1;
 
11170
        int iSchemaAttrsCount = 0;
 
11171
        int iAttrsCount = 1;
11063
11172
        if ( tRes.m_dMatches.GetLength() )
11064
11173
        {
11065
 
                iAttrs = tRes.m_tSchema.GetAttrsCount();
 
11174
                iSchemaAttrsCount = SendGetAttrCount ( tRes.m_tSchema );
 
11175
                iAttrsCount = iSchemaAttrsCount;
11066
11176
                if ( g_bCompatResults )
11067
 
                        iAttrs += 2;
 
11177
                        iAttrsCount += 2;
11068
11178
        }
11069
 
        if ( iAttrs>=251 )
 
11179
        if ( iAttrsCount>=251 )
11070
11180
        {
11071
11181
                // this will show up as success in query log, as the query itself was ok
11072
11182
                // but we need some kind of a notice anyway, to nail down issues based on logs only
11077
11187
 
11078
11188
        // result set header packet
11079
11189
        tOut.SendLSBDword ( ((uPacketID++)<<24) + 2 );
11080
 
        tOut.SendByte ( BYTE(iAttrs) );
 
11190
        tOut.SendByte ( BYTE(iAttrsCount) );
11081
11191
        tOut.SendByte ( 0 ); // extra
11082
11192
 
11083
11193
        // field packets
11094
11204
                        SendMysqlFieldPacket ( tOut, uPacketID++, "weight", MYSQL_COL_LONG );
11095
11205
                }
11096
11206
 
11097
 
                for ( int i=0; i<tRes.m_tSchema.GetAttrsCount(); i++ )
 
11207
                for ( int i=0; i<iSchemaAttrsCount; i++ )
11098
11208
                {
11099
11209
                        const CSphColumnInfo & tCol = tRes.m_tSchema.GetAttr(i);
11100
11210
                        MysqlColumnType_e eType = MYSQL_COL_STRING;
11129
11239
                }
11130
11240
 
11131
11241
                const CSphSchema & tSchema = tRes.m_tSchema;
11132
 
                for ( int i=0; i<tSchema.GetAttrsCount(); i++ )
 
11242
                for ( int i=0; i<iSchemaAttrsCount; i++ )
11133
11243
                {
11134
11244
                        CSphAttrLocator tLoc = tSchema.GetAttr(i).m_tLocator;
11135
11245
                        ESphAttr eAttrType = tSchema.GetAttr(i).m_eAttrType;
11515
11625
                        return;
11516
11626
                }
11517
11627
 
 
11628
                // INT_SET type must be sorted
 
11629
                tStmt.m_dSetValues.Sort();
 
11630
 
 
11631
                // create or update the variable
11518
11632
                g_tUservarsMutex.Lock();
11519
11633
                Uservar_t * pVar = g_hUservars ( tStmt.m_sSetName );
11520
11634
                if ( pVar )
11521
11635
                {
 
11636
                        // variable exists, release previous value
 
11637
                        // actual destruction of the value (aka data) might happen later
 
11638
                        // as the concurrent queries might still be using and holding that data
 
11639
                        // from here, the old value becomes nameless, though
11522
11640
                        assert ( pVar->m_eType==USERVAR_INT_SET );
11523
 
                        pVar->m_pVal->SwapData ( tStmt.m_dSetValues );
 
11641
                        assert ( pVar->m_pVal );
 
11642
                        pVar->m_pVal->Release();
 
11643
                        pVar->m_pVal = NULL;
11524
11644
                } else
11525
11645
                {
 
11646
                        // create a shiny new variable
11526
11647
                        Uservar_t tVar;
11527
 
                        tVar.m_eType = USERVAR_INT_SET;
11528
 
                        tVar.m_pVal = new CSphVector<SphAttr_t>;
11529
 
                        tVar.m_pVal->SwapData ( tStmt.m_dSetValues );
11530
 
                        tVar.m_pVal->Sort();
11531
 
                        g_hUservars.Add ( tVar, tStmt.m_sSetName ); // FIXME? free those on shutdown?
 
11648
                        g_hUservars.Add ( tVar, tStmt.m_sSetName );
 
11649
                        pVar = g_hUservars ( tStmt.m_sSetName );
11532
11650
                }
 
11651
 
 
11652
                // swap in the new value
 
11653
                assert ( pVar );
 
11654
                assert ( !pVar->m_pVal );
 
11655
                pVar->m_eType = USERVAR_INT_SET;
 
11656
                pVar->m_pVal = new UservarIntSet_c();
 
11657
                pVar->m_pVal->SwapData ( tStmt.m_dSetValues );
11533
11658
                g_tUservarsMutex.Unlock();
11534
11659
                break;
11535
11660
        }
11643
11768
 
11644
11769
        if ( !pIndex || !pIndex->m_bEnabled || !pIndex->m_bRT )
11645
11770
        {
11646
 
                pIndex->Unlock();
 
11771
                if ( pIndex )
 
11772
                        pIndex->Unlock();
11647
11773
                SendMysqlErrorPacket ( tOut, uPacketID, tStmt.m_sStmt, "FLUSH RTINDEX requires an existing RT index" );
11648
11774
                return;
11649
11775
        }
11957
12083
 
11958
12084
        if ( sphSockSend ( iSock, g_sMysqlHandshake, g_iMysqlHandshake )!=g_iMysqlHandshake )
11959
12085
        {
11960
 
                sphWarning ( "failed to send server version (client=%s)", sClientIP );
 
12086
                int iErrno = sphSockGetErrno ();
 
12087
                sphWarning ( "failed to send server version (client=%s, error: %d '%s')", sClientIP, iErrno, sphSockError ( iErrno ) );
11961
12088
                return;
11962
12089
        }
11963
12090
 
12043
12170
                } else if ( uMysqlCmd==MYSQL_COM_SET_OPTION )
12044
12171
                {
12045
12172
                        // bMulti = ( tIn.GetWord()==MYSQL_OPTION_MULTI_STATEMENTS_ON ); // that's how we could double check and validate multi query
12046
 
                        SendMysqlOkPacket ( tOut, uPacketID );
 
12173
                        // server reporting success in response to COM_SET_OPTION and COM_DEBUG
 
12174
                        SendMysqlEofPacket ( tOut, uPacketID, 0 );
12047
12175
                        continue;
12048
12176
 
12049
12177
                } else if ( uMysqlCmd!=MYSQL_COM_QUERY )
14527
14655
}
14528
14656
 
14529
14657
 
 
14658
void FailClient ( int iSock, SearchdStatus_e eStatus, const char * sMessage )
 
14659
{
 
14660
        assert ( eStatus==SEARCHD_RETRY || eStatus==SEARCHD_ERROR );
 
14661
 
 
14662
        int iRespLen = 4 + strlen(sMessage);
 
14663
 
 
14664
        NetOutputBuffer_c tOut ( iSock );
 
14665
        tOut.SendInt ( SPHINX_SEARCHD_PROTO );
 
14666
        tOut.SendWord ( (WORD)eStatus );
 
14667
        tOut.SendWord ( 0 ); // version doesn't matter
 
14668
        tOut.SendInt ( iRespLen );
 
14669
        tOut.SendString ( sMessage );
 
14670
        tOut.Flush ();
 
14671
 
 
14672
        // FIXME? without some wait, client fails to receive the response on windows
 
14673
        sphSockClose ( iSock );
 
14674
}
 
14675
 
 
14676
 
14530
14677
Listener_t * DoAccept ( int * pClientSock, char * sClientName )
14531
14678
{
14532
14679
        int iMaxFD = 0;
14620
14767
 
14621
14768
                // accepted!
14622
14769
#if !USE_WINDOWS
 
14770
                // FIXME!!! either get git of select() or allocate list of FD (with dup2 back instead close for thouse FD)
 
14771
                // with threads workers to prevent dup2 closes valid FD
 
14772
 
14623
14773
                if ( SPH_FDSET_OVERFLOW ( iClientSock ) )
14624
 
                        iClientSock = dup2 ( iClientSock, g_iClientFD );
 
14774
                {
 
14775
                        if ( ( g_eWorkers==MPM_FORK || g_eWorkers==MPM_PREFORK ) )
 
14776
                        {
 
14777
                                iClientSock = dup2 ( iClientSock, g_iClientFD );
 
14778
                        } else
 
14779
                        {
 
14780
                                FailClient ( iClientSock, SEARCHD_RETRY, "server maxed out, retry in a second" );
 
14781
                                sphWarning ( "maxed out, dismissing client (socket=%d)", iClientSock );
 
14782
                                sphSockClose ( iClientSock );
 
14783
                                return NULL;
 
14784
                        }
 
14785
                }
14625
14786
#endif
14626
14787
 
14627
14788
                *pClientSock = iClientSock;
14666
14827
}
14667
14828
 
14668
14829
 
14669
 
void FailClient ( int iSock, SearchdStatus_e eStatus, const char * sMessage )
14670
 
{
14671
 
        assert ( eStatus==SEARCHD_RETRY || eStatus==SEARCHD_ERROR );
14672
 
 
14673
 
        int iRespLen = 4 + strlen(sMessage);
14674
 
 
14675
 
        NetOutputBuffer_c tOut ( iSock );
14676
 
        tOut.SendInt ( SPHINX_SEARCHD_PROTO );
14677
 
        tOut.SendWord ( (WORD)eStatus );
14678
 
        tOut.SendWord ( 0 ); // version doesn't matter
14679
 
        tOut.SendInt ( iRespLen );
14680
 
        tOut.SendString ( sMessage );
14681
 
        tOut.Flush ();
14682
 
 
14683
 
        // FIXME? without some wait, client fails to receive the response on windows
14684
 
        sphSockClose ( iSock );
14685
 
}
14686
 
 
14687
 
 
14688
14830
void HandlerThread ( void * pArg )
14689
14831
{
14690
14832
        // setup query guard for threaded mode
14753
14895
        CheckFlush ();
14754
14896
        CheckChildrenTerm();
14755
14897
 
14756
 
        sphInfo ( NULL ); // flush dupes
14757
 
 
14758
14898
        if ( pAcceptMutex )
14759
14899
        {
14760
14900
                // FIXME! what if all children are busy; we might want to accept here and temp fork more
14768
14908
        if ( !pListener )
14769
14909
                return;
14770
14910
 
14771
 
        if ( ( g_iMaxChildren && g_dChildren.GetLength()>=g_iMaxChildren )
 
14911
        if ( ( g_iMaxChildren && ( g_dChildren.GetLength()>=g_iMaxChildren || g_dThd.GetLength()>=g_iMaxChildren ) )
14772
14912
                || ( g_iRotateCount && !g_bSeamlessRotate ) )
14773
14913
        {
14774
14914
                FailClient ( iClientSock, SEARCHD_RETRY, "server maxed out, retry in a second" );
15425
15565
                if ( !g_bOptNoLock )
15426
15566
                        OpenDaemonLog ( hConf["searchd"]["searchd"] );
15427
15567
                bVisualLoad = SetWatchDog ( iDevNull );
 
15568
                close ( g_iLogFile ); // just the 'IT Happens' magic - switch off, then on.
 
15569
                OpenDaemonLog ( hConf["searchd"]["searchd"] );
15428
15570
        }
15429
15571
#endif
15430
15572
 
15855
15997
}
15856
15998
 
15857
15999
 
15858
 
extern bool ( *g_pUservarsHook )( const CSphString & sUservar, CSphVector<SphAttr_t> & dVals );
 
16000
extern UservarIntSet_c * ( *g_pUservarsHook )( const CSphString & sUservar );
15859
16001
 
15860
 
bool UservarsHook ( const CSphString & sUservar, CSphVector<SphAttr_t> & dVals )
 
16002
UservarIntSet_c * UservarsHook ( const CSphString & sUservar )
15861
16003
{
15862
 
        g_tUservarsMutex.Lock();
 
16004
        CSphScopedLock<CSphStaticMutex> tLock ( g_tUservarsMutex );
 
16005
 
15863
16006
        Uservar_t * pVar = g_hUservars ( sUservar );
15864
 
        if ( pVar )
15865
 
        {
15866
 
                assert ( pVar->m_eType==USERVAR_INT_SET );
15867
 
                dVals = *pVar->m_pVal;
15868
 
        }
15869
 
        g_tUservarsMutex.Unlock();
15870
 
        return ( pVar!=NULL );
 
16007
        if ( !pVar )
 
16008
                return NULL;
 
16009
 
 
16010
        assert ( pVar->m_eType==USERVAR_INT_SET );
 
16011
        pVar->m_pVal->AddRef();
 
16012
        return pVar->m_pVal;
15871
16013
}
15872
16014
 
15873
16015
 
15919
16061
}
15920
16062
 
15921
16063
//
15922
 
// $Id: searchd.cpp 3017 2011-11-15 22:22:49Z shodan $
 
16064
// $Id: searchd.cpp 3127 2012-03-01 00:26:39Z shodan $
15923
16065
//