~launchpad-p-s/sofastatistics/main

« back to all changes in this revision

Viewing changes to dbe_plugins/dbe_sqlite.py

  • Committer: Grant Paton-Simpson
  • Date: 2009-05-19 04:21:43 UTC
  • Revision ID: g@ubuntu-20090519042143-p561mbokz3inefvd
Initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
from pysqlite2 import dbapi2 as sqlite
 
3
import pprint
 
4
import re
 
5
import wx
 
6
 
 
7
import getdata
 
8
import table_entry
 
9
 
 
10
# http://www.sqlite.org/lang_keywords.html
 
11
# The following is non-standard but will work
 
12
def quote_identifier(raw_val):
 
13
    return "`%s`" % raw_val
 
14
 
 
15
def DbeSyntaxElements():
 
16
    if_clause = "CASE WHEN %s THEN %s ELSE %s END"
 
17
    abs_wrapper_l = ""
 
18
    abs_wrapper_r = ""
 
19
    return if_clause, abs_wrapper_l, abs_wrapper_r
 
20
 
 
21
 
 
22
class DbDets(getdata.DbDets):
 
23
 
 
24
    def getDbTbls(self, cur, db):
 
25
        "Get table names given database and cursor"
 
26
        SQL_get_tbl_names = """SELECT name 
 
27
            FROM sqlite_master 
 
28
            WHERE type = 'table'
 
29
            ORDER BY name"""
 
30
        cur.execute(SQL_get_tbl_names)
 
31
        tbls = [x[0] for x in cur.fetchall()]
 
32
        tbls.sort(key=lambda s: s.upper())
 
33
        return tbls
 
34
 
 
35
    def getCharLen(self, type_text):
 
36
        """
 
37
        NB SQLite never truncates whatever you specify.
 
38
        http://www.sqlite.org/faq.html#q9
 
39
        """
 
40
        reobj = re.compile(r"\w*()")
 
41
        match = reobj.search(type_text)    
 
42
        try:
 
43
            return int(match.group(1))
 
44
        except ValueError:
 
45
            return None
 
46
 
 
47
    def getTblFlds(self, cur, db, tbl):
 
48
        "http://www.sqlite.org/pragma.html"
 
49
        # get encoding
 
50
        cur.execute("PRAGMA encoding")
 
51
        encoding = cur.fetchone()[0]
 
52
        # get field details
 
53
        cur.execute("PRAGMA table_info(%s)" % tbl)
 
54
        fld_dets = cur.fetchall() 
 
55
        flds = {}
 
56
        for cid, fld_name, fld_type, notnull, dflt_value, pk in fld_dets:
 
57
            bolnullable = True if notnull == 0 else False
 
58
            bolnumeric = fld_type.lower() in ["integer", "float", "numeric"]
 
59
            bolautonum = (pk == 1 and fld_type.lower() == "integer")            
 
60
            boldata_entry_ok = False if bolautonum else True
 
61
            boldatetime = fld_type.lower() in ["date"]
 
62
            fld_txt = not bolnumeric and not boldatetime
 
63
            dets_dic = {
 
64
                getdata.FLD_SEQ: cid,
 
65
                getdata.FLD_BOLNULLABLE: bolnullable,
 
66
                getdata.FLD_DATA_ENTRY_OK: boldata_entry_ok,
 
67
                getdata.FLD_COLUMN_DEFAULT: dflt_value,
 
68
                getdata.FLD_BOLTEXT: fld_txt,
 
69
                getdata.FLD_TEXT_LENGTH: self.getCharLen(fld_type),
 
70
                getdata.FLD_CHARSET: encoding,
 
71
                getdata.FLD_BOLNUMERIC: bolnumeric,
 
72
                getdata.FLD_BOLAUTONUMBER: bolautonum,
 
73
                getdata.FLD_DECPTS: None, # not really applicable - no limit
 
74
                getdata.FLD_NUM_WIDTH: None, # no limit (TODO unless check constraint)
 
75
                getdata.FLD_BOL_NUM_SIGNED: True,
 
76
                getdata.FLD_NUM_MIN_VAL: None, # not really applicable - no limit
 
77
                getdata.FLD_NUM_MAX_VAL: None, # not really applicable - no limit
 
78
                getdata.FLD_BOLDATETIME: boldatetime, 
 
79
                }
 
