summaryrefslogtreecommitdiffstats
path: root/Monitoring/MonitoringService/Driver
diff options
context:
space:
mode:
Diffstat (limited to 'Monitoring/MonitoringService/Driver')
-rw-r--r--Monitoring/MonitoringService/Driver/Driver.py20
-rw-r--r--Monitoring/MonitoringService/Driver/LocalExec.py56
-rw-r--r--Monitoring/MonitoringService/Driver/REST.py63
-rw-r--r--Monitoring/MonitoringService/Driver/SOAPClient.py22
-rw-r--r--Monitoring/MonitoringService/Driver/SshExec.py170
-rw-r--r--Monitoring/MonitoringService/Driver/SshTunnel.py178
-rw-r--r--Monitoring/MonitoringService/Driver/__init__.py0
-rw-r--r--Monitoring/MonitoringService/Driver/test.py153
8 files changed, 662 insertions, 0 deletions
diff --git a/Monitoring/MonitoringService/Driver/Driver.py b/Monitoring/MonitoringService/Driver/Driver.py
new file mode 100644
index 0000000..bd2ed18
--- /dev/null
+++ b/Monitoring/MonitoringService/Driver/Driver.py
@@ -0,0 +1,20 @@
+'''
+Created on Oct 28, 2011
+
+@author: steger, jozsef
+@organization: ELTE
+@contact: steger@complex.elte.hu
+'''
+import logging
+
+class Driver(object):
+ '''
+ @summary: an empty driver to serve as an ancient class
+ @author: steger, jozsef
+ @cvar logger: an interface to the logger named "NOVI.DRIVER"
+ @type logger: logging.Logger
+ '''
+ logger = logging.getLogger("NOVI.DRIVER")
+
+class DriverError(Exception):
+ pass \ No newline at end of file
diff --git a/Monitoring/MonitoringService/Driver/LocalExec.py b/Monitoring/MonitoringService/Driver/LocalExec.py
new file mode 100644
index 0000000..2cf4f8f
--- /dev/null
+++ b/Monitoring/MonitoringService/Driver/LocalExec.py
@@ -0,0 +1,56 @@
+'''
+Created on Feb 4, 2013
+
+@author: steger, jozsef
+@organization: ELTE
+@contact: steger@complex.elte.hu
+'''
+
+#TODO: nested command execution is not working properly: e.g.: echo `hostname`
+
+from Driver import Driver
+from subprocess import Popen, PIPE
+
+class LocalExec(Driver):
+ '''
+ @summary: implements a driver to execute local commands
+ @ivar command: the default command
+ @type command: str
+ @ivar p: the process api
+ @type p: subprocess.Popen or None
+ '''
+
+ def __init__(self, command = "echo -n helloworld"):
+ '''
+ @summary: save a default command
+ @param command: the default command
+ @type command: str
+ '''
+ self.command = command
+ self.p = None
+
+ def __del__(self):
+ '''
+ @summary: an implicit deletion of the driver triggers a kill signal on a running process
+ '''
+ if self.p:
+ self.p.kill()
+ self.p = None
+
+ def execute(self, command = None):
+ '''
+ @summary: executes a local command
+ @param command: the command to run, if None, the default command is issued
+ @type command: str or None
+ @return: the standard output of the command run
+ @rtype: str
+ '''
+ if command is None:
+ command = self.command
+ self.p = Popen(args = command.split(' '), stdout = PIPE, stderr = PIPE)
+ stout, sterr = self.p.communicate()
+ self.p = None
+ self.logger.debug("executed '%s'" % (command))
+ if len(sterr):
+ self.logger.warning("execution '%s' failed: %s" % (command, sterr))
+ return stout
diff --git a/Monitoring/MonitoringService/Driver/REST.py b/Monitoring/MonitoringService/Driver/REST.py
new file mode 100644
index 0000000..bf4fc2f
--- /dev/null
+++ b/Monitoring/MonitoringService/Driver/REST.py
@@ -0,0 +1,63 @@
+'''
+Created on Feb 4, 2013
+
+@author: steger, jozsef
+@organization: ELTE
+@contact: steger@complex.elte.hu
+'''
+
+from Driver import Driver
+from Credential.credentialtypes import UsernamePassword
+from httplib2 import Http
+
+class RESTDriver(Driver):
+ '''
+ @summary: implements REST driver to fetch using http GET
+ @cvar timeout: timeout of connection
+ @type timeout: float
+ @cvar cache: a cache directory
+ @type cache: str
+ @ivar url: a default document locator to be reused
+ @type url: str
+ @ivar proxy: an interface to the http server
+ @type proxy: httplib2.Http
+ '''
+ timeout = 10
+ cache = "/tmp/.cache"
+
+ def __init__(self, url, credential = None, validate_ssl = False):
+ '''
+ @summary: initializes a proxy to the http service and saves a default document locator
+ @param url: the default document locator
+ @type url: str
+ @param credential: an authentication secret
+ @type credential: L{Credential} or None
+ @param validate_ssl: whether to apply strick certificate validation, default is False
+ @type validate_ssl: bool
+ '''
+ self.url = url
+ self.proxy = Http(cache = self.cache, timeout = self.timeout)
+ self.proxy.disable_ssl_certificate_validation = not validate_ssl
+ if isinstance(credential, UsernamePassword):
+ # use password authentication
+ self.proxy.add_credentials(credential.username, credential.password)
+
+ def fetch(self, url = None):
+ '''
+ @summary: retrieve the document
+ @param url: the document locator, if not present the default is used
+ @type url: str or None
+ @return: the remote document
+ @rtype: str or None
+ @note: if the remote content cached is not changed, None is returned
+ '''
+ if url is None:
+ url = self.url
+ status, response = self.proxy.request(uri = url, method = "GET")
+ if status.status == 200:
+ return response
+ if status.status == 304:
+ self.logger.warning("remote content @ %s was not changed" % url)
+ return None
+ self.logger.error("%s -- retrieving @%s failed: %s" % (status, url, response))
+ return None
diff --git a/Monitoring/MonitoringService/Driver/SOAPClient.py b/Monitoring/MonitoringService/Driver/SOAPClient.py
new file mode 100644
index 0000000..7de5b70
--- /dev/null
+++ b/Monitoring/MonitoringService/Driver/SOAPClient.py
@@ -0,0 +1,22 @@
+'''
+Created on Sep 2, 2011
+
+@author: laki, sandor
+@organization: ELTE
+@contact: laki@complex.elte.hu
+@author: steger, jozsef
+'''
+
+#TODO: catch exception using the .service attribute and log it in the Driver log
+from suds import transport, client, wsse
+from Driver import Driver
+
+class SOAPClient(Driver, client.Client):
+ '''
+ @summary: implements SOAP driver to access remote procedures
+ '''
+ pass
+
+SOAPSecurity=wsse.Security
+SOAPUsernameToken=wsse.UsernameToken
+SOAPHttpAuthenticated=transport.http.HttpAuthenticated
diff --git a/Monitoring/MonitoringService/Driver/SshExec.py b/Monitoring/MonitoringService/Driver/SshExec.py
new file mode 100644
index 0000000..454253c
--- /dev/null
+++ b/Monitoring/MonitoringService/Driver/SshExec.py
@@ -0,0 +1,170 @@
+'''
+Created on Jul 18, 2011
+
+@author: steger, jozsef
+@organization: ELTE
+@contact: steger@complex.elte.hu
+'''
+
+from Driver import Driver, DriverError
+from paramiko import SSHClient, RSAKey, SSHException, AutoAddPolicy
+from Credential.credentialtypes import UsernamePassword, UsernameRSAKey
+from time import sleep
+import socket
+import logging
+
+class SshDriver(Driver):
+ '''
+ @summary: implements a driver to build an SSH connection to a server using paramiko package.
+ @cvar timeout: a timeout to set up the connection
+ @type timeout: float
+ @cvar trials: how many times to retry if a time out event occurs or if the remote server is busy to respond
+ @type trials: int
+ @cvar wait: how long to wait between two trials
+ @type wait: .2
+ @ivar client: the api of the ssh client
+ @type client: paramiko.SSHClient or None
+ @ivar host: the host name of the remote ssh server
+ @type host: str or None
+ @ivar isconnected: indicates whether the connection is set up
+ @type isconnected: bool
+ '''
+ timeout = 5
+ trials = 3
+ wait = .2
+
+ def __init__(self):
+ '''
+ @summary: bind the paramiko loggers to the "NOVI.DRIVER"
+ '''
+ self.client = None
+ self.host = None
+ #bind paramiko.transport logger to the same handlers used by NOVI.DRIVER
+ l = logging.getLogger("paramiko.transport")
+ for hlr in self.logger.handlers:
+ l.addHandler(hlr)
+
+ def __del__(self):
+ '''
+ @summary: close connection upon an implicit deletion of the driver
+ '''
+ self.close()
+
+ def connect(self, host, credential, port = 22, known_host = None):
+ '''
+ @summary: set up the connection
+ @param host: the host name of the remote server
+ @type host: str
+ @param credential: the secret to use for connection set up
+ @type credential: L{Credential}
+ @param port: the port remote ssh server is listening
+ @type port: int
+ @param known_host: a file name containing host signatures to check, if None AutoAddPolicy applies
+ @type known_host: str
+ '''
+ self.client = SSHClient()
+ if known_host is None:
+ self.client.set_missing_host_key_policy( AutoAddPolicy() )
+ else:
+ self.client.load_host_keys(filename = known_host)
+ if isinstance(credential, UsernamePassword):
+ # use password authentication
+ self.client.connect(hostname = host, port = port,
+ username = credential.username, password =credential.password,
+ timeout = self.timeout, look_for_keys = False, compress = True)
+ elif isinstance(credential, UsernameRSAKey):
+ # use the RSA key
+ if credential.password:
+ pw = credential.password
+ else:
+ pw = None
+ key = RSAKey(password = pw, filename = credential.rsakey)
+ n = self.trials
+ while n:
+ try:
+ self.client.connect(hostname = host, port = port,
+ username = credential.username, pkey = key,
+ timeout = self.timeout, look_for_keys = False, compress = True)
+ break
+ except SSHException, e:
+ if e.message.startswith("Error reading SSH protocol banner"):
+ n -= 1
+ self.logger.warn("retry %d times to connect @%s in %f seconds" % (n, host, self.wait))
+ sleep(self.wait)
+ else:
+ self.logger.error("cannot connect @%s" % (host))
+ raise DriverError("Cannot connect @%s " % host)
+ except socket.timeout:
+ n -= 1
+ self.logger.warn("time out, retry %d times to connect @%s in %f seconds" % (n, host, self.wait))
+ sleep(self.wait)
+ if not self.isConnected:
+ self.close()
+ self.logger.error("cannot connect @%s" % (host))
+ raise DriverError("Cannot connect @%s " % host)
+ self.host = host
+ self.logger.info("ssh connected @ %s:%d" % (self.host, port))
+
+ def close(self):
+ '''
+ @summary: closes the ssh connection
+ '''
+ try:
+ self.client.close()
+ self.logger.info("ssh disconnected @ %s" % (self.host))
+ except:
+ pass
+ finally:
+ self.client = None
+ self.host = None
+
+ @property
+ def isConnected(self):
+ try:
+ return self.client.get_transport().is_active()
+ except:
+ return False
+
+class SshExec(SshDriver):
+ '''
+ @summary: an extension of the L{SshDriver} to execute commands on the remote machine
+ @ivar command: the string representation of the command to run
+ @type command: str
+ '''
+
+ def __init__(self, host, credential, port = 22, command = "echo helloworld @ `hostname`", known_host = None):
+ '''
+ @summary: initializes an ssh connection and stores a default command
+ @param host: the host name of the remote server
+ @type host: str
+ @param credential: the secret to use for connection set up
+ @type credential: L{Credential}
+ @param port: the port remote ssh server is listening
+ @type port: int
+ @param command: the default remote command
+ @type command: str
+ @param known_host: a file name containing host signatures to check, if None AutoAddPolicy applies
+ @type known_host: str
+ '''
+ SshDriver.__init__(self)
+ self.connect(host, credential, port, known_host)
+ self.command = command
+
+ def execute(self, command = None):
+ '''
+ @summary: executes a remote command
+ @param command: the command to run, if None, the default command is issued
+ @type command: str or None
+ @return: the standard output of the command run
+ @rtype: paramico.ChannelFile
+ '''
+ if not self.isConnected:
+ raise DriverError("Not connected")
+ if command is None:
+ command = self.command
+ _, stout, sterr = self.client.exec_command(command = command)
+ e = sterr.read()
+ self.logger.debug("executed @%s '%s'" % (self.host, command))
+ if len(e):
+ self.logger.warning("execution @%s '%s' failed: %s" % (self.host, command, e))
+ return stout
diff --git a/Monitoring/MonitoringService/Driver/SshTunnel.py b/Monitoring/MonitoringService/Driver/SshTunnel.py
new file mode 100644
index 0000000..f2b7d39
--- /dev/null
+++ b/Monitoring/MonitoringService/Driver/SshTunnel.py
@@ -0,0 +1,178 @@
+'''
+Created on Jan 14, 2013
+
+@author: steger
+'''
+
+import select
+import SocketServer
+from Driver import Driver
+from SshExec import SshDriver
+from threading import Thread
+from SshExec import SshExec
+
+class SshTunnel(SshDriver):
+ '''
+ @summary: this class extends L{SshDriver} and establishes a connection
+ to the requested SSH server and sets up local port
+ forwarding (the openssh -L option) from a local port through a tunneled
+ connection to a destination reachable from the SSH server machine.
+ @ivar t: the thread container
+ @type t: threading.Thread or None
+ '''
+
+ class ForwardServer (SocketServer.ThreadingTCPServer):
+ daemon_threads = True
+ allow_reuse_address = True
+
+ class Handler (SocketServer.BaseRequestHandler):
+ def handle(self):
+ try:
+ chan = self.ssh_transport.open_channel('direct-tcpip',
+ (self.chain_host, self.chain_port),
+ self.request.getpeername())
+ except Exception, e:
+ Driver.logger.debug('Incoming request to %s:%d failed: %s' % (self.chain_host,
+ self.chain_port,
+ repr(e)))
+ return
+ if chan is None:
+ Driver.logger.debug('Incoming request to %s:%d was rejected by the SSH server.' %
+ (self.chain_host, self.chain_port))
+ return
+
+ Driver.logger.debug('Tunnel open %r -> %r -> %r' % (self.request.getpeername(),
+ chan.getpeername(), (self.chain_host, self.chain_port)))
+ while True:
+ r, _, _ = select.select([self.request, chan], [], [])
+ if self.request in r:
+ data = self.request.recv(1024)
+ if len(data) == 0:
+ break
+ chan.send(data)
+ if chan in r:
+ data = chan.recv(1024)
+ if len(data) == 0:
+ break
+ self.request.send(data)
+ chan.close()
+ self.request.close()
+ Driver.logger.debug('Tunnel closed from %r' % (self.request.getpeername(),))
+
+ def __init__(self):
+ '''
+ @summary: allocates thread pointer container
+ '''
+ SshDriver.__init__(self)
+ self.t = None
+
+ def connect(self, host, credential, localport, port, remoteserver, remoteport, known_host = None):
+ '''
+ @summary: set up the tunnel connection
+ @param host: the host name of the remote server acting a port forwarder
+ @type host: str
+ @param credential: the secret to use for connection set up
+ @type credential: L{Credential}
+ @param localport: the local port entry mapped to the remoteport
+ @type localport: int
+ @param port: the port of the forwarder ssh server
+ @type port: int
+ @param remoteserver: the sink of the tunnel
+ @type remoteserver: str
+ @param remoteport: the port of the tunnel sink
+ @type remoteport: int
+ @param known_host: a file name containing host signatures to check, if None AutoAddPolicy applies
+ @type known_host: str
+ '''
+ SshDriver.connect(self, host, credential, port, known_host)
+ self.logger.info('Now forwarding port %d to %s:%d ...' % (localport, remoteserver, remoteport))
+ self.t = Thread(target = self._tran, kwargs = {'localport': localport, 'remoteserver': remoteserver, 'remoteport': remoteport})
+ self.t.daemon = True
+ self.t.start()
+
+ def _tran(self, localport, remoteserver, remoteport):
+ '''
+ @summary: thread worker to transport data over the tunnel
+ @param localport: the local port entry mapped to the remoteport
+ @type localport: int
+ @param remoteserver: the sink of the tunnel
+ @type remoteserver: str
+ @param remoteport: the port of the tunnel sink
+ @type remoteport: int
+ '''
+ try:
+ # this is a little convoluted, but lets me configure things for the Handler
+ # object. (SocketServer doesn't give Handlers any way to access the outer
+ # server normally.)
+ class SubHander (self.Handler):
+ chain_host = remoteserver
+ chain_port = remoteport
+ ssh_transport = self.client.get_transport()
+ self.service = self.ForwardServer(('', localport), SubHander)
+ self.service.serve_forever()
+ except KeyboardInterrupt:
+ self.logger.debug('C-c: Port forwarding stopped.')
+ self.close()
+
+ def close(self):
+ '''
+ @summary: stops the thread and tears down the tunnel
+ '''
+ if self.t is None:
+ return
+ self.t.join(timeout = self.timeout)
+ self.t = None
+ self.service.shutdown()
+ self.logger.info('Port forwarding stopped @ %s.' % self.host)
+ SshDriver.close(self)
+
+
+class SshExecTunnel(SshTunnel):
+ '''
+ @summary: an extension of the L{SshTunnel} driver to execute commands
+ on the remote machine accessed via the tunnel
+ @ivar command: the string representation of the command to run
+ @type command: str
+ @ivar localdriver: the representation of an ssh client connecting over an existing ssh tunnel
+ @type localdriver: L{SshExec}
+ '''
+
+ def __init__(self, host, credential, localport, port, remoteserver, remoteport, remotecredential = None, command = "echo helloworld @ `hostname`", known_host = None):
+ '''
+ @summary: initializes an ssh connection and stores a default command
+ @param host: the host name of the remote server acting a port forwarder
+ @type host: str
+ @param credential: the secret to use for tunnel set up
+ @type credential: L{Credential}
+ @param localport: the local port entry mapped to the remoteport
+ @type localport: int
+ @param port: the port of the forwarder ssh server
+ @type port: int
+ @param remoteserver: the sink of the tunnel
+ @type remoteserver: str
+ @param remoteport: the port of the tunnel sink
+ @type remoteport: int
+ @param remotecredential: the secret to use for connection set up, if None then we fall back to the credential
+ @type remotecredential: L{Credential} or None
+ @param command: the default remote command
+ @type command: str
+ @param known_host: a file name containing host signatures to check, if None AutoAddPolicy applies
+ @type known_host: str
+ '''
+ SshTunnel.__init__(self)
+ self.connect(host, credential, localport, port, remoteserver, remoteport, known_host)
+ self.command = command
+ if remotecredential is None:
+ remotecredential = credential
+ self.localdriver = SshExec(host = 'localhost', credential = remotecredential, port = localport, command = command, known_host = None)
+ self.logger.info("connected over tunnel")
+
+ def execute(self, command = None):
+ '''
+ @summary: executes a remote command
+ @param command: the command to run, if None, the default command is issued
+ @type command: str or None
+ @return: the standard output of the command run
+ @rtype: paramico.ChannelFile
+ '''
+ return self.localdriver.execute(command)
diff --git a/Monitoring/MonitoringService/Driver/__init__.py b/Monitoring/MonitoringService/Driver/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/Monitoring/MonitoringService/Driver/__init__.py
diff --git a/Monitoring/MonitoringService/Driver/test.py b/Monitoring/MonitoringService/Driver/test.py
new file mode 100644
index 0000000..4395750
--- /dev/null
+++ b/Monitoring/MonitoringService/Driver/test.py
@@ -0,0 +1,153 @@
+'''
+Created on Aug 10, 2011
+
+@author: steger
+'''
+import unittest
+from random import randint
+from Example.Resources import PLnodes
+from threading import Thread
+from Example.credentials import noviCredential, sonomaCredential
+from SshExec import SshExec
+from SOAPClient import SOAPClient
+from LocalExec import LocalExec
+from logging import FileHandler
+import logging
+from time import sleep
+from SshTunnel import SshExecTunnel
+
+class Test(unittest.TestCase):
+ testnodes = map(lambda x: x.get_ipaddress("eth0")[0], PLnodes)
+ cred_novi = noviCredential
+ url_sonoma = "http://complex.elte.hu/~steger/sonoma/user.wsdl"
+ cred_sonoma = sonomaCredential
+
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def gettestnode(self):
+ '''
+ @summary: Return a test node IP address chosen random
+ @return: ip address
+ @rtype: string
+ '''
+ return self.testnodes[randint(1, len(self.testnodes))-1]
+
+ def test_helloworld(self):
+ '''
+ @summary: Execute local command
+ '''
+ proc = LocalExec()
+ result = proc.execute()
+ self.assertTrue(result.startswith("helloworld"), "Local command output differs from expected")
+
+ def test_helloworldWithMaster(self):
+ '''
+ @summary: Execute remote command in the name of the testuser authenticated with the master key
+ '''
+ proc = SshExec(host = self.gettestnode(), credential = self.cred_novi)
+ result = proc.execute().read()
+ self.assertTrue(result.startswith("helloworld @ "), "Remote command output differs from expected")
+
+ def echoWithMaster(self, address):
+ '''
+ @summary: Execute remote echo command in the name of the testuser authenticated with the master key
+ @param address: ip address of the remote machine
+ @type address: string
+ '''
+ try:
+ n = randint(0, 10000)
+ command = "echo %d" % n
+ proc = SshExec(host = address, credential = self.cred_novi, command = command)
+ result = proc.execute().read()
+ self.assertTrue(result.strip() == str(n), "Remote command @%s output differs from expected: (%s != %d)" % (address, result, n))
+ except Exception, e:
+ self.assertFalse(True, "Got an error %s" % e)
+
+ def test_echoWithMaster(self):
+ '''
+ @summary: Execute remote echo command in the name of the testuser authenticated with the master key
+ '''
+ self.echoWithMaster(self.gettestnode())
+
+ def test_distributedEcho(self):
+ '''
+ @summary: Execute parallel remote echo commands in a distributed fashion
+ '''
+ threads = []
+ for n in self.testnodes:
+ t = Thread(target = self.echoWithMaster, args = (n,))
+ t.daemon = True
+ t.start()
+ threads.append(t)
+ while len(threads):
+ t = threads.pop()
+ t.join(5)
+
+ def test_parallelEcho(self):
+ '''
+ @summary: Execute parallel remote echo commands in a test node
+ '''
+ N = 20
+ n = self.gettestnode()
+ threads = []
+ while N:
+ N -= 1
+ t = Thread(target = self.echoWithMaster, args = (n,))
+ t.daemon = True
+ t.start()
+ threads.append(t)
+ while len(threads):
+ t = threads.pop()
+ t.join(5)
+
+ def test_stress(self):
+ '''
+ @summary: Consecutively execute parallel remote echo commands in a distributed fashion
+ '''
+ threads = []
+ for n in self.testnodes:
+ N = randint(5, 20)
+ while N:
+ N -= 1
+ t = Thread(target = self.echoWithMaster, args = (n,))
+ t.daemon = True
+ t.start()
+ threads.append(t)
+ while len(threads):
+ t = threads.pop()
+ t.join(5)
+
+ def test_soap(self):
+ '''
+ @summary: Run SONoMA getNodeList
+ '''
+ client = SOAPClient(self.url_sonoma)
+ resources = client.service.getNodeList(filter = "AVAILABLE")
+ self.assertGreater(len(resources), 0, "sonoma reports no nodes")
+
+ def test_tunnel(self):
+ T = SshExecTunnel(host = 'novilab.elte.hu', credential = noviCredential, localport = 4000, port = 22,
+ remoteserver = 'smilax1.man.poznan.pl', remoteport = 22)
+ response = T.execute().read().strip()
+ expected = "helloworld @ smilax1.man.poznan.pl"
+ self.assertEqual(response, expected, "Remote command output differs from expected %s != %s" % (expected, response))
+
+if __name__ == "__main__":
+ #import sys;sys.argv = ['', 'Test.test_helloworldWithMaster']
+ fn = "/tmp/Driver_test.log"
+ hdlr = FileHandler(filename = fn, mode = 'w')
+ l = logging.getLogger("NOVI.DRIVER")
+ l.setLevel(level = logging.DEBUG)
+# l.setLevel(level = logging.INFO)
+ l.addHandler(hdlr = hdlr)
+ l.info("START TEST")
+ try:
+ unittest.main()
+ finally:
+ l.info("FINISHED TEST")
+ hdlr.close()