From 2f2a3a129c91de540e66c3bfbe30b0df1942cd4b Mon Sep 17 00:00:00 2001 From: pikusa Date: Wed, 03 Apr 2013 13:18:17 +0000 Subject: project commit and dir tree change --- (limited to 'Monitoring/MonitoringService/Service') diff --git a/Monitoring/MonitoringService/Service/MonitoringService.py b/Monitoring/MonitoringService/Service/MonitoringService.py new file mode 100644 index 0000000..9b1f87d --- /dev/null +++ b/Monitoring/MonitoringService/Service/MonitoringService.py @@ -0,0 +1,342 @@ +''' +Created on Mar 22, 2012 + +@author: steger +''' +from time import sleep +from Semantics.InformationModel import Ontology +from Semantics.UnitModel import UnitModel +from Semantics.TaskModel import TaskModel +from Semantics.QueryInterpreter import QueryInterpreter +from Task.Task import SubtaskManager, TaskError, STRAT_PERIODICAL,\ + STRAT_ONDEMAND +from DataProcessing.Parameter import ParameterList +from Resource.node import node +from DataProcessing.AggregatorManager import AggregatorManager +from paramiko.ssh_exception import BadAuthenticationType +import logging +from StringIO import StringIO +from DataProcessing.DataFormatter import JsonFormatter +from DataProcessing.DataHeaderCell import CellRequestByFeature,\ + CellRequestByName +from DataProcessing.DataError import SamplerError +from Example.model import owl_unit, owl_param, owl_features, owl_task, owl_query,\ + owl_stat, owl_core +from Service.WatchdogManager import WatchdogManager + +ontology = { + 'unit': (owl_unit, "http://fp7-novi.eu/unit.owl#"), + 'param': (owl_param, "http://fp7-novi.eu/monitoring_parameter.owl#"), + 'feature': (owl_features, "http://fp7-novi.eu/monitoring_features.owl#"), + 'task': (owl_task, "http://fp7-novi.eu/monitoring_task.owl#"), + 'query': (owl_query, "http://fp7-novi.eu/monitoring_query.owl#"), + 'stat': (owl_stat, 'http://fp7-novi.eu/monitoring_stat.owl#'), + 'core': (owl_core, "http://fp7-novi.eu/im.owl#"), +} + + + + +class MonitoringService(object): + ''' + classdocs + ''' + version = "0.0" + + def __str__(self): + return "NOVI Monitoring Service v%s @ %s" % (self.version, self.platform) + + @property + def platform(self): + return self._if.platform + + def __init__(self, interface, config_owl): + ''' + @summary: constructor + @param interface: + @type interface: MSInterface + @param config_owl: platform specific configuration model + @type config_owl: str + ''' + self._if = interface + self.logger = logging.getLogger(name = "NOVI.MS.%s" % self.platform) + self.log = self._if.log # to be removed + self.ontology = Ontology() + for prefix, (owl_url, ns) in ontology.iteritems(): + if owl_url is None: + continue + self.ontology.load(prefix, owl_url, ns) + self.ontology.load('config', config_owl, 'http://fp7-novi.eu/config.owl#') + self.unitmodel = UnitModel(self.ontology) + self.stm = SubtaskManager(self.um) + self.am = AggregatorManager() + self.wm = WatchdogManager(self.am) + + self.taskmodel = TaskModel(self.dm, self.um, self.ontology) + self.QI = QueryInterpreter(self.taskmodel) + + self._nextID = 0 + self.subtaskIDs = {} + self.aggregatorIDs = {} + self.watchdogIDs = {} + self.formatters = {} + + @property + def pm(self): return self.unitmodel.pm + + @property + def um(self): return self.unitmodel.um + + @property + def dm(self): return self.unitmodel.dm + + + def newProcessID(self): + try: + return "%s:process:%d" % (self.platform, self._nextID) + finally: + self._nextID += 1 + + def newAggregateID(self): + try: + return "%s:aggregate:%d" % (self.platform, self._nextID) + finally: + self._nextID += 1 + + def newWatchdogID(self): + try: + return "%s:watchdog:%d" % (self.platform, self._nextID) + finally: + self._nextID += 1 + + + def measure(self, credential, query): + #TODO: docs + ''' + ''' + g = self.ontology.g + sio = StringIO(query) + g.parse(source = sio) + responses = [] + errors = [] + queries = self.QI.inferBundleQueries(qgraph = g) + self.log(shortmsg = "measurements starting...", message = "Attempt to launch %d measurement threads" % len(queries)) + for q in queries: + feature_uri = q.feature + domain = self.ontology.ns('task')['Substrate'] + taskgen = self.taskmodel.inferTasks(domain, feature_uri) + no_tool = True + (resource_uri, resource) = q.resource + #we are ugly here: use the first tool + for task_uri, _ in taskgen: + print task_uri + no_tool = False + _, task = self.newTask(task = task_uri, cred = credential, resource = resource, parameters = q.paramlist) + if q.samplechain: + task.strategy = STRAT_PERIODICAL + task.enable() + # we apply some aggregation to the data + flow = [] + for skeleton, parlist in q.samplechain: + flow.append((skeleton, parlist.formkeyvaldict())) + _, A = self.am.newAggregator(task.datasource, CellRequestByFeature(feature = q.feature), flow) + formatter = JsonFormatter(datasource = A) + while True: + try: + task.dataAdded.wait( 15 ) + responses.append( formatter.serialize() ) + break + except SamplerError: + task.dataAdded.clear() + sleep(1) + else: + task.strategy = STRAT_ONDEMAND + task.enable() + task.dataAdded.wait( 15 ) + formatter = JsonFormatter(datasource = task.datasource) + formatter.reader.extract(cellrequest = [ + CellRequestByName(name = "Run"), + CellRequestByFeature(feature = feature_uri) + ]) + responses.append( formatter.serialize() ) + task.destroy() + if no_tool: + err_description = "No tools to measure %s @ %s" % (feature_uri, resource_uri) + errors.append(err_description) + self.log(shortmsg = "Limited result set", message = err_description) + useful_data = ",\n".join( responses ) + error_data = ",\n".join(errors) + if len(errors): + if len(useful_data): + response = "[%s,\n{\"errors\" : \"%s\"}]" % (useful_data, error_data) + else: + response = "[{\"errors\" : \"%s\"}]" % (error_data) + else: + response = "[%s]" % useful_data + return response + + def launchTasks(self, credential, query): + #TODO: many things in common with measure!!! + g = self.ontology.g + sio = StringIO(query) + g.parse(source = sio) + taskID = self.newProcessID() + idstore = self.subtaskIDs[taskID] = [] + formatters = self.formatters[taskID] = [] + for q in self.QI.inferBundleQueries(qgraph = g): + feature_uri = q.feature + domain = self.ontology.ns('task')['Slice'] + taskgen = self.taskmodel.inferTasks(domain, feature_uri) + #we are ugly here: use the first tool + for task_uri, _ in taskgen: + subtaskID, task = self.newTask(task = task_uri, cred = credential, resource = q.resource[1], parameters = q.paramlist) + task.strategy = STRAT_PERIODICAL + task.enable() + idstore.append(subtaskID) + f = q.formatter(datasource = task.datasource) + formatters.append(f) + if len(idstore): + print "KONYVELT", taskID + return taskID + else: + self.subtaskIDs.pop(taskID) + self.formatters.pop(taskID) + return None + + def newTask(self, task, cred, resource = None, parameters = ParameterList()): + ''' + @summary: initialize a Task object, which is referenced by a uri + @param task: the reference to the task description + @type task: URIRef + @param cred: an iterable over dictionaries, which are used as input parameters to initialize Credential templates passed to the Task object for authentication, authorization purposes + @type cred: dict generator + @param resource: the resource to measure + @type resource: resource or None + @param parameters: the parameter list to refresh the default parameters of the Task object + @type parameters: ParameterList + @return: the tuple of taskID and the initialized measurement Task object + @rtype: int, Task + ''' + name = self.ontology._tail(task) + credset = self.taskmodel.inferCredentialOf(task) + driver = self.taskmodel.inferDriverOf(task) + hdr = self.taskmodel.inferDataheaderOf(task) + hooks = self.taskmodel.inferHookdefinitionsOf(task) + hookpar = self.taskmodel.inferHookparametersOf(task) + taskparameters = self.taskmodel.inferParametersOf(task) + + taskparameters.update_by_list(parameters) + + #TODO: maybe better push resource to the Task as an argument + if isinstance(resource, node): + addr, unit = resource.get_ipaddress("eth0") + taskparameters.update("SourceAddress", addr, unit) + else: + print "EEEEE unhandled resource", resource +# print taskparameters + + while len(credset): + ct = credset.pop() + for c in cred: + try: + credential = ct(**c) + except: + # credential mismatch go on with the next + continue + try: + return self.stm.generate(name = name, driver = driver, dataheader = hdr, + hookimplementations = hooks, parameters = taskparameters, credential = credential, **hookpar) + except BadAuthenticationType: + pass + raise TaskError("Cannot initialize the Task with the credential set provided for %s" % name) + + def delTask(self, taskidentifier): + self.stm.pop( taskidentifier ) + + def getTask(self, taskidentifier): + return self.stm[ taskidentifier ] + + def attachAggregators(self, credential, query): + g = self.ontology.g + sio = StringIO(query) + g.parse(source = sio) + aggregatorID = self.newAggregateID() + idstore = self.aggregatorIDs[aggregatorID] = [] + formatters = self.formatters[aggregatorID] = [] + + for q in self.QI.inferSampleManipulationQueries(qgraph = g): + _, sourcetype, _ = q.sourceid.split(':') + if sourcetype == 'process': + #FIXME: csak egy subprocessre muxik perpill + sourceid = self.subtaskIDs[q.sourceid][0] + + ds = self.stm[ sourceid ].datasource + elif sourcetype == 'aggregate': + sourceid = self.aggregatorIDs[q.sourceid][0] + + ds = self.am[ sourceid ] + else: + raise Exception("Unknown source type %s" % sourcetype) + cr = CellRequestByFeature(feature = q.feature) + aggID, A = self.am.newAggregator(dataSource = ds, cellrequest = cr, commandflow = q.samplechain) + idstore.append(aggID) + f = q.formatter(datasource = A) + formatters.append(f) + if len(idstore): + return aggregatorID + else: + self.aggregatorIDs.pop(aggregatorID) + self.formatters.pop(aggregatorID) + return None + + def delAggregator(self, aggregatoridentifier): + self.am.pop( aggregatoridentifier ) + + def getAggregator(self, aggregatoridentifier): + return self.am[ aggregatoridentifier ] + + def attachWatchdogs(self, credential, query): + g = self.ontology.g + sio = StringIO(query) + g.parse(source = sio) + watchdogID = self.newWatchdogID() + idstore = self.watchdogIDs[watchdogID] = [] + + for q in self.QI.inferConditionQueries(qgraph = g): + _, sourcetype, _ = q.sourceid.split(':') + if sourcetype == 'process': + #FIXME: csak egy subprocessre muxik perpill + sourceid = self.subtaskIDs[q.sourceid][0] + + ds = self.stm[ sourceid ].datasource + elif sourcetype == 'aggregate': + sourceid = self.aggregatorIDs[q.sourceid][0] + + ds = self.am[ sourceid ] + else: + raise Exception("Unknown source type %s" % sourcetype) + + #ITT A TENNIVALO + cr = CellRequestByFeature(feature = q.feature) + watchID, _ = self.wm.newConditional(dataSource = ds, cellrequest = cr, conditiontype = q.conditiontype, operation = q.operation) + + + + + idstore.append(watchID) + if len(idstore): + return watchdogID + else: + self.watchdogIDs.pop(watchdogID) + return None + + def delWatchdog(self, watchdogidentifier): + self.wm.pop( watchdogidentifier ) + + + def checkWatchdog(self, watchdogidentifier): + resp = [] + for watchID in self.watchdogIDs[watchdogidentifier]: + WD = self.wm[watchID] + resp.append("\'%s\': %s" % (WD.name, WD.value)) + return "{\n\t%s\n}" % ",\n\t".join(resp) diff --git a/Monitoring/MonitoringService/Service/WatchdogManager.py b/Monitoring/MonitoringService/Service/WatchdogManager.py new file mode 100644 index 0000000..e40f1c3 --- /dev/null +++ b/Monitoring/MonitoringService/Service/WatchdogManager.py @@ -0,0 +1,46 @@ +''' +Created on Dec 10, 2012 + +@author: steger +''' +from DataProcessing.LinearCombination import LinearCombination +from DataProcessing.Bool import IsPositive, IsNotPositive, IsNegative,\ + IsNotNegative + +class WatchdogManager(object): + def __init__(self, am): + self._id = 0; + self._conditionals = {} + self.am = am + self._dep = {} + + def newConditional(self, dataSource, cellrequest, conditiontype, operation): + deps = [] + if conditiontype in [ IsPositive, IsNegative, IsNotNegative, IsNotPositive ]: + lincomb = LinearCombination() + for factor, commandflow in operation: + Aid, A = self.am.newAggregator(dataSource, cellrequest, commandflow) + lincomb.addTerm(factor, A) + deps.append(Aid) + DS = conditiontype(lincomb) + self._id += 1 + self._conditionals[ self._id ] = DS + self._dep[ self._id ] = deps[:] + return self._id, DS + + def __getitem__(self, watchdogid): + try: + return self._conditionals[ watchdogid ] + except: + raise Exception("Watchdog with id %s not found" % watchdogid) + + def pop(self, watchdogid): + try: + self._conditionals.pop( watchdogid ) + deps = self._dep.pop(watchdogid) + while len(deps): + Aid = deps.pop() + self.am.pop(Aid) + except KeyError: + print "WW: Watchdog with id %s not found" % watchdogid + \ No newline at end of file diff --git a/Monitoring/MonitoringService/Service/__init__.py b/Monitoring/MonitoringService/Service/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Monitoring/MonitoringService/Service/__init__.py diff --git a/Monitoring/MonitoringService/Service/interface.py b/Monitoring/MonitoringService/Service/interface.py new file mode 100644 index 0000000..2ba917d --- /dev/null +++ b/Monitoring/MonitoringService/Service/interface.py @@ -0,0 +1,344 @@ +''' +Created on 08.08.2011 + +@author: steger, jozsef +''' +from Service.MonitoringService import MonitoringService +import logging + +class InterfaceError(Exception): + pass + +#TODO: add and handle bindings at this level +class MSInterface(object): + ''' + @summary: Implements a thin service layer on top of the MonitoringService instance + to collect methods that need to be exported and mapped in the NOVI API. + It also provides a reference to the framework to be able to communicate with + remote MonitoringService instances. The log() method is a place holder + to sink information to be pushed in the NOVI UserFeedback service. + The emit() method is a place holder to sink signals to be pushed in the NOVI + Policy Service component installed on top of the same platform. + ''' + + def __init__(self, framework, reference, config_owl): + ''' + Constructor + @param framework: a service which provides getService() method to look up MonSrv instances of different reference + @type framework: Framework + @param reference: the name of the platform + @type reference: str + @param config_owl: platform specific configuration model + @type config_owl: str + ''' + self._framework = framework + self.platform = reference + self._ms = MonitoringService(self, config_owl) + self.logger = logging.getLogger(name = "NOVI.MSI.%s" % reference) + + @property + def service(self): + ''' + @return: the underlying monitoring service component + @rtype: MonitoringService + ''' + return self._ms + + @property + def proxy(self): + ''' + @return: a proxy service to look up the rest of the monitoring service components + @rtype: Framework + ''' + return self._framework + + def dispatchID(self, identifier): + ''' + @summary: this method finds the MonitoringService instance that is responsible for handling an identified Task or Aggregate + @param identifier: identifier of a task or aggregate, it follows the form: :: + @type identifier: string + @return: the monitoring service instance + @rtype: MonitoringService + ''' + try: + platform, _, _ = identifier.split(':') + if self.platform == platform: + return self.service + return self.framework.getService(platform) + except ValueError: + raise InterfaceError("Wrong identifier format") + + def log(self, shortmsg, message): + # overridden by the JAVA wrapper + self.logger.info("[%s] %s" % (shortmsg, message)) + + def emit(self, what): + # overridden by the JAVA wrapper + self.framework.getPolicyService(self.platform).trigger(what) + + # Test purpose function + def echo(self, platform): + ''' + @summary: An integration tester function (to be exported public) + @param platform: name of the platform + @type platform: string + @return: messages of the platforms taking part in the message flow + @rtype: string + ''' + self.logger.info("[echo] calling %s" % platform) + otherservice = self._framework.getInterface(platform).service + return "%s -> %s" % (str(self.service), str(otherservice)) + + + # Substrate monitoring function + def measure(self, credential, query): + ''' + @summary: Method to handle substrate monitoring queries (to be exported public) + @param credential: + @type credential: + @param query: an owl document containing several BundleQuery instances + @type query: string + @return: response to the query + @rtype: string + ''' + #TODO: split query and concatenate results + return self.service.measure(credential, query) + + # Slice monitoring functions + def sliceTasks(self, credential, query): + raise InterfaceError("sliceTasks() method is not implemented") + + def addTask(self, credential, query): + ''' + @summary: Method to start slice monitoring tasks (to be exported public) + @param credential: + @type credential: + @param query: an owl document containing several BundleQuery instances + @type query: string + @return: process identifier + @rtype: string + ''' + #TODO: investigate if the service instance under this interface should be the boss + return self.service.launchTasks(credential, query) + + def describeTaskData(self, credential, query): + ''' + @summary: Method to retrieve meta data of task data (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + @return: serialize the header of the data tables + @rtype: string + ''' + taskID = query + ms = self.dispatchID(identifier = taskID) + #TODO: move this in the MonitoringService + headers = map(lambda x: x.header(), ms.formatters[taskID]) + return "[%s]" % "\n,\n".join(headers) + + def fetchTaskData(self, credential, query): + ''' + @summary: Method to retrieve task data collected since last fetch or the start (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + @return: serialize the appended content of the data tables + @rtype: string + ''' + taskID = query + ms = self.dispatchID(identifier = taskID) + #TODO: move this in the MonitoringService + response = [] + try: + for f in ms.formatters[taskID]: + response.append( f.serialize() ) + except Exception, e: + print "EEE", e + pass + return "[%s]" % "\n,\n".join(response) + + def modifyTask(self, credential, query): + raise InterfaceError("modifyTask() method is not implemented") + + def removeTask(self, credential, query): + ''' + @summary: Method to remove a slice measurement task (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + ''' + taskID = query + ms = self.dispatchID(identifier = taskID) + #TODO: move this in the MonitoringService + try: + subtaskids = ms.subtaskIDs.pop(taskID) + ms.formatters.pop(taskID) + while len(subtaskids): + subtaskid = subtaskids.pop() + ms.delTask(taskidentifier = subtaskid) + except KeyError: + # the taskID does not belong to me + pass + + def enableTask(self, credential, query): + ''' + @summary: Method to enable a slice measurement task (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + ''' + taskID = query + ms = self.dispatchID(identifier = taskID) + try: + for subtaskid in ms.subtaskIDs[taskID]: + t = ms.getTask(taskidentifier = subtaskid) + t.enable() + except KeyError: + # the taskID does not belong to me + pass + + def disableTask(self, credential, query): + ''' + @summary: Method to disable a slice measurement task temporarily (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + ''' + taskID = query + ms = self.dispatchID(identifier = taskID) + try: + for subtaskid in ms.subtaskIDs[taskID]: + t = ms.getTask(taskidentifier = subtaskid) + t.disable() + except KeyError: + # the taskID does not belong to me + pass + + def getTaskStatus(self, credential, query): + ''' + @summary: Method to check the state of a slice measurement task (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + @return: True if the tasks are running + @rtype: boolean + ''' + taskID = query + ms = self.dispatchID(identifier = taskID) + try: + for subtaskid in ms.subtaskIDs[taskID]: + t = ms.getTask(taskidentifier = subtaskid) + if t.state == t.STATE_RUNNING: + return True + except KeyError: + # the taskID does not belong to me + pass + return False + + def addAggregator(self, credential, query): + ''' + @summary: Method to define new data manipulation on slice monitoring data (to be exported public) + @param credential: + @type credential: + @param query: an owl document containing several SampleManipulationQuery instances + @type query: string + @return: aggregator identifier + @rtype: string + ''' + #TODO: investigate if the service instance under this interface should be the boss + return self.service.attachAggregators(credential, query) + + def removeAggregator(self, credential, query): + ''' + @summary: Method to remove data manipulation on slice monitoring data (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + ''' + aggregatorID = query + ms = self.dispatchID(identifier = aggregatorID) + try: + aggregatorids = ms.aggregatorIDs.pop(aggregatorID) + ms.formatters.pop(aggregatorID) + while len(aggregatorids): + aggregatorid = aggregatorids.pop() + ms.delAggregator(aggregatorid) + except KeyError: + # the aggregatorID does not belong to me + pass + + def fetchAggregatorData(self, credential, query): + ''' + @summary: Method to refresh and serialize results of data manipulation on slice monitoring data (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + @return: result of aggregators + @rtype: string + ''' + aggregatorID = query + ms = self.dispatchID(identifier = aggregatorID) + response = [] + try: + for f in ms.formatters[aggregatorID]: + f.source.process() + print "ALMA", f.source + print "ALMA", f.source.source + print "ALMA", f.source.data + response.append( f.serialize() ) + except Exception, e: + print "EEE", e + pass + return "[%s]" % "\n,\n".join(response) + + def addWatchdog(self, credential, query): + ''' + @summary: + @param credential: + @type credential: + @param query: an owl document containing several SampleManipulationQuery instances + @type query: string + @return: watchdog identifier + @rtype: string + ''' + #TODO: investigate if the service instance under this interface should be the boss + return self.service.attachWatchdogs(credential, query) + + def removeCondition(self, credential, query): + ''' + @summary: Method to remove conditions bound to slice monitoring data (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + ''' + watchdogID = query + ms = self.dispatchID(identifier = watchdogID) + try: + watchdogids = ms.watchdogIDs.pop(watchdogID) + while len(watchdogids): + watchdogid = watchdogids.pop() + ms.delWatchdog(watchdogid) + except KeyError: + # the aggregatorID does not belong to me + pass + + def checkCondition(self, credential, query): + ''' + @summary: Method to examine a conditions bound to slice monitoring data (to be exported public) + @param credential: + @type credential: + @param query: + @type query: string + ''' + watchdogID = query + ms = self.dispatchID(identifier = watchdogID) + return ms.checkWatchdog(watchdogID) diff --git a/Monitoring/MonitoringService/Service/mock_framework.py b/Monitoring/MonitoringService/Service/mock_framework.py new file mode 100644 index 0000000..8a8874b --- /dev/null +++ b/Monitoring/MonitoringService/Service/mock_framework.py @@ -0,0 +1,284 @@ +''' +Created on Nov 20, 2012 + +@author: steger +''' +from Service.interface import MSInterface +import logging +from logging.handlers import TimedRotatingFileHandler +from os import path, unlink +from flask import Flask, request +from threading import Thread +from Example.Platforms import federation +from select import select +from sys import stdin +import httplib2 +from os import urandom + +#FIXME: DIRTY! +from Example.credentials import novisaCredentialIARGS +import traceback +import sys + +class FrameworkError(Exception): + pass + +class PolicyMock(object): + def __init__(self): + fn = "/tmp/ps.log" + if path.exists(fn): + unlink(fn) + hdlr = TimedRotatingFileHandler(filename = fn) + self.logger = logging.getLogger("NOVI.PS") + self.logger.setLevel(level = logging.DEBUG) + self.logger.addHandler(hdlr = hdlr) + + def trigger(self, what): + self.logger.info(what) + +class Framework(object): + ''' + This class mimics the integration framework. It helps to look up remote Monitoring Service instances + ''' + + def __init__(self): + ''' + Constructor + ''' + self._if = {} + self._pol = PolicyMock() + + def add(self, platform, config_owl): + fn = "/tmp/ms_%s.log" % platform + if path.exists(fn): + unlink(fn) + hdlr = TimedRotatingFileHandler(filename = fn) + l = logging.getLogger("NOVI.MS.%s" % platform) + l.setLevel(level = logging.DEBUG) + l.addHandler(hdlr = hdlr) + l = logging.getLogger("NOVI.MSI.%s" % platform) + l.setLevel(level = logging.DEBUG) + l.addHandler(hdlr = hdlr) + iface = MSInterface(self, platform, config_owl) + self._if[platform] = iface + return iface + + def getInterface(self, platform): + try: + return self._if[platform] + except: + print "EE: %s platform not found" % platform + raise FrameworkError + + def getPolicyService(self, platform): + return self._pol + + def serviceList(self): + return self._if.values() + +def app_launch(plif, port): + app = Flask(__name__) + app.secret_key = urandom(24) + t = Thread(target = run, args = (app, port)) + t.start() + + # these are here for test and control + @app.route("/", methods = ['GET']) + def hello(): + return "Hello world" + + @app.route('/shutdown', methods = ['POST']) + def shutdown(): + shutdown_server() + return 'Server shutting down...' + + + # these are the real service interfaces + @app.route("/echo", methods = ['POST']) + def echo(): + platform = request.form['platform'] + try: + return plif.echo(platform) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "="*60 + return "Error %s" % e + + @app.route("/measure", methods = ['POST']) + def measure(): + q = request.form['query'] + try: + return plif.measure(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "="*60 + return "Error %s" % e + + @app.route("/addTask", methods = ['POST']) + def addTask(): + try: + q = request.form['query'] + return plif.addTask(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "-"*60 + return "Error %s" % e + + @app.route("/fetchTaskData", methods = ['POST']) + def fetchTaskData(): + q = request.form['query'] + return plif.fetchTaskData(credential = [novisaCredentialIARGS], query = q) + + @app.route("/removeTask", methods = ['POST']) + def removeTask(): + try: + q = request.form['query'] + plif.removeTask(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "Error %s" % e + return "OK" + + @app.route("/describeTaskData", methods = ['POST']) + def describeTaskData(): + q = request.form['query'] + return plif.describeTaskData(credential = [novisaCredentialIARGS], query = q) + + @app.route("/getTaskStatus", methods = ['POST']) + def getTaskStatus(): + try: + q = request.form['query'] + return str( plif.getTaskStatus(credential = [novisaCredentialIARGS], query = q) ) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "-"*60 + return "Error %s" % e + + @app.route("/enableTask", methods = ['POST']) + def enableTask(): + q = request.form['query'] + plif.enableTask(credential = [novisaCredentialIARGS], query = q) + return "OK" + + @app.route("/disableTask", methods = ['POST']) + def disableTask(): + q = request.form['query'] + plif.disableTask(credential = [novisaCredentialIARGS], query = q) + return "OK" + + @app.route("/addAggregator", methods = ['POST']) + def addAggregator(): + try: + q = request.form['query'] + return plif.addAggregator(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "-"*60 + return "Error %s" % e + + @app.route("/fetchAggregatorData", methods = ['POST']) + def fetchAggregatorData(): + q = request.form['query'] + return plif.fetchAggregatorData(credential = [novisaCredentialIARGS], query = q) + + @app.route("/removeAggregator", methods = ['POST']) + def removeAggregator(): + try: + q = request.form['query'] + plif.removeAggregator(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "Error %s" % e + return "OK" + + @app.route("/addCondition", methods = ['POST']) + def addCondition(): + try: + q = request.form['query'] + return plif.addWatchdog(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "-"*60 + return "Error %s" % e + + @app.route("/removeCondition", methods = ['POST']) + def removeCondition(): + try: + q = request.form['query'] + plif.removeCondition(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "Error %s" % e + return "OK" + + @app.route("/checkCondition", methods = ['POST']) + def checkCondition(): + try: + q = request.form['query'] + return plif.checkCondition(credential = [novisaCredentialIARGS], query = q) + except Exception, e: + print "-"*60 + traceback.print_exc(file=sys.stdout) + print "-"*60 + return "Error %s" % e + + + + return t + + +def run(app, port): + app.run(debug = False, port = port) + +def shutdown_server(): + func = request.environ.get('werkzeug.server.shutdown') + if func is None: + raise RuntimeError('Not running with the Werkzeug Server') + func() + +def emit(port): + try: + h = httplib2.Http(timeout = 10) + url = "http://localhost:%d/shutdown" % port + resp, _ = h.request(uri = url, method = "POST") + if resp.status != 200: + print "Service responded with status %s" % resp.status + except Exception, e: + print "Error contacting server @ localhost:%d: (%s)" % (port, e) + +def start_servers(): + fw = Framework() + t = [] + for platform, (port, config_owl) in federation.iteritems(): + plif = fw.add(platform, config_owl) + t.append( app_launch(plif, port) ) + return fw, t + +def stop_servers(t): + # POST the shutdown methods + for port, _ in federation.values(): + emit(port) + + # join threads + while len(t): + st = t.pop() + st.join() + +if __name__ == "__main__": + print "INIT" + # start services as separate threads + _, t = start_servers() + + # wait for a keyboard interrupt + print "PRESS ^C to stop" + while True: + try: + select([stdin],[],[]) + except KeyboardInterrupt: + break + + stop_servers(t) + print "OK" \ No newline at end of file diff --git a/Monitoring/MonitoringService/Service/test.py b/Monitoring/MonitoringService/Service/test.py new file mode 100644 index 0000000..1b6b78b --- /dev/null +++ b/Monitoring/MonitoringService/Service/test.py @@ -0,0 +1,442 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' +import unittest +from rdflib import Graph, Namespace, Literal +from Service.mock_framework import start_servers, stop_servers +import httplib2 +from Example.Platforms import federation +from urllib import urlencode +from time import sleep + +fw, t = start_servers() +from MonitoringService import ontology + +class Test(unittest.TestCase): + headers = {'Content-type': 'application/x-www-form-urlencoded'} + proxy = httplib2.Http(cache = "/tmp/ms_client_cache", timeout = 10) + + def setUp(self): + self.MSI_planetlab = fw.getInterface('PlanetLab') + self.PL_O = self.MSI_planetlab.service.ontology + NS = self.ns + self.S = NS('stat')['UnmodifiedExtractOfFeatureSamples'] + self.F = NS('query')['Formatter_JSON'] + sleep (1) + + def tearDown(self): + pass + + def test_echo(self): + p1 = "PlanetLab" + p2 = "FEDERICA" + + h = httplib2.Http(cache = "/tmp/ms_client_cache", timeout = 10) + url = "http://localhost:%d/echo" % federation[p1][0] + data = urlencode( {'platform': p2} ) + resp, response = h.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service @%s responded with status %s" % (p1, resp.status)) + i, o = response.split("->") + got = (i.split("@")[-1].strip(), o.split("@")[-1].strip()) + expect = (p1, p2) + self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response)) + + data = urlencode( {'platform': p1} ) + resp, response = h.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service @%s responded with status %s" % (p1, resp.status)) + i, o = response.split("->") + got = (i.split("@")[-1].strip(), o.split("@")[-1].strip()) + expect = (p1, p1) + self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response)) + + @staticmethod + def q_enc(q): + return urlencode( {'query': q} ) + + @property + def mns(self): + return Namespace("http://foo.bar/req.owl#") + + @property + def ns(self): + return self.PL_O.ns + + @property + def ns_type(self): + return self.ns('rdf')['type'] + + def measure(self, q, expect = 26): + url = "http://localhost:%d/measure" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.tlen(response, expect) + return response + + def addtask(self, q): + url = "http://localhost:%d/addTask" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertTrue(response.startswith('PlanetLab:process:'), "wrong process id %s" % response) + return response + + def fetchtaskdata(self, q, expect = 26): + url = "http://localhost:%d/fetchTaskData" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.tlen(response, expect) + return response + + def deltask(self, q): + url = "http://localhost:%d/removeTask" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertEqual(response, "OK", "Got: %s" % response) + + def addaggregator(self, q): + url = "http://localhost:%d/addAggregator" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertTrue(response.startswith('PlanetLab:aggregate:'), "wrong process id %s" % response) + return response + + def fetchaggregate(self, q, expect = 26): + url = "http://localhost:%d/fetchAggregatorData" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.tlen(response, expect) + return response + + def delaggregator(self, q): + url = "http://localhost:%d/removeAggregator" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertEqual(response, "OK", "Got: %s" % response) + + def addcondition(self, q): + url = "http://localhost:%d/addCondition" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertTrue(response.startswith('PlanetLab:watchdog:'), "wrong process id %s" % response) + return response + + def delcondition(self, q): + url = "http://localhost:%d/removeCondition" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertEqual(response, "OK", "Got: %s" % response) + + def checkcondition(self, q): + url = "http://localhost:%d/checkCondition" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertGreater(len(response.splitlines()), 2, "Got: %s" % response) + return response + + + def tlen(self, response, expect = 26): +# print response + self.assertTrue(response, "Got nothing due to former errors") + if expect > 1: + self.assertGreater(len(response.splitlines()), expect, "got empty measurement response") + else: + self.assertGreater(len(response), 2, "got empty measurement response") + return response + + + + def new_g(self): + g = Graph() + for k, (_, ns) in ontology.iteritems(): + g.bind(k, Namespace(ns)) + mns = self.mns + g.bind('q', mns) + + return g + + def save(self, fn, q): + try: + with open(fn, 'w') as f: + f.write(q) + with open("%s.ue" % fn, 'w') as f: + f.write( self.q_enc(q) ) + except: + pass + + def test_measure(self): + doc = "../../information-model/monitoring-model//monitoringQuery_example.owl" + with open(doc) as fp: + q = fp.read() + self.measure(q) + + def addnode(self, g, resname = 'smilax1', address = '150.254.160.19'): + mns = self.mns + NS = self.ns + TYPE = self.ns_type + R = mns[resname] + I1 = mns['ifin'] + I2 = mns['ifout'] + IPADDR = Literal(address) + ADDR = mns['%s_address' % resname] + g.add((R, TYPE, NS('core')['Node'])) + g.add((R, TYPE, NS('core')['Resource'])) + g.add((R, TYPE, NS('owl')['NamedIndividual'])) + + g.add((R, NS('core')['hasInboundInterface'], I1)) + g.add((R, NS('core')['hasOutboundInterface'], I1)) + g.add((I1, TYPE, NS('core')['Interface'])) + g.add((I2, TYPE, NS('core')['Interface'])) + g.add((I1, NS('core')['hasIPv4Address'], ADDR)) + g.add((I2, NS('core')['hasIPv4Address'], ADDR)) + g.add((ADDR, TYPE, NS('owl')['NamedIndividual'])) + g.add((ADDR, TYPE, NS('unit')['IPAddress'])) + g.add((ADDR, NS('unit')['hasValue'], IPADDR)) + return R + + def addPar(self, g, pname = 'partition', pval = '/', ptype = 'String', pdim = 'NameOfSomething'): + P = self.mns['par_%s' % pname] + NS = self.ns + TYPE = self.ns_type + g.add((P, TYPE, NS('owl')['NamedIndividual'])) + g.add((P, TYPE, NS('query')['QueryParameter'])) + g.add((P, NS('param')['paramName'], Literal(pname))) + g.add((P, NS('unit')['hasValue'], Literal(pval))) + g.add((P, NS('param')['hasType'], NS('param')[ptype])) + g.add((P, TYPE, NS('unit')[pdim])) + return P + + def bindNode(self, g, Q, R): + g.add((Q, self.ns('query')['hasResource'], R)) + + def bindPar(self, g, Q, P): + g.add((Q, self.ns('param')['hasParameter'], P)) + + def newQ(self, g, what = 'MemoryUtilization'): + Q = self.mns['measure_%s' % what] + NS = self.ns + TYPE = NS('rdf')['type'] + g.add((Q, TYPE, NS('owl')['NamedIndividual'])) + g.add((Q, TYPE, NS('query')['BundleQuery'])) + g.add((Q, NS('feature')['hasFeature'], NS('feature')[what])) + + g.add((Q, NS('stat')['hasSample'], self.S)) + g.add((Q, NS('query')['hasFormatter'], self.F)) + return Q + + def createaggregatorquery(self, pid, what = 'MemoryUtilization'): + g = self.new_g() + R = self.addnode(g) + Q = self.mns['aggr_%s' % what] + NS = self.ns + TYPE = NS('rdf')['type'] + g.add((Q, TYPE, NS('owl')['NamedIndividual'])) + g.add((Q, TYPE, NS('query')['SampleManipulationQuery'])) + g.add((Q, NS('feature')['hasFeature'], NS('feature')[what])) + + g.add((Q, NS('query')['hasFormatter'], self.F)) + g.add((Q, NS('query')['hasProcessid'], Literal(pid))) + g.add((Q, NS('query')['hasResource'], R)) + + P = self.mns['par_last5'] + NS = self.ns + TYPE = self.ns_type + g.add((P, TYPE, NS('owl')['NamedIndividual'])) + g.add((P, TYPE, NS('query')['SOP_tail'])) + g.add((P, NS('param')['paramName'], Literal('tail'))) + g.add((P, NS('unit')['hasValue'], Literal('5'))) + g.add((P, NS('param')['hasType'], NS('param')['Integer'])) + g.add((P, TYPE, NS('unit')['Countable'])) + + L5 = self.mns['last5'] + g.add((L5, TYPE, NS('owl')['NamedIndividual'])) + g.add((L5, TYPE, NS('stat')['Tail'])) + g.add((L5, NS('stat')['hasSample'], self.S)) + g.add((L5, NS('param')['hasParameter'], P)) + ML5 = self.mns['maxlast5'] + g.add((ML5, TYPE, NS('owl')['NamedIndividual'])) + g.add((ML5, TYPE, NS('stat')['Maximum'])) + g.add((ML5, NS('stat')['hasSample'], L5)) + g.add((Q, NS('stat')['hasSample'], ML5)) + + return g.serialize() + + + def createconditionquery(self, pid, what = 'MemoryUtilization'): + g = self.new_g() + R = self.addnode(g) + NS = self.ns + TYPE = self.ns_type + + P = self.mns['par_last'] + g.add((P, TYPE, NS('owl')['NamedIndividual'])) + g.add((P, TYPE, NS('query')['SOP_tail'])) + g.add((P, NS('param')['paramName'], Literal('tail'))) + g.add((P, NS('unit')['hasValue'], Literal('1'))) + g.add((P, NS('param')['hasType'], NS('param')['Integer'])) + g.add((P, TYPE, NS('unit')['Countable'])) + + L = self.mns['last'] + g.add((L, TYPE, NS('owl')['NamedIndividual'])) + g.add((L, TYPE, NS('stat')['Tail'])) + g.add((L, NS('stat')['hasSample'], self.S)) + g.add((L, NS('param')['hasParameter'], P)) + + MIN = self.mns['minall'] + g.add((MIN, TYPE, NS('owl')['NamedIndividual'])) + g.add((MIN, TYPE, NS('stat')['Minimum'])) + g.add((MIN, NS('stat')['hasSample'], self.S)) + + ML = self.mns['maxlast1'] + g.add((ML, TYPE, NS('owl')['NamedIndividual'])) + g.add((ML, TYPE, NS('stat')['Maximum'])) + g.add((ML, NS('stat')['hasSample'], L)) + + T1 = self.mns['lastitem_%s' % what] + g.add((T1, TYPE, NS('owl')['NamedIndividual'])) + g.add((T1, TYPE, NS('stat')['SampleTerm'])) +# g.add((T, NS('stat')['hasScale'], Literal(1))) + g.add((T1, NS('stat')['hasSample'], ML)) + + T2 = self.mns['minitem_%s' % what] + g.add((T2, TYPE, NS('owl')['NamedIndividual'])) + g.add((T2, TYPE, NS('stat')['SampleTerm'])) + g.add((T2, NS('stat')['hasScale'], Literal('-1.5'))) + g.add((T2, NS('stat')['hasSample'], MIN)) + + LCS = self.mns['lcs_%s' % what] + g.add((LCS, TYPE, NS('owl')['NamedIndividual'])) + g.add((LCS, TYPE, NS('stat')['LinearCombinedSample'])) + g.add((LCS, NS('stat')['hasTerm'], T1)) + g.add((LCS, NS('stat')['hasTerm'], T2)) + + # condition: "last measurement" > "1.5 * min(all measurements)" + C = self.mns['cond_%s' % what] + g.add((C, TYPE, NS('owl')['NamedIndividual'])) + g.add((C, TYPE, NS('stat')['IsPositive'])) + g.add((C, NS('stat')['hasSample'], LCS)) + + Q = self.mns['condq_%s' % what] + g.add((Q, TYPE, NS('owl')['NamedIndividual'])) + g.add((Q, TYPE, NS('query')['ConditionQuery'])) + g.add((Q, NS('feature')['hasFeature'], NS('feature')[what])) + g.add((Q, NS('stat')['hasCondition'], C)) + g.add((Q, NS('query')['hasProcessid'], Literal(pid))) + g.add((Q, NS('query')['hasResource'], R)) + + return g.serialize() + + + def test_genq_mem(self): + g = self.new_g() + Q = self.newQ(g) + R = self.addnode(g) + self.bindNode(g, Q, R) + query = g.serialize() + self.save(fn = "/tmp/genq_mem.owl", q = query) + self.measure(query, 20) + + def test_genq_cpu(self): + g = self.new_g() + Q = self.newQ(g, what= 'CPUUtilization') + R = self.addnode(g) + self.bindNode(g, Q, R) + query = g.serialize() + self.save(fn = "/tmp/genq_cpu.owl", q = query) + self.measure(query, 16) + + def test_genq_err(self): + g = self.new_g() + Q = self.newQ(g, what = 'AlmaFa') + R = self.addnode(g) + self.bindNode(g, Q, R) + query = g.serialize() + self.save(fn = "/tmp/genq_mem_err.owl", q = query) + response = self.measure(query, 1) + self.assertTrue("error" in response, "no error message! got %s" % response) + + def test_genq_complex(self): + g = self.new_g() + R = self.addnode(g) + P = self.addPar(g) + for feature in ['FreeMemory', 'CPULoad', 'FreeDiskSpace', 'AlmaFa']: + Q = self.newQ(g, what = feature) + self.bindNode(g, Q, R) + if feature == 'FreeDiskSpace': #FIXME: ugly + self.bindPar(g, Q, P) + + query = g.serialize() + self.save(fn = "/tmp/genq_complex.owl", q = query) + response = self.measure(query, 26) + #print response + self.assertTrue("error" in response, "no error message! got %s" % response) + + def test_genq_memslice(self): + g = self.new_g() + Q = self.newQ(g) + R = self.addnode(g) + P = self.addPar(g, pname = 'SliceName', pval = 'novi_novi') + self.bindNode(g, Q, R) + self.bindPar(g, Q, P) + query = g.serialize() + self.save(fn = "/tmp/genq_memslice.owl", q = query) + pid = self.addtask(query) + query = self.createaggregatorquery(pid) + self.save(fn = "/tmp/genq_memaggr.owl", q = query) + aid = self.addaggregator(query) + print "COLLECTING DATA WAIT FOR 10 SECS" + sleep(10) + self.fetchaggregate(q = aid, expect = 21) + print "COLLECTING SOME MORE DATA WAIT FOR 10 SECS" + sleep(10) + self.fetchaggregate(q = aid, expect = 21) + + self.delaggregator(q = aid) + self.deltask(q = pid) + + + + + + def test_condition(self): + g = self.new_g() + Q = self.newQ(g) + R = self.addnode(g) + P = self.addPar(g, pname = 'SliceName', pval = 'novi_novi') + self.bindNode(g, Q, R) + self.bindPar(g, Q, P) + query = g.serialize() + self.save(fn = "/tmp/genq_memslice_c.owl", q = query) + pid = self.addtask(query) + query = self.createconditionquery(pid) + self.save(fn = "/tmp/genq_cond.owl", q = query) + cid = self.addcondition(query) + + sleep(3) + print self.checkcondition(q = cid) + + self.delcondition(q = cid) + self.deltask(q = pid) + + + + + + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_genq'] + try: + unittest.main() + finally: + stop_servers(t) -- cgit