~skinny.moey/drizzle/innodb-replication

« back to all changes in this revision

Viewing changes to drizzled/session.cc

  • Committer: Brian Aker
  • Date: 2010-11-08 22:35:57 UTC
  • mfrom: (1802.1.114 trunk)
  • Revision ID: brian@tangent.org-20101108223557-w3xzwp9hjjtjhtc1
MergeĀ inĀ trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 */
23
23
 
24
24
#include "config.h"
25
 
#include <drizzled/session.h>
 
25
#include "drizzled/session.h"
26
26
#include "drizzled/session_list.h"
27
27
#include <sys/stat.h>
28
 
#include <drizzled/error.h>
29
 
#include <drizzled/gettext.h>
30
 
#include <drizzled/query_id.h>
31
 
#include <drizzled/data_home.h>
32
 
#include <drizzled/sql_base.h>
33
 
#include <drizzled/lock.h>
34
 
#include <drizzled/item/cache.h>
35
 
#include <drizzled/item/float.h>
36
 
#include <drizzled/item/return_int.h>
37
 
#include <drizzled/item/empty_string.h>
38
 
#include <drizzled/show.h>
39
 
#include <drizzled/plugin/client.h>
 
28
#include "drizzled/error.h"
 
29
#include "drizzled/gettext.h"
 
30
#include "drizzled/query_id.h"
 
31
#include "drizzled/data_home.h"
 
32
#include "drizzled/sql_base.h"
 
33
#include "drizzled/lock.h"
 
34
#include "drizzled/item/cache.h"
 
35
#include "drizzled/item/float.h"
 
36
#include "drizzled/item/return_int.h"
 
37
#include "drizzled/item/empty_string.h"
 
38
#include "drizzled/show.h"
 
39
#include "drizzled/plugin/client.h"
40
40
#include "drizzled/plugin/scheduler.h"
41
41
#include "drizzled/plugin/authentication.h"
42
42
#include "drizzled/plugin/logging.h"
48
48
#include "drizzled/transaction_services.h"
49
49
#include "drizzled/drizzled.h"
50
50
 
51
 
#include "drizzled/table_share_instance.h"
 
51
#include "drizzled/table/instance.h"
52
52
 
53
53
#include "plugin/myisam/myisam.h"
54
54
#include "drizzled/internal/iocache.h"
55
55
#include "drizzled/internal/thread_var.h"
56
56
#include "drizzled/plugin/event_observer.h"
57
57
 
 
58
#include "drizzled/util/functors.h"
 
59
 
58
60
#include <fcntl.h>
59
61
#include <algorithm>
60
62
#include <climits>
61
 
#include "boost/filesystem.hpp" 
 
63
#include <boost/filesystem.hpp>
62
64
 
63
65
using namespace std;
64
66
 
155
157
Session::Session(plugin::Client *client_arg) :
156
158
  Open_tables_state(refresh_version),
157
159
  mem_root(&main_mem_root),
 
160
  xa_id(0),
158
161
  lex(&main_lex),
159
 
  query(),
 
162
  catalog("LOCAL"),
160
163
  client(client_arg),
161
164
  scheduler(NULL),
162
165
  scheduler_arg(NULL),
167
170
  first_successful_insert_id_in_prev_stmt(0),
168
171
  first_successful_insert_id_in_cur_stmt(0),
169
172
  limit_found_rows(0),
170
 
  global_read_lock(0),
 
173
  _global_read_lock(NONE),
 
174
  _killed(NOT_KILLED),
171
175
  some_tables_deleted(false),
172
176
  no_errors(false),
173
177
  password(false),
194
198
  memory::init_sql_alloc(&main_mem_root, memory::ROOT_MIN_BLOCK_SIZE, 0);
195
199
  thread_stack= NULL;
196
200
  count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
197
 
  killed= NOT_KILLED;
198
201
  col_access= 0;
199
202
  tmp_table= 0;
200
203
  used_tables= 0;
303
306
    return;
304
307
 
305
308
  setAbort(true);
306
 
  boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
 
