~jstys-z/helioviewer.org/client5

« back to all changes in this revision

Viewing changes to install/hvpull/net/daemon.py

  • Committer: Keith Hughitt
  • Date: 2012-04-23 16:02:25 UTC
  • mto: This revision was merged to the branch mainline in revision 732.
  • Revision ID: keith.hughitt@nasa.gov-20120423160225-xzoh82ejf37c8yr7
Incorporated HVPull code

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""ImageRetreivalDaemon"""
 
2
# Licensed under MOZILLA PUBLIC LICENSE Version 1.1
 
3
# Author: Keith Hughitt <keith.hughitt@nasa.gov>
 
4
# Author: Jack Ireland <jack.ireland@nasa.gov>
 
5
# pylint: disable=E1121
 
6
import sys
 
7
import datetime
 
8
import time
 
9
import logging
 
10
import os
 
11
import redis
 
12
import sunpy
 
13
import Queue
 
14
 
 
15
class ImageRetrievalDaemon:
 
16
    """Retrieves images from the server as specified"""
 
17
    def __init__(self, server, browse_method, download_method, conf):
 
18
        """Explain."""
 
19
        self._db = None
 
20
        
 
21
        # Database info
 
22
        self.dbhost = conf.get('redis', 'host')
 
23
        self.dbnum = conf.get('redis', 'database')
 
24
        self.dbport = int(conf.get('redis', 'port'))
 
25
        
 
26
        # Maximum number of simultaneous downloads
 
27
        self.queue = Queue.Queue()
 
28
        self.max_downloads = 4
 
29
        
 
30
        # Filepaths
 
31
        self.working_dir = os.path.expanduser(conf.get('directories',
 
32
                                                       'working_dir'))
 
33
        self.image_archive = os.path.expanduser(conf.get('directories',
 
34
                                                         'image_archive'))
 
35
        # Check directory permission
 
36
        self._check_permissions() 
 
37
        
 
38
        # Load data server, browser, and downloader
 
39
        self.server = self._load_server(server)
 
40
        self.browser = self._load_browser(browse_method, self.server.get_uri())
 
41
        
 
42
        # Start downloaders
 
43
        self.downloaders = []
 
44
        
 
45
        for i in range(self.max_downloads):
 
46
            self.downloaders.append(self._load_downloader(download_method))
 
47
 
 
48
        # Shutdown switch
 
49
        self.shutdown_requested = False
 
50
        
 
51
        # Initialize database
 
52
        self._init_db()
 
53
 
 
54
    def _init_db(self):
 
55
        """Initialise the database"""
 
56
        try:
 
57
            self._db = redis.StrictRedis(host=self.dbhost, port=self.dbport, 
 
58
                                         db=self.dbnum)
 
59
            self._db.ping()
 
60
        except redis.ConnectionError:
 
61
            logging.error('Unable to connect to Redis. Is redis running?')
 
62
            print("Please start redis and try again...")
 
63
            sys.exit()
 
64
 
 
65
    def _check_permissions(self):
 
66
        """Checks to make sure we have write permissions to directories"""
 
67
        for d in [self.working_dir, self.image_archive]:
 
68
            if not (os.path.isdir(d) and os.access(d, os.W_OK)):
 
69
                print("Unable to write to specified directories. "
 
70
                      "Please check permissions for locations listed in "
 
71
                      "settings.cfg and try again...")
 
72
                sys.exit()
 
73
 
 
74
    def _load_server(self, server):
 
75
        """Loads a data server"""
 
76
        cls = self._load_class('hvpull.servers', 
 
77
                               server, self.get_servers().get(server))
 
78
        return cls()
 
79
            
 
80
    def _load_browser(self, browse_method, uri):
 
81
        """Loads a data browser"""
 
82
        cls = self._load_class('hvpull.browser', browse_method, 
 
83
                               self.get_browsers().get(browse_method))
 
84
        return cls(uri)
 
85
    
 
86
    def _load_downloader(self, download_method):
 
87
        """Loads a data downloader"""
 
88
        cls = self._load_class('hvpull.downloader', download_method, 
 
89
                               self.get_downloaders().get(download_method))
 
90
        downloader = cls(self.image_archive, self.working_dir, 
 
91
                         self.server.get_uri(), self.queue)
 
92
        
 
93
        downloader.setDaemon(True)
 
94
        downloader.start()
 
95
 
 
96
        return downloader
 
97
        
 
98
    def start(self, starttime=None, endtime=None):
 
99
        """Start daemon operation."""
 
100
        logging.info("Initializing HVPull")
 
101
        
 
102
        date_fmt = "%Y-%m-%d %H:%M:%S"
 
103
        
 
104
        # TODO: Process urls in batches of ~1-500.. this way images start
 
105
        # appearing more quickly when filling in large gaps, etc.
 
106
        
 
107
        # Determine starttime to use
 
108
        if starttime is not None:
 
109
            starttime = datetime.datetime.strptime(starttime, date_fmt)
 
110
        else:
 
111
            starttime = self.server.get_starttime()
 
112
            
 
113
        # If end time is specified, fill in data from start to end
 
114
        if endtime is not None:
 
115
            endtime = datetime.datetime.strptime(endtime, date_fmt)
 
116
            urls = self.query(starttime, endtime)
 
117
            self.acquire(urls)
 
118
            self.ingest(urls)
 
119
            return None
 
120
        else:
 
121
        # Otherwise, first query from start -> now
 
122
            now = datetime.datetime.utcnow()
 
123
            urls = self.query(starttime, now)
 
124
            self.acquire(urls)
 
125
            self.ingest(urls)
 
126
        
 
127
        # Begin main loop
 
128
        while not self.shutdown_requested:
 
129
            now = datetime.datetime.utcnow()
 
130
            starttime = self.server.get_starttime()
 
131
            
 
132
            # get a list of files available
 
133
            urls = self.query(starttime, now)
 
134
            
 
135
            # acquire the data files
 
136
            self.acquire(urls)
 
137
            self.ingest(urls)
 
138
            
 
139
            #time.sleep(self.server.pause.seconds)
 
140
            time.sleep(self.server.pause.seconds)
 
141
        
 
142
        # Shutdown
 
143
        self.stop()
 
144
        
 
145
    def stop(self):
 
146
        logging.info("Exiting HVPull")
 
147
        sys.exit()
 
148
        
 
149
    def ingest(self, urls):
 
150
        """
 
