~ubuntu-branches/ubuntu/trusty/ceilometer/trusty-proposed

« back to all changes in this revision

Viewing changes to ceilometer/storage/impl_sqlalchemy.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, James Page, Chuck Short
  • Date: 2014-01-23 15:08:11 UTC
  • mfrom: (1.1.11)
  • Revision ID: package-import@ubuntu.com-20140123150811-1zaismsuyh1hcl8y
Tags: 2014.1~b2-0ubuntu1
[ James Page ]
* d/control: Add python-jsonpath-rw to BD's.
* d/p/fix-setup-requirements.patch: Bump WebOb to support < 1.4.
 (LP: #1261101)

[ Chuck Short ]
* New upstream version.
* debian/control, debian/ceilometer-common.install: Split out
  ceilometer-alarm-evaluator and ceilometer-alarm-notifier into their
  own packages. (LP: #1250002)
* debian/ceilometer-agent-central.logrotate,
  debian/ceilometer-agent-compute.logrotate,
  debian/ceilometer-api.logrotate,
  debian/ceilometer-collector.logrotate: Add logrotate files, 
  thanks to Ahmed Rahal. (LP: #1224223)
* Fix typos in upstart files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
 
20
20
from __future__ import absolute_import
21
21
import datetime
 
22
import eventlet
22
23
import operator
23
24
import os
24
25
import types
27
28
from sqlalchemy import desc
28
29
from sqlalchemy import func
29
30
from sqlalchemy.orm import aliased
 
31
from sqlalchemy import pool
30
32
 
31
33
from ceilometer.openstack.common.db import exception as dbexc
32
34
import ceilometer.openstack.common.db.sqlalchemy.session as sqlalchemy_session
157
159
        query = query.filter_by(project_id=sample_filter.project)
158
160
    if sample_filter.resource:
159
161
        query = query.filter_by(resource_id=sample_filter.resource)
 
162
    if sample_filter.message_id:
 
163
        query = query.filter_by(message_id=sample_filter.message_id)
160
164
 
161
165
    if sample_filter.metaquery:
162
166
        query = apply_metaquery_filter(session, query,
174
178
            conf.database.connection = \
175
179
                os.environ.get('CEILOMETER_TEST_SQL_URL', url)
176
180
 
 
181
        session = sqlalchemy_session.get_session()
 
182
        engine = session.get_bind()
 
183
        if isinstance(engine.pool, pool.QueuePool):
 
184
            poolsize = engine.pool.size() + engine.pool._max_overflow
 
185
            self.pool = eventlet.GreenPool(poolsize)
 
186
        else:
 
187
            self.pool = None
 
188
 
177
189
    def upgrade(self):
178
190
        session = sqlalchemy_session.get_session()
179
191
        migration.db_sync(session.get_bind())
219
231
            setattr(obj, k, kwargs[k])
220
232
        return obj
221
233
 
 
234
    def record_metering_data(self, data):
 
235
        if self.pool:
 
236
            if self.pool.waiting() > 0:
 
237
                LOG.warn(_("Sqlalchemy connection pool is full, "
 
238
                           "perhaps pool_size should be increased"))
 
239
            self.pool.spawn(self._real_record_metering_data, data)
 
240
        else:
 
241
            self._real_record_metering_data(data)
 
242
 
222
243
    @classmethod
223
 
    def record_metering_data(cls, data):
 
244
    def _real_record_metering_data(cls, data):
224
245
        """Write the data to the backend storage system.
225
246
 
226
247
        :param data: a dictionary such as returned by
279
300
        :param ttl: Number of seconds to keep records for.
280
301
 
281
302
        """
 
303
 
282
304
        session = sqlalchemy_session.get_session()
283
 
        query = session.query(models.Meter.id)
284
 
        end = timeutils.utcnow() - datetime.timedelta(seconds=ttl)
285
 
        query = query.filter(models.Meter.timestamp < end)
286
 
        query.delete()
287
 
 
288
 
        query = session.query(models.User.id).filter(~models.User.id.in_(
289
 
            session.query(models.Meter.user_id).group_by(models.Meter.user_id)
290
 
        ))
291
 
        query.delete(synchronize_session='fetch')
292
 
 
293
 
        query = session.query(models.Project.id)\
294
 
            .filter(~models.Project.id.in_(
295
 
                session.query(models.Meter.project_id).group_by(
296
 
                    models.Meter.project_id)))
297
 
        query.delete(synchronize_session='fetch')
298
 
 
299
 
        query = session.query(models.Resource.id)\
300
 
            .filter(~models.Resource.id.in_(
301
 
                session.query(models.Meter.resource_id).group_by(
302
 
                    models.Meter.resource_id)))
303
 
        query.delete(synchronize_session='fetch')
 
305
        with session.begin():
 
306
            end = timeutils.utcnow() - datetime.timedelta(seconds=ttl)
 
307
            meter_query = session.query(models.Meter)\
 
308
                .filter(models.Meter.timestamp < end)
 
309
            for meter_obj in meter_query.all():
 
310
                session.delete(meter_obj)
 
311
 
 
312
            query = session.query(models.User).filter(
 
313
                ~models.User.id.in_(session.query(models.Meter.user_id)
 
314
                                    .group_by(models.Meter.user_id)),
 
315
                ~models.User.id.in_(session.query(models.Alarm.user_id)
 
316
                                    .group_by(models.Alarm.user_id)),
 
317
                ~models.User.id.in_(session.query(models.AlarmChange.user_id)
 
318
                                    .group_by(models.AlarmChange.user_id))
 
319
            )
 
320
            for user_obj in query.all():
 
321
                session.delete(user_obj)
 
322
 
 
323
            query = session.query(models.Project)\
 
324
                .filter(~models.Project.id.in_(
 
325
                    session.query(models.Meter.project_id)
 
326
                        .group_by(models.Meter.project_id)),
 
327
                        ~models.Project.id.in_(
 
328
                            session.query(models.Alarm.project_id)
 
329
                            .group_by(models.Alarm.project_id)),
 
330
                        ~models.Project.id.in_(
 
331
                            session.query(models.AlarmChange.project_id)
 
332
                            .group_by(models.AlarmChange.project_id)),
 
333
                        ~models.Project.id.in_(
 
334
                            session.query(models.AlarmChange.on_behalf_of)
 
335
                            .group_by(models.AlarmChange.on_behalf_of))
 
336
                        )
 
337
            for project_obj in query.all():
 
338
                session.delete(project_obj)
 
339
 
 
340
            query = session.query(models.Resource)\
 
341
                .filter(~models.Resource.id.in_(
 
342
                    session.query(models.Meter.resource_id).group_by(
 
343
                        models.Meter.resource_id)))
 
344
            for res_obj in query.all():
 
345
                session.delete(res_obj)
304
346
 
305
347
    @staticmethod
306
348
    def get_users(source=None):
427
469
                source=meter.sources[0].id,
428
470
                user_id=meter.user_id,
429
471
                metadata=meter.resource_metadata,
430
 
                meter=[
431
 
                    api_models.ResourceMeter(
432
 
                        counter_name=m.counter_name,
433
 
                        counter_type=m.counter_type,
434
 
                        counter_unit=m.counter_unit,
435
 
                    )
436
 
                    for m in meter.resource.meters
437
 
                ],
438
472
            )
439
473
 
440
474
    @staticmethod
702
736
            alarm_row = models.Alarm(id=alarm.alarm_id)
703
737
            alarm_row.update(alarm.as_dict())
704
738
            session.add(alarm_row)
705
 
            session.flush()
706
739
 
707
740
        return self._row_to_alarm_model(alarm_row)
708
741
 
713
746
        """
714
747
        session = sqlalchemy_session.get_session()
715
748
        with session.begin():
 
749
            Connection._create_or_update(session, models.User,
 
750
                                         alarm.user_id)
 
751
            Connection._create_or_update(session, models.Project,
 
752
                                         alarm.project_id)
716
753
            alarm_row = session.merge(models.Alarm(id=alarm.alarm_id))
717
754
            alarm_row.update(alarm.as_dict())
718
 
            session.flush()
719
755
 
720
756
        return self._row_to_alarm_model(alarm_row)
721
757
 
729
765
        with session.begin():
730
766
            session.query(models.Alarm).filter(
731
767
                models.Alarm.id == alarm_id).delete()
732
 
            session.flush()
733
768
 
734
769
    @staticmethod
735
770
    def _row_to_alarm_change_model(row):
748
783
                          end_timestamp=None, end_timestamp_op=None):
749
784
        """Yields list of AlarmChanges describing alarm history
750
785
 
751
 
        Changes are always sorted in reverse order of occurence, given
 
786
        Changes are always sorted in reverse order of occurrence, given
752
787
        the importance of currency.
753
788
 
754
789
        Segregation for non-administrative users is done on the basis
815
850
                event_id=alarm_change['event_id'])
816
851
            alarm_change_row.update(alarm_change)
817
852
            session.add(alarm_change_row)
818
 
            session.flush()
819
853
 
820
854
    @staticmethod
821
855
    def _get_or_create_trait_type(trait_type, data_type, session=None):
831
865
            if not tt:
832
866
                tt = models.TraitType(trait_type, data_type)
833
867
                session.add(tt)
834
 
                session.flush()
835
868
        return tt
836
869
 
837
870
    @classmethod
847
880
        values = {'t_string': None, 't_float': None,
848
881
                  't_int': None, 't_datetime': None}
849
882
        value = trait_model.value
850
 
        if trait_model.dtype == api_models.Trait.DATETIME_TYPE:
851
 
            value = utils.dt_to_decimal(value)
852
883
        values[value_map[trait_model.dtype]] = value
853
884
        return models.Trait(trait_type, event, **values)
854
885
 
867
898
            if not et:
868
899
                et = models.EventType(event_type)
869
900
                session.add(et)
870
 
                session.flush()
871
901
        return et
872
902
 
873
903
    @classmethod
878
908
            event_type = cls._get_or_create_event_type(event_model.event_type,
879
909
                                                       session=session)
880
910
 
881
 
            generated = utils.dt_to_decimal(event_model.generated)
882
 
            event = models.Event(event_model.message_id, event_type, generated)
 
911
            event = models.Event(event_model.message_id, event_type,
 
912
                                 event_model.generated)
883
913
            session.add(event)
884
914
 
885
915
            new_traits = []
913
943
            try:
914
944
                with session.begin():
915
945
                    event = self._record_event(session, event_model)
916
 
                    session.flush()
917
946
            except dbexc.DBDuplicateEntry:
918
947
                problem_events.append((api_models.Event.DUPLICATE,
919
948
                                       event_model))
925
954
        return problem_events
926
955
 
927
956
    def get_events(self, event_filter):
928
 
        """Return an iterable of model.Event objects. The event model objects
929
 
        have their Trait model objects available -- filtered by any traits
930
 
        in the event_filter.
 
957
        """Return an iterable of model.Event objects.
931
958
 
932
959
        :param event_filter: EventFilter instance
933
960
        """
934
961
 
935
 
        start = utils.dt_to_decimal(event_filter.start)
936
 
        end = utils.dt_to_decimal(event_filter.end)
 
962
        start = event_filter.start_time
 
963
        end = event_filter.end_time
937
964
        session = sqlalchemy_session.get_session()
 
965
        LOG.debug(_("Getting events that match filter: %s") % event_filter)
938
966
        with session.begin():
939
 
            sub_query = session.query(models.Event.id)\
940
 
                .join(models.EventType,
941
 
                      models.Event.event_type_id == models.EventType.id)\
942
 
                .join(models.Trait,
943
 
                      models.Trait.event_id == models.Event.id)\
944
 
                .filter(models.Event.generated >= start,
945
 
                        models.Event.generated <= end)
 
967
            event_query = session.query(models.Event)
 
968
 
 
969
            # Build up the join conditions
 
970
            event_join_conditions = [models.EventType.id ==
 
971
                                     models.Event.event_type_id]
946
972
 
947
973
            if event_filter.event_type:
948
 
                event_type = event_filter.event_type
949
 
                sub_query = sub_query\
950
 
                    .filter(models.EventType.desc == event_type)
 
974
                event_join_conditions\
 
975
                    .append(models.EventType.desc == event_filter.event_type)
 
976
 
 
977
            event_query = event_query.join(models.EventType,
 
978
                                           and_(*event_join_conditions))
 
979
 
 
980
            # Build up the where conditions
 
981
            event_filter_conditions = []
 
982
            if event_filter.message_id:
 
983
                event_filter_conditions\
 
984
                    .append(models.Event.message_id == event_filter.message_id)
 
985
            if start:
 
986
                event_filter_conditions.append(models.Event.generated >= start)
 
987
            if end:
 
988
                event_filter_conditions.append(models.Event.generated <= end)
 
989
 
 
990
            if event_filter_conditions:
 
991
                event_query = event_query\
 
992
                    .filter(and_(*event_filter_conditions))
951
993
 
952
994
            event_models_dict = {}
953
 
            if event_filter.traits:
954
 
                sub_query = sub_query.join(models.TraitType,
955
 
                                           models.TraitType.id ==
956
 
                                           models.Trait.trait_type_id)
957
 
                for key, value in event_filter.traits.iteritems():
958
 
                    if key == 'key':
959
 
                        sub_query = sub_query.filter(models.TraitType.desc ==
960
 
                                                     value)
961
 
                    elif key == 't_string':
962
 
                        sub_query = sub_query.filter(
963
 
                            models.Trait.t_string == value)
964
 
                    elif key == 't_int':
965
 
                        sub_query = sub_query.filter(
966
 
                            models.Trait.t_int == value)
967
 
                    elif key == 't_datetime':
968
 
                        dt = utils.dt_to_decimal(value)
969
 
                        sub_query = sub_query.filter(
970
 
                            models.Trait.t_datetime == dt)
971
 
                    elif key == 't_float':
972
 
                        sub_query = sub_query.filter(
973
 
                            models.Trait.t_datetime == value)
 
995
            if event_filter.traits_filter:
 
996
                for trait_filter in event_filter.traits_filter:
 
997
 
 
998
                    # Build a sub query that joins Trait to TraitType
 
999
                    # where the trait name matches
 
1000
                    trait_name = trait_filter.pop('key')
 
1001
                    conditions = [models.Trait.trait_type_id ==
 
1002
                                  models.TraitType.id,
 
1003
                                  models.TraitType.desc == trait_name]
 
1004
 
 
1005
                    for key, value in trait_filter.iteritems():
 
1006
                        if key == 'string':
 
1007
                            conditions.append(models.Trait.t_string == value)
 
1008
                        elif key == 'integer':
 
1009
                            conditions.append(models.Trait.t_int == value)
 
1010
                        elif key == 'datetime':
 
1011
                            conditions.append(models.Trait.t_datetime == value)
 
1012
                        elif key == 'float':
 
1013
                            conditions.append(models.Trait.t_float == value)
 
1014
 
 
1015
                    trait_query = session.query(models.Trait.event_id)\
 
1016
                        .join(models.TraitType, and_(*conditions)).subquery()
 
1017
 
 
1018
                    event_query = event_query\
 
1019
                        .join(trait_query,
 
1020
                              models.Event.id == trait_query.c.event_id)
974
1021
            else:
975
 
                # Pre-populate event_models_dict to cover Events without traits
976
 
                events = session.query(models.Event)\
977
 
                    .filter(models.Event.generated >= start)\
978
 
                    .filter(models.Event.generated <= end)
979
 
                if event_filter.event_type:
980
 
                    events = events\
981
 
                        .join(models.EventType,
982
 
                              models.EventType.id ==
983
 
                              models.Event.event_type_id)\
984
 
                        .filter(models.EventType.desc ==
985
 
                                event_filter.event_type)
986
 
                for db_event in events.all():
987
 
                    generated = utils.decimal_to_dt(db_event.generated)
988
 
                    api_event = api_models.Event(db_event.message_id,
989
 
                                                 db_event.event_type.desc,
990
 
                                                 generated, [])
991
 
                    event_models_dict[db_event.id] = api_event
992
 
 
993
 
            sub_query = sub_query.subquery()
994
 
 
995
 
            all_data = session.query(models.Trait)\
996
 
                .join(sub_query, models.Trait.event_id == sub_query.c.id)
 
1022
                # If there are no trait filters, grab the events from the db
 
1023
                query = session.query(models.Event.id,
 
1024
                                      models.Event.generated,
 
1025
                                      models.Event.message_id,
 
1026
                                      models.EventType.desc)\
 
1027
                    .join(models.EventType,
 
1028
                          and_(*event_join_conditions))
 
1029
                if event_filter_conditions:
 
1030
                    query = query.filter(and_(*event_filter_conditions))
 
1031
                for (id, generated, message_id, desc) in query.all():
 
1032
                    event_models_dict[id] = api_models.Event(message_id,
 
1033
                                                             desc,
 
1034
                                                             generated,
 
1035
                                                             [])
 
1036
 
 
1037
            # Build event models for the events
 
1038
            event_query = event_query.subquery()
 
1039
            query = session.query(models.Trait)\
 
1040
                .join(models.TraitType,
 
1041
                      models.Trait.trait_type_id == models.TraitType.id)\
 
1042
                .join(event_query, models.Trait.event_id == event_query.c.id)
997
1043
 
998
1044
            # Now convert the sqlalchemy objects back into Models ...
999
 
            for trait in all_data.all():
 
1045
            for trait in query.all():
1000
1046
                event = event_models_dict.get(trait.event_id)
1001
1047
                if not event:
1002
 
                    generated = utils.decimal_to_dt(trait.event.generated)
1003
 
                    event = api_models.Event(trait.event.message_id,
1004
 
                                             trait.event.event_type.desc,
1005
 
                                             generated, [])
 
1048
                    event = api_models.Event(
 
1049
                        trait.event.message_id,
 
1050
                        trait.event.event_type.desc,
 
1051
                        trait.event.generated, [])
1006
1052
                    event_models_dict[trait.event_id] = event
1007
 
                value = trait.get_value()
1008
1053
                trait_model = api_models.Trait(trait.trait_type.desc,
1009
1054
                                               trait.trait_type.data_type,
1010
 
                                               value)
 
1055
                                               trait.get_value())
1011
1056
                event.append_trait(trait_model)
1012
1057
 
1013
1058
        event_models = event_models_dict.values()
1014
1059
        return sorted(event_models, key=operator.attrgetter('generated'))
 
1060
 
 
1061
    @staticmethod
 
1062
    def get_event_types():
 
1063
        """Return all event types as an iterable of strings.
 
1064
        """
 
1065
 
 
1066
        session = sqlalchemy_session.get_session()
 
1067
        with session.begin():
 
1068
            query = session.query(models.EventType.desc)\
 
1069
                .order_by(models.EventType.desc)
 
1070
            for name in query.all():
 
1071
                # The query returns a tuple with one element.
 
1072
                yield name[0]
 
1073
 
 
1074
    @staticmethod
 
1075
    def get_trait_types(event_type):
 
1076
        """Return a dictionary containing the name and data type of
 
1077
        the trait type. Only trait types for the provided event_type are
 
1078
        returned.
 
1079
 
 
1080
        :param event_type: the type of the Event
 
1081
        """
 
1082
        session = sqlalchemy_session.get_session()
 
1083
 
 
1084
        LOG.debug(_("Get traits for %s") % event_type)
 
1085
        with session.begin():
 
1086
            query = (session.query(models.TraitType.desc,
 
1087
                                   models.TraitType.data_type)
 
1088
                     .join(models.Trait,
 
1089
                           models.Trait.trait_type_id ==
 
1090
                           models.TraitType.id)
 
1091
                     .join(models.Event,
 
1092
                           models.Event.id ==
 
1093
                           models.Trait.event_id)
 
1094
                     .join(models.EventType,
 
1095
                           and_(models.EventType.id ==
 
1096
                                models.Event.id,
 
1097
                                models.EventType.desc ==
 
1098
                                event_type))
 
1099
                     .group_by(models.TraitType.desc,
 
1100
                               models.TraitType.data_type)
 
1101
                     .distinct())
 
1102
 
 
1103
            for desc, type in query.all():
 
1104
                yield {'name': desc, 'data_type': type}
 
1105
 
 
1106
    @staticmethod
 
1107
    def get_traits(event_type, trait_type=None):
 
1108
        """Return all trait instances associated with an event_type. If
 
1109
        trait_type is specified, only return instances of that trait type.
 
1110
 
 
1111
        :param event_type: the type of the Event to filter by
 
1112
        :param trait_type: the name of the Trait to filter by
 
1113
        """
 
1114
 
 
1115
        session = sqlalchemy_session.get_session()
 
1116
        with session.begin():
 
1117
            trait_type_filters = [models.TraitType.id ==
 
1118
                                  models.Trait.trait_type_id]
 
1119
            if trait_type:
 
1120
                trait_type_filters.append(models.TraitType.desc == trait_type)
 
1121
 
 
1122
            query = (session.query(models.Trait)
 
1123
                     .join(models.TraitType, and_(*trait_type_filters))
 
1124
                     .join(models.Event,
 
1125
                           models.Event.id == models.Trait.event_id)
 
1126
                     .join(models.EventType,
 
1127
                           and_(models.EventType.id ==
 
1128
                                models.Event.event_type_id,
 
1129
                                models.EventType.desc == event_type)))
 
1130
 
 
1131
            for trait in query.all():
 
1132
                type = trait.trait_type
 
1133
                yield api_models.Trait(name=type.desc,
 
1134
                                       dtype=type.data_type,
 
1135
                                       value=trait.get_value())