17
import psycopg2.extras
18
import psycopg2.extensions
25
GeoTypes.initialisePsycopgTypes(psycopg_module=psycopg2, psycopg_extensions_module=psycopg2.extensions)
27
from abc import ABCMeta, abstractmethod
31
def __init__(self, start = None, end = None):
42
return '[' + str(self.start) + ' - ' + str(self.end) + ']'
47
simple class for building of complex queries
54
self.table_name = None
59
def set_table_name(self, table_name):
60
self.table_name = table_name
62
def set_columns(self, columns):
63
self.columns = columns
65
def add_order(self, order):
66
self.order.append(order)
68
def set_limit(self, limit):
69
if self.limit != None:
70
raise Error("overriding Query.limit")
73
def add_condition(self, condition, parameters = None):
74
self.conditions.append(condition)
75
if parameters != None:
76
self.parameters.update(parameters)
78
def add_parameters(self, parameters):
79
self.parameters.update(parameters)
85
for index, column in enumerate(self.columns):
91
sql += 'FROM ' + self.table_name + ' '
93
for index, condition in enumerate(self.conditions):
98
sql += condition + ' '
100
if len(self.order) > 0:
102
for index, order in enumerate(self.order):
105
sql += order.get_column() + ' '
110
sql += 'LIMIT ' + str(self.limit.get_number()) + ' '
114
def get_parameters(self):
115
return self.parameters
117
def parse_args(self, args):
120
if isinstance(arg, TimeInterval):
122
if arg.get_start() != None:
123
self.add_condition('timestamp >= %(starttime)s', {'starttime': arg.get_start()})
125
if arg.get_end() != None:
126
self.add_condition('timestamp < %(endtime)s', {'endtime': arg.get_end()})
128
elif isinstance(arg, shapely.geometry.base.BaseGeometry):
132
self.add_condition('SetSRID(CAST(%(envelope)s AS geometry), %(srid)s) && st_transform(the_geom, %(srid)s)', {'envelope': shapely.wkb.dumps(arg.envelope).encode('hex')})
134
if not arg.equals(arg.envelope):
135
self.add_condition('Intersects(SetSRID(CAST(%(geometry)s AS geometry), %(srid)s), st_transform(the_geom, %(srid)s))', {'geometry': shapely.wkb.dumps(arg).encode('hex')})
138
raise Error("invalid geometry in db.Stroke.select()")
140
elif isinstance(arg, Order):
143
elif isinstance(arg, Limit):
147
print 'WARNING: ' + __name__ + ' unhandled object ' + str(type(arg))
149
def get_results(self, db):
151
resulting_strokes = []
152
if db.cur.rowcount > 0:
153
for result in db.cur.fetchall():
154
resulting_strokes.append(db.create(result))
156
return resulting_strokes
158
class RasterQuery(Query):
160
def __init__(self, raster):
165
env = self.raster.getEnv()
168
self.add_condition('SetSRID(CAST(%(envelope)s AS geometry), %(srid)s) && st_transform(the_geom, %(srid)s)', {'envelope': shapely.wkb.dumps(env).encode('hex')})
170
raise Error("invalid Raster geometry in db.Stroke.select()")
175
sql += 'TRUNC((ST_X(ST_TRANSFORM(the_geom, %(srid)s)) - ' + str(self.raster.getXMin()) + ') /' + str(self.raster.getXDiv()) + ') AS rx, '
176
sql += 'TRUNC((ST_Y(ST_TRANSFORM(the_geom, %(srid)s)) - ' + str(self.raster.getYMin()) + ') /' + str(self.raster.getYDiv()) + ') AS ry, '
177
sql += 'count(*) AS count FROM ('
179
sql += Query.__str__(self)
181
sql += ') AS ' + self.table_name + ' GROUP BY rx, ry'
185
def get_results(self, db):
187
if db.cur.rowcount > 0:
188
for result in db.cur.fetchall():
189
self.raster.set(result['rx'], result['ry'], result['count'])
194
definition for query search order
197
def __init__(self, column, desc = False):
201
def get_column(self):
210
definition of query result limit
213
def __init__(self, limit):
216
def get_number(self):
220
class Center(object):
222
definition of query center point
225
def __init__(self, center):
234
abstract base class for database access objects
238
psql as user postgres:
240
CREATE USER blitzortung PASSWORD 'blitzortung' INHERIT;
242
createdb -T postgistemplate -E utf8 -O blitzortung blitzortung
246
GRANT SELECT ON spatial_ref_sys TO blitzortung;
247
GRANT SELECT ON geometry_columns TO blitzortung;
248
GRANT INSERT, DELETE ON geometry_columns TO blitzortung;
251
__metaclass__ = ABCMeta
253
DefaultTimezone = pytz.UTC
257
create PostgreSQL db access object
260
connection = "host='localhost' dbname='blitzortung' user='blitzortung' password='blitzortung'"
261
self.schema_name = None
265
self.srid = geom.Geometry.DefaultSrid
266
self.tz = Base.DefaultTimezone
269
self.conn = psycopg2.connect(connection)
270
self.cur = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
280
if self.conn != None:
286
def is_connected(self):
287
if self.conn != None:
288
return not self.conn.closed
292
def set_table_name(self, table_name):
293
self.table_name = table_name
295
def get_table_name(self):
296
return self.table_name
298
def get_full_table_name(self):
299
if self.get_schema_name() != None:
300
return '"' + self.get_schema_name() + '"."' + self.get_table_name() + '"'
302
return self.get_table_name()
304
def set_schema_name(self, schema_name):
305
self.schema_name = schema_name
307
def get_schema_name(self):
308
return self.schema_name
313
def set_srid(self, srid):
316
def get_timezone(self):
319
def set_timezone(self, tz):
323
''' commit pending database transaction '''
327
''' rollback pending database transaction '''
331
def insert(self, object):
335
def select(self, args):
341
stroke db access class
343
database table creation (as db user blitzortung, database blitzortung):
345
CREATE TABLE strokes (id bigserial, timestamp timestamptz, nanoseconds SMALLINT, PRIMARY KEY(id));
346
SELECT AddGeometryColumn('public','strokes','the_geom','4326','POINT',2);
347
GRANT SELECT ON TABLE strokes TO bogroup_ro;
349
ALTER TABLE strokes ADD COLUMN amplitude REAL;
350
ALTER TABLE strokes ADD COLUMN error2d SMALLINT;
351
ALTER TABLE strokes ADD COLUMN type SMALLINT;
352
ALTER TABLE strokes ADD COLUMN stationcount SMALLINT;
354
CREATE INDEX strokes_timestamp ON strokes USING btree("timestamp");
355
CREATE INDEX strokes_geom ON strokes USING gist(the_geom);
356
CREATE INDEX strokes_timestamp_geom ON strokes USING gist("timestamp", the_geom);
357
CREATE INDEX strokes_id_timestamp_geom ON strokes USING gist(id, "timestamp", the_geom);
359
empty the table with the following commands:
362
ALTER SEQUENCE strokes_id_seq RESTART 1;
368
self.set_table_name('strokes')
370
def insert(self, stroke):
371
sql = 'INSERT INTO ' + self.get_full_table_name() + \
372
' ("timestamp", nanoseconds, the_geom, amplitude, error2d, type, stationcount) ' + \
373
'VALUES (\'%s\', %d, st_setsrid(makepoint(%f, %f), 4326), %f, %d, %d, %d)' \
374
%(stroke.get_time(), stroke.get_nanoseconds(), stroke.get_location().x, stroke.get_location().y, stroke.get_amplitude(), stroke.get_lateral_error(), stroke.get_type(), stroke.get_station_count())
375
self.cur.execute(sql)
377
def get_latest_time(self):
378
sql = 'SELECT timestamp FROM ' + self.get_full_table_name() + \
379
' ORDER BY timestamp DESC LIMIT 1'
380
self.cur.execute(sql)
381
if self.cur.rowcount == 1:
382
result = self.cur.fetchone()
383
return result['timestamp']
387
def create(self, result):
388
stroke = data.Stroke()
390
stroke.set_time(result['timestamp'])
391
stroke.set_nanoseconds(result['nanoseconds'])
392
stroke.set_location(shapely.wkb.loads(result['the_geom'].decode('hex')))
393
stroke.set_amplitude(result['amplitude'])
394
stroke.set_type(result['type'])
395
stroke.set_station_count(result['stationcount'])
396
stroke.set_lateral_error(result['error2d'])
400
def select_query(self, args, query = Query()):
401
' build up query object for select statement '
402
query.set_table_name(self.get_full_table_name())
403
query.set_columns(['"timestamp"', 'nanoseconds', 'st_transform(the_geom, %i) AS the_geom' % self.srid, 'amplitude', 'type', 'error2d', 'stationcount'])
404
query.add_parameters({'srid': self.srid})
406
query.add_condition('the_geom IS NOT NULL')
407
query.parse_args(args)
410
def select(self, *args):
413
query = self.select_query(args)
415
return self.select_execute(query)
417
def select_raster(self, raster, *args):
420
query = self.select_query(args, RasterQuery(raster))
422
return self.select_execute(query)
424
def select_execute(self, query):
425
' set timezone for query '
426
self.cur.execute('SET TIME ZONE \'%s\'' %(str(self.tz)))
429
self.cur.execute(str(query), query.get_parameters())
431
' collect and return data '
432
return query.get_results(self)
434
class Location(Base):
436
geonames db access class
440
CREATE TABLE geo.geonames (id bigserial, "name" character varying, PRIMARY KEY(id));
441
SELECT AddGeometryColumn('geo','geonames','the_geom','4326','POINT',2);
443
ALTER TABLE geo.geonames ADD COLUMN "class" INTEGER;
444
ALTER TABLE geo.geonames ADD COLUMN feature_class CHARACTER(1);
445
ALTER TABLE geo.geonames ADD COLUMN feature_code VARCHAR;
446
ALTER TABLE geo.geonames ADD COLUMN country_code VARCHAR;
447
ALTER TABLE geo.geonames ADD COLUMN admin_code_1 VARCHAR;
448
ALTER TABLE geo.geonames ADD COLUMN admin_code_2 VARCHAR;
449
ALTER TABLE geo.geonames ADD COLUMN population INTEGER;
450
ALTER TABLE geo.geonames ADD COLUMN elevation SMALLINT;
452
CREATE INDEX geonames_geom ON geo.geonames USING gist(the_geom);
458
self.set_schema_name('geo')
459
self.set_table_name('geonames')
461
def delete_all(self):
462
self.cur.execute('DELETE FROM ' + self.get_full_table_name())
464
def insert(self, line):
465
fields = line.strip().split('\t')
467
latitude = float(fields[4])
468
longitude = float(fields[5])
469
feature_class = fields[6]
470
feature_code = fields[7]
471
country_code = fields[8]
472
admin_code_1 = fields[10]
473
admin_code_2 = fields[11]
474
admin_code_3 = fields[12]
475
admin_code_4 = fields[13]
476
population = int(fields[14])
478
elevation = int(fields[15])
482
name = name.replace("'", "''")
484
classification = self.size_class(population)
486
if classification is not None:
487
self.cur.execute('INSERT INTO ' + self.get_full_table_name() + '''
488
(the_geom, name, class, feature_class, feature_code, country_code, admin_code_1, admin_code_2, population, elevation)
490
GeomFromText('POINT(%f %f)', 4326), '%s', %d, '%s', '%s', '%s', '%s', '%s', %d, %d)'''
491
% (longitude, latitude, name, classification, feature_class, feature_code, country_code, admin_code_1, admin_code_2, population, elevation))
493
def size_class(self, n):
496
base = math.floor(math.log(n)/math.log(10)) - 1
497
relative = n / math.pow(10, base)
498
order = min(2, math.floor(relative/25))
501
return min(15, base * 3 + order)
503
def select(self, *args):
505
self.min_population = 1000
506
self.max_distance = 10000
511
if isinstance(arg, Center):
512
' center point information given '
514
elif isinstance(arg, Limit):
515
' limit information given '
518
if self.is_connected():
519
queryString = '''SELECT
527
st_transform(the_geom, %(srid)s) AS the_geom,
529
distance_sphere(the_geom, c.center) AS distance,
530
st_azimuth(the_geom, c.center) AS azimuth
532
(SELECT SetSRID(MakePoint(%(center_x)s, %(center_y)s), %(srid)s) as center ) as c,
536
AND population >= %(min_population)s
537
AND st_transform(the_geom, %(srid)s) && st_expand(c.center, %(max_distance)s) order by distance limit %(limit)s''';
540
params['srid'] = self.get_srid()
541
params['table_name'] = self.get_full_table_name()
542
params['center_x'] = self.center.get_point().x
543
params['center_y'] = self.center.get_point().y
544
params['min_population'] = self.min_population
545
params['max_distance'] = self.max_distance
546
params['limit'] = self.limit
548
self.cur.execute(queryString % params)
551
if self.cur.rowcount > 0:
552
for result in self.cur.fetchall():
554
location['name'] = result['name']
555
location['distance'] = result['distance']
556
location['azimuth'] = result['azimuth']
557
locations.append(location)