~ubuntu-branches/debian/squeeze/rmysql/squeeze

« back to all changes in this revision

Viewing changes to R/MySQLSupport.R

  • Committer: Bazaar Package Importer
  • Author(s): Dirk Eddelbuettel
  • Date: 2006-10-28 11:47:26 UTC
  • mfrom: (1.2.1 upstream) (2.1.3 edgy)
  • Revision ID: james.westby@ubuntu.com-20061028114726-qta5l3v6hel32mpp
* New upstream release

* debian/control: Upgraded (Build-)Depends: to r-base-core (>= 2.4.0)
  and r-cran-dbi (>= 0.1.11)
* debian/control: Removed non-DD Steffen as an uploader

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
##
 
2
## $Id: MySQLSupport.R 190 2006-09-25 20:39:18Z sethf $
 
3
##
 
4
## Copyright (C) 1999 The Omega Project for Statistical Computing.
 
5
##
 
6
## This library is free software; you can redistribute it and/or
 
7
## modify it under the terms of the GNU General Public
 
8
## License as published by the Free Software Foundation; either
 
9
## version 2 of the License, or (at your option) any later version.
 
10
##
 
11
## This library is distributed in the hope that it will be useful,
 
12
## but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
14
## General Public License for more details.
 
15
##
 
16
## You should have received a copy of the GNU General Public
 
17
## License along with this library; if not, write to the Free Software
 
18
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
19
##
 
20
 
 
21
"mysqlInitDriver" <- 
 
22
function(max.con=16, fetch.default.rec = 500, force.reload=FALSE)
 
23
## create a MySQL database connection manager.  By default we allow
 
24
## up to "max.con" connections and single fetches of up to "fetch.default.rec"
 
25
## records.  These settings may be changed by re-loading the driver
 
26
## using the "force.reload" = T flag (note that this will close all 
 
27
## currently open connections).
 
28
## Returns an object of class "MySQLManger".  
 
29
## Note: This class is a singleton.
 
30
{
 
31
   if(fetch.default.rec<=0)
 
32
      stop("default num of records per fetch must be positive")
 
33
   config.params <- as.integer(c(max.con, fetch.default.rec))
 
34
   force <- as.logical(force.reload)
 
35
   drvId <- .Call("RS_MySQL_init", config.params, force, 
 
36
                  PACKAGE = .MySQLPkgName)
 
37
   new("MySQLDriver", Id = drvId)
 
38
}
 
39
 
 
40
"mysqlCloseDriver"<- 
 
41
function(drv, ...)
 
42
{
 
43
   if(!isIdCurrent(drv))
 
44
      return(TRUE)
 
45
   drvId <- as(drv, "integer")
 
46
   .Call("RS_MySQL_closeManager", drvId, PACKAGE = .MySQLPkgName)
 
47
}
 
48
 
 
49
"mysqlDescribeDriver" <-
 
50
function(obj, verbose = FALSE, ...)
 
51
## Print out nicely a brief description of the connection Driver
 
52
{
 
53
   info <- dbGetInfo(obj)
 
54
   print(obj)
 
55
   cat("  Driver name: ", info$drvName, "\n")
 
56
   cat("  Max  connections:", info$length, "\n")
 
57
   cat("  Conn. processed:", info$counter, "\n")
 
58
   cat("  Default records per fetch:", info$"fetch_default_rec", "\n")
 
59
   if(verbose){
 
60
      cat("  DBI API version: ", dbGetDBIVersion(), "\n")
 
61
      cat("  MySQL client version: ", info$clientVersion, "\n")
 
62
   }
 
63
   cat("  Open connections:", info$"num_con", "\n")
 
64
   if(verbose && !is.null(info$connectionIds)){
 
65
      for(i in seq(along = info$connectionIds)){
 
66
         cat("   ", i, " ")
 
67
         print(info$connectionIds[[i]])
 
68
      }
 
69
   }
 
70
   invisible(NULL)
 
71
}
 
72
 
 
73
"mysqlDriverInfo" <-
 
74
function(obj, what="", ...)
 
75
{
 
76
   if(!isIdCurrent(obj))
 
77
      stop(paste("expired", class(obj)))
 
78
   drvId <- as(obj, "integer")
 
79
   info <- .Call("RS_MySQL_managerInfo", drvId, PACKAGE = .MySQLPkgName)  
 
80
   ## replace drv/connection id w. actual drv/connection objects
 
81
   conObjs <- vector("list", length = info$"num_con")
 
82
   ids <- info$connectionIds
 
83
   for(i in seq(along = ids))
 
84
      conObjs[[i]] <- new("MySQLConnection", Id = c(drvId, ids[i]))
 
85
   info$connectionIds <- conObjs
 
86
   info$managerId <- new("MySQLDriver", Id = drvId)
 
87
   if(!missing(what))
 
88
      info[what]
 
89
   else
 
90
      info
 
91
}
 
92
 
 
93
"mysqlNewConnection" <-
 
