1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 import os
18
19 from twisted.internet import defer, utils, reactor, threads
20 from twisted.python import log, failure
21 from buildbot.buildslave import AbstractBuildSlave, AbstractLatentBuildSlave
22 from buildbot import config
23
24 try:
25 import libvirt
26 libvirt = libvirt
27 except ImportError:
28 libvirt = None
32 """
33 I am a class that turns parallel access into serial access.
34
35 I exist because we want to run libvirt access in threads as we don't
36 trust calls not to block, but under load libvirt doesnt seem to like
37 this kind of threaded use.
38 """
39
42
44 log.msg("Looking to start a piece of work now...")
45
46
47 if not self.queue:
48 log.msg("_process called when there is no work")
49 return
50
51
52
53 d, next_operation, args, kwargs = self.queue[0]
54
55
56 try:
57 d2 = next_operation(*args, **kwargs)
58 except:
59 d2 = defer.fail()
60
61
62
63 def _work_done(res):
64 log.msg("Completed a piece of work")
65 self.queue.pop(0)
66 if self.queue:
67 log.msg("Preparing next piece of work")
68 reactor.callLater(0, self._process)
69 return res
70 d2.addBoth(_work_done)
71
72
73 d2.chainDeferred(d)
74
75 - def execute(self, cb, *args, **kwargs):
76 kickstart_processing = not self.queue
77 d = defer.Deferred()
78 self.queue.append((d, cb, args, kwargs))
79 if kickstart_processing:
80 self._process()
81 return d
82
84 return self.execute(threads.deferToThread, cb, *args, **kwargs)
85
86
87
88 queue = WorkQueue()
89
90
91 -class Domain(object):
92
93 """
94 I am a wrapper around a libvirt Domain object
95 """
96
97 - def __init__(self, connection, domain):
98 self.connection = connection
99 self.domain = domain
100
102 return queue.executeInThread(self.domain.name)
103
105 return queue.executeInThread(self.domain.create)
106
107 - def shutdown(self):
109
112
115
116 """
117 I am a wrapper around a libvirt Connection object.
118 """
119
120 DomainClass = Domain
121
123 self.uri = uri
124 self.connection = libvirt.open(uri)
125
126 @defer.inlineCallbacks
131
132 @defer.inlineCallbacks
134 """ I take libvirt XML and start a new VM """
135 res = yield queue.executeInThread(self.connection.createXML, xml, 0)
136 defer.returnValue(self.DomainClass(self, res))
137
138 @defer.inlineCallbacks
140 domains = []
141 domain_ids = yield queue.executeInThread(self.connection.listDomainsID)
142
143 for did in domain_ids:
144 domain = yield queue.executeInThread(self.connection.lookupByID, did)
145 domains.append(self.DomainClass(self, domain))
146
147 defer.returnValue(domains)
148
151
152 - def __init__(self, name, password, connection, hd_image, base_image = None, xml=None, max_builds=None, notify_on_missing=[],
153 missing_timeout=60*20, build_wait_timeout=60*10, properties={}, locks=None):
154 AbstractLatentBuildSlave.__init__(self, name, password, max_builds, notify_on_missing,
155 missing_timeout, build_wait_timeout, properties, locks)
156
157 if not libvirt:
158 config.error("The python module 'libvirt' is needed to use a LibVirtSlave")
159
160 self.name = name
161 self.connection = connection
162 self.image = hd_image
163 self.base_image = base_image
164 self.xml = xml
165
166 self.cheap_copy = True
167 self.graceful_shutdown = False
168
169 self.domain = None
170
171 self.ready = False
172 self._find_existing_deferred = self._find_existing_instance()
173
174 @defer.inlineCallbacks
176 """
177 I find existing VMs that are already running that might be orphaned instances of this slave.
178 """
179 if not self.connection:
180 defer.returnValue(None)
181
182 domains = yield self.connection.all()
183 for d in domains:
184 name = yield d.name()
185 if name.startswith(self.name):
186 self.domain = d
187 self.substantiated = True
188 break
189
190 self.ready = True
191
193 if not self.ready:
194 log.msg("Not accepting builds as existing domains not iterated")
195 return False
196
197 if self.domain and not self.isConnected():
198 log.msg("Not accepting builds as existing domain but slave not connected")
199 return False
200
201 return AbstractLatentBuildSlave.canStartBuild(self)
202
204 """
205 I am a private method for creating (possibly cheap) copies of a
206 base_image for start_instance to boot.
207 """
208 if not self.base_image:
209 return defer.succeed(True)
210
211 if self.cheap_copy:
212 clone_cmd = "qemu-img"
213 clone_args = "create -b %(base)s -f qcow2 %(image)s"
214 else:
215 clone_cmd = "cp"
216 clone_args = "%(base)s %(image)s"
217
218 clone_args = clone_args % {
219 "base": self.base_image,
220 "image": self.image,
221 }
222
223 log.msg("Cloning base image: %s %s'" % (clone_cmd, clone_args))
224
225 def _log_result(res):
226 log.msg("Cloning exit code was: %d" % res)
227 return res
228
229 d = utils.getProcessValue(clone_cmd, clone_args.split())
230 d.addBoth(_log_result)
231 return d
232
233 @defer.inlineCallbacks
235 """
236 I start a new instance of a VM.
237
238 If a base_image is specified, I will make a clone of that otherwise i will
239 use image directly.
240
241 If i'm not given libvirt domain definition XML, I will look for my name
242 in the list of defined virtual machines and start that.
243 """
244 if self.domain is not None:
245 log.msg("Cannot start_instance '%s' as already active" % self.name)
246 defer.returnValue(False)
247
248 yield self._prepare_base_image()
249
250 try:
251 if self.xml:
252 self.domain = yield self.connection.create(self.xml)
253 else:
254 self.domain = yield self.connection.lookupByName(self.name)
255 yield self.domain.create()
256 except:
257 log.err(failure.Failure(),
258 "Cannot start a VM (%s), failing gracefully and triggering"
259 "a new build check" % self.name)
260 self.domain = None
261 defer.returnValue(False)
262
263 defer.returnValue(True)
264
266 """
267 I attempt to stop a running VM.
268 I make sure any connection to the slave is removed.
269 If the VM was using a cloned image, I remove the clone
270 When everything is tidied up, I ask that bbot looks for work to do
271 """
272 log.msg("Attempting to stop '%s'" % self.name)
273 if self.domain is None:
274 log.msg("I don't think that domain is even running, aborting")
275 return defer.succeed(None)
276
277 domain = self.domain
278 self.domain = None
279
280 if self.graceful_shutdown and not fast:
281 log.msg("Graceful shutdown chosen for %s" % self.name)
282 d = domain.shutdown()
283 else:
284 d = domain.destroy()
285
286 def _disconnect(res):
287 log.msg("VM destroyed (%s): Forcing its connection closed." % self.name)
288 return AbstractBuildSlave.disconnect(self)
289 d.addCallback(_disconnect)
290
291 def _disconnected(res):
292 log.msg("We forced disconnection (%s), cleaning up and triggering new build" % self.name)
293 if self.base_image:
294 os.remove(self.image)
295 self.botmaster.maybeStartBuildsForSlave(self.name)
296 return res
297 d.addBoth(_disconnected)
298
299 return d
300