151
        Add images to helioviewer images db.
 
152
          (1) Make sure the file exists
 
153
          (2) Make sure the file is 'good', and quarantine if it is not.
 
154
          (3) Apply the ESA JPIP encoding.
 
155
          (4) Ingest
 
156
          (5) Update database to say that the file has been successfully 
 
157
              'ingested'.
 
158
              
 
159
        """
 
160
        base_url = self.server.get_uri()
 
161
        
 
162
        # Get filepaths
 
163
        filepaths = []
 
164
        
 
165
        for url in urls:
 
166
            p = os.path.join(self.image_archive, url.replace(base_url, ""))
 
167
            if os.path.isfile(p):
 
168
                filepaths.append(p)
 
169
            
 
170
        # Add to database
 
171
        for filepath in filepaths:
 
172
            info = sunpy.read_header(filepath)
 
173
            
 
174
            img_counter = self._db.incr('counter:img_id')
 
175
            img_id = 'img:%s' % os.path.basename(filepath)
 
176
            
 
177
            params = {
 
178
                "id": img_counter,
 
179
                "timestamp": datetime.datetime.utcnow(),
 
180
                "observatory": info['observatory'],
 
181
                "instrument": info['instrument'],
 
182
                "detector": info['detector'],
 
183
                "measurement": info['measurement'],
 
184
                "date_obs": info['date']
 
185
            }
 
186
            self._db.hmset(img_id, params)
 
187
 
 
188
 
 
189
    def acquire(self, urls):
 
190
        """Acquires all the available files."""
 
191
        # If no new files are available do nothing
 
192
        if not urls:
 
193
            return
 
194
        
 
195
        print("Found %d new files" % len(urls))
 
196
        
 
197
        # Download files
 
198
        while len(urls) > 0:
 
199
            # Download files 20 at a time to avoid blocking shutdown requests
 
200
            for i in range(20): #pylint: disable=W0612
 
201
                if len(urls) > 0:
 
202
                    url = urls.pop()
 
203
                    self.queue.put(url)
 
204
                
 
205
            self.queue.join()
 
206
            
 
207
            if self.shutdown_requested:
 
208
                self.stop()
 
209
 
 
210
    def shutdown(self):
 
211
        print("Stopping HVPull...")
 
212
        self.shutdown_requested = True
 
213
        
 
214
        for downloader in self.downloaders:
 
215
            downloader.stop()
 
216
        
 
217
    def query(self, starttime, endtime):
 
218
        """Query and retrieve data within the specified range.
 