309
  boost_unique_lock_t scopedLock(mysys_var->mutex);
307
310
  if (mysys_var->current_cond)
308
311
  {
309
312
    mysys_var->current_mutex->lock();
329
332
{
330
333
  assert(cleanup_done == false);
331
334
 
332
 
  killed= KILL_CONNECTION;
 
335
  setKilled(KILL_CONNECTION);
333
336
#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
334
337
  if (transaction.xid_state.xa_state == XA_PREPARED)
335
338
  {
355
358
  close_temporary_tables();
356
359
 
357
360
  if (global_read_lock)
358
 
    unlock_global_read_lock(this);
 
361
  {
 
362
    unlockGlobalReadLock();
 
363
  }
359
364
 
360
365
  cleanup_done= true;
361
366
}
405
410
  LOCK_delete.unlock();
406
411
}
407
412
 
408
 
void Session::awake(Session::killed_state state_to_set)
 
413
void Session::awake(Session::killed_state_t state_to_set)
409
414
{
410
415
  this->checkSentry();
411
416
  safe_mutex_assert_owner(&LOCK_delete);
412
417
 
413
 
  killed= state_to_set;
 
418
  setKilled(state_to_set);
414
419
  if (state_to_set != Session::KILL_QUERY)
415
420
  {
416
421
    scheduler->killSession(this);
418
423
  }
419
424
  if (mysys_var)
420
425
  {
421
 
    boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
 
426
    boost_unique_lock_t scopedLock(mysys_var->mutex);
422
427
    /*
423
428
      "
424
429
      This broadcast could be up in the air if the victim thread
528
533
 
529
534
  prepareForQueries();
530
535
 
531
 
  while (! client->haveError() && killed != KILL_CONNECTION)
 
536
  while (not client->haveError() && getKilled() != KILL_CONNECTION)
532
537
  {
533
 
    if (! executeStatement())
 
538
    if (not executeStatement())
534
539
      break;
535
540
  }
536
541
 
552
557
  current_global_counters.connections++;
553
558
  thread_id= variables.pseudo_thread_id= global_thread_id++;
554
559
 
555
 
  LOCK_thread_count.lock();
556
 
  getSessionList().push_back(this);
557
 
  LOCK_thread_count.unlock();
 
560
  {
 
561
    boost::mutex::scoped_lock scoped(LOCK_thread_count);
 
562
    getSessionList().push_back(this);
 
563
  }
 
564
 
 
565
  if (unlikely(plugin::EventObserver::connectSession(*this)))
 
566
  {
 
567
    // We should do something about an error...
 
568
  }
 
569
 
 
570
  if (unlikely(plugin::EventObserver::connectSession(*this)))
 
571
  {
 
572
    // We should do something about an error...
 
573
  }
558
574
 
559
575
  if (scheduler->addSession(this))
560
576
  {
561
577
    DRIZZLE_CONNECTION_START(thread_id);
562
578
    char error_message_buff[DRIZZLE_ERRMSG_SIZE];
563
579
 
564
 
    killed= Session::KILL_CONNECTION;
 
580
    setKilled(Session::KILL_CONNECTION);
565
581
 
566
582
    status_var.aborted_connects++;
567
583
 
577
593
}
578
594
 
579
595
 
580
 
const char* Session::enter_cond(boost::condition_variable &cond, boost::mutex &mutex, const char* msg)
 
596
const char* Session::enter_cond(boost::condition_variable_any &cond, boost::mutex &mutex, const char* msg)
581
597
{
582
598
  const char* old_msg = get_proc_info();
583
599
  safe_mutex_assert_owner(mutex);
596
612
    does a Session::awake() on you).
597
613
  */
598
614
  mysys_var->current_mutex->unlock();
599
 
  boost::mutex::scoped_lock scopedLock(mysys_var->mutex);
 
615
  boost_unique_lock_t scopedLock(mysys_var->mutex);
600
616
  mysys_var->current_mutex = 0;
601
617
  mysys_var->current_cond = 0;
602
618
  this->set_proc_info(old_msg);
604
620
 
605
621
bool Session::authenticate()
606
622
{
607
 
  lex_start(this);
 
623
  lex->start(this);
608
624
  if (client->authenticate())
609
625
    return false;
610
626
 
613
629
  return true;
614
630
}
615
631
 
616
 
bool Session::checkUser(const char *passwd, uint32_t passwd_len, const char *in_db)
 
632
bool Session::checkUser(const std::string &passwd_str,
 
633
                        const std::string &in_db)
617
634
{
618
 
  const string passwd_str(passwd, passwd_len);
619
635
  bool is_authenticated=
620
636
    plugin::Authentication::isAuthenticated(getSecurityContext(),
621
637
                                            passwd_str);
628
644
  }
629
645
 
630
646
  /* Change database if necessary */
631
 
  if (in_db && in_db[0])
 
647
  if (not in_db.empty())
632
648
  {
633
649
    SchemaIdentifier identifier(in_db);
634
650
    if (mysql_change_db(this, identifier))
638
654
    }
639
655
  }
640
656
  my_ok();
641
 
  password= test(passwd_len);          // remember for error messages
 
657
  password= not passwd_str.empty();
642
658
 
643
659
  /* Ready to handle queries */
644
660
  return true;
662
678
  if (client->readCommand(&l_packet, &packet_length) == false)
663
679
    return false;
664
680
 
665
 
  if (killed == KILL_CONNECTION)
 
681
  if (getKilled() == KILL_CONNECTION)
666
682
    return false;
667
683
 
668
684
  if (packet_length == 0)
748
764
  }
749
765
 
750
766
  if (result == false)
 
767
  {
751
768
    my_error(killed_errno(), MYF(0));
 
769
  }
752
770
  else if ((result == true) && do_release)
753
 
    killed= Session::KILL_CONNECTION;
 
771
  {
 
772
    setKilled(Session::KILL_CONNECTION);
 
773
  }
754
774
 
755
775
  return result;
756
776
}
821
841
  where= Session::DEFAULT_WHERE;
