summaryrefslogtreecommitdiffstats
path: root/Monitoring/MonitoringService/Service/test.py
diff options
context:
space:
mode:
authorpikusa <pikusa@man.poznan.pl>2013-04-03 13:18:17 (GMT)
committer pikusa <pikusa@man.poznan.pl>2013-04-03 13:18:17 (GMT)
commit2f2a3a129c91de540e66c3bfbe30b0df1942cd4b (patch)
tree2d313cdf0068af368d4de6067d676be16f6a6464 /Monitoring/MonitoringService/Service/test.py
parentff8aa232b071a9b54dff833714a870fd0aec0b30 (diff)
downloadnovi-public-2f2a3a129c91de540e66c3bfbe30b0df1942cd4b.zip
novi-public-2f2a3a129c91de540e66c3bfbe30b0df1942cd4b.tar.gz
novi-public-2f2a3a129c91de540e66c3bfbe30b0df1942cd4b.tar.bz2
project commit and dir tree change
Diffstat (limited to 'Monitoring/MonitoringService/Service/test.py')
-rw-r--r--Monitoring/MonitoringService/Service/test.py442
1 files changed, 442 insertions, 0 deletions
diff --git a/Monitoring/MonitoringService/Service/test.py b/Monitoring/MonitoringService/Service/test.py
new file mode 100644
index 0000000..1b6b78b
--- /dev/null
+++ b/Monitoring/MonitoringService/Service/test.py
@@ -0,0 +1,442 @@
+'''
+Created on Aug 10, 2011
+
+@author: steger
+'''
+import unittest
+from rdflib import Graph, Namespace, Literal
+from Service.mock_framework import start_servers, stop_servers
+import httplib2
+from Example.Platforms import federation
+from urllib import urlencode
+from time import sleep
+
+fw, t = start_servers()
+from MonitoringService import ontology
+
+class Test(unittest.TestCase):
+ headers = {'Content-type': 'application/x-www-form-urlencoded'}
+ proxy = httplib2.Http(cache = "/tmp/ms_client_cache", timeout = 10)
+
+ def setUp(self):
+ self.MSI_planetlab = fw.getInterface('PlanetLab')
+ self.PL_O = self.MSI_planetlab.service.ontology
+ NS = self.ns
+ self.S = NS('stat')['UnmodifiedExtractOfFeatureSamples']
+ self.F = NS('query')['Formatter_JSON']
+ sleep (1)
+
+ def tearDown(self):
+ pass
+
+ def test_echo(self):
+ p1 = "PlanetLab"
+ p2 = "FEDERICA"
+
+ h = httplib2.Http(cache = "/tmp/ms_client_cache", timeout = 10)
+ url = "http://localhost:%d/echo" % federation[p1][0]
+ data = urlencode( {'platform': p2} )
+ resp, response = h.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service @%s responded with status %s" % (p1, resp.status))
+ i, o = response.split("->")
+ got = (i.split("@")[-1].strip(), o.split("@")[-1].strip())
+ expect = (p1, p2)
+ self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response))
+
+ data = urlencode( {'platform': p1} )
+ resp, response = h.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service @%s responded with status %s" % (p1, resp.status))
+ i, o = response.split("->")
+ got = (i.split("@")[-1].strip(), o.split("@")[-1].strip())
+ expect = (p1, p1)
+ self.assertEquals(expect, got, "Echo reply differs from expected (%s): %s" % (expect, response))
+
+ @staticmethod
+ def q_enc(q):
+ return urlencode( {'query': q} )
+
+ @property
+ def mns(self):
+ return Namespace("http://foo.bar/req.owl#")
+
+ @property
+ def ns(self):
+ return self.PL_O.ns
+
+ @property
+ def ns_type(self):
+ return self.ns('rdf')['type']
+
+ def measure(self, q, expect = 26):
+ url = "http://localhost:%d/measure" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.tlen(response, expect)
+ return response
+
+ def addtask(self, q):
+ url = "http://localhost:%d/addTask" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertTrue(response.startswith('PlanetLab:process:'), "wrong process id %s" % response)
+ return response
+
+ def fetchtaskdata(self, q, expect = 26):
+ url = "http://localhost:%d/fetchTaskData" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.tlen(response, expect)
+ return response
+
+ def deltask(self, q):
+ url = "http://localhost:%d/removeTask" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertEqual(response, "OK", "Got: %s" % response)
+
+ def addaggregator(self, q):
+ url = "http://localhost:%d/addAggregator" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertTrue(response.startswith('PlanetLab:aggregate:'), "wrong process id %s" % response)
+ return response
+
+ def fetchaggregate(self, q, expect = 26):
+ url = "http://localhost:%d/fetchAggregatorData" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.tlen(response, expect)
+ return response
+
+ def delaggregator(self, q):
+ url = "http://localhost:%d/removeAggregator" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertEqual(response, "OK", "Got: %s" % response)
+
+ def addcondition(self, q):
+ url = "http://localhost:%d/addCondition" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertTrue(response.startswith('PlanetLab:watchdog:'), "wrong process id %s" % response)
+ return response
+
+ def delcondition(self, q):
+ url = "http://localhost:%d/removeCondition" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertEqual(response, "OK", "Got: %s" % response)
+
+ def checkcondition(self, q):
+ url = "http://localhost:%d/checkCondition" % federation['PlanetLab'][0]
+ data = self.q_enc(q)
+ resp, response = self.proxy.request(uri = url, method = "POST", body = data, headers = self.headers)
+ self.assertTrue(resp.status == 200, "Service responded with status %s" % (resp.status))
+ self.assertGreater(len(response.splitlines()), 2, "Got: %s" % response)
+ return response
+
+
+ def tlen(self, response, expect = 26):
+# print response
+ self.assertTrue(response, "Got nothing due to former errors")
+ if expect > 1:
+ self.assertGreater(len(response.splitlines()), expect, "got empty measurement response")
+ else:
+ self.assertGreater(len(response), 2, "got empty measurement response")
+ return response
+
+
+
+ def new_g(self):
+ g = Graph()
+ for k, (_, ns) in ontology.iteritems():
+ g.bind(k, Namespace(ns))
+ mns = self.mns
+ g.bind('q', mns)
+
+ return g
+
+ def save(self, fn, q):
+ try:
+ with open(fn, 'w') as f:
+ f.write(q)
+ with open("%s.ue" % fn, 'w') as f:
+ f.write( self.q_enc(q) )
+ except:
+ pass
+
+ def test_measure(self):
+ doc = "../../information-model/monitoring-model//monitoringQuery_example.owl"
+ with open(doc) as fp:
+ q = fp.read()
+ self.measure(q)
+
+ def addnode(self, g, resname = 'smilax1', address = '150.254.160.19'):
+ mns = self.mns
+ NS = self.ns
+ TYPE = self.ns_type
+ R = mns[resname]
+ I1 = mns['ifin']
+ I2 = mns['ifout']
+ IPADDR = Literal(address)
+ ADDR = mns['%s_address' % resname]
+ g.add((R, TYPE, NS('core')['Node']))
+ g.add((R, TYPE, NS('core')['Resource']))
+ g.add((R, TYPE, NS('owl')['NamedIndividual']))
+
+ g.add((R, NS('core')['hasInboundInterface'], I1))
+ g.add((R, NS('core')['hasOutboundInterface'], I1))
+ g.add((I1, TYPE, NS('core')['Interface']))
+ g.add((I2, TYPE, NS('core')['Interface']))
+ g.add((I1, NS('core')['hasIPv4Address'], ADDR))
+ g.add((I2, NS('core')['hasIPv4Address'], ADDR))
+ g.add((ADDR, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ADDR, TYPE, NS('unit')['IPAddress']))
+ g.add((ADDR, NS('unit')['hasValue'], IPADDR))
+ return R
+
+ def addPar(self, g, pname = 'partition', pval = '/', ptype = 'String', pdim = 'NameOfSomething'):
+ P = self.mns['par_%s' % pname]
+ NS = self.ns
+ TYPE = self.ns_type
+ g.add((P, TYPE, NS('owl')['NamedIndividual']))
+ g.add((P, TYPE, NS('query')['QueryParameter']))
+ g.add((P, NS('param')['paramName'], Literal(pname)))
+ g.add((P, NS('unit')['hasValue'], Literal(pval)))
+ g.add((P, NS('param')['hasType'], NS('param')[ptype]))
+ g.add((P, TYPE, NS('unit')[pdim]))
+ return P
+
+ def bindNode(self, g, Q, R):
+ g.add((Q, self.ns('query')['hasResource'], R))
+
+ def bindPar(self, g, Q, P):
+ g.add((Q, self.ns('param')['hasParameter'], P))
+
+ def newQ(self, g, what = 'MemoryUtilization'):
+ Q = self.mns['measure_%s' % what]
+ NS = self.ns
+ TYPE = NS('rdf')['type']
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['BundleQuery']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')[what]))
+
+ g.add((Q, NS('stat')['hasSample'], self.S))
+ g.add((Q, NS('query')['hasFormatter'], self.F))
+ return Q
+
+ def createaggregatorquery(self, pid, what = 'MemoryUtilization'):
+ g = self.new_g()
+ R = self.addnode(g)
+ Q = self.mns['aggr_%s' % what]
+ NS = self.ns
+ TYPE = NS('rdf')['type']
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['SampleManipulationQuery']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')[what]))
+
+ g.add((Q, NS('query')['hasFormatter'], self.F))
+ g.add((Q, NS('query')['hasProcessid'], Literal(pid)))
+ g.add((Q, NS('query')['hasResource'], R))
+
+ P = self.mns['par_last5']
+ NS = self.ns
+ TYPE = self.ns_type
+ g.add((P, TYPE, NS('owl')['NamedIndividual']))
+ g.add((P, TYPE, NS('query')['SOP_tail']))
+ g.add((P, NS('param')['paramName'], Literal('tail')))
+ g.add((P, NS('unit')['hasValue'], Literal('5')))
+ g.add((P, NS('param')['hasType'], NS('param')['Integer']))
+ g.add((P, TYPE, NS('unit')['Countable']))
+
+ L5 = self.mns['last5']
+ g.add((L5, TYPE, NS('owl')['NamedIndividual']))
+ g.add((L5, TYPE, NS('stat')['Tail']))
+ g.add((L5, NS('stat')['hasSample'], self.S))
+ g.add((L5, NS('param')['hasParameter'], P))
+ ML5 = self.mns['maxlast5']
+ g.add((ML5, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ML5, TYPE, NS('stat')['Maximum']))
+ g.add((ML5, NS('stat')['hasSample'], L5))
+ g.add((Q, NS('stat')['hasSample'], ML5))
+
+ return g.serialize()
+
+
+ def createconditionquery(self, pid, what = 'MemoryUtilization'):
+ g = self.new_g()
+ R = self.addnode(g)
+ NS = self.ns
+ TYPE = self.ns_type
+
+ P = self.mns['par_last']
+ g.add((P, TYPE, NS('owl')['NamedIndividual']))
+ g.add((P, TYPE, NS('query')['SOP_tail']))
+ g.add((P, NS('param')['paramName'], Literal('tail')))
+ g.add((P, NS('unit')['hasValue'], Literal('1')))
+ g.add((P, NS('param')['hasType'], NS('param')['Integer']))
+ g.add((P, TYPE, NS('unit')['Countable']))
+
+ L = self.mns['last']
+ g.add((L, TYPE, NS('owl')['NamedIndividual']))
+ g.add((L, TYPE, NS('stat')['Tail']))
+ g.add((L, NS('stat')['hasSample'], self.S))
+ g.add((L, NS('param')['hasParameter'], P))
+
+ MIN = self.mns['minall']
+ g.add((MIN, TYPE, NS('owl')['NamedIndividual']))
+ g.add((MIN, TYPE, NS('stat')['Minimum']))
+ g.add((MIN, NS('stat')['hasSample'], self.S))
+
+ ML = self.mns['maxlast1']
+ g.add((ML, TYPE, NS('owl')['NamedIndividual']))
+ g.add((ML, TYPE, NS('stat')['Maximum']))
+ g.add((ML, NS('stat')['hasSample'], L))
+
+ T1 = self.mns['lastitem_%s' % what]
+ g.add((T1, TYPE, NS('owl')['NamedIndividual']))
+ g.add((T1, TYPE, NS('stat')['SampleTerm']))
+# g.add((T, NS('stat')['hasScale'], Literal(1)))
+ g.add((T1, NS('stat')['hasSample'], ML))
+
+ T2 = self.mns['minitem_%s' % what]
+ g.add((T2, TYPE, NS('owl')['NamedIndividual']))
+ g.add((T2, TYPE, NS('stat')['SampleTerm']))
+ g.add((T2, NS('stat')['hasScale'], Literal('-1.5')))
+ g.add((T2, NS('stat')['hasSample'], MIN))
+
+ LCS = self.mns['lcs_%s' % what]
+ g.add((LCS, TYPE, NS('owl')['NamedIndividual']))
+ g.add((LCS, TYPE, NS('stat')['LinearCombinedSample']))
+ g.add((LCS, NS('stat')['hasTerm'], T1))
+ g.add((LCS, NS('stat')['hasTerm'], T2))
+
+ # condition: "last measurement" > "1.5 * min(all measurements)"
+ C = self.mns['cond_%s' % what]
+ g.add((C, TYPE, NS('owl')['NamedIndividual']))
+ g.add((C, TYPE, NS('stat')['IsPositive']))
+ g.add((C, NS('stat')['hasSample'], LCS))
+
+ Q = self.mns['condq_%s' % what]
+ g.add((Q, TYPE, NS('owl')['NamedIndividual']))
+ g.add((Q, TYPE, NS('query')['ConditionQuery']))
+ g.add((Q, NS('feature')['hasFeature'], NS('feature')[what]))
+ g.add((Q, NS('stat')['hasCondition'], C))
+ g.add((Q, NS('query')['hasProcessid'], Literal(pid)))
+ g.add((Q, NS('query')['hasResource'], R))
+
+ return g.serialize()
+
+
+ def test_genq_mem(self):
+ g = self.new_g()
+ Q = self.newQ(g)
+ R = self.addnode(g)
+ self.bindNode(g, Q, R)
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_mem.owl", q = query)
+ self.measure(query, 20)
+
+ def test_genq_cpu(self):
+ g = self.new_g()
+ Q = self.newQ(g, what= 'CPUUtilization')
+ R = self.addnode(g)
+ self.bindNode(g, Q, R)
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_cpu.owl", q = query)
+ self.measure(query, 16)
+
+ def test_genq_err(self):
+ g = self.new_g()
+ Q = self.newQ(g, what = 'AlmaFa')
+ R = self.addnode(g)
+ self.bindNode(g, Q, R)
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_mem_err.owl", q = query)
+ response = self.measure(query, 1)
+ self.assertTrue("error" in response, "no error message! got %s" % response)
+
+ def test_genq_complex(self):
+ g = self.new_g()
+ R = self.addnode(g)
+ P = self.addPar(g)
+ for feature in ['FreeMemory', 'CPULoad', 'FreeDiskSpace', 'AlmaFa']:
+ Q = self.newQ(g, what = feature)
+ self.bindNode(g, Q, R)
+ if feature == 'FreeDiskSpace': #FIXME: ugly
+ self.bindPar(g, Q, P)
+
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_complex.owl", q = query)
+ response = self.measure(query, 26)
+ #print response
+ self.assertTrue("error" in response, "no error message! got %s" % response)
+
+ def test_genq_memslice(self):
+ g = self.new_g()
+ Q = self.newQ(g)
+ R = self.addnode(g)
+ P = self.addPar(g, pname = 'SliceName', pval = 'novi_novi')
+ self.bindNode(g, Q, R)
+ self.bindPar(g, Q, P)
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_memslice.owl", q = query)
+ pid = self.addtask(query)
+ query = self.createaggregatorquery(pid)
+ self.save(fn = "/tmp/genq_memaggr.owl", q = query)
+ aid = self.addaggregator(query)
+ print "COLLECTING DATA WAIT FOR 10 SECS"
+ sleep(10)
+ self.fetchaggregate(q = aid, expect = 21)
+ print "COLLECTING SOME MORE DATA WAIT FOR 10 SECS"
+ sleep(10)
+ self.fetchaggregate(q = aid, expect = 21)
+
+ self.delaggregator(q = aid)
+ self.deltask(q = pid)
+
+
+
+
+
+ def test_condition(self):
+ g = self.new_g()
+ Q = self.newQ(g)
+ R = self.addnode(g)
+ P = self.addPar(g, pname = 'SliceName', pval = 'novi_novi')
+ self.bindNode(g, Q, R)
+ self.bindPar(g, Q, P)
+ query = g.serialize()
+ self.save(fn = "/tmp/genq_memslice_c.owl", q = query)
+ pid = self.addtask(query)
+ query = self.createconditionquery(pid)
+ self.save(fn = "/tmp/genq_cond.owl", q = query)
+ cid = self.addcondition(query)
+
+ sleep(3)
+ print self.checkcondition(q = cid)
+
+ self.delcondition(q = cid)
+ self.deltask(q = pid)
+
+
+
+
+
+
+
+if __name__ == "__main__":
+ #import sys;sys.argv = ['', 'Test.test_genq']
+ try:
+ unittest.main()
+ finally:
+ stop_servers(t)