~jcsackett/charmworld/bac-tag-constraints

« back to all changes in this revision

Viewing changes to charmworld/jobs/index.py

  • Committer: Tarmac
  • Author(s): j.c.sackett
  • Date: 2013-02-12 18:15:28 UTC
  • mfrom: (142.3.15 workify-jobs-2)
  • Revision ID: tarmac-20130212181528-wf7ipovofsbwkc9w
[r=jcsackett][bug=][author=jcsackett] Split jobs into worker code and ingest code, in preparation for worker refactor.

Show diffs side-by-side

added added

removed removed

Lines of Context:
11
11
from config import INDEX_IN_QUEUE
12
12
from config import MONGO_HOST
13
13
from config import MONGO_PORT
 
14
from worker import index_queue
 
15
from worker import reindex
 
16
from worker import setup_indexer
14
17
from utils import get_queues
15
18
 
16
 
log = logging.getLogger("charm.index")
17
 
 
18
 
 
19
 
def setup_indexer(indexer):
20
 
    indexer.add_field_action('owner', xappy.FieldActions.INDEX_EXACT)
21
 
    indexer.add_field_action('series', xappy.FieldActions.INDEX_EXACT)
22
 
    indexer.add_field_action('subordinate', xappy.FieldActions.INDEX_EXACT)
23
 
    indexer.add_field_action('provides', xappy.FieldActions.INDEX_EXACT)
24
 
    indexer.add_field_action('requires', xappy.FieldActions.INDEX_EXACT)
25
 
 
26
 
    # Store content
27
 
    indexer.add_field_action('name', xappy.FieldActions.STORE_CONTENT)
28
 
    indexer.add_field_action('store_url', xappy.FieldActions.STORE_CONTENT)
29
 
    indexer.add_field_action('summary', xappy.FieldActions.STORE_CONTENT)
30
 
    indexer.add_field_action('short_url', xappy.FieldActions.STORE_CONTENT)
31
 
    indexer.add_field_action('label', xappy.FieldActions.STORE_CONTENT)
32
 
    indexer.add_field_action('series', xappy.FieldActions.STORE_CONTENT)
33
 
    indexer.add_field_action('owner', xappy.FieldActions.STORE_CONTENT)
34
 
    indexer.add_field_action('subordinate', xappy.FieldActions.STORE_CONTENT)
35
 
 
36
 
    # Full text search fields
37
 
    indexer.add_field_action(
38
 
        'name', xappy.FieldActions.INDEX_FREETEXT, weight=10,
39
 
        language='en')
40
 
 
41
 
    indexer.add_field_action(
42
 
        'summary', xappy.FieldActions.INDEX_FREETEXT, weight=5,
43
 
        language='en')
44
 
 
45
 
    indexer.add_field_action(
46
 
        'description', xappy.FieldActions.INDEX_FREETEXT, weight=3,
47
 
        language='en')
48
 
 
49
 
    indexer.add_field_action(
50
 
        'config', xappy.FieldActions.INDEX_FREETEXT, language='en')
51
 
 
52
 
    indexer.add_field_action(
53
 
        'relations', xappy.FieldActions.INDEX_FREETEXT, language='en')
54
 
 
55
 
    indexer.add_field_action(
56
 
        'changes', xappy.FieldActions.INDEX_FREETEXT, language='en')
57
 
 
58
 
 
59
 
def index_charm(indexer, charm):
60
 
    doc = xappy.UnprocessedDocument()
61
 
 
62
 
    # Weight critical fields higher for official charms.
63
 
    if charm['owner'] == 'charmers':
64
 
        weight = 10
65
 
    else:
66
 
        weight = 1
67
 
    doc.fields.append(xappy.Field(
68
 
        "name", charm["name"], weight=weight))
69
 
    doc.fields.append(xappy.Field(
70
 
        "summary", charm["summary"], weight=weight))
71
 
    doc.fields.append(xappy.Field(
72
 
        "description", charm["description"], weight=weight))
73
 
 
74
 
    doc.fields.append(xappy.Field("owner", charm["owner"]))
75
 
    doc.fields.append(xappy.Field("short_url", charm["short_url"]))
76
 
    doc.fields.append(xappy.Field("label", charm["label"]))
77
 
    doc.fields.append(xappy.Field("series", charm["series"]))
78
 
 
79
 
    if charm.get('subordinate'):