94
## note that dbname may be a database name, an empty string "", or NULL.
 
95
## The distinction between "" and NULL is that "" is interpreted by 
 
96
## the MySQL API as the default database (MySQL config specific)
 
97
## while NULL means "no database".
 
98
function(drv, dbname = "", username="",
 
99
   password="", host="",
 
100
   unix.socket = "", port = 0, client.flag = 0, 
 
101
   groups = NULL, default.file = character(0))
 
102
{
 
103
   if(!isIdCurrent(drv))
 
104
      stop("expired manager")
 
105
   con.params <- as.character(c(username, password, host, 
 
106
                                dbname, unix.socket, port, 
 
107
                                client.flag))
 
108
   groups <- as.character(groups)
 
109
   if(length(default.file)==1){
 
110
      default.file <- file.path(dirname(default.file), basename(default.file))
 
111
      if(!file.exists(default.file))
 
112
         stop(sprintf("mysql default file %s does not exist", default.file))
 
113
   }
 
114
   drvId <- as(drv, "integer")
 
115
   conId <- .Call("RS_MySQL_newConnection", drvId, con.params, groups, 
 
116
               default.file, PACKAGE = .MySQLPkgName)
 
117
   new("MySQLConnection", Id = conId)
 
118
}
 
119
 
 
120
"mysqlCloneConnection" <-
 
121
function(con, ...)
 
122
{
 
123
   if(!isIdCurrent(con))
 
124
      stop(paste("expired", class(con)))
 
125
   conId <- as(con, "integer")
 
126
   newId <- .Call("RS_MySQL_cloneConnection", conId, PACKAGE = .MySQLPkgName)
 
127
   new("MySQLConnection", Id = newId)
 
128
}
 
129
 
 
130
"mysqlDescribeConnection" <-
 
131
function(obj, verbose = FALSE, ...)
 
132
{
 
133
   info <- dbGetInfo(obj)
 
134
   print(obj)
 
135
   cat("  User:", info$user, "\n")
 
136
   cat("  Host:", info$host, "\n")
 
137
   cat("  Dbname:", info$dbname, "\n")
 
138
   cat("  Connection type:", info$conType, "\n")
 
139
   if(verbose){
 
140
      cat("  MySQL server version: ", info$serverVersion, "\n")
 
141
      cat("  MySQL client version: ", 
 
142
         dbGetInfo(as(obj, "MySQLDriver"), what="clientVersion")[[1]], "\n")
 
143
      cat("  MySQL protocol version: ", info$protocolVersion, "\n")
 
144
      cat("  MySQL server thread id: ", info$threadId, "\n")
 
145
   }
 
146
   if(length(info$rsId)>0){
 
147
      for(i in seq(along = info$rsId)){
 
148
         cat("   ", i, " ")
 
149
         print(info$rsId[[i]])
 
150
      }
 
151
   } else 
 
152
      cat("  No resultSet available\n")
 
153
   invisible(NULL)
 
154
}
 
155
 
 
156
"mysqlCloseConnection" <-
 
157
function(con, ...)
 
158
{
 
159
   if(!isIdCurrent(con))
 
160
      return(TRUE)
 
161
   rs <- dbListResults(con)
 
162
   if(length(rs)>0){
 
163
      if(dbHasCompleted(rs[[1]]))
 
164
         dbClearResult(rs[[1]])
 
165
      else
 
166
         stop("connection has pending rows (close open results set first)")
 
167
   }
 
168
   conId <- as(con, "integer")
 
169
   .Call("RS_MySQL_closeConnection", conId, PACKAGE = .MySQLPkgName)
 
170
}
 
171
 
 
172
"mysqlConnectionInfo" <-
 
173
function(obj, what="", ...)
 
174
{
 
175
   if(!isIdCurrent(obj))
 
176
      stop(paste("expired", class(obj), deparse(substitute(obj))))
 
177
   id <- as(obj, "integer")
 
178
   info <- .Call("RS_MySQL_connectionInfo", id, PACKAGE = .MySQLPkgName)
 
179
   rsId <- vector("list", length = length(info$rsId))
 
180
   for(i in seq(along = info$rsId))
 
181
       rsId[[i]] <- new("MySQLResult", Id = c(id, info$rsId[i]))
 
182
   info$rsId <- rsId
 
183
   if(!missing(what))
 
184
      info[what]
 
185
   else
 
186
      info
 
187
}
 
188
       
 
189
"mysqlExecStatement" <-
 
190
function(con, statement)
 
191
## submits the sql statement to MySQL and creates a
 
192
## dbResult object if the SQL operation does not produce
 
193
## output, otherwise it produces a resultSet that can
 
194
## be used for fetching rows.
 
195
{
 
196
   if(!isIdCurrent(con))
 
197
      stop(paste("expired", class(con)))
 
198
   conId <- as(con, "integer")
 
199
   statement <- as(statement, "character")
 
200
   rsId <- .Call("RS_MySQL_exec", conId, statement, PACKAGE = .MySQLPkgName)
 
201
   new("MySQLResult", Id = rsId)
 
202
}
 
