~datafinder-team/datafinder/trunk

« back to all changes in this revision

Viewing changes to src/datafinder/persistence/adapters/sftp/factory.py

  • Committer: tobias-schlauch
  • Date: 2014-09-17 11:47:52 UTC
  • Revision ID: tobias.schlauch@dlr.de-20140917114752-uh8wtl8g9q1gyhwx
Added interface to deteremine available disk space for data stores. Provide first implementation in context of the SFTP store which makes use of the df command.

Show diffs side-by-side

added added

removed removed

Lines of Context:
41
41
"""
42
42
 
43
43
 
 
44
import decimal
 
45
import logging
 
46
import socket
 
47
 
 
48
from paramiko import SSHException
 
49
 
 
50
from datafinder.persistence.error import PersistenceError
 
51
 
44
52
from datafinder.persistence.adapters.sftp import constants, utils
45
53
from datafinder.persistence.adapters.sftp.configuration import Configuration
46
54
from datafinder.persistence.adapters.sftp.connection_pool import SftpConnectionPool
107
115
        self._configuration.username = credentials["username"]
108
116
        self._configuration.password = credentials["password"]
109
117
        self._connectionPool.reload()
 
118
 
 
119
    def determineFreeDiskSpace(self):
 
120
        """
 
121
        @see: L{FileSystem.determineFreeDiskSpace<datafinder.persistence.factory.FileSystem.determineFreeDiskSpace>}
 
122
        """
 
123
        
 
124
        connection = self._connectionPool.acquire()
 
125
        try:
 
126
            transport = connection.get_channel().get_transport()
 
127
            diskFreeCommand = "df %s" % self._configuration.basePath
 
128
            print diskFreeCommand
 
129
            commandRunner = _SshCommandRunner(diskFreeCommand, transport)
 
130
            diskFreeCommandOutput, _ = commandRunner.executeCommand()
 
131
            return _parseDiskFreeCommandOutForAvailableSpace(diskFreeCommandOutput)
 
132
        finally:
 
133
            self._connectionPool.release(connection)
 
134
 
 
135
            
 
136
def _parseDiskFreeCommandOutForAvailableSpace(diskFreeCommandOutput):
 
137
    """ This helper function parses the output of the df command to determine the available
 
138
    free space of the first device listed. This functions is a first attempt in this direction and is intended
 
139
    to work fine with the result of 'df <PATH>'. 
 
140
    Consider introducing a separate helper class when introducing different/alternative parsing strategy. """
 
141
    
 
142
    for line in diskFreeCommandOutput.split("\n"):
 
143
        line = line.strip()
 
144
        if line and not line.startswith("Filesystem"): # Ignoring header
 
145
            token = line.split()
 
146
            if len(token) > 3:
 
147
                try:
 
148
                    return decimal.Decimal(token[3])
 
149
                except decimal.InvalidOperation:
 
150
                    raise PersistenceError("Unable to parse df command output '%s' for avaialble disk space." % diskFreeCommandOutput)
 
151
    # Handle all non-conformant df command outputs
 
152
    raise PersistenceError("Unable to parse df command output '%s' for avaialble disk space." % diskFreeCommandOutput)
 
153
 
 
154
 
 
155
class _SshCommandRunner(object):
 
156
    """ Helper class which executes a specific SSH command on the basis of an 
 
157
    authenticated transport channel. It creates a new channel and properly closes it.
 
158
    This class might be reused in the TSM adapter.
 
159
    """
 
160
    
 
161
    _log = logging.getLogger()
 
162
    
 
163
    def __init__(self, command, transport, timeout=500.0, maxReceivedBytes=1024):
 
164
        """
 
165
        @param command: String representing the command that should be executed.
 
166
        @type command: C{str}
 
167
        @param transport: An authenticated paramiko transport channel.
 
168
        @type transport: C{paramiko.Transport}
 
169
        """
 
170
        
 
171
        self._command = command
 
172
        self._transport = transport
 
173
        self._timeout = timeout
 
174
        self._maxReceivedBytes = maxReceivedBytes
 
175
        
 
176
    def executeCommand(self):
 
177
        """ 
 
178
        Executes the given command on the connected host and returns corresponding
 
179
        standard output and standard error output.
 
180
 
 
181
        @raise PersistenceError: Indicating problem executing the specific command.
 
182
        """
 
183
        
 
184
        channel = self._transport.open_session()
 
185
        channel.settimeout(self._timeout)
 
186
        try:
 
187
            channel.exec_command(self._command)
 
188
        except SSHException, sshException:
 
189
            errorMessage = "Cannot send command '%s' to host.\nReason: '%s'" % (self._command, str(sshException)) 
 
190
            raise PersistenceError(errorMessage)
 
191
        else:
 
192
            standardOutput =  self._getCommandOutput(channel.recv)
 
193
            standardError = self._getCommandOutput(channel.recv_stderr)
 
194
            if standardError:
 
195
                self._log.debug(standardError)
 
196
            return standardOutput, standardError
 
197
        finally:
 
198
            channel.close()
 
199
 
 
200
    def _getCommandOutput(self, outputFunction):
 
201
        output = ""
 
202
        try:
 
203
            outputContentPart = outputFunction(self._maxReceivedBytes)
 
204
            while len(outputContentPart) > 0:
 
205
                output = output + outputContentPart
 
206
                outputContentPart = outputFunction(self._maxReceivedBytes)
 
207
        except socket.timeout:
 
208
            raise PersistenceError("Receiving out put from '%s' command timed out." % self._command)
 
209
        else:
 
210
            return output