822
842
 
823
843
  /* Reset the temporary shares we built */
824
 
  for (std::vector<TableShareInstance *>::iterator iter= temporary_shares.begin();
825
 
       iter != temporary_shares.end(); iter++)
826
 
  {
827
 
    delete *iter;
828
 
  }
 
844
  for_each(temporary_shares.begin(),
 
845
           temporary_shares.end(),
 
846
           DeletePtr());
829
847
  temporary_shares.clear();
830
848
}
831
849
 
911
929
  my_message(errcode, err, MYF(0));
912
930
  if (file > 0)
913
931
  {
914
 
    (void) end_io_cache(cache);
 
932
    (void) cache->end_io_cache();
915
933
    (void) internal::my_close(file, MYF(0));
916
 
    (void) internal::my_delete(path, MYF(0));           // Delete file on error
 
934
    (void) internal::my_delete(path.file_string().c_str(), MYF(0));             // Delete file on error
917
935
    file= -1;
918
936
  }
919
937
}
921
939
 
922
940
bool select_to_file::send_eof()
923
941
{
924
 
  int error= test(end_io_cache(cache));
 
942
  int error= test(cache->end_io_cache());
925
943
  if (internal::my_close(file, MYF(MY_WME)))
926
944
    error= 1;
927
945
  if (!error)
943
961
  /* In case of error send_eof() may be not called: close the file here. */
944
962
  if (file >= 0)
945
963
  {
946
 
    (void) end_io_cache(cache);
 
964
    (void) cache->end_io_cache();
947
965
    (void) internal::my_close(file, MYF(0));
948
966
    file= -1;
949
967
  }
950
 
  path[0]= '\0';
 
968
  path= "";
951
969
  row_count= 0;
952
970
}
953
971
 
957
975
    cache(static_cast<internal::IO_CACHE *>(memory::sql_calloc(sizeof(internal::IO_CACHE)))),
958
976
    row_count(0L)
959
977
{
960
 
  path[0]=0;
 
978
  path= "";
961
979
}
962
980
 