203
 
 
204
## helper function: it exec's *and* retrieves a statement. It should
 
205
## be named somehting else.
 
206
"mysqlQuickSQL" <-
 
207
function(con, statement)
 
208
{
 
209
   if(!isIdCurrent(con))
 
210
      stop(paste("expired", class(con)))
 
211
   nr <- length(dbListResults(con))
 
212
   if(nr>0){                     ## are there resultSets pending on con?
 
213
      new.con <- dbConnect(con)   ## yep, create a clone connection
 
214
      on.exit(dbDisconnect(new.con))
 
215
      rs <- dbSendQuery(new.con, statement)
 
216
   } else rs <- dbSendQuery(con, statement)
 
217
   if(dbHasCompleted(rs)){
 
218
      dbClearResult(rs)            ## no records to fetch, we're done
 
219
      invisible()
 
220
      return(NULL)
 
221
   }
 
222
   res <- fetch(rs, n = -1)
 
223
   if(dbHasCompleted(rs))
 
224
      dbClearResult(rs)
 
225
   else 
 
226
      warning("pending rows")
 
227
   res
 
228
}
 
229
 
 
230
"mysqlDescribeFields" <-
 
231
function(res, ...)
 
232
{
 
233
   flds <- dbGetInfo(res, "fieldDescription")[[1]][[1]]
 
234
   if(!is.null(flds)){
 
235
      flds$Sclass <- .Call("RS_DBI_SclassNames", flds$Sclass, 
 
236
                        PACKAGE = .MySQLPkgName)
 
237
      flds$type <- .Call("RS_MySQL_typeNames", as.integer(flds$type), 
 
238
                        PACKAGE = .MySQLPkgName)
 
239
      ## no factors
 
240
      structure(flds, row.names = paste(seq(along=flds$type)),
 
241
                            class = "data.frame")
 
242
   }
 
243
   else data.frame(flds)
 
244
}
 
245
 
 
246
"mysqlDBApply" <-
 
247
function(res, INDEX, FUN = stop("must specify FUN"), 
 
248
         begin = NULL, 
 
249
         group.begin =  NULL, 
 
250
         new.record = NULL, 
 
251
         end = NULL, 
 
252
         batchSize = 100, maxBatch = 1e6, 
 
253
         ..., simplify = TRUE)
 
254
## (Experimental)
 
255
## This function is meant to handle somewhat gracefully(?) large amounts 
 
256
## of data from the DBMS by bringing into R manageable chunks (about 
 
257
## batchSize records at a time, but not more than maxBatch); the idea
 
258
## is that the data from individual groups can be handled by R, but
 
259
## not all the groups at the same time.  
 
260
##
 
261
## dbApply apply functions to groups of rows coming from a remote
 
262
## database resultSet upon the following fetching events: 
 
263
##   begin         (prior to fetching the first record)
 
264
##   group.begin   (the record just fetched begins a new group)
 
265
##   new_record    (a new record just fetched)
 
266
##   group.end     (the record just fetched ends the current group)
 
267
##   end           (the record just fetched is the very last record)
 
268
##
 
269
## The "begin", "begin.group", etc., specify R functions to be
 
270
## invoked upon the corresponding events.  (For compatibility 
 
271
## with other apply functions the arg FUN is used to specify the
 
272
## most common case where we only specify the "group.end" event.)
 
273
## 
 
274
## The following describes the exact order and form of invocation for the
 
275
## various callbacks in the underlying  C code.  All callback functions
 
276
## (except FUN) are optional.
 
277
##  begin()
 
278
##    group.begin(group.name)   
 
279
##    new.record(df.record)
 
280
##    FUN(df.group, group.name)   (aka group.end)
 
281
##  end()
 
282
##
 
283
## TODO: (1) add argument output=F/T to suppress the creation of
 
284
##           an expensive(?) output list.
 
285
##       (2) allow INDEX to be a list as in tapply()
 
286
##       (3) add a "counter" event, to callback every k rows
 
287
##       (3) should we implement a simplify argument, as in sapply()?
 
288
##       (4) should it report (instead of just warning) when we're forced
 
289
##           to handle partial groups (groups larger than maxBatch).
 
290
##       (5) extend to the case where even individual groups are too
 
291
##           big for R (as in incremental quantiles).
 
292
##       (6) Highly R-dependent, not sure yet how to port it to S-plus.
 
