Skip to content

Instantly share code, notes, and snippets.

@Elucidation
Created June 24, 2023 00:23
Show Gist options
  • Save Elucidation/739b2e3b29b74d2155ea716256730f0f to your computer and use it in GitHub Desktop.
Save Elucidation/739b2e3b29b74d2155ea716256730f0f to your computer and use it in GitHub Desktop.
robot allocator line_profiler results
Timer unit: 1e-06 s
Total time: 0.922614 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: __init__ at line 49
Line # Hits Time Per Hit % Time Line Contents
==============================================================
49 @profile
50 def __init__(self, logger, redis_con: redis.Redis, wdb: WorldDatabaseManager,
51 world_info: WorldInfo,
52 heuristic_dict: dict[Position, 'np.ndarray']) -> None:
53 1 2.3 2.3 0.0 self.logger = logger
54
55 # Connect to redis database
56 1 20.9 20.9 0.0 self.wdb = wdb
57
58 # Tracks Order / Task, notifies item adds etc.
59 1 1.2 1.2 0.0 self.redis_db = redis_con
60
61 # Load grid positions all in x,y coordinates
62 1 2.9 2.9 0.0 self.world_grid = world_info.world_grid
63 1 2.2 2.2 0.0 self.robot_home_zones = world_info.robot_home_zones
64 1 3.0 3.0 0.0 self.item_load_zones = world_info.item_load_zones
65 1 1.9 1.9 0.0 self.station_zones = world_info.station_zones
66
67 1 0.9 0.9 0.0 self.heuristic_dict = heuristic_dict
68
69 # locks for zones
70 1 83.2 83.2 0.0 self.item_locks: dict[Position, Optional[JobId]] = {
71 1 0.3 0.3 0.0 pos: None for pos in self.item_load_zones}
72 1 30.1 30.1 0.0 self.station_locks: dict[Position, Optional[JobId]] = {
73 1 0.5 0.5 0.0 pos: None for pos in self.station_zones}
74
75 1 2.0 2.0 0.0 self.max_steps = MAX_PATH_STEPS # hard-coded search tile limit for pathing
76
77 # Keep track of all jobs, even completed
78 1 3.9 3.9 0.0 self.job_id_counter: JobId = JobId(0)
79 1 0.7 0.7 0.0 self.jobs: dict[JobId, Job] = {}
80
81 # Get delta time step used by world sim
82 1 1934.4 1934.4 0.2 self.dt_sec = self.wdb.get_dt_sec()
83
84 # Using world info set up static dynamic obstacles
85 1 74.0 74.0 0.0 self.static_obstacles = self.get_all_static_obstacles()
86
87 # Try and wait for world state update to get data and reset robots
88 1 768543.7 768543.7 83.3 response = self.redis_db.xread(
89 1 1.0 1.0 0.0 {'world:state': '$'}, block=1000, count=1)
90 1 0.8 0.8 0.0 if response:
91 # Parse robot data from update message
92 1 1.4 1.4 0.0 timestamp, data = response[0][1][0]
93 1 4.5 4.5 0.0 self.world_sim_t = int(data['t'])
94 1 7419.5 7419.5 0.8 self.robots = [Robot.from_json(json_data)
95 1 277.4 277.4 0.0 for json_data in json.loads(data['robots'])]
96 1 691.9 691.9 0.1 self.logger.info(
97 1 0.7 0.7 0.0 'RA restart Step start T=%d timestamp=%s %s',
98 1 0.8 0.8 0.0 self.world_sim_t, timestamp, '-'*100)
99 else:
100 # Get all robots regardless of state
101 # assume no robots will be added or removed for duration of this instance
102 self.robots = self.wdb.get_robots()
103
104 # Track robot allocations as allocations[robot_id] = job_id
105 1 51.6 51.6 0.0 self.allocations: dict[RobotId, Optional[JobId]] = {
106 1 1.4 1.4 0.0 robot.robot_id: None for robot in self.robots}
107
108 # Latest dynamic obstacles, reset on update(), updated with every new path in that cycle
109 1 0.9 0.9 0.0 self.latest_dynamic_obstacles = None
110
111 # Try to find paths for robots to go home, since robots no longer are pathing, no
112 # issue with this taking more than one time step.
113 100 134.6 1.3 0.0 for idx, robot in enumerate(self.robots):
114 # On RA reset we assume any existing tasks/jobs are lost for the robot,
115 # so drop item and task, assign it a job to go home if it's not already there.
116 100 74.4 0.7 0.0 robot.held_item_id = None
117 100 58.3 0.6 0.0 robot.task_key = None
118 100 98416.8 984.2 10.7 print(robot, idx, self.robot_home_zones)
119 57 326.9 5.7 0.0 if robot.pos == self.robot_home_zones[idx]:
120 43 306.5 7.1 0.0 robot.set_path([]) # Clear any paths
121 43 54.4 1.3 0.0 robot.state_description = 'Waiting for task'
122 43 107.9 2.5 0.0 robot.state = RobotStatus.AVAILABLE
123 43 21.9 0.5 0.0 continue
124
125 # Assign a job to go home if it's not already there.
126 57 3461.3 60.7 0.4 job = self.make_and_assign_taskless_return_home_job(robot)
127 57 22955.1 402.7 2.5 self.logger.warning(
128 57 881.0 15.5 0.1 f'{robot.robot_id} starting outside of home, assigning taskless job home {job}')
129 57 94.6 1.7 0.0 self.jobs[job.job_id] = job
130 57 87.3 1.5 0.0 self.allocations[robot.robot_id] = job.job_id
131 1 13203.5 13203.5 1.4 self.wdb.update_robots(self.robots) # Update new robot state
132
133 # Move all in progress tasks back to head of new
134 1 1896.2 1896.2 0.2 task_keys = self.redis_db.smembers('tasks:inprogress')
135 1 15.6 15.6 0.0 pipeline = self.redis_db.pipeline()
136 1 0.5 0.5 0.0 if task_keys:
137 1 21.5 21.5 0.0 pipeline.lpush('tasks:new', *task_keys)
138 1 5.8 5.8 0.0 pipeline.delete('tasks:inprogress')
139 1 1330.2 1330.2 0.1 pipeline.execute()
Total time: 5.17e-05 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: get_all_static_obstacles at line 141
Line # Hits Time Per Hit % Time Line Contents
==============================================================
141 @profile
142 def get_all_static_obstacles(self):
143 """Get all static obstacles"""
144 1 2.0 2.0 3.9 static_obstacles: set[tuple[int, int]] = set() # set{(row,col), ...}
145 # Maeke all zones static obstacles (lifted later for individual robots)
146 1 32.9 32.9 63.6 static_obstacles.update(self.robot_home_zones)
147 1 9.6 9.6 18.6 static_obstacles.update(self.item_load_zones)
148 1 7.0 7.0 13.5 static_obstacles.update(self.station_zones)
149 1 0.2 0.2 0.4 return static_obstacles
Total time: 0.0071125 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: make_job at line 151
Line # Hits Time Per Hit % Time Line Contents
==============================================================
151 @profile
152 def make_job(self, task_key: str, robot: Robot) -> Job:
153 """Create a job for a given robot and task"""
154 192 167.3 0.9 2.4 assert robot.state == RobotStatus.AVAILABLE
155
156 192 2644.1 13.8 37.2 task_ids = parse_task_key_to_ids(task_key)
157
158 # Get positions along the route
159 192 224.1 1.2 3.2 robot_home = self.robot_home_zones[robot.robot_id]
160 192 268.6 1.4 3.8 item_zone = self.item_load_zones[task_ids.item_id]
161 # Note: Hacky, off-by-one because indexing starts at 1, not zero
162 192 175.9 0.9 2.5 station_zone = self.station_zones[task_ids.station_id - 1]
163
164 # Create job and increment counter
165 192 242.0 1.3 3.4 job_data = {
166 192 54.5 0.3 0.8 'task_key': task_key,
167 192 77.1 0.4 1.1 'station_id': task_ids.station_id,
168 192 87.4 0.5 1.2 'order_id': task_ids.order_id,
169 192 68.5 0.4 1.0 'item_id': task_ids.item_id,
170 192 119.2 0.6 1.7 'idx': task_ids.idx,
171 192 72.0 0.4 1.0 'robot_id': robot.robot_id,
172 192 113.6 0.6 1.6 'robot_start_pos': robot.pos,
173 192 54.8 0.3 0.8 'robot_home': robot_home,
174 192 66.8 0.3 0.9 'item_zone': item_zone,
175 192 73.3 0.4 1.0 'station_zone': station_zone,
176 }
177 192 92.6 0.5 1.3 job_id = self.job_id_counter
178 192 356.4 1.9 5.0 self.job_id_counter = JobId(self.job_id_counter + 1)
179 192 1731.2 9.0 24.3 job = Job(job_id, job_data)
180 # Set robot state
181 192 157.9 0.8 2.2 robot.state = RobotStatus.IN_PROGRESS
182 192 124.6 0.6 1.8 robot.state_description = 'Assigned new task'
183 192 87.3 0.5 1.2 robot.task_key = job.task_key
184 192 53.3 0.3 0.7 return job
Total time: 0.0019242 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: make_and_assign_taskless_return_home_job at line 186
Line # Hits Time Per Hit % Time Line Contents
==============================================================
186 @profile
187 def make_and_assign_taskless_return_home_job(self, robot: Robot) -> Job:
188 """Create and assign a taskless job for a given robot to go home."""
189 57 72.9 1.3 3.8 robot_home = self.robot_home_zones[robot.robot_id]
190
191 # Create job and increment counter
192 57 112.4 2.0 5.8 job_data = {
193 57 53.9 0.9 2.8 'task_key': '',
194 57 22.9 0.4 1.2 'station_id': None,
195 57 17.4 0.3 0.9 'order_id': None,
196 57 22.4 0.4 1.2 'item_id': None,
197 57 18.0 0.3 0.9 'idx': None,
198 57 24.1 0.4 1.3 'robot_id': robot.robot_id,
199 57 37.9 0.7 2.0 'robot_start_pos': robot.pos,
200 57 16.9 0.3 0.9 'robot_home': robot_home,
201 57 16.0 0.3 0.8 'item_zone': None,
202 57 21.1 0.4 1.1 'station_zone': None,
203 }
204 57 31.3 0.5 1.6 job_id = self.job_id_counter
205 57 218.5 3.8 11.4 self.job_id_counter = JobId(self.job_id_counter + 1)
206 57 768.0 13.5 39.9 job = Job(job_id, job_data)
207 57 45.3 0.8 2.4 job.state = JobState.RESTART_MGR_GO_HOME # Next stop is returning home
208 # Set robot state
209 57 44.2 0.8 2.3 robot.state = RobotStatus.IN_PROGRESS
210 57 42.5 0.7 2.2 robot.state_description = 'Allocator restart, trying to return home'
211 57 24.5 0.4 1.3 robot.task_key = None
212 # Set current pos or end pos locks if they end in zones
213 37 50.8 1.4 2.6 if robot.pos in self.station_locks:
214 20 23.1 1.2 1.2 self.station_locks[robot.pos] = job_id
215 37 100.4 2.7 5.2 elif robot.pos in self.item_locks:
216 37 43.5 1.2 2.3 self.item_locks[robot.pos] = job_id
217 57 77.7 1.4 4.0 if robot.future_path:
218 end_pos = robot.future_path[-1]
219 if end_pos in self.station_locks:
220 self.station_locks[end_pos] = job_id
221 elif end_pos in self.item_locks:
222 self.item_locks[end_pos] = job_id
223 57 18.5 0.3 1.0 return job
Total time: 0.635725 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: get_all_current_dynamic_obstacles at line 225
Line # Hits Time Per Hit % Time Line Contents
==============================================================
225 @profile
226 def get_all_current_dynamic_obstacles(self) -> set[tuple[int, int, int]]:
227 """Return existing robot future paths as dynamic obstacles
228
229 Returns:
230 set[tuple[int, int, int]]: set of (row, col, t) dynamic obstacles with current t=-1
231
232 """
233 # For dynamic obstacles, assume current moment is t=-1, next moment is t=0
234 182 102.3 0.6 0.0 dynamic_obstacles: set[tuple[int, int, int]
235 182 412.6 2.3 0.1 ] = set() # set{(row,col,t), ...}
236
237 18200 7283.1 0.4 1.1 for robot in self.robots:
238 11877 6070.4 0.5 1.0 if not robot.future_path:
239 # Robot will be added as a static obstacle separately
240 6323 1690.1 0.3 0.3 continue
241 # Add all positions along future path
242 190865 83542.4 0.4 13.1 for t_step, pos in enumerate(robot.future_path):
243 190865 150995.7 0.8 23.8 dynamic_obstacles.add((pos[0], pos[1], t_step))
244 # Also add it at future and past time to keep robots from slipping nearby
245 190865 148058.7 0.8 23.3 dynamic_obstacles.add((pos[0], pos[1], t_step+1))
246 # TODO : This is hacky, get more precise on when robots go along paths.
247 190865 164502.7 0.9 25.9 dynamic_obstacles.add((pos[0], pos[1], t_step-1))
248
249 11877 5573.2 0.5 0.9 path_len_t = len(robot.future_path)
250 # Add final position a few times to give space for robot to move
251 11877 5371.5 0.5 0.8 last_pos = robot.future_path[path_len_t-1]
252 59385 23082.1 0.4 3.6 for t_step in range(path_len_t, path_len_t+5):
253 59385 38945.9 0.7 6.1 dynamic_obstacles.add((last_pos[0], last_pos[1], t_step))
254
255 182 94.3 0.5 0.0 return dynamic_obstacles
Total time: 0.0379767 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: get_current_static_obstacles at line 257
Line # Hits Time Per Hit % Time Line Contents
==============================================================
257 @profile
258 def get_current_static_obstacles(self) -> set[Position]:
259 """Return static obstacles with stationary robots too
260
261 Returns a set[Position]: set of (row, col) static obstacles
262 """
263 # Add stationary robots to static obstacles
264 481 3869.1 8.0 10.2 static_obstacles = self.static_obstacles.copy()
265 481 33824.3 70.3 89.1 static_obstacles.update(
266 481 146.9 0.3 0.4 robot.pos for robot in self.robots if not robot.future_path)
267 481 136.4 0.3 0.4 return static_obstacles
Total time: 0.377753 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: get_robot at line 269
Line # Hits Time Per Hit % Time Line Contents
==============================================================
269 @profile
270 def get_robot(self, robot_id: RobotId) -> Robot:
271 """Get robot by id from stored list of robots"""
272 # TODO : Replace this with dict[robot_id] -> robot
273 647775 168156.4 0.3 44.5 for robot in self.robots:
274 634837 204737.4 0.3 54.2 if robot.robot_id == robot_id:
275 12938 4858.8 0.4 1.3 return robot
276 raise ValueError(f'get_robot called with invalid robot id {robot_id}')
Total time: 0.0083703 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: robot_pick_item at line 278
Line # Hits Time Per Hit % Time Line Contents
==============================================================
278 @profile
279 def robot_pick_item(self, robot_id: RobotId,
280 item_id: ItemId) -> Tuple[bool, Optional[ItemId]]:
281 """Robot by id pick the given item if possible, return held item."""
282 # Return fail if already holding an item
283 151 7882.5 52.2 94.2 robot = self.get_robot(robot_id)
284 151 416.5 2.8 5.0 if not robot.hold_item(item_id):
285 return (False, robot.held_item_id)
286
287 151 71.3 0.5 0.9 return (True, robot.held_item_id)
Total time: 0.0063956 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: robot_drop_item at line 289
Line # Hits Time Per Hit % Time Line Contents
==============================================================
289 @profile
290 def robot_drop_item(self, robot_id: RobotId) -> Tuple[bool, Optional[ItemId]]:
291 """Robot by id drop held item, or false if none"""
292 111 5937.1 53.5 92.8 robot = self.get_robot(robot_id)
293 111 356.0 3.2 5.6 item_id = robot.drop_item()
294 111 61.7 0.6 1.0 if item_id is None:
295 return (False, item_id)
296
297 # Return success and the item dropped
298 111 40.8 0.4 0.6 return (True, item_id)
Total time: 0.0072711 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: get_available_robots at line 300
Line # Hits Time Per Hit % Time Line Contents
==============================================================
300 @profile
301 def get_available_robots(self) -> list[Robot]:
302 """Finds all robots currently available"""
303 182 7271.1 40.0 100.0 return [robot for robot in self.robots if robot.state == RobotStatus.AVAILABLE]
Total time: 0 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: get_first_available_robot at line 305
Line # Hits Time Per Hit % Time Line Contents
==============================================================
305 @profile
306 def get_first_available_robot(self) -> Optional[Robot]:
307 """Get first available robot."""
308 for robot in self.robots:
309 if robot.state == RobotStatus.AVAILABLE:
310 return robot
311 return None
Total time: 0 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: set_robot_error at line 313
Line # Hits Time Per Hit % Time Line Contents
==============================================================
313 @profile
314 def set_robot_error(self, robot_id: RobotId):
315 """Set robot state to error"""
316 robot = self.get_robot(robot_id)
317 robot.state = RobotStatus.ERROR
Total time: 0 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: find_and_assign_task_to_robot at line 319
Line # Hits Time Per Hit % Time Line Contents
==============================================================
319 @profile
320 def find_and_assign_task_to_robot(self) -> Optional[Job]:
321 """ Find and assign available robot an available task if they exist.
322 If yes, create a job for that robot, update states and return the job. """
323 robot = self.get_first_available_robot()
324 if not robot:
325 return None # No available robots
326
327 # Pop task and push into inprogress
328 if not self.redis_db.exists('tasks:new') or self.redis_db.llen('tasks:new') == 0:
329 return None # No available tasks
330 task_key = self.redis_db.lpop()
331 if not task_key:
332 return None
333 return self.assign_task_to_robot(task_key, robot)
Total time: 0.0949133 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: assign_task_to_robot at line 335
Line # Hits Time Per Hit % Time Line Contents
==============================================================
335 @profile
336 def assign_task_to_robot(self, task_key: str, robot: Robot) -> Job:
337 """Given available robot and task, create job and assign robot to it"""
338 192 104.6 0.5 0.1 assert task_key is not None
339 192 512.5 2.7 0.5 if robot.state != RobotStatus.AVAILABLE:
340 raise ValueError(
341 f'{robot} not available for a new job: ({repr(robot.state)} == '
342 f'{repr(RobotStatus.AVAILABLE)}) = {robot.state == RobotStatus.AVAILABLE}')
343
344 # Returns task_key to tasks:new if it fails
345 192 9998.5 52.1 10.5 job = self.make_job(task_key, robot)
346 192 84203.7 438.6 88.7 self.logger.info(f'Created Job: {job}')
347 192 94.0 0.5 0.1 return job
Total time: 11.0341 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: update at line 349
Line # Hits Time Per Hit % Time Line Contents
==============================================================
349 @profile
350 def update(self, robots=None, time_read=None):
351 """Process jobs and assign robots tasks.
352 If update takes longer than a threshold, the step is skipped and redis is not updated."""
353 182 262.1 1.4 0.0 t_start = time.perf_counter() if time_read is None else time_read
354 182 20919.8 114.9 0.2 self.logger.debug('update start')
355
356 # 1 - Update to latest robots from WDB if not passed in
357 182 306.1 1.7 0.0 t_load_robots = time.perf_counter()
358 182 28552.3 156.9 0.3 self.robots = self.wdb.get_robots() if robots is None else robots
359
360 182 148.1 0.8 0.0 @profile
361 182 19387.0 106.5 0.2 def update_too_long():
362 """Check if time since t_start > MAX_UPDATE_TIME_SEC"""
363 return (time.perf_counter() - t_start) > MAX_UPDATE_TIME_SEC
364 182 1994.9 11.0 0.0 if update_too_long():
365 self.logger.warning('update started too late at %.2f ms > %.2f ms threshold, skipping',
366 (time.perf_counter() - t_start)*1000, MAX_UPDATE_TIME_SEC * 1000)
367 return
368 182 261.8 1.4 0.0 t_load_robots = (time.perf_counter() - t_load_robots)*1000
369
370 # 2 - Check and update any jobs
371 182 110.8 0.6 0.0 t_update_jobs = time.perf_counter()
372 182 1092.4 6.0 0.0 job_keys = list(self.jobs)
373 # TODO : Replace this with round-robin
374 182 73412.5 403.4 0.7 shuffled_job_keys = random.sample(job_keys, len(job_keys))
375
376 # Get the dynamic obstacles for this timestep
377 182 924110.0 5077.5 8.4 self.latest_dynamic_obstacles = self.get_all_current_dynamic_obstacles()
378 # Only process jobs for up to MAX_TIME_CHECK_JOB_SEC locally and MAX_UPDATE_TIME_SEC total
379 182 138.6 0.8 0.0 jobs_processed = 0
380 182 141.1 0.8 0.0 processed_jobs: list[Job] = []
381 17573 7284.3 0.4 0.1 for job_key in shuffled_job_keys:
382 # Create a copy of the job to be changed
383 17573 246275.3 14.0 2.2 job = self.jobs[job_key].copy()
384 17573 6016957.7 342.4 54.5 self.check_and_update_job(job)
385 17573 7124.4 0.4 0.1 jobs_processed += 1
386 17573 10969.0 0.6 0.1 processed_jobs.append(job)
387 17564 90203.9 5.1 0.8 if ((time.perf_counter() - t_start) > MAX_TIME_CHECK_JOB_SEC) or update_too_long():
388 9 7.5 0.8 0.0 break
389 182 153.5 0.8 0.0 t_update_jobs = (time.perf_counter() - t_update_jobs)*1000
390
391 # 3 - Now check for any available robots and tasks for
392 # up to MAX_TIME_ASSIGN_JOB_SEC locally and MAX_UPDATE_TIME_SEC total
393 182 77.4 0.4 0.0 t_assign = time.perf_counter()
394 182 78.6 0.4 0.0 new_jobs: list[Job] = []
395 # Get available robots
396 182 57.4 0.3 0.0 robots_assigned = 0
397 182 8296.8 45.6 0.1 available_robots = self.get_available_robots()
398 182 195.3 1.1 0.0 available_robots_count = len(available_robots)
399 # Get available new tasks
400 182 257945.9 1417.3 2.3 all_new_tasks_count = self.redis_db.llen('tasks:new')
401 182 195363.8 1073.4 1.8 new_tasks = self.redis_db.lrange(
402 182 76.5 0.4 0.0 'tasks:new', 0, available_robots_count)
403 182 223.9 1.2 0.0 new_tasks_count = len(new_tasks)
404 # For each available pair of robot and tasks
405 192 427.5 2.2 0.0 for idx in range(min(available_robots_count, new_tasks_count)):
406 192 351.6 1.8 0.0 if (time.perf_counter() - t_assign) > MAX_TIME_ASSIGN_JOB_SEC:
407 break
408 192 1794.4 9.3 0.0 if update_too_long():
409 break
410 # Assign new task to available robot, creating a new job
411 192 98.7 0.5 0.0 task_key = new_tasks[idx]
412 192 106.9 0.6 0.0 robot = available_robots[idx]
413 192 97182.1 506.2 0.9 new_job = self.assign_task_to_robot(task_key, robot)
414 192 185.6 1.0 0.0 new_jobs.append(new_job)
415 192 117.2 0.6 0.0 robots_assigned += 1
416 182 373.9 2.1 0.0 t_assign = (time.perf_counter() - t_assign)*1000
417
418 # Revert changes if at this point update took too long
419 # Expectation: No redis writes were done up to this point.
420 182 1835.1 10.1 0.0 if update_too_long():
421 update_duration_ms = (time.perf_counter() - t_start)*1000
422 self.logger.error(
423 f'update end, reverted due to over threshold, '
424 f'took {update_duration_ms:.3f} ms > {MAX_UPDATE_TIME_SEC*1000} ms threshold, '
425 f'reverting processed {jobs_processed}/{len(shuffled_job_keys)} jobs, '
426 f'reverting assigned {robots_assigned}/{available_robots_count} available robots '
427 f'to {new_tasks_count}/{all_new_tasks_count} available tasks')
428 return
429
430 # 4 - Batch update robots, jobs, tasks now
431 182 121.9 0.7 0.0 t_update_all = time.perf_counter()
432 182 2850.3 15.7 0.0 pipeline = self.redis_db.pipeline()
433 182 1165658.0 6404.7 10.6 self.wdb.update_robots(self.robots, pipeline=pipeline)
434
435 # replace stored jobs that were processed with the chaanged ones
436 17573 6876.2 0.4 0.1 for job in processed_jobs:
437 # Either replace job in progress, or pop completed ones
438 17424 10215.2 0.6 0.1 if job.state == JobState.COMPLETE:
439 # Remove allocation and job on completion
440 149 86.6 0.6 0.0 self.allocations[job.robot_id] = None
441 149 393.6 2.6 0.0 self.jobs.pop(job.job_id, None)
442 149 49.7 0.3 0.0 continue
443
444 17313 8390.6 0.5 0.1 if job.state == JobState.ITEM_DROPPED:
445 # Notify task complete (Order Proc adds item to station)
446 111 760.7 6.9 0.0 pipeline.srem('tasks:inprogress', job.task_key)
447 111 458.2 4.1 0.0 pipeline.lpush('tasks:processed', job.task_key)
448 111 52497.6 473.0 0.5 self.logger.info(f'Task {job.task_key} complete, '
449 111 39.4 0.4 0.0 f'Robot {job.robot_id} successfully dropped item')
450
451 17424 22850.9 1.3 0.2 self.jobs[job.job_id] = job
452 17424 8562.9 0.5 0.1 self.allocations[job.robot_id] = job.job_id
453
454 # For newly created jobs, track them and make their tasks inprogress in redis
455 192 84.8 0.4 0.0 for job in new_jobs:
456 192 123.6 0.6 0.0 self.jobs[job.job_id] = job # Track job
457 # Move task keys associated with new jobs from new -> inprogress
458 98 66.4 0.7 0.0 if new_jobs:
459 98 346.2 3.5 0.0 task_keys = [job.task_key for job in new_jobs]
460 98 776.5 7.9 0.0 pipeline.lpop('tasks:new', len(task_keys))
461 # Set tasks in progress
462 98 497.6 5.1 0.0 pipeline.sadd('tasks:inprogress', *task_keys)
463
464 # Execute transactions on redis
465 182 1633635.4 8976.0 14.8 pipeline.execute()
466 182 650.1 3.6 0.0 t_update_all = (time.perf_counter() - t_update_all)*1000
467
468 182 163.3 0.9 0.0 update_duration_ms = (time.perf_counter() - t_start)*1000
469 182 101979.4 560.3 0.9 self.logger.info(
470 728 576.4 0.8 0.0 f'update end, took {update_duration_ms:.3f} ms, '
471 364 211.4 0.6 0.0 f'processed {jobs_processed}/{len(shuffled_job_keys)} jobs, '
472 364 126.8 0.3 0.0 f'assigned {robots_assigned}/{available_robots_count} available robots '
473 364 125.4 0.3 0.0 f'to {new_tasks_count}/{all_new_tasks_count} available tasks '
474 728 319.8 0.4 0.0 f'[{t_load_robots:.3f}, {t_update_jobs:.3f}, {t_assign:.3f}, {t_update_all:.3f}] ms')
Total time: 6.57e-05 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: update_too_long at line 360
Line # Hits Time Per Hit % Time Line Contents
==============================================================
360 @profile
361 def update_too_long():
362 """Check if time since t_start > MAX_UPDATE_TIME_SEC"""
363 103 65.7 0.6 100.0 return (time.perf_counter() - t_start) > MAX_UPDATE_TIME_SEC
Total time: 3.67122 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: generate_path at line 480
Line # Hits Time Per Hit % Time Line Contents
==============================================================
480 @profile
481 def generate_path(self, pos_a: Position, pos_b: Position,
482 dynamic_obstacles, static_obstacles) -> Path:
483 """Generate a path from a to b avoiding existing robots"""
484 481 417.3 0.9 0.0 t_start = time.perf_counter()
485 481 518.5 1.1 0.0 stats = {
486 481 177.9 0.4 0.0 'pos_a': pos_a,
487 481 127.5 0.3 0.0 'pos_b': pos_b,
488 481 424.5 0.9 0.0 'count_dynamic_obstacles': len(dynamic_obstacles),
489 481 251.4 0.5 0.0 'count_static_obstacles': len(static_obstacles)
490 }
491
492 481 832.3 1.7 0.0 true_dists = self.heuristic_dict[pos_b]
493
494 481 816.0 1.7 0.0 def true_heuristic(pos_a: Position) -> float:
495 """Returns A* shortest path between any two points based on world_grid"""
496 return true_dists[pos_a]
497 481 3395334.2 7058.9 92.5 path = pf.st_astar(
498 481 260.4 0.5 0.0 self.world_grid, pos_a, pos_b, dynamic_obstacles, static_obstacles=static_obstacles,
499 481 271.7 0.6 0.0 end_fast=True, max_time=self.max_steps, heuristic=true_heuristic, stats=stats,
500 481 134.3 0.3 0.0 validate_ends=False)
501 481 262678.3 546.1 7.2 self.logger.info(
502 481 8790.2 18.3 0.2 f'generate_path took {(time.perf_counter() - t_start)*1000:.3f} ms - {stats}')
503 481 185.2 0.4 0.0 return path
Total time: 0.0479977 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: set_robot_path at line 505
Line # Hits Time Per Hit % Time Line Contents
==============================================================
505 @profile
506 def set_robot_path(self, robot: Robot, path: Path):
507 """Sets robot path, and also updates latest dynamic obstacles with this"""
508 481 3613.7 7.5 7.5 robot.set_path(path)
509 481 457.7 1.0 1.0 if self.latest_dynamic_obstacles is not None:
510 # TODO : Avoid repeating this.
511 # Add all positions along future path
512 13345 5637.3 0.4 11.7 for t_step, pos in enumerate(path):
513 13345 12441.3 0.9 25.9 self.latest_dynamic_obstacles.add((pos[0], pos[1], t_step))
514 # Also add it at future and past time to keep robots from slipping nearby
515 13345 10348.1 0.8 21.6 self.latest_dynamic_obstacles.add((pos[0], pos[1], t_step+1))
516 # TODO : This is hacky, get more precise on when robots go along paths.
517 13345 10516.5 0.8 21.9 self.latest_dynamic_obstacles.add((pos[0], pos[1], t_step-1))
518
519 481 401.3 0.8 0.8 path_len_t = len(path)
520 # Add final position a few times to give space for robot to move
521 481 286.2 0.6 0.6 last_pos = path[path_len_t-1]
522 2405 1966.6 0.8 4.1 for t_step in range(path_len_t, path_len_t+5):
523 2405 1330.6 0.6 2.8 self.latest_dynamic_obstacles.add(
524 2405 998.4 0.4 2.1 (last_pos[0], last_pos[1], t_step))
Total time: 1.27803 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: job_start at line 526
Line # Hits Time Per Hit % Time Line Contents
==============================================================
526 @profile
527 def job_start(self, job: Job) -> bool:
528 """Start job, pathing robot to item zone, or home."""
529 # Check if item zone lock available
530 3431 4604.7 1.3 0.4 if self.item_locks[job.item_zone] and self.item_locks[job.item_zone] != job.job_id:
531 # Item zone in use
532 3431 1160.9 0.3 0.1 return False
533 177 126.9 0.7 0.0 self.item_locks[job.item_zone] = job.job_id # take lock on item zone
534
535 177 9800.8 55.4 0.8 robot = self.get_robot(job.robot_id)
536 177 107.1 0.6 0.0 current_pos = robot.pos
537 # Try to generate new path for robot
538 177 14944.2 84.4 1.2 static_obstacles = self.get_current_static_obstacles()
539 177 1127449.8 6369.8 88.2 job.path_robot_to_item = self.generate_path(
540 177 119.4 0.7 0.0 current_pos, job.item_zone, self.latest_dynamic_obstacles, static_obstacles)
541 177 142.5 0.8 0.0 if not job.path_robot_to_item:
542 self.logger.warning(f'Robot {job.robot_id} no path to item zone')
543
544 if job.robot_home == current_pos:
545 return False
546 # Not at home, so try going home instead for now
547 path_to_home = self.generate_path(
548 current_pos, job.robot_home, self.latest_dynamic_obstacles, static_obstacles)
549 self.logger.warning(
550 f'Trying to go home {current_pos} -> {job.robot_home}')
551 if path_to_home:
552 self.set_robot_path(robot, path_to_home)
553 robot.state_description = 'Pathing home, waiting till item zone available'
554 return False # Did not start job as no path existed yet
555
556 177 30079.9 169.9 2.4 self.set_robot_path(robot, job.path_robot_to_item)
557 177 224.1 1.3 0.0 robot.state_description = 'Pathing to item zone'
558 177 1284.5 7.3 0.1 job.start()
559 177 87857.3 496.4 6.9 self.logger.info(f'Started {job}')
560 177 128.2 0.7 0.0 return True
Total time: 0.621937 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: job_try_pick_item at line 562
Line # Hits Time Per Hit % Time Line Contents
==============================================================
562 @profile
563 def job_try_pick_item(self, job: Job) -> bool:
564 """Have robot try and pick the item for the job."""
565 # Check that robot is at item zone or has a path
566 4372 238964.5 54.7 38.4 robot = self.get_robot(job.robot_id)
567 4221 7130.1 1.7 1.1 if robot.pos != job.item_zone:
568 4221 2287.1 0.5 0.4 if not robot.future_path:
569 self.logger.error(
570 f'Robot {robot.robot_id} path diverged from job pick item, reset state')
571 job.started = False
572 return False
573 4221 276803.4 65.6 44.5 self.logger.debug(
574 4221 17789.4 4.2 2.9 f'Robot {job.robot_id} not yet to item zone {robot.pos} -> {job.item_zone}')
575 4221 1307.6 0.3 0.2 return False
576
577 # Add item to held items for robot
578 151 9300.8 61.6 1.5 success, item_in_hand = self.robot_pick_item(
579 151 65.7 0.4 0.0 job.robot_id, job.item_id)
580 151 58.3 0.4 0.0 if not success:
581 self.logger.error(
582 f'Robot {job.robot_id} could not pick item for '
583 f'job {job}, already holding {item_in_hand}')
584 job.error = True
585 return False
586 151 568.5 3.8 0.1 job.pick_item()
587
588 151 67215.4 445.1 10.8 self.logger.info(f'Robot {job.robot_id} item picked {item_in_hand}')
589 151 392.0 2.6 0.1 robot.state_description = f'Picked item {item_in_hand}'
590 151 53.8 0.4 0.0 return True
Total time: 1.2635 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: job_go_to_station at line 592
Line # Hits Time Per Hit % Time Line Contents
==============================================================
592 @profile
593 def job_go_to_station(self, job: Job) -> bool:
594 """Have robot go to station."""
595 # Check if station zone lock available
596 1615 1754.7 1.1 0.1 if (self.station_locks[job.station_zone] and
597 1615 930.3 0.6 0.1 self.station_locks[job.station_zone] != job.job_id):
598 1615 495.4 0.3 0.0 return False # Item zone in use
599 # take lock on station zone
600 137 89.7 0.7 0.0 self.station_locks[job.station_zone] = job.job_id
601
602 137 7724.3 56.4 0.6 robot = self.get_robot(job.robot_id)
603 137 60.2 0.4 0.0 current_pos = robot.pos
604 # Try to generate new path for robot
605 137 11374.3 83.0 0.9 static_obstacles = self.get_current_static_obstacles()
606 137 1158528.9 8456.4 91.7 job.path_item_to_station = self.generate_path(
607 137 78.0 0.6 0.0 current_pos, job.station_zone, self.latest_dynamic_obstacles, static_obstacles)
608 137 96.6 0.7 0.0 if not job.path_item_to_station:
609 self.logger.warning('No path to station for %s', job)
610 # Try going home instead to leave space at station
611 path_to_home = self.generate_path(
612 current_pos, job.robot_home, self.latest_dynamic_obstacles, static_obstacles)
613 if path_to_home:
614 self.set_robot_path(robot, path_to_home)
615 robot.state_description = 'Pathing home, waiting till station available'
616 return False # Did not start job as no path existed yet
617
618 # unlock item zone since we're moving
619 137 329.2 2.4 0.0 self.item_locks[job.item_zone] = None
620
621 # Send robot to station
622 137 19773.9 144.3 1.6 self.set_robot_path(robot, job.path_item_to_station)
623 137 150.2 1.1 0.0 robot.state_description = 'Pathing to station'
624 137 933.2 6.8 0.1 job.going_to_station()
625 137 60851.7 444.2 4.8 self.logger.info(
626 137 257.0 1.9 0.0 f'Sending robot {job.robot_id} to station for task {job.task_key}')
627 137 68.9 0.5 0.0 return True
Total time: 0.446135 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: job_drop_item_at_station at line 629
Line # Hits Time Per Hit % Time Line Contents
==============================================================
629 @profile
630 def job_drop_item_at_station(self, job: Job) -> bool:
631 """Have robot drop item at station if possible."""
632 # Check that robot is at station zone
633 3530 192495.7 54.5 43.1 robot = self.get_robot(job.robot_id)
634 3419 5496.0 1.6 1.2 if robot.pos != job.station_zone:
635 3419 1668.7 0.5 0.4 if not robot.future_path:
636 self.logger.error(
637 f'Robot {robot.robot_id} path diverged from job drop item, reset state')
638 job.going_to_station = False
639 return False
640 3419 223074.3 65.2 50.0 self.logger.debug(
641 3419 14519.6 4.2 3.3 f'Robot {job.robot_id} not yet to station zone {robot.pos} -> {job.station_zone}')
642 3419 1072.7 0.3 0.2 return False
643
644 # Add Item to station (this finishes the task too)
645 # Drop item from held items for robot
646 111 7132.1 64.3 1.6 drop_success, item_id = self.robot_drop_item(job.robot_id)
647 111 37.1 0.3 0.0 if not drop_success:
648 self.logger.error(
649 f"Robot {job.robot_id} didn't have item to drop: {job.robot_id} held {item_id}")
650 self.set_robot_error(job.robot_id)
651 job.error = True
652 return False
653 111 49.7 0.4 0.0 if item_id != job.item_id:
654 self.logger.error(
655 f"Robot {job.robot_id} was holding the wrong "
656 f"item: {item_id}, needed {job.item_id}")
657 job.error = True
658 return False
659
660 111 372.0 3.4 0.1 job.drop_item()
661 111 186.6 1.7 0.0 robot.state_description = f'Finished task: Added item {job.item_id} to station'
662 111 30.9 0.3 0.0 return True
Total time: 0.855077 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: job_return_home at line 664
Line # Hits Time Per Hit % Time Line Contents
==============================================================
664 @profile
665 def job_return_home(self, job: Job) -> bool:
666 """Transition to return home state, having robot path home."""
667 # Try to generate new path for robot
668 110 5987.3 54.4 0.7 robot = self.get_robot(job.robot_id)
669 110 53.4 0.5 0.0 current_pos = robot.pos
670 110 8957.1 81.4 1.0 static_obstacles = self.get_current_static_obstacles()
671 110 775573.7 7050.7 90.7 job.path_station_to_home = self.generate_path(
672 110 63.5 0.6 0.0 current_pos, job.robot_home, self.latest_dynamic_obstacles, static_obstacles)
673 110 89.7 0.8 0.0 if not job.path_station_to_home:
674 self.logger.warning(f'Robot {job.robot_id} no path back home')
675 return False # Did not start job as no path existed yet
676 # Send robot home
677 110 14445.8 131.3 1.7 self.set_robot_path(robot, job.path_station_to_home)
678 110 133.4 1.2 0.0 robot.state_description = 'Finished task, returning home'
679 110 690.5 6.3 0.1 job.return_home()
680 # Release lock on station
681 110 199.6 1.8 0.0 self.station_locks[job.station_zone] = None
682 110 48608.0 441.9 5.7 self.logger.info(
683 110 194.2 1.8 0.0 f'Sending Robot {job.robot_id} back home for {job.task_key}')
684 110 80.8 0.7 0.0 return True
Total time: 0.607424 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: job_arrive_home at line 686
Line # Hits Time Per Hit % Time Line Contents
==============================================================
686 @profile
687 def job_arrive_home(self, job: Job) -> bool:
688 """If robot at home, finish the job."""
689 # Check that robot is at home zone
690 4144 236606.9 57.1 39.0 robot = self.get_robot(job.robot_id)
691 3995 6924.6 1.7 1.1 if robot.pos != job.robot_home:
692 3995 2073.2 0.5 0.3 if not robot.future_path:
693 self.logger.error(
694 f'Robot {robot.robot_id} path diverged from job arrive home, reset state')
695 return False
696 3995 264159.4 66.1 43.5 self.logger.debug(
697 3995 17200.1 4.3 2.8 f'Robot {job.robot_id} not yet to robot home {robot.pos} -> {job.robot_home}')
698 3995 1212.4 0.3 0.2 return False
699 149 66397.5 445.6 10.9 self.logger.info(
700 149 180.4 1.2 0.0 f'Robot {job.robot_id} returned home, job complete for task {job.task_key}')
701 149 12670.0 85.0 2.1 return self.job_complete(job)
Total time: 0.0109427 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: job_complete at line 703
Line # Hits Time Per Hit % Time Line Contents
==============================================================
703 @profile
704 def job_complete(self, job: Job) -> bool:
705 """Complete job and make robot available"""
706 # Normally this will be called in line with arriving home.
707 149 807.6 5.4 7.4 job.complete()
708 # Make robot available
709 149 9659.0 64.8 88.3 robot = self.get_robot(job.robot_id)
710 149 198.9 1.3 1.8 robot.state = RobotStatus.AVAILABLE
711 149 123.5 0.8 1.1 robot.task_key = None
712 149 114.8 0.8 1.0 robot.state_description = 'Waiting for task'
713 # Expect update to remove allocations and completed jobs separately
714 149 38.9 0.3 0.4 return True
Total time: 0 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: job_restart at line 716
Line # Hits Time Per Hit % Time Line Contents
==============================================================
716 @profile
717 def job_restart(self, job: Job):
718 """If something went wrong, reset job state to start, same robot/task"""
719 self.logger.error(
720 f'{job} in error for, resetting job and Robot {job.robot_id} etc.')
721
722 # Make robot available and drop any held items
723 robot = self.get_robot(job.robot_id)
724 robot.state = RobotStatus.AVAILABLE
725 robot.held_item_id = None
726
727 # Reset the job
728 job.reset()
729 # Clear any locks it may have had
730 if robot.pos in self.item_locks and self.item_locks[robot.pos] == job.job_id:
731 self.item_locks[robot.pos] = None
732 if robot.pos in self.station_locks and self.station_locks[robot.pos] == job.job_id:
733 self.station_locks[robot.pos] = None
734
735 # Try going home instead to leave space at station
736 robot = self.get_robot(job.robot_id)
737 current_pos = robot.pos
738 static_obstacles = self.get_current_static_obstacles()
739 path_to_home = self.generate_path(
740 current_pos, job.robot_home, self.latest_dynamic_obstacles, static_obstacles)
741 if path_to_home:
742 self.set_robot_path(robot, path_to_home)
743 # Set robots start pos to where it is currently
744 # job.robot_start_pos = robot.pos
745 return False
Total time: 0.665317 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: job_restart_manager_return_home at line 747
Line # Hits Time Per Hit % Time Line Contents
==============================================================
747 @profile
748 def job_restart_manager_return_home(self, job: Job) -> bool:
749 """Transition to restart manager return home state, wait for path and then path home."""
750 57 3665.4 64.3 0.6 robot = self.get_robot(job.robot_id)
751 # Wait for existing path to finish
752 57 52.2 0.9 0.0 if robot.future_path:
753 self.logger.warning(
754 f'{job} - Robot {job.robot_id} still finishing path')
755 return False # Did not generate path home yet since still have path
756 57 28.9 0.5 0.0 current_pos = robot.pos
757 # If path finished at home, we're done here
758 57 176.8 3.1 0.0 if current_pos == job.robot_home:
759 job.return_home()
760 return self.job_arrive_home(job)
761 # Try to find path to home
762 57 6831.6 119.9 1.0 static_obstacles = self.get_current_static_obstacles()
763 57 623307.8 10935.2 93.7 job.path_station_to_home = self.generate_path(
764 57 34.0 0.6 0.0 current_pos, job.robot_home, self.latest_dynamic_obstacles, static_obstacles)
765 57 48.5 0.9 0.0 if not job.path_station_to_home:
766 self.logger.warning(f'Robot {job.robot_id} no path back home')
767 return False # Did not start job as no path existed yet
768 # Send robot home
769 57 8041.4 141.1 1.2 self.set_robot_path(robot, job.path_station_to_home)
770 57 70.2 1.2 0.0 robot.state_description = 'Returning home'
771 57 470.3 8.3 0.1 job.return_home()
772 # Release lock on any zones if pos was in them
773 37 44.3 1.2 0.0 if current_pos in self.station_locks:
774 20 23.9 1.2 0.0 self.station_locks[current_pos] = None
775 37 207.5 5.6 0.0 elif current_pos in self.item_locks:
776 37 83.3 2.3 0.0 self.item_locks[current_pos] = None
777 57 22104.3 387.8 3.3 self.logger.info(
778 57 99.4 1.7 0.0 f'Sending Robot {job.robot_id} back home for {job.task_key}')
779 57 27.1 0.5 0.0 return True
Total time: 5.92265 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: check_and_update_job at line 794
Line # Hits Time Per Hit % Time Line Contents
==============================================================
794 @profile
795 def check_and_update_job(self, job: Job) -> bool:
796 """Process a job based on the current state. """
797
798 # Get the method for the current state
799 17573 32952.6 1.9 0.6 method_name = self.state_methods.get(job.state)
800 # print(f'check update for {job.state} : {method_name}')
801 17573 6441.2 0.4 0.1 if method_name is None:
802 return False
803 17573 10489.8 0.6 0.2 method = getattr(self, method_name)
804
805 17573 5985.7 0.3 0.1 if method is None:
806 return False
807
808 17573 5866776.1 333.9 99.1 return method(job)
Total time: 12.173 s
File: C:\Users\sam\Documents\GitHub\mapf-multiagent-robot-planning\dev\robot_allocator.py
Function: step at line 810
Line # Hits Time Per Hit % Time Line Contents
==============================================================
810 @profile
811 def step(self):
812 """Main update step for robot allocator, runs after world update."""
813 # Wait for world state update
814 183 976.8 5.3 0.0 response = self.redis_db.xread(
815 183 163.6 0.9 0.0 {'world:state': '$'}, block=1000, count=1)
816 182 554.9 3.0 0.0 time_read = time.perf_counter()
817 182 92.2 0.5 0.0 if not response:
818 return
819 # Parse robot data from update message
820 182 195.1 1.1 0.0 timestamp, data = response[0][1][0]
821 182 579.6 3.2 0.0 world_sim_t = int(data['t'])
822 182 356.3 2.0 0.0 if self.world_sim_t and world_sim_t <= self.world_sim_t:
823 # Error out of RA since WS restarted, let RA container restart
824 # TODO : Consider just resetting/restarting RA allocator in main loop.
825 raise ValueError(
826 'World time step state earlier than previous, world sim restarted.')
827 182 224.8 1.2 0.0 self.world_sim_t = world_sim_t
828 182 892170.8 4902.0 7.3 robots = [Robot.from_json(json_data)
829 182 47627.3 261.7 0.4 for json_data in json.loads(data['robots'])]
830
831 182 103085.0 566.4 0.8 self.logger.info(
832 182 102.5 0.6 0.0 'Step start T=%d timestamp=%s ---------------------------------'
833 182 98.2 0.5 0.0 '--------------------------------------------------------', self.world_sim_t, timestamp)
834 182 11107757.0 61031.6 91.2 self.update(robots, time_read=time_read)
835 182 19018.9 104.5 0.2 self.logger.debug('Step end')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment