18
18
The servers are saved in the system table "servers"
20
Currently, when the user performs an ALTER SERVER or a DROP SERVER
21
operation, it will cause all open tables which refer to the named
22
server connection to be flushed. This may cause some undesirable
23
behaviour with regard to currently running transactions. It is
24
expected that the DBA knows what s/he is doing when s/he performs
25
the ALTER SERVER or DROP SERVER operation.
28
It is desirable for us to implement a callback mechanism instead where
29
callbacks can be registered for specific server protocols. The callback
30
will be fired when such a server name has been created/altered/dropped
31
or when statistics are to be gathered such as how many actual connections.
32
Storage engines etc will be able to make use of the callback so that
33
currently running transactions etc will not be disrupted.
21
36
#include "mysql_priv.h"
25
40
#include "sp_head.h"
29
pthread_mutex_t servers_cache_mutex; // To init the hash
30
uint servers_cache_initialised=FALSE;
31
/* Version of server table. incremented by servers_load */
32
static uint servers_version=0;
44
We only use 1 mutex to guard the data structures - THR_LOCK_servers.
45
Read locked when only reading data and write-locked for all other access.
48
static HASH servers_cache;
33
49
static MEM_ROOT mem;
34
50
static rw_lock_t THR_LOCK_servers;
52
static bool get_server_from_table_to_cache(TABLE *table);
54
/* insert functions */
55
static int insert_server(THD *thd, FOREIGN_SERVER *server_options);
56
static int insert_server_record(TABLE *table, FOREIGN_SERVER *server);
57
static int insert_server_record_into_cache(FOREIGN_SERVER *server);
58
static void prepare_server_struct_for_insert(LEX_SERVER_OPTIONS *server_options,
59
FOREIGN_SERVER *server);
61
static int delete_server_record(TABLE *table,
63
int server_name_length);
64
static int delete_server_record_in_cache(LEX_SERVER_OPTIONS *server_options);
66
/* update functions */
67
static void prepare_server_struct_for_update(LEX_SERVER_OPTIONS *server_options,
68
FOREIGN_SERVER *existing,
69
FOREIGN_SERVER *altered);
70
static int update_server(THD *thd, FOREIGN_SERVER *existing,
71
FOREIGN_SERVER *altered);
72
static int update_server_record(TABLE *table, FOREIGN_SERVER *server);
73
static int update_server_record_in_cache(FOREIGN_SERVER *existing,
74
FOREIGN_SERVER *altered);
75
/* utility functions */
76
static void merge_server_struct(FOREIGN_SERVER *from, FOREIGN_SERVER *to);
36
80
static byte *servers_cache_get_key(FOREIGN_SERVER *server, uint *length,
37
81
my_bool not_used __attribute__((unused)))
64
109
1 Could not initialize servers
67
my_bool servers_init(bool dont_read_servers_table)
112
bool servers_init(bool dont_read_servers_table)
70
my_bool return_val= 0;
115
bool return_val= FALSE;
71
116
DBUG_ENTER("servers_init");
73
118
/* init the mutex */
74
if (pthread_mutex_init(&servers_cache_mutex, MY_MUTEX_INIT_FAST))
77
119
if (my_rwlock_init(&THR_LOCK_servers, NULL))
80
122
/* initialise our servers cache */
81
123
if (hash_init(&servers_cache, system_charset_info, 32, 0, 0,
82
124
(hash_get_key) servers_cache_get_key, 0, 0))
84
return_val= 1; /* we failed, out of memory? */
126
return_val= TRUE; /* we failed, out of memory? */
88
130
/* Initialize the mem root for data */
89
131
init_alloc_root(&mem, ACL_ALLOC_BLOCK_SIZE, 0);
92
at this point, the cache is initialised, let it be known
94
servers_cache_initialised= TRUE;
96
133
if (dont_read_servers_table)
133
static my_bool servers_load(THD *thd, TABLE_LIST *tables)
170
static bool servers_load(THD *thd, TABLE_LIST *tables)
136
173
READ_RECORD read_record_info;
137
my_bool return_val= TRUE;
174
bool return_val= TRUE;
138
175
DBUG_ENTER("servers_load");
140
if (!servers_cache_initialised)
143
/* need to figure out how to utilise this variable */
144
servers_version++; /* servers updated */
146
177
/* first, send all cached rows to sleep with the fishes, oblivion!
147
178
I expect this crappy comment replaced */
148
179
free_root(&mem, MYF(MY_MARK_BLOCKS_FREE));
196
227
close_thread_tables(thd);
200
To avoid deadlocks we should obtain table locks before
201
obtaining servers_cache->lock mutex.
230
DBUG_PRINT("info", ("locking servers_cache"));
231
rw_wrlock(&THR_LOCK_servers);
203
233
bzero((char*) tables, sizeof(tables));
204
234
tables[0].alias= tables[0].table_name= (char*) "servers";
205
235
tables[0].db= (char*) "mysql";
208
238
if (simple_open_n_lock_tables(thd, tables))
210
sql_print_error("Fatal error: Can't open and lock privilege tables: %s",
240
sql_print_error("Can't open and lock privilege tables: %s",
211
241
thd->net.last_error);
215
DBUG_PRINT("info", ("locking servers_cache"));
216
VOID(pthread_mutex_lock(&servers_cache_mutex));
218
//old_servers_cache= servers_cache;
221
245
if ((return_val= servers_load(thd, tables)))
222
246
{ // Error. Revert to old list
223
247
/* blast, for now, we have no servers, discuss later way to preserve */
229
DBUG_PRINT("info", ("unlocking servers_cache"));
230
VOID(pthread_mutex_unlock(&servers_cache_mutex));
233
254
close_thread_tables(thd);
255
DBUG_PRINT("info", ("unlocking servers_cache"));
256
rw_unlock(&THR_LOCK_servers);
234
257
DBUG_RETURN(return_val);
238
262
Initialize structures responsible for servers used in federated
239
263
server scheme information for them from the server
308
333
DBUG_RETURN(FALSE);
313
server_exists_in_table()
314
THD *thd - thread pointer
315
LEX_SERVER_OPTIONS *server_options - pointer to Lex->server_options
318
This function takes a LEX_SERVER_OPTIONS struct, which is very much the
319
same type of structure as a FOREIGN_SERVER, it contains the values parsed
320
in any one of the [CREATE|DELETE|DROP] SERVER statements. Using the
321
member "server_name", index_read_idx either founds the record and returns
322
1, or doesn't find the record, and returns 0
329
my_bool server_exists_in_table(THD *thd, LEX_SERVER_OPTIONS *server_options)
335
DBUG_ENTER("server_exists");
337
bzero((char*) &tables, sizeof(tables));
338
tables.db= (char*) "mysql";
339
tables.alias= tables.table_name= (char*) "servers";
341
/* need to open before acquiring THR_LOCK_plugin or it will deadlock */
342
if (! (table= open_ltable(thd, &tables, TL_WRITE)))
345
table->use_all_columns();
347
rw_wrlock(&THR_LOCK_servers);
348
VOID(pthread_mutex_lock(&servers_cache_mutex));
350
/* set the field that's the PK to the value we're looking for */
351
table->field[0]->store(server_options->server_name,
352
server_options->server_name_length,
353
system_charset_info);
355
if ((error= table->file->index_read_idx(table->record[0], 0,
356
(byte *)table->field[0]->ptr,
357
table->key_info[0].key_length,
360
if (error != HA_ERR_KEY_NOT_FOUND && error != HA_ERR_END_OF_FILE)
362
table->file->print_error(error, MYF(0));
366
DBUG_PRINT("info",("record for server '%s' not found!",
367
server_options->server_name));
370
VOID(pthread_mutex_unlock(&servers_cache_mutex));
371
rw_unlock(&THR_LOCK_servers);
382
344
This function takes a server object that is has all members properly
383
345
prepared, ready to be inserted both into the mysql.servers table and
384
346
the servers cache.
348
THR_LOCK_servers must be write locked.
388
352
other - error code
391
int insert_server(THD *thd, FOREIGN_SERVER *server)
356
insert_server(THD *thd, FOREIGN_SERVER *server)
394
359
TABLE_LIST tables;
403
368
/* need to open before acquiring THR_LOCK_plugin or it will deadlock */
404
369
if (! (table= open_ltable(thd, &tables, TL_WRITE)))
407
/* lock mutex to make sure no changes happen */
408
VOID(pthread_mutex_lock(&servers_cache_mutex));
411
rw_wrlock(&THR_LOCK_servers);
413
372
/* insert the server into the table */
414
373
if ((error= insert_server_record(table, server)))
434
391
This function takes a FOREIGN_SERVER pointer to an allocated (root mem)
435
392
and inserts it into the global servers cache
394
THR_LOCK_servers must be write locked.
443
int insert_server_record_into_cache(FOREIGN_SERVER *server)
403
insert_server_record_into_cache(FOREIGN_SERVER *server)
446
406
DBUG_ENTER("insert_server_record_into_cache");
555
518
/* read index until record is that specified in server_name */
556
519
if ((error= table->file->index_read_idx(table->record[0], 0,
557
(byte *)table->field[0]->ptr,
558
table->key_info[0].key_length,
520
(byte *)table->field[0]->ptr, HA_WHOLE_KEY,
559
521
HA_READ_KEY_EXACT)))
561
523
/* if not found, err */
607
569
int drop_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
610
572
TABLE_LIST tables;
574
LEX_STRING name= { server_options->server_name,
575
server_options->server_name_length };
613
577
DBUG_ENTER("drop_server");
614
578
DBUG_PRINT("info", ("server name server->server_name %s",
618
582
tables.db= (char*) "mysql";
619
583
tables.alias= tables.table_name= (char*) "servers";
621
/* need to open before acquiring THR_LOCK_plugin or it will deadlock */
622
if (! (table= open_ltable(thd, &tables, TL_WRITE)))
625
585
rw_wrlock(&THR_LOCK_servers);
626
VOID(pthread_mutex_lock(&servers_cache_mutex));
629
if ((error= delete_server_record(table,
630
server_options->server_name,
631
server_options->server_name_length)))
587
/* hit the memory hit first */
635
588
if ((error= delete_server_record_in_cache(server_options)))
591
if (! (table= open_ltable(thd, &tables, TL_WRITE)))
597
error= delete_server_record(table, name.str, name.length);
599
/* close the servers table before we call closed_cached_connection_tables */
600
close_thread_tables(thd);
602
if (close_cached_connection_tables(thd, TRUE, &name))
604
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
605
ER_UNKNOWN_ERROR, "Server connection in use");
639
VOID(pthread_mutex_unlock(&servers_cache_mutex));
640
609
rw_unlock(&THR_LOCK_servers);
641
610
DBUG_RETURN(error);
677
648
DBUG_PRINT("info", ("server_name %s length %d not found!",
678
649
server_options->server_name,
679
650
server_options->server_name_length));
680
// what should be done if not found in the cache?
683
654
We succeded in deletion of the server to the table, now delete
687
658
server->server_name,
688
659
server->server_name_length));
691
VOID(hash_delete(&servers_cache, (byte*) server));
693
servers_version++; /* servers updated */
661
VOID(hash_delete(&servers_cache, (byte*) server));
695
666
DBUG_RETURN(error);
732
706
tables.alias= tables.table_name= (char*)"servers";
734
708
if (!(table= open_ltable(thd, &tables, TL_WRITE)))
737
rw_wrlock(&THR_LOCK_servers);
738
714
if ((error= update_server_record(table, altered)))
741
update_server_record_in_cache(existing, altered);
717
error= update_server_record_in_cache(existing, altered);
720
Perform a reload so we don't have a 'hole' in our mem_root
722
servers_load(thd, &tables);
744
rw_unlock(&THR_LOCK_servers);
745
725
DBUG_RETURN(error);
792
775
DBUG_PRINT("info", ("had a problem inserting server %s at %lx",
793
776
altered->server_name, (long unsigned int) altered));
777
error= ER_OUT_OF_RESOURCES;
797
servers_version++; /* servers updated */
798
780
DBUG_RETURN(error);
830
813
to->password= strdup_root(&mem, from->password);
831
814
if (to->port == -1)
832
815
to->port= from->port;
816
if (!to->socket && from->socket)
834
817
to->socket= strdup_root(&mem, from->socket);
818
if (!to->scheme && from->scheme)
836
819
to->scheme= strdup_root(&mem, from->scheme);
838
821
to->owner= strdup_root(&mem, from->owner);
873
859
system_charset_info);
875
861
if ((error= table->file->index_read_idx(table->record[0], 0,
876
(byte *)table->field[0]->ptr,
877
table->key_info[0].key_length,
862
(byte *)table->field[0]->ptr, ~(longlong)0,
878
863
HA_READ_KEY_EXACT)))
880
865
if (error != HA_ERR_KEY_NOT_FOUND && error != HA_ERR_END_OF_FILE)
882
866
table->file->print_error(error, MYF(0));
885
867
DBUG_PRINT("info",("server not found!"));
886
868
error= ER_FOREIGN_SERVER_DOESNT_EXIST;
919
int delete_server_record(TABLE *table,
921
int server_name_length)
903
delete_server_record(TABLE *table,
904
char *server_name, int server_name_length)
924
907
DBUG_ENTER("delete_server_record");
925
908
table->use_all_columns();
928
911
table->field[0]->store(server_name, server_name_length, system_charset_info);
930
913
if ((error= table->file->index_read_idx(table->record[0], 0,
931
(byte *)table->field[0]->ptr,
932
table->key_info[0].key_length,
914
(byte *)table->field[0]->ptr, HA_WHOLE_KEY,
933
915
HA_READ_KEY_EXACT)))
935
917
if (error != HA_ERR_KEY_NOT_FOUND && error != HA_ERR_END_OF_FILE)
937
918
table->file->print_error(error, MYF(0));
940
919
DBUG_PRINT("info",("server not found!"));
941
920
error= ER_FOREIGN_SERVER_DOESNT_EXIST;
966
945
int create_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
947
int error= ER_FOREIGN_SERVER_EXISTS;
969
948
FOREIGN_SERVER *server;
971
950
DBUG_ENTER("create_server");
972
951
DBUG_PRINT("info", ("server_options->server_name %s",
973
952
server_options->server_name));
954
rw_wrlock(&THR_LOCK_servers);
956
/* hit the memory first */
957
if (hash_search(&servers_cache, (byte*) server_options->server_name,
958
server_options->server_name_length))
975
961
server= (FOREIGN_SERVER *)alloc_root(&mem,
976
962
sizeof(FOREIGN_SERVER));
978
if ((error= prepare_server_struct_for_insert(server_options, server)))
964
prepare_server_struct_for_insert(server_options, server);
981
if ((error= insert_server(thd, server)))
966
error= insert_server(thd, server);
984
968
DBUG_PRINT("info", ("error returned %d", error));
971
rw_unlock(&THR_LOCK_servers);
987
972
DBUG_RETURN(error);
1004
990
int alter_server(THD *thd, LEX_SERVER_OPTIONS *server_options)
992
int error= ER_FOREIGN_SERVER_DOESNT_EXIST;
1007
993
FOREIGN_SERVER *altered, *existing;
994
LEX_STRING name= { server_options->server_name,
995
server_options->server_name_length };
1008
996
DBUG_ENTER("alter_server");
1009
997
DBUG_PRINT("info", ("server_options->server_name %s",
1010
998
server_options->server_name));
1000
rw_wrlock(&THR_LOCK_servers);
1002
if (!(existing= (FOREIGN_SERVER *) hash_search(&servers_cache,
1012
1007
altered= (FOREIGN_SERVER *)alloc_root(&mem,
1013
1008
sizeof(FOREIGN_SERVER));
1015
VOID(pthread_mutex_lock(&servers_cache_mutex));
1017
if (!(existing= (FOREIGN_SERVER *) hash_search(&servers_cache,
1018
(byte*) server_options->server_name,
1019
server_options->server_name_length)))
1010
prepare_server_struct_for_update(server_options, existing, altered);
1012
error= update_server(thd, existing, altered);
1014
/* close the servers table before we call closed_cached_connection_tables */
1015
close_thread_tables(thd);
1017
if (close_cached_connection_tables(thd, FALSE, &name))
1021
error= ER_FOREIGN_SERVER_DOESNT_EXIST;
1019
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1020
ER_UNKNOWN_ERROR, "Server connection in use");
1025
if ((error= prepare_server_struct_for_update(server_options, existing, altered)))
1028
if ((error= update_server(thd, existing, altered)))
1032
1024
DBUG_PRINT("info", ("error returned %d", error));
1033
VOID(pthread_mutex_unlock(&servers_cache_mutex));
1025
rw_unlock(&THR_LOCK_servers);
1034
1026
DBUG_RETURN(error);
1051
int prepare_server_struct_for_insert(LEX_SERVER_OPTIONS *server_options,
1052
FOREIGN_SERVER *server)
1045
prepare_server_struct_for_insert(LEX_SERVER_OPTIONS *server_options,
1046
FOREIGN_SERVER *server)
1055
1048
char *unset_ptr= (char*)"";
1056
1049
DBUG_ENTER("prepare_server_struct");
1060
1051
/* these two MUST be set */
1061
1052
server->server_name= strdup_root(&mem, server_options->server_name);
1062
1053
server->server_name_length= server_options->server_name_length;
1105
int prepare_server_struct_for_update(LEX_SERVER_OPTIONS *server_options,
1106
FOREIGN_SERVER *existing,
1107
FOREIGN_SERVER *altered)
1097
prepare_server_struct_for_update(LEX_SERVER_OPTIONS *server_options,
1098
FOREIGN_SERVER *existing,
1099
FOREIGN_SERVER *altered)
1110
1101
DBUG_ENTER("prepare_server_struct_for_update");
1113
1103
altered->server_name= strdup_root(&mem, server_options->server_name);
1114
1104
altered->server_name_length= server_options->server_name_length;
1178
1168
void servers_free(bool end)
1180
1170
DBUG_ENTER("servers_free");
1181
if (!servers_cache_initialised)
1183
VOID(pthread_mutex_destroy(&servers_cache_mutex));
1184
servers_cache_initialised=0;
1171
if (!hash_inited(&servers_cache))
1175
free_root(&mem, MYF(MY_MARK_BLOCKS_FREE));
1176
my_hash_reset(&servers_cache);
1179
rwlock_destroy(&THR_LOCK_servers);
1185
1180
free_root(&mem,MYF(0));
1186
1181
hash_free(&servers_cache);
1187
1182
DBUG_VOID_RETURN;
1189
clone_server(MEM_ROOT *mem_root, FOREIGN_SERVER *orig, FOREIGN_SERVER *buff)
1191
Create a clone of FOREIGN_SERVER. If the supplied mem_root is of
1192
thd->mem_root then the copy is automatically disposed at end of statement.
1197
MEM_ROOT pointer (strings are copied into this mem root)
1198
FOREIGN_SERVER pointer (made a copy of)
1199
FOREIGN_SERVER buffer (if not-NULL, this pointer is returned)
1202
FOREIGN_SEVER pointer (copy of one supplied FOREIGN_SERVER)
1205
static FOREIGN_SERVER *clone_server(MEM_ROOT *mem, const FOREIGN_SERVER *server,
1206
FOREIGN_SERVER *buffer)
1208
DBUG_ENTER("sql_server.cc:clone_server");
1211
buffer= (FOREIGN_SERVER *) alloc_root(mem, sizeof(FOREIGN_SERVER));
1213
buffer->server_name= strmake_root(mem, server->server_name,
1214
server->server_name_length);
1215
buffer->port= server->port;
1216
buffer->server_name_length= server->server_name_length;
1218
/* TODO: We need to examine which of these can really be NULL */
1219
buffer->db= server->db ? strdup_root(mem, server->db) : NULL;
1220
buffer->scheme= server->scheme ? strdup_root(mem, server->scheme) : NULL;
1221
buffer->username= server->username? strdup_root(mem, server->username): NULL;
1222
buffer->password= server->password? strdup_root(mem, server->password): NULL;
1223
buffer->socket= server->socket ? strdup_root(mem, server->socket) : NULL;
1224
buffer->owner= server->owner ? strdup_root(mem, server->owner) : NULL;
1225
buffer->host= server->host ? strdup_root(mem, server->host) : NULL;
1227
DBUG_RETURN(buffer);
1205
FOREIGN_SERVER *get_server_by_name(const char *server_name)
1244
FOREIGN_SERVER *get_server_by_name(MEM_ROOT *mem, const char *server_name,
1245
FOREIGN_SERVER *buff)
1208
1247
uint server_name_length;
1209
FOREIGN_SERVER *server= 0;
1248
FOREIGN_SERVER *server;
1210
1249
DBUG_ENTER("get_server_by_name");
1211
1250
DBUG_PRINT("info", ("server_name %s", server_name));
1215
1254
if (! server_name || !strlen(server_name))
1217
1256
DBUG_PRINT("info", ("server_name not defined!"));
1219
1257
DBUG_RETURN((FOREIGN_SERVER *)NULL);
1222
1260
DBUG_PRINT("info", ("locking servers_cache"));
1223
VOID(pthread_mutex_lock(&servers_cache_mutex));
1261
rw_rdlock(&THR_LOCK_servers);
1224
1262
if (!(server= (FOREIGN_SERVER *) hash_search(&servers_cache,
1225
1263
(byte*) server_name,
1226
1264
server_name_length)))
1229
1267
server_name, server_name_length));
1230
1268
server= (FOREIGN_SERVER *) NULL;
1270
/* otherwise, make copy of server */
1272
server= clone_server(mem, server, buff);
1232
1274
DBUG_PRINT("info", ("unlocking servers_cache"));
1233
VOID(pthread_mutex_unlock(&servers_cache_mutex));
1275
rw_unlock(&THR_LOCK_servers);
1234
1276
DBUG_RETURN(server);