293
{
 
294
   if(dbHasCompleted(res))
 
295
      stop("result set has completed")
 
296
   if(is.character(INDEX)){
 
297
      flds <- tolower(as.character(dbColumnInfo(res)$name))
 
298
      INDEX <- match(tolower(INDEX[1]), flds, 0)
 
299
   }
 
300
   if(INDEX<1)
 
301
      stop(paste("INDEX field", INDEX, "not in result set"))
 
302
 
 
303
   "null.or.fun" <- function(fun) # get fun obj, but a NULL is ok 
 
304
   {
 
305
      if(is.null(fun)) 
 
306
         fun 
 
307
      else 
 
308
         match.fun(fun)
 
309
   }
 
310
   begin <- null.or.fun(begin)
 
311
   group.begin <- null.or.fun(group.begin)
 
312
   group.end <- null.or.fun(FUN)     ## probably this is the most important
 
313
   end <- null.or.fun(end)
 
314
   new.record <- null.or.fun(new.record)
 
315
   rsId <- as(res, "integer")
 
316
   con <- as(res, "MySQLConnection")
 
317
   on.exit({
 
318
      rc <- dbGetException(con)
 
319
      if(!is.null(rc$errorNum) && rc$errorNum!=0)
 
320
         cat("dbApply aborted with MySQL error ", rc$errorNum,
 
321
             " (", rc$errorMsg, ")\n", sep = "")
 
322
 
 
323
      })
 
324
   ## BEGIN event handler (re-entrant, only prior to reading first row)
 
325
   if(!is.null(begin) && dbGetRowCount(res)==0) 
 
326
      begin()
 
327
   rho <- environment()
 
328
   funs <- list(begin = begin, end = end,
 
329
                group.begin = group.begin,
 
330
                group.end = group.end, new.record = new.record)
 
331
   out <- .Call("RS_MySQL_dbApply",
 
332
                rs = rsId,
 
333
                INDEX = as.integer(INDEX-1),
 
334
                funs, rho, as.integer(batchSize), as.integer(maxBatch),
 
335
                PACKAGE = .MySQLPkgName)
 
336
   if(!is.null(end) && dbHasCompleted(res))
 
337
      end()
 
338
   out
 
339
}
 
340
 
 
341
"mysqlFetch" <-
 
342
function(res, n=0, ...)
 
343
## Fetch at most n records from the opened resultSet (n = -1 means
 
344
## all records, n=0 means extract as many as "default_fetch_rec",
 
345
## as defined by MySQLDriver (see describe(drv, T)).
 
346
## The returned object is a data.frame. 
 
347
## Note: The method dbHasCompleted() on the resultSet tells you whether
 
348
## or not there are pending records to be fetched. 
 
349
## 
 
350
## TODO: Make sure we don't exhaust all the memory, or generate
 
351
## an object whose size exceeds option("object.size").  Also,
 
352
## are we sure we want to return a data.frame?
 
353
{    
 
354
   n <- as(n, "integer")
 
355
   rsId <- as(res, "integer")
 
356
   rel <- .Call("RS_MySQL_fetch", rsId, nrec = n, PACKAGE = .MySQLPkgName)
 
357
   if(length(rel)==0 || length(rel[[1]])==0) 
 
358
      return(NULL)
 
359
   ## create running row index as of previous fetch (if any)
 
360
   cnt <- dbGetRowCount(res)
 
361
   nrec <- length(rel[[1]])
 
362
   indx <- seq(from = cnt - nrec + 1, length = nrec)
 
363
   attr(rel, "row.names") <- as.character(indx)
 
364
   if(usingR())
 
365
      class(rel) <- "data.frame"
 
366
   else
 
367
      oldClass(rel) <- "data.frame"
 
368
   rel
 
369
}
 
370
 
 
371
## Note that originally we had only resultSet both for SELECTs
 
372
## and INSERTS, ...  Later on we created a base class dbResult
 
373
## for non-Select SQL and a derived class resultSet for SELECTS.
 
374
 
 
375
"mysqlResultInfo" <-
 
376
function(obj, what = "", ...)
 
377
{
 
378
   if(!isIdCurrent(obj))
 
379
      stop(paste("expired", class(obj), deparse(substitute(obj))))
 
380
   id <- as(obj, "integer")
 
381
   info <- .Call("RS_MySQL_resultSetInfo", id, PACKAGE = .MySQLPkgName)
 
382
   if(!missing(what))
 
383
      info[what]
 
384
   else
 
385
      info
 
386
}
 
387
 
 
388
"mysqlDescribeResult" <-
 
389
function(obj, verbose = FALSE, ...)
 
390
{
 
391
 
 
392
   if(!isIdCurrent(obj)){
 
393
      print(obj)
 
394
      invisible(return(NULL))
 
395
   }
 
396
   print(obj)
 
397
   cat("  Statement:", dbGetStatement(obj), "\n")
 
398
   cat("  Has completed?", if(dbHasCompleted(obj)) "yes" else "no", "\n")
 
399
   cat("  Affected rows:", dbGetRowsAffected(obj), "\n")
 
400
   cat("  Rows fetched:", dbGetRowCount(obj), "\n")
 
401
   flds <- dbColumnInfo(obj)
 
402
   if(verbose && !is.null(flds)){
 
403
      cat("  Fields:\n")  
 
404
      out <- print(dbColumnInfo(obj))
 
405
   }
 
406
   invisible(NULL)
 
407
}
 