963
981
select_to_file::~select_to_file()
991
1009
*/
992
1010
 
993
1011
 
994
 
static int create_file(Session *session, char *path, file_exchange *exchange, internal::IO_CACHE *cache)
 
1012
static int create_file(Session *session,
 
1013
                       fs::path &target_path,
 
1014
                       file_exchange *exchange,
 
1015
                       internal::IO_CACHE *cache)
995
1016
{
 
1017
  fs::path to_file(exchange->file_name);
996
1018
  int file;
997
 
  uint32_t option= MY_UNPACK_FILENAME | MY_RELATIVE_PATH;
998
 
 
999
 
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
1000
 
  option|= MY_REPLACE_DIR;                      // Force use of db directory
1001
 
#endif
1002
 
 
1003
 
  if (!internal::dirname_length(exchange->file_name))
 
1019
 
 
1020
  if (not to_file.has_root_directory())
1004
1021
  {
1005
 
    strcpy(path, getDataHomeCatalog().c_str());
1006
 
    strncat(path, "/", 1);
1007
 
    if (! session->db.empty())
1008
 
      strncat(path, session->db.c_str(), FN_REFLEN-getDataHomeCatalog().size());
1009
 
    (void) internal::fn_format(path, exchange->file_name, path, "", option);
 
1022
    target_path= fs::system_complete(getDataHomeCatalog());
 
1023
    if (not session->db.empty())
 
1024
    {
 
1025
      int count_elements= 0;
 
1026
      for (fs::path::iterator iter= to_file.begin();
 
1027
           iter != to_file.end();
 
1028
           ++iter, ++count_elements)
 
1029
      { }
 
1030
 
 
1031
      if (count_elements == 1)
 
1032
      {
 
1033
        target_path /= session->db;
 
1034
      }
 
1035
    }
 
1036
    target_path /= to_file;
1010
1037
  }
1011
1038
  else
1012
 
    (void) internal::fn_format(path, exchange->file_name, getDataHomeCatalog().c_str(), "", option);
 
1039
  {
 
1040
    target_path = exchange->file_name;
 
1041
  }
1013
1042
 
1014
 
  if (opt_secure_file_priv)
 
1043
  if (not secure_file_priv.string().empty())
1015
1044
  {
1016
 
    fs::path secure_file_path(fs::system_complete(fs::path(opt_secure_file_priv)));
1017
 
    fs::path target_path(fs::system_complete(fs::path(path)));
1018
 
    if (target_path.file_string().substr(0, secure_file_path.file_string().size()) != secure_file_path.file_string())
 
1045
    if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
1019
1046
    {
1020
1047
      /* Write only allowed to dir or subdir specified by secure_file_priv */
1021
1048
      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
1023
1050
    }
1024
1051
  }
1025
1052
 
1026
 
  if (!access(path, F_OK))
 
1053
  if (!access(target_path.file_string().c_str(), F_OK))
1027
1054
  {
1028
1055
    my_error(ER_FILE_EXISTS_ERROR, MYF(0), exchange->file_name);
1029
1056
    return -1;
1030
1057
  }
1031
1058
  /* Create the file world readable */
1032
 
  if ((file= internal::my_create(path, 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
 
1059
  if ((file= internal::my_create(target_path.file_string().c_str(), 0666, O_WRONLY|O_EXCL, MYF(MY_WME))) < 0)
1033
1060
    return file;
1034
1061
  (void) fchmod(file, 0666);                    // Because of umask()
1035
 
  if (init_io_cache(cache, file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
 
1062
  if (cache->init_io_cache(file, 0L, internal::WRITE_CACHE, 0L, 1, MYF(MY_WME)))
1036
1063
  {
1037
1064
    internal::my_close(file, MYF(0));
1038
 
    internal::my_delete(path, MYF(0));  // Delete file on error, it was just created
 
1065
    internal::my_delete(target_path.file_string().c_str(), MYF(0));  // Delete file on error, it was just created
1039
1066
    return -1;
1040
1067
  }
1041
1068
  return file;
1049
1076
  bool string_results= false, non_string_results= false;
1050
1077
  unit= u;
1051
1078
  if ((uint32_t) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
1052
 
    strncpy(path,exchange->file_name,FN_REFLEN-1);
 
1079
  {
 
1080
    path= exchange->file_name;
 
1081
  }
1053
1082
 
1054
1083
  /* Check if there is any blobs in data */
1055
1084
  {
1112
1141
  if (unit->offset_limit_cnt)
1113
1142
  {                                             // using limit offset,count
1114
1143
    unit->offset_limit_cnt--;
1115
 
    return(0);
 
1144
    return false;
1116
1145
  }
1117
1146
  row_count++;
1118
1147
  Item *item;
1121
1150
 
1122
1151
  if (my_b_write(cache,(unsigned char*) exchange->line_start->ptr(),
1123
1152
                 exchange->line_start->length()))
1124
 
    goto err;
 
1153
    return true;
 
1154
 
1125
1155
  while ((item=li++))
1126
1156
  {
1127
1157
    Item_result result_type=item->result_type();
1132
1162
    {
1133
1163
      if (my_b_write(cache,(unsigned char*) exchange->enclosed->ptr(),
1134
1164
                     exchange->enclosed->length()))
1135
 
        goto err;
 
1165
        return true;
1136
1166
    }
1137
1167
    if (!res)
1138
1168
    {                                           // NULL
1143
1173
          null_buff[0]=escape_char;
1144
1174
          null_buff[1]='N';
1145
1175
          if (my_b_write(cache,(unsigned char*) null_buff,2))
1146
 
            goto err;
 
1176
            return true;
1147
1177
        }
1148
1178
        else if (my_b_write(cache,(unsigned char*) "NULL",4))
1149
 
          goto err;
 
1179
          return true;
1150
1180
      }
1151
1181
      else
1152
1182
      {
1156
1186
    else
1157
1187
    {
1158
1188
      if (fixed_row_size)
1159
 
        used_length= min(res->length(),item->max_length);
 
1189
        used_length= min(res->length(), static_cast<size_t>(item->max_length));
1160
1190
      else
1161
1191
        used_length= res->length();
1162
1192
 
1237
1267
            tmp_buff[1]= *pos ? *pos : '0';
1238
1268
            if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)) ||
1239
1269
                my_b_write(cache,(unsigned char*) tmp_buff,2))
1240
 
              goto err;
 
1270
              return true;
1241
1271
            start=pos+1;
1242
1272
          }
1243
1273
        }
1244
1274
        if (my_b_write(cache,(unsigned char*) start,(uint32_t) (pos-start)))
1245
 
          goto err;
 
1275
          return true;
1246
1276
      }
1247
1277
      else if (my_b_write(cache,(unsigned char*) res->ptr(),used_length))
1248
 
        goto err;
 
1278
        return true;
1249
1279
    }
1250
1280
    if (fixed_row_size)
1251
1281
    {                                           // Fill with space
1261
1291
        for (; length > sizeof(space) ; length-=sizeof(space))
1262
1292
        {
1263
1293
          if (my_b_write(cache,(unsigned char*) space,sizeof(space)))
1264
 
            goto err;
 
1294
            return true;
1265
1295
        }
1266
1296
        if (my_b_write(cache,(unsigned char*) space,length))
1267
 
          goto err;
 
1297
          return true;
1268
1298
      }
1269
1299
    }
1270
1300
    if (res && enclosed)
1271
1301
    {
1272
1302
      if (my_b_write(cache, (unsigned char*) exchange->enclosed->ptr(),
1273
1303
                     exchange->enclosed->length()))
1274
 
        goto err;
 
1304
        return true;
1275
1305
    }
1276
1306
    if (--items_left)
1277
1307
    {
1278
1308
      if (my_b_write(cache, (unsigned char*) exchange->field_term->ptr(),
1279
1309
                     field_term_length))
1280
 
        goto err;
 
1310
        return true;
1281
1311
    }
1282
1312
  }
1283
1313
  if (my_b_write(cache,(unsigned char*) exchange->line_term->ptr(),
1284
1314
                 exchange->line_term->length()))
1285
 
    goto err;
1286
 
  return(0);
1287
 
err:
1288
 
  return(1);
 
1315
  {
 
1316
    return true;
 
1317
  }
 
1318
 
 
1319
  return false;
1289
1320
}
1290
1321
 