80
            flds[fld_name] = dets_dic
 
81
        return flds
 
82
    
 
83
    def getIndexDets(self, cur, db, tbl):
 
84
        """
 
85
        has_unique - booleanself.dropDefault_Dbe
 
86
        idxs = [idx0, idx1, ...]
 
87
        each idx is a dict name, is_unique, flds
 
88
        """
 
89
        debug = False
 
90
        cur.execute("PRAGMA index_list(\"%s\")" % tbl)
 
91
        idx_lst = cur.fetchall() # [(seq, name, unique), ...]
 
92
        if debug: pprint.pprint(idx_lst)
 
93
        names_idx_name = 1
 
94
        names_idx_unique = 2
 
95
        # initialise
 
96
        has_unique = False
 
97
        idxs = []
 
98
        if idx_lst:
 
99
            idx_names = [x[names_idx_name] for x in idx_lst]
 
100
            for i, idx_name in enumerate(idx_names):
 
101
                cur.execute("PRAGMA index_info(\"%s\")" % idx_name)
 
102
                # [(seqno, cid, name), ...]
 
103
                flds_idx_names = 2
 
104
                index_info = cur.fetchall()
 
105
                if debug: pprint.pprint(index_info)
 
106
                fld_names = [x[flds_idx_names] for x in index_info]
 
107
                unique = (idx_lst[i][names_idx_unique] == 1)
 
108
                if unique:
 
109
                    has_unique = True
 
110
                idx_dic = {getdata.IDX_NAME: idx_name, 
 
111
                           getdata.IDX_IS_UNIQUE: unique, 
 
112
                           getdata.IDX_FLDS: fld_names}
 
113
                idxs.append(idx_dic)
 
114
        if debug:
 
115
            pprint.pprint(idxs)
 
116
            print has_unique
 
117
        return has_unique, idxs
 
118
        
 
119
    def getDbDets(self):
 
120
        """
 
121
        Return connection, cursor, and get lists of 
 
122
            databases (only one for SQLite), tables, and fields 
 
123
            based on the SQLite database connection details provided.
 
124
        The table used will be the first if none provided.
 
125
        The field dets will be taken from the table used.
 
126
        Returns conn, cur, dbs, tbls, flds, has_unique, idxs.
 
127
        """
 
128
        conn_dets_sqlite = self.conn_dets.get(getdata.DBE_SQLITE)
 
129
        if not conn_dets_sqlite:
 
130
            raise Exception, "No connection details available for SQLite"
 
131
        if not conn_dets_sqlite.get(self.db):
 
132
            raise Exception, "No connections for SQLite database %s" % \
 
133
                self.db
 
134
        conn = sqlite.connect(**conn_dets_sqlite[self.db])
 
135
        cur = conn.cursor() # must return tuples not dics
 
136
        dbs = [self.db]
 
137
        db_to_use = self.db
 
138
        tbls = self.getDbTbls(cur, self.db)
 
139
        # get field names (from first table if none provided)
 
140
        tbl_to_use = self.tbl if self.tbl else tbls[0]
 
141
        flds = self.getTblFlds(cur, self.db, tbl_to_use)
 
142
        has_unique, idxs = self.getIndexDets(cur, db_to_use, tbl_to_use)
 
143
        debug = False
 
144
        if debug:
 
145
            print self.db
 
146
            print self.tbl
 
147
            pprint.pprint(tbls)
 
148
            pprint.pprint(flds)
 
149
            pprint.pprint(idxs)
 
150
        return conn, cur, dbs, tbls, flds, has_unique, idxs
 
151
 
 
152
 
 
153
def InsertRow(conn, cur, tbl_name, data):
 
154
    """
 
155
    data = [(value as string, fld_name, fld_dets), ...]
 
156
    Modify any values (according to field details) to be ready for insertion.
 
157
    Use placeholders in execute statement.
 
158
    Commit insert statement.
 
159
    """
 
