Skip to content

Commit e29bf8c

Browse files
authored
Merge pull request #189 from fabric-testbed/slice_defer
changes for improving create response time
2 parents ce74db8 + cb97fd6 commit e29bf8c

File tree

6 files changed

+216
-51
lines changed

6 files changed

+216
-51
lines changed

fabric_cf/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__VERSION__ = "1.2.3"
1+
__VERSION__ = "1.2.4"

fabric_cf/actor/core/common/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ class Constants:
249249
CLAIMS_SUB = "sub"
250250
CLAIMS_EMAIL = "email"
251251
CLAIMS_PROJECTS = "projects"
252+
PROJECT_ID = "project_id"
252253

253254
PROPERTY_EXCEPTION_MESSAGE = "exception.message"
254255
PROPERTY_TARGET_NAME = "target.name"

fabric_cf/actor/core/kernel/reservation_client.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -316,8 +316,10 @@ def accept_lease_update(self, *, incoming: ABCReservationMixin, update_data: Upd
316316
# Alternative: could transition to (state, None) to allow retry of the
317317
# redeem/extend by a higher level.
318318
if update_data.failed:
319-
self.transition(prefix="failed lease update", state=ReservationStates.Failed,
320-
pending=ReservationPendingStates.None_)
319+
self.fail(message=f"failed lease update- {update_data.get_message()}",
320+
sliver=incoming.get_resources().get_sliver())
321+
#self.transition(prefix="failed lease update", state=ReservationStates.Failed,
322+
# pending=ReservationPendingStates.None_)
321323
else:
322324
try:
323325
self.lease_update_satisfies(incoming=incoming, update_data=update_data)
@@ -1648,9 +1650,12 @@ def recover(self):
16481650
elif self.state == ReservationStates.Failed:
16491651
self.logger.warning("Reservation #{} has failed".format(self.get_reservation_id()))
16501652

1651-
def fail(self, *, message: str, exception: Exception = None):
1653+
def fail(self, *, message: str, exception: Exception = None, sliver: BaseSliver = None):
16521654
super().fail(message=message, exception=exception)
1653-
if self.requested_resources is not None and self.requested_resources.sliver is not None:
1655+
if sliver is None and self.requested_resources is not None and self.requested_resources.sliver is not None:
1656+
sliver = self.requested_resources.sliver
1657+
1658+
if sliver is not None:
16541659
self.update_slice_graph(sliver=self.requested_resources.sliver)
16551660

16561661
def update_slice_graph(self, *, sliver: BaseSliver):

fabric_cf/orchestrator/core/orchestrator_handler.py

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key
235235

236236
slice_id = None
237237
controller = None
238-
orchestrator_slice = None
238+
new_slice_object = None
239239
asm_graph = None
240240
try:
241241
end_time = self.__validate_lease_end_time(lease_end_time=lease_end_time)
@@ -274,7 +274,8 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key
274274
slice_obj.set_client_slice(True)
275275
slice_obj.set_description("Description")
276276
slice_obj.graph_id = asm_graph.get_graph_id()
277-
slice_obj.set_config_properties(value={Constants.USER_SSH_KEY: ssh_key})
277+
slice_obj.set_config_properties(value={Constants.USER_SSH_KEY: ssh_key,
278+
Constants.PROJECT_ID: project})
278279
slice_obj.set_lease_end(lease_end=end_time)
279280
auth = AuthAvro()
280281
auth.oidc_sub_claim = fabric_token.get_subject()
@@ -292,25 +293,19 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key
292293
self.logger.debug(f"Slice {slice_name}/{slice_id} added successfully")
293294

294295
slice_obj.set_slice_id(slice_id=str(slice_id))
295-
orchestrator_slice = OrchestratorSliceWrapper(controller=controller, broker=broker,
296-
slice_obj=slice_obj, logger=self.logger)
296+
new_slice_object = OrchestratorSliceWrapper(controller=controller, broker=broker,
297+
slice_obj=slice_obj, logger=self.logger)
297298

298-
orchestrator_slice.lock()
299+
new_slice_object.lock()
299300

300301
# Create Slivers from Slice Graph; Compute Reservations from Slivers;
301302
# Add Reservations to relational database;
302-
computed_reservations = orchestrator_slice.create(slice_graph=asm_graph)
303+
computed_reservations = new_slice_object.create(slice_graph=asm_graph)
303304

304-
# Process the Slice i.e. Demand the computed reservations i.e. Add them to the policy
305-
# Once added to the policy; Actor Tick Handler will do following asynchronously:
306-
# 1. Ticket message exchange with broker and
307-
# 2. Redeem message exchange with AM once ticket is granted by Broker
308-
self.controller_state.demand_slice(controller_slice=orchestrator_slice)
309-
310-
for r in orchestrator_slice.computed_l3_reservations:
311-
res_status_update = ReservationStatusUpdate(logger=self.logger)
312-
self.controller_state.get_sut().add_active_status_watch(watch=ID(uid=r.get_reservation_id()),
313-
callback=res_status_update)
305+
# Enqueue the slice on the demand thread
306+
# Demand thread is responsible for demanding the reservations
307+
# Helps improve the create response time
308+
self.controller_state.get_demand_thread().queue_slice(controller_slice=new_slice_object)
314309

315310
return ResponseBuilder.get_reservation_summary(res_list=computed_reservations)
316311
except Exception as e:
@@ -321,8 +316,8 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key
321316
self.logger.error(f"Exception occurred processing create_slice e: {e}")
322317
raise e
323318
finally:
324-
if orchestrator_slice is not None:
325-
orchestrator_slice.unlock()
319+
if new_slice_object is not None:
320+
new_slice_object.unlock()
326321

327322
def get_slivers(self, *, token: str, slice_id: str, sliver_id: str = None, include_notices: bool = True) -> dict:
328323
"""

fabric_cf/orchestrator/core/orchestrator_kernel.py

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,17 @@
2424
#
2525
# Author: Komal Thareja ([email protected])
2626
import threading
27-
import traceback
2827

2928
from fim.user import GraphFormat
3029

3130
from fabric_cf.actor.core.apis.abc_mgmt_controller_mixin import ABCMgmtControllerMixin
3231
from fabric_cf.actor.core.common.constants import Constants
33-
from fabric_cf.actor.core.kernel.reservation_states import ReservationStates
3432
from fabric_cf.actor.core.manage.management_utils import ManagementUtils
3533
from fabric_cf.actor.core.util.id import ID
3634
from fabric_cf.orchestrator.core.bqm_wrapper import BqmWrapper
3735
from fabric_cf.orchestrator.core.exceptions import OrchestratorException
38-
from fabric_cf.orchestrator.core.orchestrator_slice_wrapper import OrchestratorSliceWrapper
3936
from fabric_cf.orchestrator.core.reservation_status_update_thread import ReservationStatusUpdateThread
37+
from fabric_cf.orchestrator.core.slice_demand_thread import SliceDemandThread
4038

4139

4240
class OrchestratorKernel:
@@ -46,6 +44,7 @@ class OrchestratorKernel:
4644

4745
def __init__(self):
4846
self.lock = threading.Lock()
47+
self.demand_thread = None
4948
self.sut = None
5049
self.broker = None
5150
self.logger = None
@@ -79,31 +78,6 @@ def save_bqm(self, *, bqm: str, graph_format: GraphFormat):
7978
finally:
8079
self.lock.release()
8180

82-
def demand_slice(self, *, controller_slice: OrchestratorSliceWrapper):
83-
"""
84-
Demand slice reservations.
85-
:param controller_slice:
86-
"""
87-
computed_reservations = controller_slice.get_computed_reservations()
88-
89-
try:
90-
self.lock.acquire()
91-
for reservation in computed_reservations:
92-
self.get_logger().debug(f"Issuing demand for reservation: {reservation.get_reservation_id()}")
93-
94-
if reservation.get_state() != ReservationStates.Unknown.value:
95-
self.get_logger().debug(f"Reservation not in {reservation.get_state()} state, ignoring it")
96-
continue
97-
98-
if not self.controller.demand_reservation(reservation=reservation):
99-
raise OrchestratorException(f"Could not demand resources: {self.controller.get_last_error()}")
100-
self.get_logger().debug(f"Reservation #{reservation.get_reservation_id()} demanded successfully")
101-
except Exception as e:
102-
self.get_logger().error(traceback.format_exc())
103-
self.get_logger().error("Unable to get orchestrator or demand reservation: {}".format(e))
104-
finally:
105-
self.lock.release()
106-
10781
def set_broker(self, *, broker: ID):
10882
"""
10983
Set Broker
@@ -119,6 +93,9 @@ def get_broker(self) -> ID:
11993
"""
12094
return self.broker
12195

96+
def get_demand_thread(self) -> SliceDemandThread:
97+
return self.demand_thread
98+
12299
def get_sut(self) -> ReservationStatusUpdateThread:
123100
"""
124101
Get SUT thread
@@ -150,6 +127,8 @@ def stop_threads(self):
150127
Stop threads
151128
:return:
152129
"""
130+
if self.demand_thread is not None:
131+
self.demand_thread.stop()
153132
if self.sut is not None:
154133
self.sut.stop()
155134

@@ -158,6 +137,9 @@ def start_threads(self):
158137
Start threads
159138
:return:
160139
"""
140+
self.get_logger().debug("Starting SliceDemandThread")
141+
self.demand_thread = SliceDemandThread(kernel=self)
142+
self.demand_thread.start()
161143
self.get_logger().debug("Starting ReservationStatusUpdateThread")
162144
self.sut = ReservationStatusUpdateThread()
163145
self.sut.start()
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
#!/usr/bin/env python3
2+
# MIT License
3+
#
4+
# Copyright (c) 2020 FABRIC Testbed
5+
#
6+
# Permission is hereby granted, free of charge, to any person obtaining a copy
7+
# of this software and associated documentation files (the "Software"), to deal
8+
# in the Software without restriction, including without limitation the rights
9+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
# copies of the Software, and to permit persons to whom the Software is
11+
# furnished to do so, subject to the following conditions:
12+
#
13+
# The above copyright notice and this permission notice shall be included in all
14+
# copies or substantial portions of the Software.
15+
#
16+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
# SOFTWARE.
23+
#
24+
#
25+
# Author: Komal Thareja ([email protected])
26+
import queue
27+
import threading
28+
import traceback
29+
30+
from fabric_cf.actor.core.kernel.reservation_states import ReservationStates
31+
from fabric_cf.actor.core.util.id import ID
32+
from fabric_cf.actor.core.util.iterable_queue import IterableQueue
33+
from fabric_cf.orchestrator.core.exceptions import OrchestratorException
34+
from fabric_cf.orchestrator.core.orchestrator_slice_wrapper import OrchestratorSliceWrapper
35+
from fabric_cf.orchestrator.core.reservation_status_update import ReservationStatusUpdate
36+
37+
38+
class SliceDemandThread:
39+
"""
40+
This runs as a standalone thread started by Orchestrator and deals with issuing demand for the slivers for
41+
the newly created slices. The purpose of this thread is to help orchestrator respond back to the create
42+
without waiting for the slivers to be demanded
43+
"""
44+
45+
def __init__(self, *, kernel):
46+
self.slice_queue = queue.Queue()
47+
self.slice_avail_condition = threading.Condition()
48+
self.thread_lock = threading.Lock()
49+
self.thread = None
50+
self.stopped = False
51+
from fabric_cf.actor.core.container.globals import GlobalsSingleton
52+
self.logger = GlobalsSingleton.get().get_logger()
53+
self.mgmt_actor = kernel.get_management_actor()
54+
self.sut = kernel.get_sut()
55+
56+
def queue_slice(self, *, controller_slice: OrchestratorSliceWrapper):
57+
"""
58+
Queue a slice
59+
:param controller_slice:
60+
:return:
61+
"""
62+
with self.slice_avail_condition:
63+
self.slice_queue.put_nowait(controller_slice)
64+
self.logger.debug(f"Added slice to slices queue {controller_slice.get_slice_id()}")
65+
self.slice_avail_condition.notify_all()
66+
67+
def start(self):
68+
"""
69+
Start thread
70+
:return:
71+
"""
72+
try:
73+
self.thread_lock.acquire()
74+
if self.thread is not None:
75+
raise OrchestratorException("This SliceDemandThread has already been started")
76+
77+
self.thread = threading.Thread(target=self.run)
78+
self.thread.setName(self.__class__.__name__)
79+
self.thread.setDaemon(True)
80+
self.thread.start()
81+
82+
finally:
83+
self.thread_lock.release()
84+
85+
def stop(self):
86+
"""
87+
Stop thread
88+
:return:
89+
"""
90+
self.stopped = True
91+
try:
92+
self.thread_lock.acquire()
93+
temp = self.thread
94+
self.thread = None
95+
if temp is not None:
96+
self.logger.warning("It seems that the SliceDemandThread is running. Interrupting it")
97+
try:
98+
# TODO find equivalent of interrupt
99+
with self.slice_avail_condition:
100+
self.slice_avail_condition.notify_all()
101+
temp.join()
102+
except Exception as e:
103+
self.logger.error(f"Could not join SliceDemandThread thread {e}")
104+
finally:
105+
self.thread_lock.release()
106+
finally:
107+
if self.thread_lock is not None and self.thread_lock.locked():
108+
self.thread_lock.release()
109+
110+
def run(self):
111+
"""
112+
Thread main loop
113+
:return:
114+
"""
115+
self.logger.debug("SliceDemandThread started")
116+
while True:
117+
slices = []
118+
with self.slice_avail_condition:
119+
120+
while self.slice_queue.empty() and not self.stopped:
121+
try:
122+
self.slice_avail_condition.wait()
123+
except InterruptedError as e:
124+
self.logger.info("Slice Demand thread interrupted. Exiting")
125+
return
126+
127+
if self.stopped:
128+
self.logger.info("Slice Demand Thread exiting")
129+
return
130+
131+
if not self.slice_queue.empty():
132+
try:
133+
for s in IterableQueue(source_queue=self.slice_queue):
134+
slices.append(s)
135+
except Exception as e:
136+
self.logger.error(f"Error while adding slice to slice queue! e: {e}")
137+
self.logger.error(traceback.format_exc())
138+
139+
self.slice_avail_condition.notify_all()
140+
141+
if len(slices) > 0:
142+
self.logger.debug(f"Processing {len(slices)} slices")
143+
for s in slices:
144+
try:
145+
# Process the Slice i.e. Demand the computed reservations i.e. Add them to the policy
146+
# Once added to the policy; Actor Tick Handler will do following asynchronously:
147+
# 1. Ticket message exchange with broker and
148+
# 2. Redeem message exchange with AM once ticket is granted by Broker
149+
self.demand_slice(controller_slice=s)
150+
except Exception as e:
151+
self.logger.error(f"Error while processing slice {type(s)}, {e}")
152+
self.logger.error(traceback.format_exc())
153+
154+
def demand_slice(self, *, controller_slice: OrchestratorSliceWrapper):
155+
"""
156+
Demand slice reservations.
157+
:param controller_slice:
158+
"""
159+
computed_reservations = controller_slice.get_computed_reservations()
160+
161+
try:
162+
controller_slice.lock()
163+
for reservation in computed_reservations:
164+
self.logger.debug(f"Issuing demand for reservation: {reservation.get_reservation_id()}")
165+
166+
if reservation.get_state() != ReservationStates.Unknown.value:
167+
self.logger.debug(f"Reservation not in {reservation.get_state()} state, ignoring it")
168+
continue
169+
170+
if not self.mgmt_actor.demand_reservation(reservation=reservation):
171+
raise OrchestratorException(f"Could not demand resources: {self.mgmt_actor.get_last_error()}")
172+
self.logger.debug(f"Reservation #{reservation.get_reservation_id()} demanded successfully")
173+
174+
for r in controller_slice.computed_l3_reservations:
175+
res_status_update = ReservationStatusUpdate(logger=self.logger)
176+
self.sut.add_active_status_watch(watch=ID(uid=r.get_reservation_id()),
177+
callback=res_status_update)
178+
except Exception as e:
179+
self.logger.error(traceback.format_exc())
180+
self.logger.error("Unable to get orchestrator or demand reservation: {}".format(e))
181+
finally:
182+
controller_slice.unlock()

0 commit comments

Comments
 (0)