~lifeless/python-oops-tools/bug-881400

« back to all changes in this revision

Viewing changes to src/oopstools/scripts/amqp2disk.py

  • Committer: Robert Collins
  • Date: 2011-10-16 22:35:01 UTC
  • mto: This revision was merged to the branch mainline in revision 2.
  • Revision ID: robertc@robertcollins.net-20111016223501-jl5vg9mduh7n29df
Add an amqpd listener, tweak some docs to match reality.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2011 Canonical Ltd.  All rights reserved.
 
2
#
 
3
# This program is free software: you can redistribute it and/or modify
 
4
# it under the terms of the GNU Affero General Public License as published by
 
5
# the Free Software Foundation, either version 3 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU Affero General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU Affero General Public License
 
14
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
15
 
 
16
# Receive OOPS reports from AMQP and publish them into the oops-tools
 
17
# repository.
 
18
 
 
19
__metaclass__ = type
 
20
 
 
21
import sys
 
22
import optparse
 
23
import StringIO
 
24
from textwrap import dedent
 
25
 
 
26
import amqplib.client_0_8 as amqp
 
27
from oops import Config
 
28
import oops_amqp
 
29
import oops_datedir_repo
 
30
 
 
31
from oopstools.oops.helpers import parsedate, load_prefixes
 
32
from oopstools.oops.models import (
 
33
    Oops,
 
34
    parsed_oops_to_model_oops,
 
35
    Prefix,
 
36
    Report,
 
37
    )
 
38
from oopstools.oops.oopsstore import OopsStore
 
39
from oopstools.oops import dbsummaries
 
40
from oopstools.oops.summaries import (
 
41
    WebAppErrorSummary,
 
42
    CheckwatchesErrorSummary,
 
43
    CodeHostingWithRemoteSectionSummary,
 
44
    GenericErrorSummary,
 
45
)
 
46
 
 
47
 
 
48
def main(argv=None):
 
49
    if argv is None:
 
50
        argv=sys.argv
 
51
    usage = dedent("""\
 
52
        %prog [options]
 
53
 
 
54
        The following options must be supplied:
 
55
         --output
 
56
         --host
 
57
         --username
 
58
         --password
 
59
         --vhost
 
60
         --queue
 
61
 
 
62
        e.g.
 
63
        amqp2disk --output /srv/oops-tools/amqpoopses --host "localhost:3472" \\
 
64
            --username "guest" --password "guest" --vhost "/" --queue "oops"
 
65
 
 
66
        The AMQP environment should be setup in advance with a persistent queue
 
67
        bound to your exchange : using transient queues would allow OOPSes to
 
68
        be lost if the amqp2disk process were to be shutdown for a non-trivial
 
69
        duration. The --bind-to option will cause the queue to be created and
 
70
        bound to the given exchange. This is only needed the first time as it
 
71
        is created persistently.
 
72
        """)
 
73
    description = "Load OOPS reports into oops-tools from AMQP."
 
74
    parser = optparse.OptionParser(
 
75
        description=description, usage=usage)
 
76
    parser.add_option('--output', help="Root directory to store OOPSes in.")
 
77
    parser.add_option('--host', help="AQMP host / host:port.")
 
78
    parser.add_option('--username', help="AQMP username.")
 
79
    parser.add_option('--password', help="AQMP password.")
 
80
    parser.add_option('--vhost', help="AMQP vhost.")
 
81
    parser.add_option('--queue', help="AMQP queue name.")
 
82
    parser.add_option(
 
83
        '--bind-to', help="AMQP exchange to bind to (only needed once).")
 
84
    options, args = parser.parse_args(argv[1:])
 
85
    def needed(optname):
 
86
        if getattr(options, optname, None) is None:
 
87
            raise ValueError('option "%s" must be supplied' % optname)
 
88
    needed('host')
 
89
    needed('output')
 
90
    needed('username')
 
91
    needed('password')
 
92
    needed('vhost')
 
93
    needed('queue')
 
94
    connection = amqp.Connection(host=options.host, userid=options.username,
 
95
        password=options.password, virtual_host=options.vhost)
 
96
    channel = connection.channel()
 
97
    if options.bind_to:
 
98
        channel.queue_declare(options.queue, durable=True, auto_delete=False)
 
99
        channel.queue_bind(options.queue, options.bind_to)
 
100
    config = make_amqp_config(options.output)
 
101
    receiver = oops_amqp.Receiver(config, channel, options.queue)
 
102
    try:
 
103
        receiver.run_forever()
 
104
    except KeyboardInterrupt:
 
105
        pass
 
106
 
 
107
 
 
108
def db_publisher(report):
 
109
    """Publish OOPS reports to the oops-tools django store."""
 
110
    # the first publisher will either inherit or assign, so this should be
 
111
    # impossible.
 
112
    assert report['id'] is not None
 
113
    # Some fallback methods could lead to duplicate paths into the DB: exit
 
114
    # early if the OOPS is already loaded.
 
115
    try:
 
116
        res = Oops.objects.get(oopsid__exact=report['id'])
 
117
    except Oops.DoesNotExist:
 
118
        res = parsed_oops_to_model_oops(
 
119
            report, report['datedir_repo_filepath'])
 
120
        return res.oopsid
 
121
    return None
 
122
 
 
123
 
 
124
def make_amqp_config(output_dir):
 
125
    """Create an OOPS Config for republishing amqp OOPSes.
 
126
 
 
127
    An OOPS published to this config will be written to disk and then loaded
 
128
    into the database.
 
129
 
 
130
    :param output_dir: The directory to write OOPSes too.
 
131
    """
 
132
    config = Config()
 
133
    disk_publisher = oops_datedir_repo.DateDirRepo(
 
134
        output_dir, inherit_id=True, stash_path=True)
 
135
    config.publishers.append(disk_publisher.publish)
 
136
    config.publishers.append(db_publisher)
 
137
    return config