1291
1322
 
1318
1349
  if (row_count++ > 1)
1319
1350
  {
1320
1351
    my_message(ER_TOO_MANY_ROWS, ER(ER_TOO_MANY_ROWS), MYF(0));
1321
 
    goto err;
 
1352
    return 1;
1322
1353
  }
1323
1354
  while ((item=li++))
1324
1355
  {
1326
1357
    if (!res)                                   // If NULL
1327
1358
    {
1328
1359
      if (my_b_write(cache,(unsigned char*) "",1))
1329
 
        goto err;
 
1360
        return 1;
1330
1361
    }
1331
1362
    else if (my_b_write(cache,(unsigned char*) res->ptr(),res->length()))
1332
1363
    {
1333
 
      my_error(ER_ERROR_ON_WRITE, MYF(0), path, errno);
1334
 
      goto err;
 
1364
      my_error(ER_ERROR_ON_WRITE, MYF(0), path.file_string().c_str(), errno);
 
1365
      return 1;
1335
1366
    }
1336
1367
  }
1337
1368
  return(0);
1338
 
err:
1339
 
  return(1);
1340
1369
}
1341
1370
 
1342
1371
 
1394
1423
      switch (val_item->result_type())
1395
1424
      {
1396
1425
      case REAL_RESULT:
1397
 
        op= &select_max_min_finder_subselect::cmp_real;
1398
 
        break;
 
1426
        op= &select_max_min_finder_subselect::cmp_real;
 
1427
        break;
1399
1428
      case INT_RESULT:
1400
 
        op= &select_max_min_finder_subselect::cmp_int;
1401
 
        break;
 
1429
        op= &select_max_min_finder_subselect::cmp_int;
 
1430
        break;
1402
1431
      case STRING_RESULT:
1403
 
        op= &select_max_min_finder_subselect::cmp_str;
1404
 
        break;
 
1432
        op= &select_max_min_finder_subselect::cmp_str;
 
1433
        break;
1405
1434
      case DECIMAL_RESULT:
1406
1435
        op= &select_max_min_finder_subselect::cmp_decimal;
1407
1436
        break;
1408
1437
      case ROW_RESULT:
1409
1438
        // This case should never be choosen
1410
 
        assert(0);
1411
 
        op= 0;
 
1439
        assert(0);
 
1440
        op= 0;
1412
1441
      }
