diff options
Diffstat (limited to 'Monitoring/MonitoringService/Service/test.py')
-rw-r--r-- | Monitoring/MonitoringService/Service/test.py | 442 |
1 files changed, 442 insertions, 0 deletions
diff --git a/Monitoring/MonitoringService/Service/test.py b/Monitoring/MonitoringService/Service/test.py new file mode 100644 index 0000000..1b6b78b --- /dev/null +++ b/Monitoring/MonitoringService/Service/test.py @@ -0,0 +1,442 @@ +''' +Created on Aug 10, 2011 + +@author: steger +''' +import unittest +from rdflib import Graph, Namespace, Literal +from Service.mock_framework import start_servers, stop_servers +import httplib2 +from Example.Platforms import federation +from urllib import urlencode +from time import sleep + +fw, t = start_servers() +from MonitoringService import ontology + +class Test(unittest.TestCase): + headers = {'Content-type': 'application/x-www-form-urlencoded'} + proxy = httplib2.Http(cache = "/tmp/ms_client_cache", timeout = 10) + + def setUp(self): + self.MSI_planetlab = fw.getInterface('PlanetLab') + self.PL_O = self.MSI_planetlab.service.ontology + NS = self.ns + self.S = NS('stat')['UnmodifiedExtractOfFeatureSamples'] + self.F = NS('query')['Formatter_JSON'] + sleep (1) + + def tearDown(self): + pass + + def test_echo(self): + p1 = "PlanetLab" + p2 = "FEDERICA" + + h = httplib2.Http(cache = "/tmp/ms_client_cache", timeout = 10) + url = "http://localhost:%d/echo" % federation[p1][0] + data = urlencode( {'platform': p2} ) + resp, response = h.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service @%s responded with status %s" % (p1, resp.status)) + i, o = response.split("->") + got = (i.split("@")[-1].strip(), o.split("@")[-1].strip()) + expect = (p1, p2) + self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response)) + + data = urlencode( {'platform': p1} ) + resp, response = h.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service @%s responded with status %s" % (p1, resp.status)) + i, o = response.split("->") + got = (i.split("@")[-1].strip(), o.split("@")[-1].strip()) + expect = (p1, p1) + self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response)) + + @staticmethod + def q_enc(q): + return urlencode( {'query': q} ) + + @property + def mns(self): + return Namespace("http://foo.bar/req.owl#") + + @property + def ns(self): + return self.PL_O.ns + + @property + def ns_type(self): + return self.ns('rdf')['type'] + + def measure(self, q, expect = 26): + url = "http://localhost:%d/measure" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.tlen(response, expect) + return response + + def addtask(self, q): + url = "http://localhost:%d/addTask" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertTrue(response.startswith('PlanetLab:process:'), "wrong process id %s" % response) + return response + + def fetchtaskdata(self, q, expect = 26): + url = "http://localhost:%d/fetchTaskData" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.tlen(response, expect) + return response + + def deltask(self, q): + url = "http://localhost:%d/removeTask" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertEqual(response, "OK", "Got: %s" % response) + + def addaggregator(self, q): + url = "http://localhost:%d/addAggregator" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertTrue(response.startswith('PlanetLab:aggregate:'), "wrong process id %s" % response) + return response + + def fetchaggregate(self, q, expect = 26): + url = "http://localhost:%d/fetchAggregatorData" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.tlen(response, expect) + return response + + def delaggregator(self, q): + url = "http://localhost:%d/removeAggregator" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertEqual(response, "OK", "Got: %s" % response) + + def addcondition(self, q): + url = "http://localhost:%d/addCondition" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertTrue(response.startswith('PlanetLab:watchdog:'), "wrong process id %s" % response) + return response + + def delcondition(self, q): + url = "http://localhost:%d/removeCondition" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertEqual(response, "OK", "Got: %s" % response) + + def checkcondition(self, q): + url = "http://localhost:%d/checkCondition" % federation['PlanetLab'][0] + data = self.q_enc(q) + resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers) + self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status)) + self.assertGreater(len(response.splitlines()), 2, "Got: %s" % response) + return response + + + def tlen(self, response, expect = 26): +# print response + self.assertTrue(response, "Got nothing due to former errors") + if expect > 1: + self.assertGreater(len(response.splitlines()), expect, "got empty measurement response") + else: + self.assertGreater(len(response), 2, "got empty measurement response") + return response + + + + def new_g(self): + g = Graph() + for k, (_, ns) in ontology.iteritems(): + g.bind(k, Namespace(ns)) + mns = self.mns + g.bind('q', mns) + + return g + + def save(self, fn, q): + try: + with open(fn, 'w') as f: + f.write(q) + with open("%s.ue" % fn, 'w') as f: + f.write( self.q_enc(q) ) + except: + pass + + def test_measure(self): + doc = "../../information-model/monitoring-model//monitoringQuery_example.owl" + with open(doc) as fp: + q = fp.read() + self.measure(q) + + def addnode(self, g, resname = 'smilax1', address = '150.254.160.19'): + mns = self.mns + NS = self.ns + TYPE = self.ns_type + R = mns[resname] + I1 = mns['ifin'] + I2 = mns['ifout'] + IPADDR = Literal(address) + ADDR = mns['%s_address' % resname] + g.add((R, TYPE, NS('core')['Node'])) + g.add((R, TYPE, NS('core')['Resource'])) + g.add((R, TYPE, NS('owl')['NamedIndividual'])) + + g.add((R, NS('core')['hasInboundInterface'], I1)) + g.add((R, NS('core')['hasOutboundInterface'], I1)) + g.add((I1, TYPE, NS('core')['Interface'])) + g.add((I2, TYPE, NS('core')['Interface'])) + g.add((I1, NS('core')['hasIPv4Address'], ADDR)) + g.add((I2, NS('core')['hasIPv4Address'], ADDR)) + g.add((ADDR, TYPE, NS('owl')['NamedIndividual'])) + g.add((ADDR, TYPE, NS('unit')['IPAddress'])) + g.add((ADDR, NS('unit')['hasValue'], IPADDR)) + return R + + def addPar(self, g, pname = 'partition', pval = '/', ptype = 'String', pdim = 'NameOfSomething'): + P = self.mns['par_%s' % pname] + NS = self.ns + TYPE = self.ns_type + g.add((P, TYPE, NS('owl')['NamedIndividual'])) + g.add((P, TYPE, NS('query')['QueryParameter'])) + g.add((P, NS('param')['paramName'], Literal(pname))) + g.add((P, NS('unit')['hasValue'], Literal(pval))) + g.add((P, NS('param')['hasType'], NS('param')[ptype])) + g.add((P, TYPE, NS('unit')[pdim])) + return P + + def bindNode(self, g, Q, R): + g.add((Q, self.ns('query')['hasResource'], R)) + + def bindPar(self, g, Q, P): + g.add((Q, self.ns('param')['hasParameter'], P)) + + def newQ(self, g, what = 'MemoryUtilization'): + Q = self.mns['measure_%s' % what] + NS = self.ns + TYPE = NS('rdf')['type'] + g.add((Q, TYPE, NS('owl')['NamedIndividual'])) + g.add((Q, TYPE, NS('query')['BundleQuery'])) + g.add((Q, NS('feature')['hasFeature'], NS('feature')[what])) + + g.add((Q, NS('stat')['hasSample'], self.S)) + g.add((Q, NS('query')['hasFormatter'], self.F)) + return Q + + def createaggregatorquery(self, pid, what = 'MemoryUtilization'): + g = self.new_g() + R = self.addnode(g) + Q = self.mns['aggr_%s' % what] + NS = self.ns + TYPE = NS('rdf')['type'] + g.add((Q, TYPE, NS('owl')['NamedIndividual'])) + g.add((Q, TYPE, NS('query')['SampleManipulationQuery'])) + g.add((Q, NS('feature')['hasFeature'], NS('feature')[what])) + + g.add((Q, NS('query')['hasFormatter'], self.F)) + g.add((Q, NS('query')['hasProcessid'], Literal(pid))) + g.add((Q, NS('query')['hasResource'], R)) + + P = self.mns['par_last5'] + NS = self.ns + TYPE = self.ns_type + g.add((P, TYPE, NS('owl')['NamedIndividual'])) + g.add((P, TYPE, NS('query')['SOP_tail'])) + g.add((P, NS('param')['paramName'], Literal('tail'))) + g.add((P, NS('unit')['hasValue'], Literal('5'))) + g.add((P, NS('param')['hasType'], NS('param')['Integer'])) + g.add((P, TYPE, NS('unit')['Countable'])) + + L5 = self.mns['last5'] + g.add((L5, TYPE, NS('owl')['NamedIndividual'])) + g.add((L5, TYPE, NS('stat')['Tail'])) + g.add((L5, NS('stat')['hasSample'], self.S)) + g.add((L5, NS('param')['hasParameter'], P)) + ML5 = self.mns['maxlast5'] + g.add((ML5, TYPE, NS('owl')['NamedIndividual'])) + g.add((ML5, TYPE, NS('stat')['Maximum'])) + g.add((ML5, NS('stat')['hasSample'], L5)) + g.add((Q, NS('stat')['hasSample'], ML5)) + + return g.serialize() + + + def createconditionquery(self, pid, what = 'MemoryUtilization'): + g = self.new_g() + R = self.addnode(g) + NS = self.ns + TYPE = self.ns_type + + P = self.mns['par_last'] + g.add((P, TYPE, NS('owl')['NamedIndividual'])) + g.add((P, TYPE, NS('query')['SOP_tail'])) + g.add((P, NS('param')['paramName'], Literal('tail'))) + g.add((P, NS('unit')['hasValue'], Literal('1'))) + g.add((P, NS('param')['hasType'], NS('param')['Integer'])) + g.add((P, TYPE, NS('unit')['Countable'])) + + L = self.mns['last'] + g.add((L, TYPE, NS('owl')['NamedIndividual'])) + g.add((L, TYPE, NS('stat')['Tail'])) + g.add((L, NS('stat')['hasSample'], self.S)) + g.add((L, NS('param')['hasParameter'], P)) + + MIN = self.mns['minall'] + g.add((MIN, TYPE, NS('owl')['NamedIndividual'])) + g.add((MIN, TYPE, NS('stat')['Minimum'])) + g.add((MIN, NS('stat')['hasSample'], self.S)) + + ML = self.mns['maxlast1'] + g.add((ML, TYPE, NS('owl')['NamedIndividual'])) + g.add((ML, TYPE, NS('stat')['Maximum'])) + g.add((ML, NS('stat')['hasSample'], L)) + + T1 = self.mns['lastitem_%s' % what] + g.add((T1, TYPE, NS('owl')['NamedIndividual'])) + g.add((T1, TYPE, NS('stat')['SampleTerm'])) +# g.add((T, NS('stat')['hasScale'], Literal(1))) + g.add((T1, NS('stat')['hasSample'], ML)) + + T2 = self.mns['minitem_%s' % what] + g.add((T2, TYPE, NS('owl')['NamedIndividual'])) + g.add((T2, TYPE, NS('stat')['SampleTerm'])) + g.add((T2, NS('stat')['hasScale'], Literal('-1.5'))) + g.add((T2, NS('stat')['hasSample'], MIN)) + + LCS = self.mns['lcs_%s' % what] + g.add((LCS, TYPE, NS('owl')['NamedIndividual'])) + g.add((LCS, TYPE, NS('stat')['LinearCombinedSample'])) + g.add((LCS, NS('stat')['hasTerm'], T1)) + g.add((LCS, NS('stat')['hasTerm'], T2)) + + # condition: "last measurement" > "1.5 * min(all measurements)" + C = self.mns['cond_%s' % what] + g.add((C, TYPE, NS('owl')['NamedIndividual'])) + g.add((C, TYPE, NS('stat')['IsPositive'])) + g.add((C, NS('stat')['hasSample'], LCS)) + + Q = self.mns['condq_%s' % what] + g.add((Q, TYPE, NS('owl')['NamedIndividual'])) + g.add((Q, TYPE, NS('query')['ConditionQuery'])) + g.add((Q, NS('feature')['hasFeature'], NS('feature')[what])) + g.add((Q, NS('stat')['hasCondition'], C)) + g.add((Q, NS('query')['hasProcessid'], Literal(pid))) + g.add((Q, NS('query')['hasResource'], R)) + + return g.serialize() + + + def test_genq_mem(self): + g = self.new_g() + Q = self.newQ(g) + R = self.addnode(g) + self.bindNode(g, Q, R) + query = g.serialize() + self.save(fn = "/tmp/genq_mem.owl", q = query) + self.measure(query, 20) + + def test_genq_cpu(self): + g = self.new_g() + Q = self.newQ(g, what= 'CPUUtilization') + R = self.addnode(g) + self.bindNode(g, Q, R) + query = g.serialize() + self.save(fn = "/tmp/genq_cpu.owl", q = query) + self.measure(query, 16) + + def test_genq_err(self): + g = self.new_g() + Q = self.newQ(g, what = 'AlmaFa') + R = self.addnode(g) + self.bindNode(g, Q, R) + query = g.serialize() + self.save(fn = "/tmp/genq_mem_err.owl", q = query) + response = self.measure(query, 1) + self.assertTrue("error" in response, "no error message! got %s" % response) + + def test_genq_complex(self): + g = self.new_g() + R = self.addnode(g) + P = self.addPar(g) + for feature in ['FreeMemory', 'CPULoad', 'FreeDiskSpace', 'AlmaFa']: + Q = self.newQ(g, what = feature) + self.bindNode(g, Q, R) + if feature == 'FreeDiskSpace': #FIXME: ugly + self.bindPar(g, Q, P) + + query = g.serialize() + self.save(fn = "/tmp/genq_complex.owl", q = query) + response = self.measure(query, 26) + #print response + self.assertTrue("error" in response, "no error message! got %s" % response) + + def test_genq_memslice(self): + g = self.new_g() + Q = self.newQ(g) + R = self.addnode(g) + P = self.addPar(g, pname = 'SliceName', pval = 'novi_novi') + self.bindNode(g, Q, R) + self.bindPar(g, Q, P) + query = g.serialize() + self.save(fn = "/tmp/genq_memslice.owl", q = query) + pid = self.addtask(query) + query = self.createaggregatorquery(pid) + self.save(fn = "/tmp/genq_memaggr.owl", q = query) + aid = self.addaggregator(query) + print "COLLECTING DATA WAIT FOR 10 SECS" + sleep(10) + self.fetchaggregate(q = aid, expect = 21) + print "COLLECTING SOME MORE DATA WAIT FOR 10 SECS" + sleep(10) + self.fetchaggregate(q = aid, expect = 21) + + self.delaggregator(q = aid) + self.deltask(q = pid) + + + + + + def test_condition(self): + g = self.new_g() + Q = self.newQ(g) + R = self.addnode(g) + P = self.addPar(g, pname = 'SliceName', pval = 'novi_novi') + self.bindNode(g, Q, R) + self.bindPar(g, Q, P) + query = g.serialize() + self.save(fn = "/tmp/genq_memslice_c.owl", q = query) + pid = self.addtask(query) + query = self.createconditionquery(pid) + self.save(fn = "/tmp/genq_cond.owl", q = query) + cid = self.addcondition(query) + + sleep(3) + print self.checkcondition(q = cid) + + self.delcondition(q = cid) + self.deltask(q = pid) + + + + + + + +if __name__ == "__main__": + #import sys;sys.argv = ['', 'Test.test_genq'] + try: + unittest.main() + finally: + stop_servers(t) |