1
# Copyright (c) 2013 Mirantis Inc.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
16
"""Implementation of SQLAlchemy backend."""
21
from oslo.config import cfg
22
from oslo.db import exception as db_exc
23
from oslo.db.sqlalchemy import session as db_session
25
import sqlalchemy as sa
27
from sahara.db.sqlalchemy import models as m
28
from sahara import exceptions as ex
29
from sahara.i18n import _
30
from sahara.i18n import _LE
31
from sahara.openstack.common import log as logging
34
LOG = logging.getLogger(__name__)
39
_LOCK = threading.Lock()
42
def _create_facade_lazily():
48
_FACADE = db_session.EngineFacade.from_config(CONF,
54
facade = _create_facade_lazily()
55
return facade.get_engine()
58
def get_session(**kwargs):
59
facade = _create_facade_lazily()
60
return facade.get_session(**kwargs)
69
"""The backend is this module itself."""
70
return sys.modules[__name__]
73
def model_query(model, context, session=None, project_only=True):
76
:param model: base model to query
77
:param context: context to query under
78
:param project_only: if present and context is user-type, then restrict
79
query to match the context's tenant_id.
81
session = session or get_session()
83
query = session.query(model)
85
if project_only and not context.is_admin:
86
query = query.filter_by(tenant_id=context.tenant_id)
91
def count_query(model, context, session=None, project_only=None):
92
"""Count query helper.
94
:param model: base model to query
95
:param context: context to query under
96
:param project_only: if present and context is user-type, then restrict
97
query to match the context's project_id.
99
return model_query(sa.func.count(model.id), context, session, project_only)
104
engine = get_engine()
105
m.Cluster.metadata.create_all(engine)
106
except sa.exc.OperationalError as e:
107
LOG.exception(_LE("Database registration exception: %s"), e)
114
engine = get_engine()
115
m.Cluster.metadata.drop_all(engine)
116
except Exception as e:
117
LOG.exception(_LE("Database shutdown exception: %s"), e)
122
# Helpers for building constraints / equality checks
125
def constraint(**conditions):
126
return Constraint(conditions)
129
def equal_any(*values):
130
return EqualityCondition(values)
133
def not_equal(*values):
134
return InequalityCondition(values)
137
class Constraint(object):
138
def __init__(self, conditions):
139
self.conditions = conditions
141
def apply(self, model, query):
142
for key, condition in self.conditions.iteritems():
143
for clause in condition.clauses(getattr(model, key)):
144
query = query.filter(clause)
148
class EqualityCondition(object):
149
def __init__(self, values):
152
def clauses(self, field):
153
return sa.or_([field == value for value in self.values])
156
class InequalityCondition(object):
157
def __init__(self, values):
160
def clauses(self, field):
161
return [field != value for value in self.values]
166
def _cluster_get(context, session, cluster_id):
167
query = model_query(m.Cluster, context, session)
168
return query.filter_by(id=cluster_id).first()
171
def cluster_get(context, cluster_id):
172
return _cluster_get(context, get_session(), cluster_id)
175
def cluster_get_all(context, **kwargs):
176
query = model_query(m.Cluster, context)
177
return query.filter_by(**kwargs).all()
180
def cluster_create(context, values):
181
values = values.copy()
182
cluster = m.Cluster()
183
node_groups = values.pop("node_groups", [])
184
cluster.update(values)
186
session = get_session()
187
with session.begin():
189
cluster.save(session=session)
190
except db_exc.DBDuplicateEntry as e:
191
raise ex.DBDuplicateEntry(
192
_("Duplicate entry for Cluster: %s") % e.columns)
195
for ng in node_groups:
196
node_group = m.NodeGroup()
197
node_group.update({"cluster_id": cluster.id})
198
node_group.update(ng)
199
node_group.save(session=session)
200
except db_exc.DBDuplicateEntry as e:
201
raise ex.DBDuplicateEntry(
202
_("Duplicate entry for NodeGroup: %s") % e.columns)
204
return cluster_get(context, cluster.id)
207
def cluster_update(context, cluster_id, values):
208
session = get_session()
210
with session.begin():
211
cluster = _cluster_get(context, session, cluster_id)
213
raise ex.NotFoundException(cluster_id,
214
_("Cluster id '%s' not found!"))
215
cluster.update(values)
220
def cluster_destroy(context, cluster_id):
221
session = get_session()
222
with session.begin():
223
cluster = _cluster_get(context, session, cluster_id)
225
raise ex.NotFoundException(cluster_id,
226
_("Cluster id '%s' not found!"))
228
session.delete(cluster)
233
def _node_group_get(context, session, node_group_id):
234
query = model_query(m.NodeGroup, context, session)
235
return query.filter_by(id=node_group_id).first()
238
def node_group_add(context, cluster_id, values):
239
session = get_session()
241
with session.begin():
242
cluster = _cluster_get(context, session, cluster_id)
244
raise ex.NotFoundException(cluster_id,
245
_("Cluster id '%s' not found!"))
247
node_group = m.NodeGroup()
248
node_group.update({"cluster_id": cluster_id})
249
node_group.update(values)
250
session.add(node_group)
255
def node_group_update(context, node_group_id, values):
256
session = get_session()
257
with session.begin():
258
node_group = _node_group_get(context, session, node_group_id)
260
raise ex.NotFoundException(node_group_id,
261
_("Node Group id '%s' not found!"))
263
node_group.update(values)
266
def node_group_remove(context, node_group_id):
267
session = get_session()
269
with session.begin():
270
node_group = _node_group_get(context, session, node_group_id)
272
raise ex.NotFoundException(node_group_id,
273
_("Node Group id '%s' not found!"))
275
session.delete(node_group)
280
def _instance_get(context, session, instance_id):
281
query = model_query(m.Instance, context, session)
282
return query.filter_by(id=instance_id).first()
285
def instance_add(context, node_group_id, values):
286
session = get_session()
288
with session.begin():
289
node_group = _node_group_get(context, session, node_group_id)
291
raise ex.NotFoundException(node_group_id,
292
_("Node Group id '%s' not found!"))
294
instance = m.Instance()
295
instance.update({"node_group_id": node_group_id})
296
instance.update(values)
297
session.add(instance)
299
node_group = _node_group_get(context, session, node_group_id)
300
node_group.count += 1
305
def instance_update(context, instance_id, values):
306
session = get_session()
307
with session.begin():
308
instance = _instance_get(context, session, instance_id)
310
raise ex.NotFoundException(instance_id,
311
_("Instance id '%s' not found!"))
313
instance.update(values)
316
def instance_remove(context, instance_id):
317
session = get_session()
318
with session.begin():
319
instance = _instance_get(context, session, instance_id)
321
raise ex.NotFoundException(instance_id,
322
_("Instance id '%s' not found!"))
324
session.delete(instance)
326
node_group_id = instance.node_group_id
327
node_group = _node_group_get(context, session, node_group_id)
328
node_group.count -= 1
333
def append_volume(context, instance_id, volume_id):
334
session = get_session()
335
with session.begin():
336
instance = _instance_get(context, session, instance_id)
338
raise ex.NotFoundException(instance_id,
339
_("Instance id '%s' not found!"))
341
instance.volumes.append(volume_id)
344
def remove_volume(context, instance_id, volume_id):
345
session = get_session()
346
with session.begin():
347
instance = _instance_get(context, session, instance_id)
349
raise ex.NotFoundException(instance_id,
350
_("Instance id '%s' not found!"))
352
instance.volumes.remove(volume_id)
355
# Cluster Template ops
357
def _cluster_template_get(context, session, cluster_template_id):
358
query = model_query(m.ClusterTemplate, context, session)
359
return query.filter_by(id=cluster_template_id).first()
362
def cluster_template_get(context, cluster_template_id):
363
return _cluster_template_get(context, get_session(), cluster_template_id)
366
def cluster_template_get_all(context):
367
query = model_query(m.ClusterTemplate, context)
371
def cluster_template_create(context, values):
372
values = values.copy()
373
cluster_template = m.ClusterTemplate()
374
node_groups = values.pop("node_groups") or []
375
cluster_template.update(values)
377
session = get_session()
378
with session.begin():
380
cluster_template.save(session=session)
381
except db_exc.DBDuplicateEntry as e:
382
raise ex.DBDuplicateEntry(
383
_("Duplicate entry for ClusterTemplate: %s") % e.columns)
386
for ng in node_groups:
387
node_group = m.TemplatesRelation()
388
node_group.update({"cluster_template_id": cluster_template.id})
389
node_group.update(ng)
390
node_group.save(session=session)
392
except db_exc.DBDuplicateEntry as e:
393
raise ex.DBDuplicateEntry(
394
_("Duplicate entry for TemplatesRelation: %s") % e.columns)
396
return cluster_template_get(context, cluster_template.id)
399
def cluster_template_destroy(context, cluster_template_id):
400
session = get_session()
401
with session.begin():
402
cluster_template = _cluster_template_get(context, session,
404
if not cluster_template:
405
raise ex.NotFoundException(
407
_("Cluster Template id '%s' not found!"))
409
session.delete(cluster_template)
412
# Node Group Template ops
414
def _node_group_template_get(context, session, node_group_template_id):
415
query = model_query(m.NodeGroupTemplate, context, session)
416
return query.filter_by(id=node_group_template_id).first()
419
def node_group_template_get(context, node_group_template_id):
420
return _node_group_template_get(context, get_session(),
421
node_group_template_id)
424
def node_group_template_get_all(context):
425
query = model_query(m.NodeGroupTemplate, context)
429
def node_group_template_create(context, values):
430
node_group_template = m.NodeGroupTemplate()
431
node_group_template.update(values)
433
session = get_session()
434
with session.begin():
436
node_group_template.save(session=session)
437
except db_exc.DBDuplicateEntry as e:
438
raise ex.DBDuplicateEntry(
439
_("Duplicate entry for NodeGroupTemplate: %s") % e.columns)
441
return node_group_template
444
def node_group_template_destroy(context, node_group_template_id):
445
session = get_session()
446
with session.begin():
447
node_group_template = _node_group_template_get(context, session,
448
node_group_template_id)
449
if not node_group_template:
450
raise ex.NotFoundException(
451
node_group_template_id,
452
_("Node Group Template id '%s' not found!"))
454
session.delete(node_group_template)
459
def _data_source_get(context, session, data_source_id):
460
query = model_query(m.DataSource, context, session)
461
return query.filter_by(id=data_source_id).first()
464
def data_source_get(context, data_source_id):
465
return _data_source_get(context, get_session(), data_source_id)
468
def data_source_get_all(context):
469
query = model_query(m.DataSource, context)
473
def data_source_create(context, values):
474
data_source = m.DataSource()
475
data_source.update(values)
477
session = get_session()
478
with session.begin():
480
data_source.save(session=session)
481
except db_exc.DBDuplicateEntry as e:
482
raise ex.DBDuplicateEntry(
483
_("Duplicate entry for DataSource: %s") % e.columns)
488
def data_source_destroy(context, data_source_id):
489
session = get_session()
491
with session.begin():
492
data_source = _data_source_get(context, session, data_source_id)
494
raise ex.NotFoundException(
496
_("Data Source id '%s' not found!"))
497
session.delete(data_source)
498
except db_exc.DBError as e:
499
msg = ("foreign key constraint" in six.text_type(e) and
500
_(" on foreign key constraint") or "")
501
raise ex.DeletionFailed(_("Data Source deletion failed%s") % msg)
506
def _job_execution_get(context, session, job_execution_id):
507
query = model_query(m.JobExecution, context, session)
508
return query.filter_by(id=job_execution_id).first()
511
def job_execution_get(context, job_execution_id):
512
return _job_execution_get(context, get_session(), job_execution_id)
515
def job_execution_get_all(context, **kwargs):
516
query = model_query(m.JobExecution, context)
517
return query.filter_by(**kwargs).all()
520
def job_execution_count(context, **kwargs):
521
query = count_query(m.JobExecution, context)
522
return query.filter_by(**kwargs).first()[0]
525
def job_execution_create(context, values):
526
session = get_session()
528
with session.begin():
529
job_ex = m.JobExecution()
530
job_ex.update(values)
532
job_ex.save(session=session)
533
except db_exc.DBDuplicateEntry as e:
534
raise ex.DBDuplicateEntry(
535
_("Duplicate entry for JobExecution: %s") % e.columns)
540
def job_execution_update(context, job_execution_id, values):
541
session = get_session()
543
with session.begin():
544
job_ex = _job_execution_get(context, session, job_execution_id)
546
raise ex.NotFoundException(job_execution_id,
547
_("JobExecution id '%s' not found!"))
548
job_ex.update(values)
553
def job_execution_destroy(context, job_execution_id):
554
session = get_session()
555
with session.begin():
556
job_ex = _job_execution_get(context, session, job_execution_id)
558
raise ex.NotFoundException(job_execution_id,
559
_("JobExecution id '%s' not found!"))
561
session.delete(job_ex)
566
def _job_get(context, session, job_id):
567
query = model_query(m.Job, context, session)
568
return query.filter_by(id=job_id).first()
571
def job_get(context, job_id):
572
return _job_get(context, get_session(), job_id)
575
def job_get_all(context):
576
query = model_query(m.Job, context)
580
def _append_job_binaries(context, session, from_list, to_list):
581
for job_binary_id in from_list:
582
job_binary = model_query(
583
m.JobBinary, context, session).filter_by(id=job_binary_id).first()
584
if job_binary is not None:
585
to_list.append(job_binary)
588
def job_create(context, values):
589
mains = values.pop("mains", [])
590
libs = values.pop("libs", [])
592
session = get_session()
593
with session.begin():
596
# libs and mains are 'lazy' objects. The initialization below
597
# is needed here because it provides libs and mains to be initialized
598
# within a session even if the lists are empty
602
_append_job_binaries(context, session, mains, job.mains)
603
_append_job_binaries(context, session, libs, job.libs)
605
job.save(session=session)
606
except db_exc.DBDuplicateEntry as e:
607
raise ex.DBDuplicateEntry(
608
_("Duplicate entry for Job: %s") % e.columns)
613
def job_update(context, job_id, values):
614
session = get_session()
616
with session.begin():
617
job = _job_get(context, session, job_id)
619
raise ex.NotFoundException(job_id,
620
_("Job id '%s' not found!"))
626
def job_destroy(context, job_id):
627
session = get_session()
629
with session.begin():
630
job = _job_get(context, session, job_id)
632
raise ex.NotFoundException(job_id,
633
_("Job id '%s' not found!"))
635
except db_exc.DBError as e:
636
msg = ("foreign key constraint" in six.text_type(e) and
637
_(" on foreign key constraint") or "")
638
raise ex.DeletionFailed(_("Job deletion failed%s") % msg)
643
def _job_binary_get(context, session, job_binary_id):
644
query = model_query(m.JobBinary, context, session)
645
return query.filter_by(id=job_binary_id).first()
648
def job_binary_get_all(context):
649
"""Returns JobBinary objects that do not contain a data field
651
The data column uses deferred loading.
653
query = model_query(m.JobBinary, context)
657
def job_binary_get(context, job_binary_id):
658
"""Returns a JobBinary object that does not contain a data field
660
The data column uses deferred loadling.
662
return _job_binary_get(context, get_session(), job_binary_id)
665
def job_binary_create(context, values):
666
"""Returns a JobBinary that does not contain a data field
668
The data column uses deferred loading.
670
job_binary = m.JobBinary()
671
job_binary.update(values)
673
session = get_session()
674
with session.begin():
676
job_binary.save(session=session)
677
except db_exc.DBDuplicateEntry as e:
678
raise ex.DBDuplicateEntry(
679
_("Duplicate entry for JobBinary: %s") % e.columns)
684
def _check_job_binary_referenced(ctx, session, job_binary_id):
685
args = {"JobBinary_id": job_binary_id}
686
mains = model_query(m.mains_association, ctx, session,
687
project_only=False).filter_by(**args)
688
libs = model_query(m.libs_association, ctx, session,
689
project_only=False).filter_by(**args)
691
return mains.first() is not None or libs.first() is not None
694
def job_binary_destroy(context, job_binary_id):
695
session = get_session()
696
with session.begin():
697
job_binary = _job_binary_get(context, session, job_binary_id)
699
raise ex.NotFoundException(job_binary_id,
700
_("JobBinary id '%s' not found!"))
702
if _check_job_binary_referenced(context, session, job_binary_id):
703
raise ex.DeletionFailed(
704
_("JobBinary is referenced and cannot be deleted"))
706
session.delete(job_binary)
709
# JobBinaryInternal ops
711
def _job_binary_internal_get(context, session, job_binary_internal_id):
712
query = model_query(m.JobBinaryInternal, context, session)
713
return query.filter_by(id=job_binary_internal_id).first()
716
def job_binary_internal_get_all(context):
717
"""Returns JobBinaryInternal objects that do not contain a data field
719
The data column uses deferred loading.
721
query = model_query(m.JobBinaryInternal, context)
725
def job_binary_internal_get(context, job_binary_internal_id):
726
"""Returns a JobBinaryInternal object that does not contain a data field
728
The data column uses deferred loadling.
730
return _job_binary_internal_get(context, get_session(),
731
job_binary_internal_id)
734
def job_binary_internal_get_raw_data(context, job_binary_internal_id):
735
"""Returns only the data field for the specified JobBinaryInternal."""
736
query = model_query(m.JobBinaryInternal, context)
737
res = query.filter_by(id=job_binary_internal_id).first()
740
datasize_KB = res.datasize / 1024.0
741
if datasize_KB > CONF.job_binary_max_KB:
742
raise ex.DataTooBigException(
743
round(datasize_KB, 1), CONF.job_binary_max_KB,
744
_("Size of internal binary (%(size)sKB) is greater than the "
745
"maximum (%(maximum)sKB)"))
747
# This assignment is sufficient to load the deferred column
752
def job_binary_internal_create(context, values):
753
"""Returns a JobBinaryInternal that does not contain a data field
755
The data column uses deferred loading.
757
values["datasize"] = len(values["data"])
758
datasize_KB = values["datasize"] / 1024.0
759
if datasize_KB > CONF.job_binary_max_KB:
760
raise ex.DataTooBigException(
761
round(datasize_KB, 1), CONF.job_binary_max_KB,
762
_("Size of internal binary (%(size)sKB) is greater "
763
"than the maximum (%(maximum)sKB)"))
765
job_binary_int = m.JobBinaryInternal()
766
job_binary_int.update(values)
768
session = get_session()
769
with session.begin():
771
job_binary_int.save(session=session)
772
except db_exc.DBDuplicateEntry as e:
773
raise ex.DBDuplicateEntry(
774
_("Duplicate entry for JobBinaryInternal: %s") % e.columns)
776
return job_binary_internal_get(context, job_binary_int.id)
779
def job_binary_internal_destroy(context, job_binary_internal_id):
780
session = get_session()
781
with session.begin():
782
job_binary_internal = _job_binary_internal_get(context, session,
783
job_binary_internal_id)
784
if not job_binary_internal:
785
raise ex.NotFoundException(
786
job_binary_internal_id,
787
_("JobBinaryInternal id '%s' not found!"))
789
session.delete(job_binary_internal)