1413
1442
    }
1414
1443
    cache->store(val_item);
1497
1526
void Session::end_statement()
1498
1527
{
1499
1528
  /* Cleanup SQL processing state to reuse this statement in next query. */
1500
 
  lex_end(lex);
 
1529
  lex->end();
1501
1530
  query_cache_key= ""; // reset the cache key
1502
1531
  resetResultsetMessage();
1503
1532
}
1562
1591
}
1563
1592
 
1564
1593
 
1565
 
 
1566
 
 
1567
 
/**
1568
 
  Check the killed state of a user thread
1569
 
  @param session  user thread
1570
 
  @retval 0 the user thread is active
1571
 
  @retval 1 the user thread has been killed
1572
 
*/
1573
 
int session_killed(const Session *session)
1574
 
{
1575
 
  return(session->killed);
1576
 
}
1577
 
 
1578
 
 
1579
 
const struct charset_info_st *session_charset(Session *session)
1580
 
{
1581
 
  return(session->charset());
1582
 
}
1583
 
 
1584
1594
/**
1585
1595
  Mark transaction to rollback and mark error as fatal to a sub-statement.
1586
1596
 
1602
1612
  plugin_sessionvar_cleanup(this);
1603
1613
 
1604
1614
  /* If necessary, log any aborted or unauthorized connections */
1605
 
  if (killed || client->wasAborted())
 