408
 
 
409
"mysqlCloseResult" <-
 
410
function(res, ...)
 
411
{
 
412
   if(!isIdCurrent(res))
 
413
      return(TRUE)
 
414
   rsId <- as(res, "integer")
 
415
   .Call("RS_MySQL_closeResultSet", rsId, PACKAGE = .MySQLPkgName)
 
416
}
 
417
 
 
418
"mysqlReadTable" <- 
 
419
function(con, name, row.names = "row_names", check.names = TRUE, ...)
 
420
## Use NULL, "", or 0 as row.names to prevent using any field as row.names.
 
421
{
 
422
   out <- dbGetQuery(con, paste("SELECT * from", name))
 
423
   if(check.names)
 
424
       names(out) <- make.names(names(out), unique = TRUE)
 
425
   ## should we set the row.names of the output data.frame?
 
426
   nms <- names(out)
 
427
   j <- switch(mode(row.names),
 
428
           "character" = if(row.names=="") 0 else
 
429
               match(tolower(row.names), tolower(nms), 
 
430
                     nomatch = if(missing(row.names)) 0 else -1),
 
431
           "numeric" = row.names,
 
432
           "NULL" = 0,
 
433
           0)
 
434
   if(j==0) 
 
435
      return(out)
 
436
   if(j<0 || j>ncol(out)){
 
437
      warning("row.names not set on output data.frame (non-existing field)")
 
438
      return(out)
 
439
   }
 
440
   rnms <- as.character(out[,j])
 
441
   if(all(!duplicated(rnms))){
 
442
      out <- out[,-j, drop = FALSE]
 
443
      row.names(out) <- rnms
 
444
   } else warning("row.names not set on output (duplicate elements in field)")
 
445
   out
 
446
 
447
 
 
448
"mysqlImportFile" <-
 
449
function(con, name, value, field.types = NULL, overwrite = FALSE, 
 
450
  append = FALSE, header, row.names, nrows = 50, sep = ",", 
 
451
  eol="\n", skip = 0, quote = '"', ...)
 
452
{
 
453
  if(overwrite && append)
 
454
    stop("overwrite and append cannot both be TRUE")
 
455
 
 
456
  ## Do we need to clone the connection (ie., if it is in use)?
 
457
  if(length(dbListResults(con))!=0){ 
 
458
    new.con <- dbConnect(con)              ## there's pending work, so clone
 
459
    on.exit(dbDisconnect(new.con))
 
460
  } 
 
461
  else 
 
462
    new.con <- con
 
463
 
 
464
  if(dbExistsTable(con,name)){
 
465
    if(overwrite){
 
466
      if(!dbRemoveTable(con, name)){
 
467
        warning(paste("table", name, "couldn't be overwritten"))
 
468
        return(FALSE)
 
469
      }
 
470
    }
 
471
    else if(!append){
 
472
      warning(paste("table", name, "exists in database: aborting dbWriteTable"))
 
473
      return(FALSE)
 
474
    }
 
475
  }
 
476
 
 
477
  ## compute full path name (have R expand ~, etc)
 
478
  fn <- file.path(dirname(value), basename(value))
 
479
  if(missing(header) || missing(row.names)){
 
480
    f <- file(fn, open="r")
 
481
    if(skip>0) 
 
482
      readLines(f, n=skip)
 
483
    txtcon <- textConnection(readLines(f, n=2))
 
484
    flds <- count.fields(txtcon, sep)
 
485
    close(txtcon)
 
486
    close(f)
 
487
    nf <- length(unique(flds))
 
488
  }
 
489
  if(missing(header)){
 
490
    header <- nf==2
 
491
  }
 
492
  if(missing(row.names)){
 
493
    if(header)
 
494
      row.names <- if(nf==2) TRUE else FALSE
 
495
    else
 
496
      row.names <- FALSE
 
497
  }
 
498
 
 
499
  new.table <- !dbExistsTable(con, name)
 
500
  if(new.table){
 
501
    ## need to init table, say, with the first nrows lines
 
502
    d <- read.table(fn, sep=sep, header=header, skip=skip, nrows=nrows, ...)
 
503
    sql <- 
 
504
      dbBuildTableDefinition(new.con, name, obj=d, field.types = field.types,
 
505
        row.names = row.names)
 
506
    rs <- try(dbSendQuery(new.con, sql))
 
507
    if(inherits(rs, ErrorClass)){
 
508
      warning("could not create table: aborting sqliteImportFile")
 
509
      return(FALSE)
 
510
    } 
 
511
    else 
 
512
      dbClearResult(rs)
 
513
  }
 
514
  else if(!append){
 
515
    warning(sprintf("table %s already exists -- use append=TRUE?", name))
 
516
  }
 
517
 
 
518
  fmt <- 
 
519
     paste("LOAD DATA LOCAL INFILE '%s' INTO TABLE  %s ",
 
520
           "FIELDS TERMINATED BY '%s' ",
 
521
           if(!is.null(quote)) "OPTIONALLY ENCLOSED BY '%s' " else "",
 
522
           "LINES TERMINATED BY '%s' ",
 
523
           "IGNORE %d LINES ", sep="")
 
524
  if(is.null(quote))
 
525
     sql <- sprintf(fmt, fn, name, sep, eol, skip + as.integer(header))
 
526
  else
 
527
     sql <- sprintf(fmt, fn, name, sep, quote, eol, skip + as.integer(header))
 
528
 
 
529
  rs <- try(dbSendQuery(new.con, sql))
 
530
  if(inherits(rs, ErrorClass)){
 
531
     warning("could not load data into table")
 
532
     return(FALSE)
 
533
  } 
 
534
  dbClearResult(rs)
 
535
  TRUE
 
536
}
 
537
 
 
538
"mysqlWriteTable" <-
 
539
function(con, name, value, field.types, row.names = TRUE, 
 
540
   overwrite = FALSE, append = FALSE, ..., allow.keywords = FALSE)
 
541
## Create table "name" (must be an SQL identifier) and populate
 
542
## it with the values of the data.frame "value"
 
543
## TODO: This function should execute its sql as a single transaction,
 
544
##       and allow converter functions.
 
545
## TODO: In the unlikely event that value has a field called "row_names"
 
546
##       we could inadvertently overwrite it (here the user should set 
 
547
##       row.names=F)  I'm (very) reluctantly adding the code re: row.names,
 
548
##       because I'm not 100% comfortable using data.frames as the basic 
 
549
##       data for relations.
 
550
{
 
551
   if(overwrite && append)
 
552
      stop("overwrite and append cannot both be TRUE")
 
553
   if(!is.data.frame(value))
 
554
      value <- as.data.frame(value)
 
555
   if(row.names){
 
556
      value <- cbind(row.names(value), value)  ## can't use row.names= here
 
557
      names(value)[1] <- "row.names"
 
558
   }
 
559
   if(missing(field.types) || is.null(field.types)){
 
560
      ## the following mapping should be coming from some kind of table
 
561
      ## also, need to use converter functions (for dates, etc.)
 
562
      field.types <- sapply(value, dbDataType, dbObj = con)
 
563
   } 
 
564
 
 
565
   ## Do we need to coerce any field prior to write it out?
 
566
   ## TODO: MySQL 4.1 introduces the boolean data type.  
 
567
   for(i in seq(along = value)){
 
568
      if(is(value[[i]], "logical"))
 
569
         value[[i]] <- as(value[[i]], "integer")
 
570
   }
 
571
   i <- match("row.names", names(field.types), nomatch=0)
 
572
   if(i>0) ## did we add a row.names value?  If so, it's a text field.
 
573
      field.types[i] <- dbDataType(dbObj=con, field.types$row.names)
 
574
   names(field.types) <- make.db.names(con, names(field.types), 
 
575
                             allow.keywords = allow.keywords)
 
576
   ## Do we need to clone the connection (ie., if it is in use)?
 
577
   if(length(dbListResults(con))!=0){ 
 
578
      new.con <- dbConnect(con)              ## there's pending work, so clone
 
579
      on.exit(dbDisconnect(new.con))
 
580
   } 
 
581
   else {
 
582
      new.con <- con
 
583
   }
 
584
 
 
585
   if(dbExistsTable(con,name)){
 
586
      if(overwrite){
 
587
         if(!dbRemoveTable(con, name)){
 
588
         warning(paste("table", name, "couldn't be overwritten"))
 
589
         return(F)
 
590
         }
 
591
      }
 
592
      else if(!append){
 
593
         warning(paste("table",name,"exists in database: aborting assignTable"))
 
594
         return(F)
 
595
      }
 
596
   } 
 
597
   if(!dbExistsTable(con,name)){      ## need to re-test table for existance 
 
598
      ## need to create a new (empty) table
 
599
      sql1 <- paste("create table ", name, "\n(\n\t", sep="")
 
600
      sql2 <- paste(paste(names(field.types), field.types), collapse=",\n\t",
 
601
                          sep="")
 
602
      sql3 <- "\n)\n"
 
603
      sql <- paste(sql1, sql2, sql3, sep="")
 
604
      rs <- try(dbSendQuery(new.con, sql))
 
605
      if(inherits(rs, ErrorClass)){
 
606
         warning("could not create table: aborting assignTable")
 
607
         return(F)
 
608
      } 
 
609
      else 
 
610
         dbClearResult(rs)
 
611
   }
 
612
 
 
613
   ## TODO: here, we should query the MySQL to find out if it supports
 
614
   ## LOAD DATA thru pipes; if so, should open the pipe instead of a file.
 
615
 
 
616
   fn <- tempfile("rsdbi")
 
617
   fn <- gsub("\\\\", "/", fn)  # Since MySQL on Windows wants \ double (BDR)
 
618
   safe.write(value, file = fn)
 
619
   on.exit(unlink(fn), add = TRUE)
 
620
   sql4 <- paste("LOAD DATA LOCAL INFILE '", fn, "'",
 
621
                  " INTO TABLE ", name, 
 
622
                  " LINES TERMINATED BY '\n' ", sep="")
 
623
   rs <- try(dbSendQuery(new.con, sql4))
 
624
   if(inherits(rs, ErrorClass)){
 
625
      warning("could not load data into table")
 
626
      return(F)
 
627
   } 
 
628
   else 
 
629
      dbClearResult(rs)
 
630
   TRUE
 
631
}
 
632
 
 
633
"dbBuildTableDefinition" <-
 
634
function(dbObj, name, obj, field.types = NULL, row.names = TRUE, ...)
 
635
{
 
636
  if(!is.data.frame(obj))
 
637
    obj <- as.data.frame(obj)
 
638
  if(!is.null(row.names) && row.names){
 
639
    obj  <- cbind(row.names(obj), obj)  ## can't use row.names= here
 
640
    names(obj)[1] <- "row.names" 
 
641
  }
 
642
  if(is.null(field.types)){
 
643
    ## the following mapping should be coming from some kind of table
 
644
    ## also, need to use converter functions (for dates, etc.)
 
645
    field.types <- sapply(obj, dbDataType, dbObj = dbObj)
 
646
  } 
 
647
  i <- match("row.names", names(field.types), nomatch=0)
 
648
  if(i>0) ## did we add a row.names value?  If so, it's a text field.
 
649
    field.types[i] <- dbDataType(dbObj, field.types$row.names)
 
650
  names(field.types) <- 
 
651
    make.db.names(dbObj, names(field.types), allow.keywords = FALSE)
 
652
 
 
653
  ## need to create a new (empty) table
 
654
  flds <- paste(names(field.types), field.types)
 
655
  paste("CREATE TABLE", name, "\n(", paste(flds, collapse=",\n\t"), "\n)")
 
656
}
 
657
 
 
658
## the following is almost exactly from the ROracle driver 
 
659
"safe.write" <- 
 
660
function(value, file, batch, ...)
 
661
## safe.write makes sure write.table doesn't exceed available memory by batching
 
662
## at most batch rows (but it is still slowww)
 
663
{  
 
664
   N <- nrow(value)
 
665
   if(N<1){
 
666
      warning("no rows in data.frame")
 
667
      return(NULL)
 
668
   }
 
669
   digits <- options(digits = 17)
 
670
   on.exit(options(digits))
 
671
   if(missing(batch) || is.null(batch))
 
672
      batch <- 10000
 
673
   else if(batch<=0) 
 
674
      batch <- N
 
675
   from <- 1 
 
676
   to <- min(batch, N)
 
677
   while(from<=N){
 
678
      if(usingR())
 
679
         write.table(value[from:to,, drop=FALSE], file = file, append = TRUE, 
 
680
               quote = FALSE, sep="\t", na = .MySQL.NA.string,
 
681
               row.names=FALSE, col.names=FALSE, eol = '\n', ...)
 
682
      else
 
683
         write.table(value[from:to,, drop=FALSE], file = file, append = TRUE, 
 
684
               quote.string = FALSE, sep="\t", na = .MySQL.NA.string,
 
685
               dimnames.write=FALSE, end.of.row = '\n', ...)
 
686
      from <- to+1
 
687
      to <- min(to+batch, N)
 
688
   }
 
689
   invisible(NULL)
 
690
}
 
691
 
 
692
"mysqlDataType" <-
 
693
function(obj, ...)
 
694
## find a suitable SQL data type for the R/S object obj
 
695
## TODO: Lots and lots!! (this is a very rough first draft)
 
696
## need to register converters, abstract out MySQL and generalize 
 
697
## to Oracle, Informix, etc.  Perhaps this should be table-driven.
 
698
## NOTE: MySQL data types differ from the SQL92 (e.g., varchar truncate
 
699
## trailing spaces).  MySQL enum() maps rather nicely to factors (with
 
700
## up to 65535 levels)
 
701
{
 
702
   rs.class <- data.class(obj)    ## this differs in R 1.4 from older vers
 
703
   rs.mode <- storage.mode(obj)
 
704
   if(rs.class=="numeric" || rs.class == "integer"){
 
705
      sql.type <- if(rs.mode=="integer") "bigint" else  "double"
 
706
   } 
 
707
   else {
 
708
      sql.type <- switch(rs.class,
 
709
                     character = "text",
 
710
                     logical = "tinyint",  ## but we need to coerce to int!!
 
711
                     factor = "text",      ## up to 65535 characters
 
712
                     ordered = "text",
 
713
                     "text")
 
714
   }
 
715
   sql.type
 
716
}
 
717
 
 
718
## the following reserved words were taken from Section 6.1.7
 
719
## of the MySQL Manual, Version 4.1.1-alpha, html format.
 
720
 
 
721
".MySQLKeywords" <-
 
722
c("ADD", "ALL", "ALTER", "ANALYZE", "AND", "AS", "ASC", "ASENSITIVE", 
 
723
  "AUTO_INCREMENT", "BDB", "BEFORE", "BERKELEYDB", "BETWEEN", "BIGINT", 
 
724
  "BINARY", "BLOB", "BOTH", "BY", "CALL", "CASCADE", "CASE", "CHANGE", 
 
725
  "CHAR", "CHARACTER", "CHECK", "COLLATE", "COLUMN", "COLUMNS", 
 
726
  "CONDITION", "CONNECTION", "CONSTRAINT", "CONTINUE", "CREATE", 
 
727
  "CROSS", "CURRENT_DATE", "CURRENT_TIME", "CURRENT_TIMESTAMP", 
 
728
  "CURSOR", "DATABASE", "DATABASES", "DAY_HOUR", "DAY_MICROSECOND", 
 
729
  "DAY_MINUTE", "DAY_SECOND", "DEC", "DECIMAL", "DECLARE", "DEFAULT", 
 
730
  "DELAYED", "DELETE", "DESC", "DESCRIBE", "DISTINCT", "DISTINCTROW", 
 
731
  "DIV", "DOUBLE", "DROP", "ELSE", "ELSEIF", "ENCLOSED", "ESCAPED", 
 
732
  "EXISTS", "EXIT", "EXPLAIN", "FALSE", "FETCH", "FIELDS", "FLOAT", 
 
733
  "FOR", "FORCE", "FOREIGN", "FOUND", "FROM", "FULLTEXT", "GRANT", 
 
734
  "GROUP", "HAVING", "HIGH_PRIORITY", "HOUR_MICROSECOND", "HOUR_MINUTE", 
 
735
  "HOUR_SECOND", "IF", "IGNORE", "IN", "INDEX", "INFILE", "INNER", 
 
736
  "INNODB", "INOUT", "INSENSITIVE", "INSERT", "INT", "INTEGER", 
 
737
  "INTERVAL", "INTO", "IO_THREAD", "IS", "ITERATE", "JOIN", "KEY", 
 
738
  "KEYS", "KILL", "LEADING", "LEAVE", "LEFT", "LIKE", "LIMIT", 
 
739
  "LINES", "LOAD", "LOCALTIME", "LOCALTIMESTAMP", "LOCK", "LONG", 
 
740
  "LONGBLOB", "LONGTEXT", "LOOP", "LOW_PRIORITY", "MASTER_SERVER_ID", 
 
741
  "MATCH", "MEDIUMBLOB", "MEDIUMINT", "MEDIUMTEXT", "MIDDLEINT", 
 
742
  "MINUTE_MICROSECOND", "MINUTE_SECOND", "MOD", "NATURAL", "NOT", 
 
743
  "NO_WRITE_TO_BINLOG", "NULL", "NUMERIC", "ON", "OPTIMIZE", "OPTION", 
 
744
  "OPTIONALLY", "OR", "ORDER", "OUT", "OUTER", "OUTFILE", "PRECISION", 
 
745
  "PRIMARY", "PRIVILEGES", "PROCEDURE", "PURGE", "READ", "REAL", 
 
746
  "REFERENCES", "REGEXP", "RENAME", "REPEAT", "REPLACE", "REQUIRE", 
 
747
  "RESTRICT", "RETURN", "RETURNS", "REVOKE", "RIGHT", "RLIKE", 
 
748
  "SECOND_MICROSECOND", "SELECT", "SENSITIVE", "SEPARATOR", "SET", 
 
749
  "SHOW", "SMALLINT", "SOME", "SONAME", "SPATIAL", "SPECIFIC", 
 
750
  "SQL", "SQLEXCEPTION", "SQLSTATE", "SQLWARNING", "SQL_BIG_RESULT", 
 
751
  "SQL_CALC_FOUND_ROWS", "SQL_SMALL_RESULT", "SSL", "STARTING", 
 
752
  "STRAIGHT_JOIN", "STRIPED", "TABLE", "TABLES", "TERMINATED", 
 
753
  "THEN", "TINYBLOB", "TINYINT", "TINYTEXT", "TO", "TRAILING", 
 
754
  "TRUE", "TYPES", "UNDO", "UNION", "UNIQUE", "UNLOCK", "UNSIGNED", 
 
755
  "UPDATE", "USAGE", "USE", "USER_RESOURCES", "USING", "UTC_DATE", 
 
756
  "UTC_TIME", "UTC_TIMESTAMP", "VALUES", "VARBINARY", "VARCHAR", 
 
757
  "VARCHARACTER", "VARYING", "WHEN", "WHERE", "WHILE", "WITH", 
 
758
  "WRITE", "XOR", "YEAR_MONTH", "ZEROFILL"
 
759
  )