160
    debug = False
 
161
    if debug: pprint.pprint(data)
 
162
    fld_dics = [x[2] for x in data]
 
163
    fld_names = [x[1] for x in data]
 
164
    # http://www.sqlite.org/lang_keywords.html
 
165
    # The following is non-standard but will work
 
166
    fld_names_clause = " (`" + "`, `".join(fld_names) + "`) "
 
167
    # e.g. (`fname`, `lname`, `dob` ...)
 
168
    # http://docs.python.org/library/sqlite3.html re placeholders
 
169
    fld_placeholders_clause = " (" + \
 
170
        ", ".join(["?" for x in range(len(data))]) + ") "
 
171
    # e.g. " (%s, %s, %s ...) "
 
172
    SQL_insert = "INSERT INTO `%s` " % tbl_name + fld_names_clause + \
 
173
        "VALUES %s" % fld_placeholders_clause
 
174
    if debug: print SQL_insert
 
175
    data_lst = []
 
176
    for i, data_dets in enumerate(data):
 
177
        if debug: pprint.pprint(data_dets)
 
178
        str_val, fld_name, fld_dic = data_dets
 
179
        if fld_dic[getdata.FLD_BOLDATETIME]:
 
180
            valid_datetime, t = util.datetime_str_valid(str_val)
 
181
            if not valid_datetime:
 
182
                return False # should have been tested already but ...
 
183
            else:
 
184
                if debug: print t
 
185
            # might as well store in same way as MySQL
 
186
            val2add = "%s-%s-%s %s:%s:%s" % (t[0], t[1], t[2], 
 
187
                                             t[3], t[4], t[5])
 
188
            if debug: print val2add 
 
189
        else:
 
190
            val2add = str_val
 
191
        data_lst.append(val2add)
 
192
    data_tup = tuple(data_lst)
 
193
    try:
 
194
        cur.execute(SQL_insert, data_tup)
 
195
        conn.commit()
 
196
        return True
 
197
    except Exception, e:
 
198
        if debug: print "Failed to insert row.  SQL: %s, Data: %s" % \
 
199
            (SQL_insert, str(data_tup)) + "\n\nOriginal error: %s" % e
 
200
        return False
 
201
 
 
202
def setDataConnGui(parent, read_only, scroll, szr, lblfont):
 
203
    ""
 
204
    # default database
 
205
    parent.lblSqliteDefaultDb = wx.StaticText(scroll, -1, "Default Database:")
 
206
    parent.lblSqliteDefaultDb.SetFont(lblfont)
 
207
    sqlite_default_db = parent.sqlite_default_db if parent.sqlite_default_db \
 
208
        else ""
 
209
    parent.txtSqliteDefaultDb = wx.TextCtrl(scroll, -1, 
 
210
                                            sqlite_default_db, 
 
211
                                            size=(250,-1))
 
212
    parent.txtSqliteDefaultDb.Enable(not read_only)
 
213
    # default table
 
214
    parent.lblSqliteDefaultTbl = wx.StaticText(scroll, -1, "Default Table:")
 
215
    parent.lblSqliteDefaultTbl.SetFont(lblfont)
 
216
    sqlite_default_tbl = parent.sqlite_default_tbl if parent.sqlite_default_tbl \
 
217
        else ""
 
218
    parent.txtSqliteDefaultTbl = wx.TextCtrl(scroll, -1,
 
219
                                             sqlite_default_tbl, 
 
220
                                             size=(250,-1))
 
221
    parent.txtSqliteDefaultTbl.Enable(not read_only)
 
222
    bxSqlite = wx.StaticBox(scroll, -1, "SQLite")
 
223
    parent.szrSqlite = wx.StaticBoxSizer(bxSqlite, wx.VERTICAL)
 
224
    #3 SQLITE INNER
 
225
    szrSqliteInner = wx.BoxSizer(wx.HORIZONTAL)
 
226
    szrSqliteInner.Add(parent.lblSqliteDefaultDb, 0, wx.LEFT|wx.RIGHT, 5)
 
227
    szrSqliteInner.Add(parent.txtSqliteDefaultDb, 1, wx.GROW|wx.RIGHT, 10)
 