1615
  if (getKilled() || client->wasAborted())
1606
1616
  {
1607
1617
    status_var.aborted_threads++;
1608
1618
  }
1609
1619
 
1610
1620
  if (client->wasAborted())
1611
1621
  {
1612
 
    if (! killed && variables.log_warnings > 1)
 
1622
    if (not getKilled() && variables.log_warnings > 1)
1613
1623
    {
1614
1624
      SecurityContext *sctx= &security_ctx;
1615
1625
 
1625
1635
  /* Close out our connection to the client */
1626
1636
  if (should_lock)
1627
1637
    LOCK_thread_count.lock();
1628
 
  killed= Session::KILL_CONNECTION;
 
1638
 
 
1639
  setKilled(Session::KILL_CONNECTION);
 
1640
 
1629
1641
  if (client->isConnected())
1630
1642
  {
1631
1643
    if (errcode)
1635
1647
    }
1636
1648
    client->close();
1637
1649
  }
 
1650
 
1638
1651
  if (should_lock)
 
1652
  {
1639
1653
    (void) LOCK_thread_count.unlock();
 
1654
  }
1640
1655
}
1641
1656
 
1642
1657
void Session::reset_for_next_command()
1751
1766
 
1752
1767
user_var_entry *Session::getVariable(LEX_STRING &name, bool create_if_not_exists)
1753
1768
{
 
1769
  return getVariable(std::string(name.str, name.length), create_if_not_exists);
 
1770
}
 
1771
 
 
1772
user_var_entry *Session::getVariable(const std::string  &name, bool create_if_not_exists)
 
1773
{
 
1774
  UserVarsRange ppp= user_vars.equal_range(name);
 
1775
 
 
1776
  for (UserVars::iterator iter= ppp.first;
 
1777
       iter != ppp.second; ++iter)
 
1778
  {
 
1779
    return (*iter).second;
 
1780
  }
 
1781
 
 
1782
  if (not create_if_not_exists)
 
1783
    return NULL;
 
1784
 
1754
1785
  user_var_entry *entry= NULL;
1755
 
  UserVarsRange ppp= user_vars.equal_range(std::string(name.str, name.length));
1756
 
 
1757
 
  for (UserVars::iterator iter= ppp.first;
1758
 
         iter != ppp.second; ++iter)
1759
 
  {
1760
 
    entry= (*iter).second;
1761
 
  }
1762
 
 
1763
 
  if ((entry == NULL) && create_if_not_exists)
1764
 
  {
1765
 
    entry= new (nothrow) user_var_entry(name.str, query_id);
1766
 
 
1767
 
    if (entry == NULL)
1768
 
      return NULL;
1769
 
 
1770
 
    std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(std::string(name.str, name.length), entry));
1771
 
 
1772
 
    if (not returnable.second)
1773
 
    {
1774
 
      delete entry;
1775
 
      return NULL;
1776
 
    }
 
1786
  entry= new (nothrow) user_var_entry(name.c_str(), query_id);
 
1787
 
 
1788
  if (entry == NULL)
 
1789
    return NULL;
 
1790
 
 
1791
  std::pair<UserVars::iterator, bool> returnable= user_vars.insert(make_pair(name, entry));
 
1792
 
 
1793
  if (not returnable.second)
 
1794
  {
 
1795
    delete entry;
1777
1796
  }
1778
1797
 
1779
1798
  return entry;
1780
1799
}
1781
1800
 
 
1801
void Session::setVariable(const std::string &name, const std::string &value)
 
1802
{
 
1803
  user_var_entry *updateable_var= getVariable(name.c_str(), true);
 
1804
 
 
1805
  updateable_var->update_hash(false,
 
1806
                              (void*)value.c_str(),
 
1807
                              static_cast<uint32_t>(value.length()), STRING_RESULT,
 
1808
                              &my_charset_bin,
 
1809
                              DERIVATION_IMPLICIT, false);
 
1810
}
 
1811
 
