1
"""ImageRetreivalDaemon"""
2
# Licensed under MOZILLA PUBLIC LICENSE Version 1.1
3
# Author: Keith Hughitt <keith.hughitt@nasa.gov>
4
# Author: Jack Ireland <jack.ireland@nasa.gov>
5
# pylint: disable=E1121
15
class ImageRetrievalDaemon:
16
"""Retrieves images from the server as specified"""
17
def __init__(self, server, browse_method, download_method, conf):
22
self.dbhost = conf.get('redis', 'host')
23
self.dbnum = conf.get('redis', 'database')
24
self.dbport = int(conf.get('redis', 'port'))
26
# Maximum number of simultaneous downloads
27
self.queue = Queue.Queue()
28
self.max_downloads = 4
31
self.working_dir = os.path.expanduser(conf.get('directories',
33
self.image_archive = os.path.expanduser(conf.get('directories',
35
# Check directory permission
36
self._check_permissions()
38
# Load data server, browser, and downloader
39
self.server = self._load_server(server)
40
self.browser = self._load_browser(browse_method, self.server.get_uri())
45
for i in range(self.max_downloads):
46
self.downloaders.append(self._load_downloader(download_method))
49
self.shutdown_requested = False
55
"""Initialise the database"""
57
self._db = redis.StrictRedis(host=self.dbhost, port=self.dbport,
60
except redis.ConnectionError:
61
logging.error('Unable to connect to Redis. Is redis running?')
62
print("Please start redis and try again...")
65
def _check_permissions(self):
66
"""Checks to make sure we have write permissions to directories"""
67
for d in [self.working_dir, self.image_archive]:
68
if not (os.path.isdir(d) and os.access(d, os.W_OK)):
69
print("Unable to write to specified directories. "
70
"Please check permissions for locations listed in "
71
"settings.cfg and try again...")
74
def _load_server(self, server):
75
"""Loads a data server"""
76
cls = self._load_class('hvpull.servers',
77
server, self.get_servers().get(server))
80
def _load_browser(self, browse_method, uri):
81
"""Loads a data browser"""
82
cls = self._load_class('hvpull.browser', browse_method,
83
self.get_browsers().get(browse_method))
86
def _load_downloader(self, download_method):
87
"""Loads a data downloader"""
88
cls = self._load_class('hvpull.downloader', download_method,
89
self.get_downloaders().get(download_method))
90
downloader = cls(self.image_archive, self.working_dir,
91
self.server.get_uri(), self.queue)
93
downloader.setDaemon(True)
98
def start(self, starttime=None, endtime=None):
99
"""Start daemon operation."""
100
logging.info("Initializing HVPull")
102
date_fmt = "%Y-%m-%d %H:%M:%S"
104
# TODO: Process urls in batches of ~1-500.. this way images start
105
# appearing more quickly when filling in large gaps, etc.
107
# Determine starttime to use
108
if starttime is not None:
109
starttime = datetime.datetime.strptime(starttime, date_fmt)
111
starttime = self.server.get_starttime()
113
# If end time is specified, fill in data from start to end
114
if endtime is not None:
115
endtime = datetime.datetime.strptime(endtime, date_fmt)
116
urls = self.query(starttime, endtime)
121
# Otherwise, first query from start -> now
122
now = datetime.datetime.utcnow()
123
urls = self.query(starttime, now)
128
while not self.shutdown_requested:
129
now = datetime.datetime.utcnow()
130
starttime = self.server.get_starttime()
132
# get a list of files available
133
urls = self.query(starttime, now)
135
# acquire the data files
139
#time.sleep(self.server.pause.seconds)
140
time.sleep(self.server.pause.seconds)
146
logging.info("Exiting HVPull")
149
def ingest(self, urls):
151
Add images to helioviewer images db.
152
(1) Make sure the file exists
153
(2) Make sure the file is 'good', and quarantine if it is not.
154
(3) Apply the ESA JPIP encoding.
156
(5) Update database to say that the file has been successfully
160
base_url = self.server.get_uri()
166
p = os.path.join(self.image_archive, url.replace(base_url, ""))
167
if os.path.isfile(p):
171
for filepath in filepaths:
172
info = sunpy.read_header(filepath)
174
img_counter = self._db.incr('counter:img_id')
175
img_id = 'img:%s' % os.path.basename(filepath)
179
"timestamp": datetime.datetime.utcnow(),
180
"observatory": info['observatory'],
181
"instrument": info['instrument'],
182
"detector": info['detector'],
183
"measurement": info['measurement'],
184
"date_obs": info['date']
186
self._db.hmset(img_id, params)
189
def acquire(self, urls):
190
"""Acquires all the available files."""
191
# If no new files are available do nothing
195
print("Found %d new files" % len(urls))
199
# Download files 20 at a time to avoid blocking shutdown requests
200
for i in range(20): #pylint: disable=W0612
207
if self.shutdown_requested:
211
print("Stopping HVPull...")
212
self.shutdown_requested = True
214
for downloader in self.downloaders:
217
def query(self, starttime, endtime):
218
"""Query and retrieve data within the specified range.
220
Checks for data in the specified range and retrieves any new files.
221
After execution is completed, the same range is checked again to see
222
if any new files have appeared since the first execution. This continues
223
until no new files are found (for xxx minutes?)
225
# Get the nickname subdirectory list present at the server
226
root_url = self.server.get_uri()
227
nicknames = self.browser.get_directories(root_url)
229
# No nicknames found.
233
logging.info("Querying time range %s - %s", starttime, endtime)
235
# Get the list of dates
237
dates = [starttime.strftime(fmt)]
239
date = starttime.date()
240
while date < endtime.date():
241
date = date + datetime.timedelta(days=1)
242
dates.append(date.strftime(fmt))
244
# Ensure the dates are most recent first
248
# Get the measurement subdirectories present at the server
250
for nickname in nicknames:
252
location = os.path.join(nickname, date)
253
measurement = self.browser.get_directories(location)
254
measurements.extend(measurement)
256
# No measurements found
257
if measurements == []:
260
# Get all the unique measurements
261
measurements = list(set(measurements))
263
# Get a sorted list of available JP2 files via browser
267
# measurements = [measurements[1]]
269
# Check each remote directory for new files
270
for measurement in measurements:
271
if self.shutdown_requested:
273
logging.info('Scanning ' + measurement)
274
matches = self.browser.get_files(measurement, "jp2")
275
files.extend(matches)
277
# Remove any duplicates
278
files = list(set(files))
280
return filter(self._filter_new, files) or None
282
def _load_class(self, base_package, packagename, classname):
283
"""Dynamically loads a class given a set of strings indicating its
286
modname = "%s.%s" % (base_package, packagename)
289
# Instantiate class and return
290
return getattr(sys.modules[modname], classname)
292
def _filter_new(self, url):
293
"""For a given list of remote files determines which ones have not
294
yet been acquired."""
295
filename = os.path.basename(url)
296
return not self._db.exists("img:%s" % filename)
299
def get_servers(cls):
300
"""Returns a list of valid servers to interact with"""
302
"lmsal": "LMSALDataServer",
303
"soho": "SOHODataServer",
304
"stereo": "STEREODataServer"
308
def get_browsers(cls):
309
"""Returns a list of valid data browsers to interact with"""
311
"httpbrowser": "HTTPDataBrowser",
312
"localbrowser": "LocalDataBrowser"
316
def get_downloaders(cls):
317
"""Returns a list of valid data downloaders to interact with"""
319
"urllib": "URLLibDownloader"