80
 
        doc.fields.append(xappy.Field('subordinate', 'true'))
81
 
 
82
 
    if "config" in charm \
83
 
       and charm['config'] \
84
 
       and 'options' in charm['config'] \
85
 
       and charm['config']['options']:
86
 
        config_text = []
87
 
 
88
 
        for key, option in charm["config"]["options"].items():
89
 
            config_text.append(key)
90
 
            config_text.append(option.get("description", ""))
91
 
        doc.fields.append(xappy.Field("config", " ".join(config_text)))
92
 
 
93
 
    relation_text = []
94
 
 
95
 
    if "requires" in charm and charm["requires"]:
96
 
        for key, option in charm["requires"].items():
97
 
            relation_text.append(key)
98
 
            relation_text.append(option["interface"])
99
 
            doc.fields.append(xappy.Field('requires', option["interface"]))
100
 
    if "provides" in charm and charm["provides"]:
101
 
        for key, option in charm["provides"].items():
102
 
            if not isinstance(option, dict):
103
 
                log.warning("invalid charm provides %s", charm['branch_spec'])
104
 
                continue
105
 
            relation_text.append(key)
106
 
            relation_text.append(option["interface"])
107
 
            doc.fields.append(xappy.Field("provides", option["interface"]))
108
 
 
109
 
    if relation_text:
110
 
        doc.fields.append(xappy.Field("relations", " ".join(relation_text)))
111
 
 
112
 
    change_text = []
113
 
    for change in charm.get("changes", ()):
114
 
        change_text.append(change['message'])
115
 
        change_text.append(change['committer'])
116
 
 
117
 
    if change_text:
118
 
        doc.fields.append(xappy.Field("changes", " ".join(change_text)))
119
 
 
120
 
    if 'store_url' in charm:
121
 
        doc.fields.append(xappy.Field('store_url', charm['store_url']))
122
 
    else:
123
 
        log.warning("No store url found for %s", charm["_id"])
124
 
    doc.id = charm["_id"]
125
 
    indexer.replace(doc)
126
 
    indexer.flush()
127
 
 
128
 
 
129
 
def reindex(db, indexer):
130
 
    count = 0
131
 
    for charm in db.find():
132
 
        log.debug("Indexing %s", charm['branch_spec'])
133
 
        try:
134
 
            index_charm(indexer, charm)
135
 
        except:
136
 
            log.error("Indexing charm %s", charm['branch_spec'])
137
 
            raise
138
 
        count += 1
139
 
    log.info("Indexed %d Charms" % count)
140
 
 
141
 
 
142
 
def index_queue(db, indexer, index_queue):
143
 
    count = 0
144
 
    while 1:
145
 
        count += 1
146
 
        item = index_queue.next()
147
 
        if not item:
148
 
            return
149
 
        try:
150
 
            charm_data = item.payload
151
 
            charm = db.find_one({"_id": charm_data["branch_spec"]})
152
 
 
153
 
            log.debug("Indexing charm %s", charm_data["branch_spec"])
154
 
            if charm:
155
 
                index_charm(indexer, charm)
156
 
            else:
157
 
                log.info(
158
 
                    "Skipping unknown charm %s", charm_data["branch_spec"])
159
 
        except KeyboardInterrupt:
160
 
            raise
161
 
        except:
162
 
            log.exception("Error indexing %s", charm_data)
163
 
            continue
164
 
        finally:
165
 
            # Remove the job from the input queue.
166
 
            item.complete()
167
 
    log.info("Indexed %d Charms" % count)
168
 
 
169
 
 
170
 
def main():
 
19
 
 
20
if __name__ == '__main__':
 
21
    log = logging.getLogger("charm.index")
171
22
    logging.basicConfig(
172
23
        level=logging.WARNING,
173
24
        format="%(asctime)s: %(name)s@%(levelname)s: %(message)s")
181
32
    # Index queue
182
33
    in_queue = get_queues(INDEX_IN_QUEUE)
183
34
    index_queue(db, indexer, in_queue)
 
35
    # XXX j.c.sackett Reindexing should be its own job, outside of the ingest
 
36
    # queue.
184
37
    #reindex(db, indexer)
185
38
 
186
39
    indexer.close()
187
40
    log.info("Indexed Repo")
188
 
 
189
 
if __name__ == '__main__':
190
 
    main()