1782
1812
void Session::mark_temp_tables_as_free_for_reuse()
1783
1813
{
1784
1814
  for (Table *table= temporary_tables ; table ; table= table->getNext())
1848
1878
      handled either before writing a query log event (inside
1849
1879
      binlog_query()) or when preparing a pending event.
1850
1880
     */
1851
 
    mysql_unlock_tables(this, lock);
 
1881
    unlockTables(lock);
1852
1882
    lock= 0;
1853
1883
  }
1854
1884
  /*
1855
1885
    Note that we need to hold LOCK_open while changing the
1856
1886
    open_tables list. Another thread may work on it.
1857
 
    (See: remove_table_from_cache(), mysql_wait_completed_table())
 
1887
    (See: table::Cache::singleton().removeTable(), mysql_wait_completed_table())
1858
1888
    Closing a MERGE child before the parent would be fatal if the
1859
1889
    other thread tries to abort the MERGE lock in between.
1860
1890
  */
1893
1923
    close_tables_for_reopen(&tables);
1894
1924
  }
1895
1925
  if ((mysql_handle_derived(lex, &mysql_derived_prepare) ||
1896
 
       (fill_derived_tables() &&
 
1926
       (
1897
1927
        mysql_handle_derived(lex, &mysql_derived_filling))))
1898
1928
    return true;
1899
1929
 
1900
1930
  return false;
1901
1931
}
1902
1932
 
1903
 
bool Session::openTables(TableList *tables, uint32_t flags)
1904
 
{
1905
 
  uint32_t counter;
1906
 
  bool ret= fill_derived_tables();
1907
 
  assert(ret == false);
1908
 
  if (open_tables_from_list(&tables, &counter, flags) ||
1909
 
      mysql_handle_derived(lex, &mysql_derived_prepare))
1910
 
  {
1911
 
    return true;
1912
 
  }
1913
 
  return false;
1914
 
}
1915
 
 
1916
1933
/*
1917
1934
  @note "best_effort" is used in cases were if a failure occurred on this
1918
1935
  operation it would not be surprising because we are only removing because there
2050
2067
  return true;
2051
2068
}
2052
2069
 
2053
 
TableShareInstance *Session::getTemporaryShare(TableIdentifier::Type type_arg)
2054
 
{
2055
 
  temporary_shares.push_back(new TableShareInstance(type_arg)); // This will not go into the tableshare cache, so no key is used.
2056
 
 
2057
 
  TableShareInstance *tmp_share= temporary_shares.back();
 
2070
table::Instance *Session::getInstanceTable()
 
2071
{
 
2072
  temporary_shares.push_back(new table::Instance()); // This will not go into the tableshare cache, so no key is used.
 
2073
 
 
2074
  table::Instance *tmp_share= temporary_shares.back();
 
2075
 
 
2076
  assert(tmp_share);
 
2077
 
 
2078
  return tmp_share;
 
2079
}
 
2080
 
 
2081
 
 
2082
/**
 
2083
  Create a reduced Table object with properly set up Field list from a
 
2084
  list of field definitions.
 
2085
 
 
2086
    The created table doesn't have a table Cursor associated with
 
2087
    it, has no keys, no group/distinct, no copy_funcs array.
 
2088
    The sole purpose of this Table object is to use the power of Field
 
2089
    class to read/write data to/from table->getInsertRecord(). Then one can store
 
2090
    the record in any container (RB tree, hash, etc).
 
2091
    The table is created in Session mem_root, so are the table's fields.
 
2092
    Consequently, if you don't BLOB fields, you don't need to free it.
 
2093
 
 
2094
  @param session         connection handle
 
2095
  @param field_list  list of column definitions
 
2096
 
 
2097
  @return
 
2098
    0 if out of memory, Table object in case of success
 
2099
*/
 
2100
table::Instance *Session::getInstanceTable(List<CreateField> &field_list)
 
2101
{
 
2102
  temporary_shares.push_back(new table::Instance(this, field_list)); // This will not go into the tableshare cache, so no key is used.
 
2103
 
 
2104
  table::Instance *tmp_share= temporary_shares.back();
2058
2105
 
2059
2106
  assert(tmp_share);
2060
2107