2
(cl-interpol:enable-interpol-syntax)
5
(clsql-sys:execute-command sql))
7
(defmethod format-csv-value ((val clsql-sys:date))
8
(clsql-helper:print-nullable-date val))
10
(defmethod format-csv-value ((val clsql-sys:wall-time))
11
(clsql-helper:print-nullable-datetime val))
13
(defun export-query ( sql &key stream path)
14
(with-csv-output-stream (s (or stream path))
15
(multiple-value-bind (rows cols)
16
(clsql:query sql :flatp t)
17
(write-csv-row cols :stream s)
18
(write-csv rows :stream s))))
20
(defun import-from-csv (table-name &rest keys
21
&key file data-table (sample-size 1000)
22
schema (should-have-serial-id "id")
23
excluded-columns row-fn
24
(log-fn #'(lambda (&rest args) (declare (ignore args))))
26
&aux (cl-interpol:*list-delimiter* ",")
28
"Will make a best effor to create a table matching the csv's schema and then
30
row-fn (data-row schema table columns)
31
allows you to take actions on a row (before insert).
32
returning false will prevent the default insert
34
log-fn (msg &rest args)
35
function to log progress
37
log-frequency : how frequent (measured in rows) to log progress
40
;;; TODO: accept column names from params
41
;;; TODO: figure out how to process files without columns in the first row
42
;;; TODO: figure out how to override type guesses
43
;;; TODO: check if the table already exists and skip the guessing
44
(funcall log-fn "Starting import ~a" table-name)
45
(let ((dt (or data-table (get-data-table-from-csv file t t sample-size)))
46
(keys (copy-list keys)))
47
(dolist (k '(:file :data-table :row-fn :sample-size :log-fn :log-frequency))
49
(funcall log-fn "CSV scanned for type information")
50
(when (and should-have-serial-id
51
(member should-have-serial-id (data-table:column-names dt) :test #'string-equal))
52
(error #?"This table already has an id column name `${should-have-serial-id}` Column! Perhaps you wish to turn off should-have-serial-id or assign it a different name?"))
53
(apply #'data-table:ensure-table-for-data-table dt table-name keys)
54
(funcall log-fn "Created table, starting import")
55
(let ((start-time (get-universal-time)))
56
(flet ((log-progress (row-num &optional (msg "Processing row"))
57
(let ((elapsed (- (get-universal-time) start-time)))
58
(funcall log-fn "~a ~a. ~ds elapsed (~,2f rows/sec) "
60
(if (zerop elapsed) "Inf"
61
(/ row-num elapsed))))))
63
(with importer = (data-table::make-row-importer
64
dt table-name :schema schema :excluded-columns excluded-columns
66
(for row in-csv file SKIPPING-HEADER T)
68
(funcall importer row)
69
(when (zerop (mod row-num log-frequency)) (log-progress row-num))
70
(finally (log-progress row-num "Finished, total processed: ")))))))
72
(defun serial-import-from-csv (table-name
74
(column-names :first-row)
75
(schema "public") (column-transform
76
#'data-table::english->postgres)
77
(progress-stream t) (progress-mod 5000)
78
(data-munger (lambda (row)
79
(mapcar #'clsql-helper:format-value-for-database
81
(log (lambda (msg &rest args)
82
(apply #'format progress-stream msg args)))
83
(on-error 'continue-importing)
84
&aux columns (cl-interpol:*list-delimiter* ", "))
85
"Will make a best effor to create a table matching the csv's schema and then
87
data-munger : a function that changes the the data row to be inserted
88
(for conversion to other types etc)
89
progress-stream: the default log stream to print to
90
progress-mod: how often we should report progress (in number of rows)
91
log: is a function that accepts msg and args to display status updates to the user
92
on-error: is a restart to run when an error is experienced, if nil, it will simply
93
allow the error to go up the stack. the default is to continue with the
94
import, skipping the row that caused errors
96
(unless (eql column-names :first-row)
97
(setf columns (data-table::sql-escaped-column-names
99
:transform column-transform)))
101
(for row in-csv file)
103
(if (and (eql column-names :first-row) (first-iteration-p))
104
(setf columns (data-table::sql-escaped-column-names row :transform column-transform))
108
(funcall log "Error importing ROW ~D of file: ~S~%~S~%~A~%~S"
111
(when (find-restart on-error) (invoke-restart on-error))))))
112
(exec #?"INSERT INTO ${schema}.${table-name} (@{ columns })
113
VALUES ( @{ (funcall data-munger row) } )")
114
(when (zerop (mod cnt progress-mod))
115
(funcall log "Imported ~D rows~%" cnt )))
116
(continue-importing ()
117
:report "Continue Importing the file, skipping this row of data")))))