diff options
Diffstat (limited to 'Monitoring/MonitoringService/Driver')
-rw-r--r-- | Monitoring/MonitoringService/Driver/Driver.py | 20 | ||||
-rw-r--r-- | Monitoring/MonitoringService/Driver/LocalExec.py | 56 | ||||
-rw-r--r-- | Monitoring/MonitoringService/Driver/REST.py | 63 | ||||
-rw-r--r-- | Monitoring/MonitoringService/Driver/SOAPClient.py | 22 | ||||
-rw-r--r-- | Monitoring/MonitoringService/Driver/SshExec.py | 170 | ||||
-rw-r--r-- | Monitoring/MonitoringService/Driver/SshTunnel.py | 178 | ||||
-rw-r--r-- | Monitoring/MonitoringService/Driver/__init__.py | 0 | ||||
-rw-r--r-- | Monitoring/MonitoringService/Driver/test.py | 153 |
8 files changed, 662 insertions, 0 deletions
diff --git a/Monitoring/MonitoringService/Driver/Driver.py b/Monitoring/MonitoringService/Driver/Driver.py new file mode 100644 index 0000000..bd2ed18 --- /dev/null +++ b/Monitoring/MonitoringService/Driver/Driver.py @@ -0,0 +1,20 @@ +''' +Created on Oct 28, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' +import logging + +class Driver(object): + ''' + @summary: an empty driver to serve as an ancient class + @author: steger, jozsef + @cvar logger: an interface to the logger named "NOVI.DRIVER" + @type logger: logging.Logger + ''' + logger = logging.getLogger("NOVI.DRIVER") + +class DriverError(Exception): + pass
\ No newline at end of file diff --git a/Monitoring/MonitoringService/Driver/LocalExec.py b/Monitoring/MonitoringService/Driver/LocalExec.py new file mode 100644 index 0000000..2cf4f8f --- /dev/null +++ b/Monitoring/MonitoringService/Driver/LocalExec.py @@ -0,0 +1,56 @@ +''' +Created on Feb 4, 2013 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +#TODO: nested command execution is not working properly: e.g.: echo `hostname` + +from Driver import Driver +from subprocess import Popen, PIPE + +class LocalExec(Driver): + ''' + @summary: implements a driver to execute local commands + @ivar command: the default command + @type command: str + @ivar p: the process api + @type p: subprocess.Popen or None + ''' + + def __init__(self, command = "echo -n helloworld"): + ''' + @summary: save a default command + @param command: the default command + @type command: str + ''' + self.command = command + self.p = None + + def __del__(self): + ''' + @summary: an implicit deletion of the driver triggers a kill signal on a running process + ''' + if self.p: + self.p.kill() + self.p = None + + def execute(self, command = None): + ''' + @summary: executes a local command + @param command: the command to run, if None, the default command is issued + @type command: str or None + @return: the standard output of the command run + @rtype: str + ''' + if command is None: + command = self.command + self.p = Popen(args = command.split(' '), stdout = PIPE, stderr = PIPE) + stout, sterr = self.p.communicate() + self.p = None + self.logger.debug("executed '%s'" % (command)) + if len(sterr): + self.logger.warning("execution '%s' failed: %s" % (command, sterr)) + return stout diff --git a/Monitoring/MonitoringService/Driver/REST.py b/Monitoring/MonitoringService/Driver/REST.py new file mode 100644 index 0000000..bf4fc2f --- /dev/null +++ b/Monitoring/MonitoringService/Driver/REST.py @@ -0,0 +1,63 @@ +''' +Created on Feb 4, 2013 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +from Driver import Driver +from Credential.credentialtypes import UsernamePassword +from httplib2 import Http + +class RESTDriver(Driver): + ''' + @summary: implements REST driver to fetch using http GET + @cvar timeout: timeout of connection + @type timeout: float + @cvar cache: a cache directory + @type cache: str + @ivar url: a default document locator to be reused + @type url: str + @ivar proxy: an interface to the http server + @type proxy: httplib2.Http + ''' + timeout = 10 + cache = "/tmp/.cache" + + def __init__(self, url, credential = None, validate_ssl = False): + ''' + @summary: initializes a proxy to the http service and saves a default document locator + @param url: the default document locator + @type url: str + @param credential: an authentication secret + @type credential: L{Credential} or None + @param validate_ssl: whether to apply strick certificate validation, default is False + @type validate_ssl: bool + ''' + self.url = url + self.proxy = Http(cache = self.cache, timeout = self.timeout) + self.proxy.disable_ssl_certificate_validation = not validate_ssl + if isinstance(credential, UsernamePassword): + # use password authentication + self.proxy.add_credentials(credential.username, credential.password) + + def fetch(self, url = None): + ''' + @summary: retrieve the document + @param url: the document locator, if not present the default is used + @type url: str or None + @return: the remote document + @rtype: str or None + @note: if the remote content cached is not changed, None is returned + ''' + if url is None: + url = self.url + status, response = self.proxy.request(uri = url, method = "GET") + if status.status == 200: + return response + if status.status == 304: + self.logger.warning("remote content @ %s was not changed" % url) + return None + self.logger.error("%s -- retrieving @%s failed: %s" % (status, url, response)) + return None diff --git a/Monitoring/MonitoringService/Driver/SOAPClient.py b/Monitoring/MonitoringService/Driver/SOAPClient.py new file mode 100644 index 0000000..7de5b70 --- /dev/null +++ b/Monitoring/MonitoringService/Driver/SOAPClient.py @@ -0,0 +1,22 @@ +''' +Created on Sep 2, 2011 + +@author: laki, sandor +@organization: ELTE +@contact: laki@complex.elte.hu +@author: steger, jozsef +''' + +#TODO: catch exception using the .service attribute and log it in the Driver log +from suds import transport, client, wsse +from Driver import Driver + +class SOAPClient(Driver, client.Client): + ''' + @summary: implements SOAP driver to access remote procedures + ''' + pass + +SOAPSecurity=wsse.Security +SOAPUsernameToken=wsse.UsernameToken +SOAPHttpAuthenticated=transport.http.HttpAuthenticated diff --git a/Monitoring/MonitoringService/Driver/SshExec.py b/Monitoring/MonitoringService/Driver/SshExec.py new file mode 100644 index 0000000..454253c --- /dev/null +++ b/Monitoring/MonitoringService/Driver/SshExec.py @@ -0,0 +1,170 @@ +''' +Created on Jul 18, 2011 + +@author: steger, jozsef +@organization: ELTE +@contact: steger@complex.elte.hu +''' + +from Driver import Driver, DriverError +from paramiko import SSHClient, RSAKey, SSHException, AutoAddPolicy +from Credential.credentialtypes import UsernamePassword, UsernameRSAKey +from time import sleep +import socket +import logging + +class SshDriver(Driver): + ''' + @summary: implements a driver to build an SSH connection to a server using paramiko package. + @cvar timeout: a timeout to set up the connection + @type timeout: float + @cvar trials: how many times to retry if a time out event occurs or if the remote server is busy to respond + @type trials: int + @cvar wait: how long to wait between two trials + @type wait: .2 + @ivar client: the api of the ssh client + @type client: paramiko.SSHClient or None + @ivar host: the host name of the remote ssh server + @type host: str or None + @ivar isconnected: indicates whether the connection is set up + @type isconnected: bool + ''' + timeout = 5 + trials = 3 + wait = .2 + + def __init__(self): + ''' + @summary: bind the paramiko loggers to the "NOVI.DRIVER" + ''' + self.client = None + self.host = None + #bind paramiko.transport logger to the same handlers used by NOVI.DRIVER + l = logging.getLogger("paramiko.transport") + for hlr in self.logger.handlers: + l.addHandler(hlr) + + def __del__(self): + ''' + @summary: close connection upon an implicit deletion of the driver + ''' + self.close() + + def connect(self, host, credential, port = 22, known_host = None): + ''' + @summary: set up the connection + @param host: the host name of the remote server + @type host: str + @param credential: the secret to use for connection set up + @type credential: L{Credential} + @param port: the port remote ssh server is listening + @type port: int + @param known_host: a file name containing host signatures to check, if None AutoAddPolicy applies + @type known_host: str + ''' + self.client = SSHClient() + if known_host is None: + self.client.set_missing_host_key_policy( AutoAddPolicy() ) + else: + self.client.load_host_keys(filename = known_host) + if isinstance(credential, UsernamePassword): + # use password authentication + self.client.connect(hostname = host, port = port, + username = credential.username, password =credential.password, + timeout = self.timeout, look_for_keys = False, compress = True) + elif isinstance(credential, UsernameRSAKey): + # use the RSA key + if credential.password: + pw = credential.password + else: + pw = None + key = RSAKey(password = pw, filename = credential.rsakey) + n = self.trials + while n: + try: + self.client.connect(hostname = host, port = port, + username = credential.username, pkey = key, + timeout = self.timeout, look_for_keys = False, compress = True) + break + except SSHException, e: + if e.message.startswith("Error reading SSH protocol banner"): + n -= 1 + self.logger.warn("retry %d times to connect @%s in %f seconds" % (n, host, self.wait)) + sleep(self.wait) + else: + self.logger.error("cannot connect @%s" % (host)) + raise DriverError("Cannot connect @%s " % host) + except socket.timeout: + n -= 1 + self.logger.warn("time out, retry %d times to connect @%s in %f seconds" % (n, host, self.wait)) + sleep(self.wait) + if not self.isConnected: + self.close() + self.logger.error("cannot connect @%s" % (host)) + raise DriverError("Cannot connect @%s " % host) + self.host = host + self.logger.info("ssh connected @ %s:%d" % (self.host, port)) + + def close(self): + ''' + @summary: closes the ssh connection + ''' + try: + self.client.close() + self.logger.info("ssh disconnected @ %s" % (self.host)) + except: + pass + finally: + self.client = None + self.host = None + + @property + def isConnected(self): + try: + return self.client.get_transport().is_active() + except: + return False + +class SshExec(SshDriver): + ''' + @summary: an extension of the L{SshDriver} to execute commands on the remote machine + @ivar command: the string representation of the command to run + @type command: str + ''' + + def __init__(self, host, credential, port = 22, command = "echo helloworld @ `hostname`", known_host = None): + ''' + @summary: initializes an ssh connection and stores a default command + @param host: the host name of the remote server + @type host: str + @param credential: the secret to use for connection set up + @type credential: L{Credential} + @param port: the port remote ssh server is listening + @type port: int + @param command: the default remote command + @type command: str + @param known_host: a file name containing host signatures to check, if None AutoAddPolicy applies + @type known_host: str + ''' + SshDriver.__init__(self) + self.connect(host, credential, port, known_host) + self.command = command + + def execute(self, command = None): + ''' + @summary: executes a remote command + @param command: the command to run, if None, the default command is issued + @type command: str or None + @return: the standard output of the command run + @rtype: paramico.ChannelFile + ''' + if not self.isConnected: + raise DriverError("Not connected") + if command is None: + command = self.command + _, stout, sterr = self.client.exec_command(command = command) + e = sterr.read() + self.logger.debug("executed @%s '%s'" % (self.host, command)) + if len(e): + self.logger.warning("execution @%s '%s' failed: %s" % (self.host, command, e)) + return stout diff --git a/Monitoring/MonitoringService/Driver/SshTunnel.py b/Monitoring/MonitoringService/Driver/SshTunnel.py new file mode 100644 index 0000000..f2b7d39 --- /dev/null +++ b/Monitoring/MonitoringService/Driver/SshTunnel.py @@ -0,0 +1,178 @@ +''' +Created on Jan 14, 2013 + +@author: steger +''' + +import select +import SocketServer +from Driver import Driver +from SshExec import SshDriver +from threading import Thread +from SshExec import SshExec + +class SshTunnel(SshDriver): + ''' + @summary: this class extends L{SshDriver} and establishes a connection + to the requested SSH server and sets up local port + forwarding (the openssh -L option) from a local port through a tunneled + connection to a destination reachable from the SSH server machine. + @ivar t: the thread container + @type t: threading.Thread or None + ''' + + class ForwardServer (SocketServer.ThreadingTCPServer): + daemon_threads = True + allow_reuse_address = True + + class Handler (SocketServer.BaseRequestHandler): + def handle(self): + try: + chan = self.ssh_transport.open_channel('direct-tcpip', + (self.chain_host, self.chain_port), + self.request.getpeername()) + except Exception, e: + Driver.logger.debug('Incoming request to %s:%d failed: %s' % (self.chain_host, + self.chain_port, + repr(e))) + return + if chan is None: + Driver.logger.debug('Incoming request to %s:%d was rejected by the SSH server.' % + (self.chain_host, self.chain_port)) + return + + Driver.logger.debug('Tunnel open %r -> %r -> %r' % (self.request.getpeername(), + chan.getpeername(), (self.chain_host, self.chain_port))) + while True: + r, _, _ = select.select([self.request, chan], [], []) + if self.request in r: + data = self.request.recv(1024) + if len(data) == 0: + break + chan.send(data) + if chan in r: + data = chan.recv(1024) + if len(data) == 0: + break + self.request.send(data) + chan.close() + self.request.close() + Driver.logger.debug('Tunnel closed from %r' % (self.request.getpeername(),)) + + def __init__(self): + ''' + @summary: allocates thread pointer container + ''' + SshDriver.__init__(self) + self.t = None + + def connect(self, host, credential, localport, port, remoteserver, remoteport, known_host = None): + ''' + @summary: set up the tunnel connection + @param host: the host name of the remote server acting a port forwarder + @type host: str + @param credential: the secret to use for connection set up + @type credential: L{Credential} + @param localport: the local port entry mapped to the remoteport + @type localport: int + @param port: the port of the forwarder ssh server + @type port: int + @param remoteserver: the sink of the tunnel + @type remoteserver: str + @param remoteport: the port of the tunnel sink + @type remoteport: int + @param known_host: a file name containing host signatures to check, if None AutoAddPolicy applies + @type known_host: str + ''' + SshDriver.connect(self, host, credential, port, known_host) + self.logger.info('Now forwarding port %d to %s:%d ...' % (localport, remoteserver, remoteport)) + self.t = Thread(target = self._tran, kwargs = {'localport': localport, 'remoteserver': remoteserver, 'remoteport': remoteport}) + self.t.daemon = True + self.t.start() + + def _tran(self, localport, remoteserver, remoteport): + ''' + @summary: thread worker to transport data over the tunnel + @param localport: the local port entry mapped to the remoteport + @type localport: int + @param remoteserver: the sink of the tunnel + @type remoteserver: str + @param remoteport: the port of the tunnel sink + @type remoteport: int + ''' + try: + # this is a little convoluted, but lets me configure things for the Handler + # object. (SocketServer doesn't give Handlers any way to access the outer + # server normally.) + class SubHander (self.Handler): + chain_host = remoteserver + chain_port = remoteport + ssh_transport = self.client.get_transport() + self.service = self.ForwardServer(('', localport), SubHander) + self.service.serve_forever() + except KeyboardInterrupt: + self.logger.debug('C-c: Port forwarding stopped.') + self.close() + + def close(self): + ''' + @summary: stops the thread and tears down the tunnel + ''' + if self.t is None: + return + self.t.join(timeout = self.timeout) + self.t = None + self.service.shutdown() + self.logger.info('Port forwarding stopped @ %s.' % self.host) + SshDriver.close(self) + + +class SshExecTunnel(SshTunnel): + ''' + @summary: an extension of the L{SshTunnel} driver to execute commands + on the remote machine accessed via the tunnel + @ivar command: the string representation of the command to run + @type command: str + @ivar localdriver: the representation of an ssh client connecting over an existing ssh tunnel + @type localdriver: L{SshExec} + ''' + + def __init__(self, host, credential, localport, port, remoteserver, remoteport, remotecredential = None, command = "echo helloworld @ `hostname`", known_host = None): + ''' + @summary: initializes an ssh connection and stores a default command + @param host: the host name of the remote server acting a port forwarder + @type host: str + @param credential: the secret to use for tunnel set up + @type credential: L{Credential} + @param localport: the local port entry mapped to the remoteport + @type localport: int + @param port: the port of the forwarder ssh server + @type port: int + @param remoteserver: the sink of the tunnel + @type remoteserver: str + @param remoteport: the port of the tunnel sink + @type remoteport: int + @param remotecredential: the secret to use for connection set up, if None then we fall back to the credential + @type remotecredential: L{Credential} or None + @param command: the default remote command + @type command: str + @param known_host: a file name containing host signatures to check, if None AutoAddPolicy applies + @type known_host: str + ''' + SshTunnel.__init__(self) + self.connect(host, credential, localport, port, remoteserver, remoteport, known_host) + self.command = command + if remotecredential is None: + remotecredential = credential + self.localdriver = SshExec(host = 'localhost', credential = remotecredential, port = localport, command = command, known_host = None) + self.logger.info("connected over tunnel") + + def execute(self, command = None): + ''' + @summary: executes a remote command + @param command: the command to run, if None, the default command is issued + @type command: str or None + @return: the standard output of the command run + @rtype: paramico.ChannelFile + ''' + return self.localdriver.execute(command) diff --git a/Monitoring/MonitoringService/Driver/__init__.py b/Monitoring/MonitoringService/Driver/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/MonitoringService/Driver/__init__.py diff --git a/Monitoring/MonitoringService/Driver/test.py b/Monitoring/MonitoringService/Driver/test.py new file mode 100644 index 0000000..4395750 --- /dev/null +++ b/Monitoring/MonitoringService/Driver/test.py @@ -0,0 +1,153 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' +import unittest +from random import randint +from Example.Resources import PLnodes +from threading import Thread +from Example.credentials import noviCredential, sonomaCredential +from SshExec import SshExec +from SOAPClient import SOAPClient +from LocalExec import LocalExec +from logging import FileHandler +import logging +from time import sleep +from SshTunnel import SshExecTunnel + +class Test(unittest.TestCase): + testnodes = map(lambda x: x.get_ipaddress("eth0")[0], PLnodes) + cred_novi = noviCredential + url_sonoma = "http://complex.elte.hu/~steger/sonoma/user.wsdl" + cred_sonoma = sonomaCredential + + + def setUp(self): + pass + + def tearDown(self): + pass + + def gettestnode(self): + ''' + @summary: Return a test node IP address chosen random + @return: ip address + @rtype: string + ''' + return self.testnodes[randint(1, len(self.testnodes))-1] + + def test_helloworld(self): + ''' + @summary: Execute local command + ''' + proc = LocalExec() + result = proc.execute() + self.assertTrue(result.startswith("helloworld"), "Local command output differs from expected") + + def test_helloworldWithMaster(self): + ''' + @summary: Execute remote command in the name of the testuser authenticated with the master key + ''' + proc = SshExec(host = self.gettestnode(), credential = self.cred_novi) + result = proc.execute().read() + self.assertTrue(result.startswith("helloworld @ "), "Remote command output differs from expected") + + def echoWithMaster(self, address): + ''' + @summary: Execute remote echo command in the name of the testuser authenticated with the master key + @param address: ip address of the remote machine + @type address: string + ''' + try: + n = randint(0, 10000) + command = "echo %d" % n + proc = SshExec(host = address, credential = self.cred_novi, command = command) + result = proc.execute().read() + self.assertTrue(result.strip() == str(n), "Remote command @%s output differs from expected: (%s != %d)" % (address, result, n)) + except Exception, e: + self.assertFalse(True, "Got an error %s" % e) + + def test_echoWithMaster(self): + ''' + @summary: Execute remote echo command in the name of the testuser authenticated with the master key + ''' + self.echoWithMaster(self.gettestnode()) + + def test_distributedEcho(self): + ''' + @summary: Execute parallel remote echo commands in a distributed fashion + ''' + threads = [] + for n in self.testnodes: + t = Thread(target = self.echoWithMaster, args = (n,)) + t.daemon = True + t.start() + threads.append(t) + while len(threads): + t = threads.pop() + t.join(5) + + def test_parallelEcho(self): + ''' + @summary: Execute parallel remote echo commands in a test node + ''' + N = 20 + n = self.gettestnode() + threads = [] + while N: + N -= 1 + t = Thread(target = self.echoWithMaster, args = (n,)) + t.daemon = True + t.start() + threads.append(t) + while len(threads): + t = threads.pop() + t.join(5) + + def test_stress(self): + ''' + @summary: Consecutively execute parallel remote echo commands in a distributed fashion + ''' + threads = [] + for n in self.testnodes: + N = randint(5, 20) + while N: + N -= 1 + t = Thread(target = self.echoWithMaster, args = (n,)) + t.daemon = True + t.start() + threads.append(t) + while len(threads): + t = threads.pop() + t.join(5) + + def test_soap(self): + ''' + @summary: Run SONoMA getNodeList + ''' + client = SOAPClient(self.url_sonoma) + resources = client.service.getNodeList(filter = "AVAILABLE") + self.assertGreater(len(resources), 0, "sonoma reports no nodes") + + def test_tunnel(self): + T = SshExecTunnel(host = 'novilab.elte.hu', credential = noviCredential, localport = 4000, port = 22, + remoteserver = 'smilax1.man.poznan.pl', remoteport = 22) + response = T.execute().read().strip() + expected = "helloworld @ smilax1.man.poznan.pl" + self.assertEqual(response, expected, "Remote command output differs from expected %s != %s" % (expected, response)) + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_helloworldWithMaster'] + fn = "/tmp/Driver_test.log" + hdlr = FileHandler(filename = fn, mode = 'w') + l = logging.getLogger("NOVI.DRIVER") + l.setLevel(level = logging.DEBUG) +# l.setLevel(level = logging.INFO) + l.addHandler(hdlr = hdlr) + l.info("START TEST") + try: + unittest.main() + finally: + l.info("FINISHED TEST") + hdlr.close() |