219
        
 
220
        Checks for data in the specified range and retrieves any new files.
 
221
        After execution is completed, the same range is checked again to see
 
222
        if any new files have appeared since the first execution. This continues
 
223
        until no new files are found (for xxx minutes?)
 
224
        """
 
225
        # Get the nickname subdirectory list present at the server
 
226
        root_url = self.server.get_uri()
 
227
        nicknames = self.browser.get_directories(root_url)
 
228
 
 
229
        # No nicknames found.
 
230
        if nicknames == []:
 
231
            return None
 
232
        
 
233
        logging.info("Querying time range %s - %s", starttime, endtime)
 
234
                
 
235
        # Get the list of dates
 
236
        fmt = "%Y/%m/%d"
 
237
        dates = [starttime.strftime(fmt)]
 
238
 
 
239
        date = starttime.date()
 
240
        while date < endtime.date():
 
241
            date = date + datetime.timedelta(days=1)
 
242
            dates.append(date.strftime(fmt))
 
243
        
 
244
        # Ensure the dates are most recent first
 
245
        dates.sort()
 
246
        dates.reverse()
 
247
        
 
248
        # Get the measurement subdirectories present at the server        
 
249
        measurements = []
 
250
        for nickname in nicknames:
 
251
            for date in dates:
 
252
                location = os.path.join(nickname, date)
 
253
                measurement = self.browser.get_directories(location)
 
254
                measurements.extend(measurement)
 
255
 
 
256
        # No measurements found
 
257
        if measurements == []:
 
258
            return None
 
259
 
 
260
        # Get all the unique measurements
 
261
        measurements = list(set(measurements))
 
262
 
 
263
        # Get a sorted list of available JP2 files via browser
 
264
        files = []
 
265
        
 
266
        # TESTING>>>>>>
 
267
        # measurements = [measurements[1]]
 
268
 
 
269
        # Check each remote directory for new files
 
270
        for measurement in measurements:
 
271
            if self.shutdown_requested:
 
272
                return            
 
273
            logging.info('Scanning ' + measurement)
 
274
            matches = self.browser.get_files(measurement, "jp2")
 
275
            files.extend(matches)
 
276
 
 
277
        # Remove any duplicates
 
278
        files = list(set(files))
 
279
        
 
280
        return filter(self._filter_new, files) or None
 
281
    
 
282
    def _load_class(self, base_package, packagename, classname):
 
283
        """Dynamically loads a class given a set of strings indicating its 
 
284
        location"""
 
285
        # Import module
 
286
        modname = "%s.%s" % (base_package, packagename)
 
287
        __import__(modname)
 
288
    
 
289
        # Instantiate class and return
 
290
        return getattr(sys.modules[modname], classname)
 
291
    
 
292
    def _filter_new(self, url):
 
293
        """For a given list of remote files determines which ones have not
 
294
        yet been acquired."""
 
295
        filename = os.path.basename(url)
 
296
        return not self._db.exists("img:%s" % filename)
 
297
    
 
298
    @classmethod
 
299
    def get_servers(cls):
 
300
        """Returns a list of valid servers to interact with"""
 
301
        return {
 
302
            "lmsal": "LMSALDataServer",
 
303
            "soho": "SOHODataServer",
 
304
            "stereo": "STEREODataServer"
 
305
        }
 
306
        
 
307
    @classmethod
 
308
    def get_browsers(cls):
 
309
        """Returns a list of valid data browsers to interact with"""
 
310
        return {
 
311
        "httpbrowser": "HTTPDataBrowser",
 
312
        "localbrowser": "LocalDataBrowser"
 
313
        }
 
314
 
 
315
    @classmethod
 
316
    def get_downloaders(cls):
 
317
        """Returns a list of valid data downloaders to interact with"""
 
318
        return {
 
319
            "urllib": "URLLibDownloader"
 
320
        }