228
    szrSqliteInner.Add(parent.lblSqliteDefaultTbl, 0, wx.LEFT|wx.RIGHT, 5)
 
229
    szrSqliteInner.Add(parent.txtSqliteDefaultTbl, 1, wx.GROW|wx.RIGHT, 10)
 
230
    parent.szrSqlite.Add(szrSqliteInner, 0)
 
231
    sqlite_col_dets = [("Database(s)", table_entry.COL_TEXT_BROWSE, 400, 
 
232
                        "Choose an SQLite database file")]
 
233
    parent.sqlite_new_grid_data = []
 
234
    parent.sqlite_grid = table_entry.TableEntry(frame=parent, 
 
235
        panel=scroll, szr=parent.szrSqlite, read_only=read_only, 
 
236
        grid_size=(550, 100), col_dets=sqlite_col_dets, 
 
237
        data=parent.sqlite_data, new_grid_data=parent.sqlite_new_grid_data)
 
238
    szr.Add(parent.szrSqlite, 0, wx.GROW|wx.ALL, 10)
 
239
 
 
240
def getProjSettings(parent, proj_dic):
 
241
    ""
 
242
    parent.sqlite_default_db = proj_dic["default_dbs"][getdata.DBE_SQLITE]
 
243
    parent.sqlite_default_tbl = proj_dic["default_tbls"][getdata.DBE_SQLITE]
 
244
    if proj_dic["conn_dets"].get(getdata.DBE_SQLITE):
 
245
        parent.sqlite_data = [(x["database"],) \
 
246
             for x in proj_dic["conn_dets"][getdata.DBE_SQLITE].values()]
 
247
    else:
 
248
        parent.sqlite_data = []
 
249
 
 
250
def setConnDetDefaults(parent):
 
251
    try:            
 
252
        parent.sqlite_default_db
 
253
    except AttributeError: 
 
254
        parent.sqlite_default_db = ""
 
255
    try:
 
256
        parent.sqlite_default_tbl
 
257
    except AttributeError: 
 
258
        parent.sqlite_default_tbl = ""
 
259
    try:
 
260
        parent.sqlite_data
 
261
    except AttributeError: 
 
262
        parent.sqlite_data = []
 
263
 
 
264
def processConnDets(parent, default_dbs, default_tbls, conn_dets):
 
265
    parent.sqlite_grid.UpdateNewGridData()
 
266
    sqlite_default_db = parent.txtSqliteDefaultDb.GetValue()
 
267
    sqlite_default_tbl = parent.txtSqliteDefaultTbl.GetValue()
 
268
    has_sqlite_conn = sqlite_default_db and sqlite_default_tbl
 
269
    incomplete_sqlite = (sqlite_default_db or sqlite_default_tbl) \
 
270
        and not has_sqlite_conn
 
271
    if incomplete_sqlite:
 
272
        wx.MessageBox("The SQLite details are incomplete")
 
273
        parent.txtSqliteDefaultDb.SetFocus()
 
274
    default_dbs[getdata.DBE_SQLITE] = sqlite_default_db \
 
275
        if sqlite_default_db else None
 
276
    default_tbls[getdata.DBE_SQLITE] = sqlite_default_tbl \
 
277
        if sqlite_default_tbl else None
 
278
    #pprint.pprint(parent.sqlite_new_grid_data) # debug
 
279
    sqlite_settings = parent.sqlite_new_grid_data
 
280
    if sqlite_settings:
 
281
        conn_dets_sqlite = {}
 
282
        for sqlite_setting in sqlite_settings:
 
283
            # e.g. ("C:\.....\my_sqlite_db",)
 
284
            db_path = sqlite_setting[0]
 
285
            db_name = parent.getFileName(db_path)
 
286
            new_sqlite_dic = {}
 
287
            new_sqlite_dic["database"] = db_path
 
288
            conn_dets_sqlite[db_name] = new_sqlite_dic
 
289
        conn_dets[getdata.DBE_SQLITE] = conn_dets_sqlite        
 
290
    return incomplete_sqlite, has_sqlite_conn
 
291