| 1 | # -*- coding: utf-8 -*-
|
|---|
| 2 | from __future__ import (nested_scopes, generators, division, absolute_import,
|
|---|
| 3 | with_statement, print_function, unicode_literals)
|
|---|
| 4 | import sys
|
|---|
| 5 | from multiprocessing import cpu_count, Process, Queue
|
|---|
| 6 | import time
|
|---|
| 7 | from xml.etree.ElementTree import fromstring
|
|---|
| 8 |
|
|---|
| 9 | from grass.exceptions import CalledModuleError, GrassError, ParameterError
|
|---|
| 10 | from grass.script.core import Popen, PIPE, use_temp_region, del_temp_region
|
|---|
| 11 | from grass.script.utils import encode, decode
|
|---|
| 12 | from .docstring import docstring_property
|
|---|
| 13 | from .parameter import Parameter
|
|---|
| 14 | from .flag import Flag
|
|---|
| 15 | from .typedict import TypeDict
|
|---|
| 16 | from .read import GETFROMTAG, DOC
|
|---|
| 17 | from .env import G_debug
|
|---|
| 18 |
|
|---|
| 19 | if sys.version_info[0] == 2:
|
|---|
| 20 | from itertools import izip_longest as zip_longest
|
|---|
| 21 | else:
|
|---|
| 22 | from itertools import zip_longest
|
|---|
| 23 | unicode = str
|
|---|
| 24 |
|
|---|
| 25 |
|
|---|
| 26 | def _get_bash(self, *args, **kargs):
|
|---|
| 27 | return self.get_bash()
|
|---|
| 28 |
|
|---|
| 29 |
|
|---|
| 30 | class ParallelModuleQueue(object):
|
|---|
| 31 | """This class is designed to run an arbitrary number of pygrass Module or MultiModule
|
|---|
| 32 | processes in parallel.
|
|---|
| 33 |
|
|---|
| 34 | Objects of type grass.pygrass.modules.Module or
|
|---|
| 35 | grass.pygrass.modules.MultiModule can be put into the
|
|---|
| 36 | queue using put() method. When the queue is full with the maximum
|
|---|
| 37 | number of parallel processes it will wait for all processes to finish,
|
|---|
| 38 | sets the stdout and stderr of the Module object and removes it
|
|---|
| 39 | from the queue when its finished.
|
|---|
| 40 |
|
|---|
| 41 | To finish the queue before the maximum number of parallel
|
|---|
| 42 | processes was reached call wait() .
|
|---|
| 43 |
|
|---|
| 44 | This class will raise a GrassError in case a Module process exits
|
|---|
| 45 | with a return code other than 0.
|
|---|
| 46 |
|
|---|
| 47 | Processes that were run asynchronously with the MultiModule class
|
|---|
| 48 | will not raise a GrassError in case of failure. This must be manually checked
|
|---|
| 49 | by accessing finished modules by calling get_finished_modules().
|
|---|
| 50 |
|
|---|
| 51 | Usage:
|
|---|
| 52 |
|
|---|
| 53 | Check with a queue size of 3 and 5 processes
|
|---|
| 54 |
|
|---|
| 55 | >>> import copy
|
|---|
| 56 | >>> from grass.pygrass.modules import Module, ParallelModuleQueue
|
|---|
| 57 | >>> mapcalc_list = []
|
|---|
| 58 |
|
|---|
| 59 | Setting run_ to False is important, otherwise a parallel processing is not possible
|
|---|
| 60 |
|
|---|
| 61 | >>> mapcalc = Module("r.mapcalc", overwrite=True, run_=False)
|
|---|
| 62 | >>> queue = ParallelModuleQueue(nprocs=3)
|
|---|
| 63 | >>> for i in range(5):
|
|---|
| 64 | ... new_mapcalc = copy.deepcopy(mapcalc)
|
|---|
| 65 | ... mapcalc_list.append(new_mapcalc)
|
|---|
| 66 | ... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|---|
| 67 | ... queue.put(m)
|
|---|
| 68 | >>> queue.wait()
|
|---|
| 69 | >>> mapcalc_list = queue.get_finished_modules()
|
|---|
| 70 | >>> queue.get_num_run_procs()
|
|---|
| 71 | 0
|
|---|
| 72 | >>> queue.get_max_num_procs()
|
|---|
| 73 | 3
|
|---|
| 74 | >>> for mapcalc in mapcalc_list:
|
|---|
| 75 | ... print(mapcalc.popen.returncode)
|
|---|
| 76 | 0
|
|---|
| 77 | 0
|
|---|
| 78 | 0
|
|---|
| 79 | 0
|
|---|
| 80 | 0
|
|---|
| 81 |
|
|---|
| 82 | Check with a queue size of 8 and 5 processes
|
|---|
| 83 |
|
|---|
| 84 | >>> queue = ParallelModuleQueue(nprocs=8)
|
|---|
| 85 | >>> mapcalc_list = []
|
|---|
| 86 | >>> for i in range(5):
|
|---|
| 87 | ... new_mapcalc = copy.deepcopy(mapcalc)
|
|---|
| 88 | ... mapcalc_list.append(new_mapcalc)
|
|---|
| 89 | ... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|---|
| 90 | ... queue.put(m)
|
|---|
| 91 | >>> queue.wait()
|
|---|
| 92 | >>> mapcalc_list = queue.get_finished_modules()
|
|---|
| 93 | >>> queue.get_num_run_procs()
|
|---|
| 94 | 0
|
|---|
| 95 | >>> queue.get_max_num_procs()
|
|---|
| 96 | 8
|
|---|
| 97 | >>> for mapcalc in mapcalc_list:
|
|---|
| 98 | ... print(mapcalc.popen.returncode)
|
|---|
| 99 | 0
|
|---|
| 100 | 0
|
|---|
| 101 | 0
|
|---|
| 102 | 0
|
|---|
| 103 | 0
|
|---|
| 104 |
|
|---|
| 105 | Check MultiModule approach with three by two processes running in a background process
|
|---|
| 106 |
|
|---|
| 107 | >>> gregion = Module("g.region", flags="p", run_=False)
|
|---|
| 108 | >>> queue = ParallelModuleQueue(nprocs=3)
|
|---|
| 109 | >>> proc_list = []
|
|---|
| 110 | >>> for i in range(3):
|
|---|
| 111 | ... new_gregion = copy.deepcopy(gregion)
|
|---|
| 112 | ... proc_list.append(new_gregion)
|
|---|
| 113 | ... new_mapcalc = copy.deepcopy(mapcalc)
|
|---|
| 114 | ... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|---|
| 115 | ... proc_list.append(new_mapcalc)
|
|---|
| 116 | ... mm = MultiModule(module_list=[new_gregion, new_mapcalc], sync=False, set_temp_region=True)
|
|---|
| 117 | ... queue.put(mm)
|
|---|
| 118 | >>> queue.wait()
|
|---|
| 119 | >>> proc_list = queue.get_finished_modules()
|
|---|
| 120 | >>> queue.get_num_run_procs()
|
|---|
| 121 | 0
|
|---|
| 122 | >>> queue.get_max_num_procs()
|
|---|
| 123 | 3
|
|---|
| 124 | >>> for proc in proc_list:
|
|---|
| 125 | ... print(proc.popen.returncode)
|
|---|
| 126 | 0
|
|---|
| 127 | 0
|
|---|
| 128 | 0
|
|---|
| 129 | 0
|
|---|
| 130 | 0
|
|---|
| 131 | 0
|
|---|
| 132 |
|
|---|
| 133 | Check with a queue size of 8 and 4 processes
|
|---|
| 134 |
|
|---|
| 135 | >>> queue = ParallelModuleQueue(nprocs=8)
|
|---|
| 136 | >>> mapcalc_list = []
|
|---|
| 137 | >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|---|
| 138 | >>> mapcalc_list.append(new_mapcalc)
|
|---|
| 139 | >>> m = new_mapcalc(expression="test_pygrass_1 =1")
|
|---|
| 140 | >>> queue.put(m)
|
|---|
| 141 | >>> queue.get_num_run_procs()
|
|---|
| 142 | 1
|
|---|
| 143 | >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|---|
| 144 | >>> mapcalc_list.append(new_mapcalc)
|
|---|
| 145 | >>> m = new_mapcalc(expression="test_pygrass_2 =2")
|
|---|
| 146 | >>> queue.put(m)
|
|---|
| 147 | >>> queue.get_num_run_procs()
|
|---|
| 148 | 2
|
|---|
| 149 | >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|---|
| 150 | >>> mapcalc_list.append(new_mapcalc)
|
|---|
| 151 | >>> m = new_mapcalc(expression="test_pygrass_3 =3")
|
|---|
| 152 | >>> queue.put(m)
|
|---|
| 153 | >>> queue.get_num_run_procs()
|
|---|
| 154 | 3
|
|---|
| 155 | >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|---|
| 156 | >>> mapcalc_list.append(new_mapcalc)
|
|---|
| 157 | >>> m = new_mapcalc(expression="test_pygrass_4 =4")
|
|---|
| 158 | >>> queue.put(m)
|
|---|
| 159 | >>> queue.get_num_run_procs()
|
|---|
| 160 | 4
|
|---|
| 161 | >>> queue.wait()
|
|---|
| 162 | >>> mapcalc_list = queue.get_finished_modules()
|
|---|
| 163 | >>> queue.get_num_run_procs()
|
|---|
| 164 | 0
|
|---|
| 165 | >>> queue.get_max_num_procs()
|
|---|
| 166 | 8
|
|---|
| 167 | >>> for mapcalc in mapcalc_list:
|
|---|
| 168 | ... print(mapcalc.popen.returncode)
|
|---|
| 169 | 0
|
|---|
| 170 | 0
|
|---|
| 171 | 0
|
|---|
| 172 | 0
|
|---|
| 173 |
|
|---|
| 174 | Check with a queue size of 3 and 4 processes
|
|---|
| 175 |
|
|---|
| 176 | >>> queue = ParallelModuleQueue(nprocs=3)
|
|---|
| 177 | >>> mapcalc_list = []
|
|---|
| 178 | >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|---|
| 179 | >>> mapcalc_list.append(new_mapcalc)
|
|---|
| 180 | >>> m = new_mapcalc(expression="test_pygrass_1 =1")
|
|---|
| 181 | >>> queue.put(m)
|
|---|
| 182 | >>> queue.get_num_run_procs()
|
|---|
| 183 | 1
|
|---|
| 184 | >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|---|
| 185 | >>> mapcalc_list.append(new_mapcalc)
|
|---|
| 186 | >>> m = new_mapcalc(expression="test_pygrass_2 =2")
|
|---|
| 187 | >>> queue.put(m)
|
|---|
| 188 | >>> queue.get_num_run_procs()
|
|---|
| 189 | 2
|
|---|
| 190 | >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|---|
| 191 | >>> mapcalc_list.append(new_mapcalc)
|
|---|
| 192 | >>> m = new_mapcalc(expression="test_pygrass_3 =3")
|
|---|
| 193 | >>> queue.put(m) # Now it will wait until all procs finish and set the counter back to 0
|
|---|
| 194 | >>> queue.get_num_run_procs()
|
|---|
| 195 | 0
|
|---|
| 196 | >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|---|
| 197 | >>> mapcalc_list.append(new_mapcalc)
|
|---|
| 198 | >>> m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|---|
| 199 | >>> queue.put(m)
|
|---|
| 200 | >>> queue.get_num_run_procs()
|
|---|
| 201 | 1
|
|---|
| 202 | >>> queue.wait()
|
|---|
| 203 | >>> mapcalc_list = queue.get_finished_modules()
|
|---|
| 204 | >>> queue.get_num_run_procs()
|
|---|
| 205 | 0
|
|---|
| 206 | >>> queue.get_max_num_procs()
|
|---|
| 207 | 3
|
|---|
| 208 | >>> for mapcalc in mapcalc_list:
|
|---|
| 209 | ... print(mapcalc.popen.returncode)
|
|---|
| 210 | 0
|
|---|
| 211 | 0
|
|---|
| 212 | 0
|
|---|
| 213 | 0
|
|---|
| 214 |
|
|---|
| 215 | """
|
|---|
| 216 | def __init__(self, nprocs=1):
|
|---|
| 217 | """Constructor
|
|---|
| 218 |
|
|---|
| 219 | :param nprocs: The maximum number of Module processes that
|
|---|
| 220 | can be run in parallel, default is 1, if None
|
|---|
| 221 | then use all the available CPUs.
|
|---|
| 222 | :type nprocs: int
|
|---|
| 223 | """
|
|---|
| 224 | nprocs = int(nprocs) if nprocs else cpu_count()
|
|---|
| 225 | self._num_procs = nprocs
|
|---|
| 226 | self._list = nprocs * [None]
|
|---|
| 227 | self._proc_count = 0
|
|---|
| 228 | self._finished_modules = [] # Store all processed modules in a list
|
|---|
| 229 |
|
|---|
| 230 | def put(self, module):
|
|---|
| 231 | """Put the next Module or MultiModule object in the queue
|
|---|
| 232 |
|
|---|
| 233 | To run the Module objects in parallel the run\_ and finish\_ options
|
|---|
| 234 | of the Module must be set to False.
|
|---|
| 235 |
|
|---|
| 236 | :param module: a preconfigured Module or MultiModule object that were configured
|
|---|
| 237 | with run\_ and finish\_ set to False,
|
|---|
| 238 | :type module: Module or MultiModule object
|
|---|
| 239 | """
|
|---|
| 240 | self._list[self._proc_count] = module
|
|---|
| 241 | # Force that finish is False, otherwise the execution
|
|---|
| 242 | # will not be parallel
|
|---|
| 243 | self._list[self._proc_count].finish_ = False
|
|---|
| 244 | self._list[self._proc_count].run()
|
|---|
| 245 | self._proc_count += 1
|
|---|
| 246 |
|
|---|
| 247 | if self._proc_count == self._num_procs:
|
|---|
| 248 | self.wait()
|
|---|
| 249 |
|
|---|
| 250 | def get(self, num):
|
|---|
| 251 | """Get a Module object or list of Module objects from the queue
|
|---|
| 252 |
|
|---|
| 253 | :param num: the number of the object in queue
|
|---|
| 254 | :type num: int
|
|---|
| 255 | :returns: the Module object or list of Module objects or None if num is not in the queue
|
|---|
| 256 | """
|
|---|
| 257 | if num < self._num_procs:
|
|---|
| 258 | return self._list[num]
|
|---|
| 259 | return None
|
|---|
| 260 |
|
|---|
| 261 | def get_num_run_procs(self):
|
|---|
| 262 | """Get the number of Module processes that are in the queue running
|
|---|
| 263 | or finished
|
|---|
| 264 |
|
|---|
| 265 | :returns: the number fo Module processes running/finished in the queue
|
|---|
| 266 | """
|
|---|
| 267 | return self._proc_count
|
|---|
| 268 |
|
|---|
| 269 | def get_max_num_procs(self):
|
|---|
| 270 | """Return the maximum number of parallel Module processes
|
|---|
| 271 |
|
|---|
| 272 | :returns: the maximum number of parallel Module processes
|
|---|
| 273 | """
|
|---|
| 274 | return self._num_procs
|
|---|
| 275 |
|
|---|
| 276 | def set_max_num_procs(self, nprocs):
|
|---|
| 277 | """Set the maximum number of Module processes that should run
|
|---|
| 278 | in parallel
|
|---|
| 279 |
|
|---|
| 280 | :param nprocs: The maximum number of Module processes that can be
|
|---|
| 281 | run in parallel
|
|---|
| 282 | :type nprocs: int
|
|---|
| 283 | """
|
|---|
| 284 | self._num_procs = int(nprocs)
|
|---|
| 285 | self.wait()
|
|---|
| 286 |
|
|---|
| 287 | def get_finished_modules(self):
|
|---|
| 288 | """Return all finished processes that were run by this queue
|
|---|
| 289 |
|
|---|
| 290 | :return: A list of Module objects
|
|---|
| 291 | """
|
|---|
| 292 | return self._finished_modules
|
|---|
| 293 |
|
|---|
| 294 | def wait(self):
|
|---|
| 295 | """Wait for all Module processes that are in the list to finish
|
|---|
| 296 | and set the modules stdout and stderr output options
|
|---|
| 297 |
|
|---|
| 298 | :return: A list of modules that were run
|
|---|
| 299 | """
|
|---|
| 300 | for proc in self._list:
|
|---|
| 301 | if proc:
|
|---|
| 302 | if isinstance(proc, Module):
|
|---|
| 303 | self._finished_modules.extend([proc.wait(),])
|
|---|
| 304 | else:
|
|---|
| 305 | self._finished_modules.extend(proc.wait())
|
|---|
| 306 |
|
|---|
| 307 | self._list = self._num_procs * [None]
|
|---|
| 308 | self._proc_count = 0
|
|---|
| 309 |
|
|---|
| 310 |
|
|---|
| 311 |
|
|---|
| 312 | class Module(object):
|
|---|
| 313 | """This class is design to wrap/run/interact with the GRASS modules.
|
|---|
| 314 |
|
|---|
| 315 | The class during the init phase read the XML description generate using
|
|---|
| 316 | the ``--interface-description`` in order to understand which parameters
|
|---|
| 317 | are required which optionals. ::
|
|---|
| 318 |
|
|---|
| 319 | >>> from grass.pygrass.modules import Module
|
|---|
| 320 | >>> from subprocess import PIPE
|
|---|
| 321 | >>> import copy
|
|---|
| 322 |
|
|---|
| 323 | >>> region = Module("g.region")
|
|---|
| 324 | >>> region.flags.p = True # set flags
|
|---|
| 325 | >>> region.flags.u = True
|
|---|
| 326 | >>> region.flags["3"].value = True # set numeric flags
|
|---|
| 327 | >>> region.get_bash()
|
|---|
| 328 | u'g.region -p -3 -u'
|
|---|
| 329 | >>> new_region = copy.deepcopy(region)
|
|---|
| 330 | >>> new_region.inputs.res = "10"
|
|---|
| 331 | >>> new_region.get_bash()
|
|---|
| 332 | u'g.region res=10 -p -3 -u'
|
|---|
| 333 |
|
|---|
| 334 | >>> neighbors = Module("r.neighbors")
|
|---|
| 335 | >>> neighbors.inputs.input = "mapA"
|
|---|
| 336 | >>> neighbors.outputs.output = "mapB"
|
|---|
| 337 | >>> neighbors.inputs.size = 5
|
|---|
| 338 | >>> neighbors.inputs.quantile = 0.5
|
|---|
| 339 | >>> neighbors.get_bash()
|
|---|
| 340 | u'r.neighbors input=mapA method=average size=5 quantile=0.5 output=mapB'
|
|---|
| 341 |
|
|---|
| 342 | >>> new_neighbors1 = copy.deepcopy(neighbors)
|
|---|
| 343 | >>> new_neighbors1.inputs.input = "mapD"
|
|---|
| 344 | >>> new_neighbors1.inputs.size = 3
|
|---|
| 345 | >>> new_neighbors1.inputs.quantile = 0.5
|
|---|
| 346 | >>> new_neighbors1.get_bash()
|
|---|
| 347 | u'r.neighbors input=mapD method=average size=3 quantile=0.5 output=mapB'
|
|---|
| 348 |
|
|---|
| 349 | >>> new_neighbors2 = copy.deepcopy(neighbors)
|
|---|
| 350 | >>> new_neighbors2(input="mapD", size=3, run_=False)
|
|---|
| 351 | Module('r.neighbors')
|
|---|
| 352 | >>> new_neighbors2.get_bash()
|
|---|
| 353 | u'r.neighbors input=mapD method=average size=3 quantile=0.5 output=mapB'
|
|---|
| 354 |
|
|---|
| 355 | >>> neighbors = Module("r.neighbors")
|
|---|
| 356 | >>> neighbors.get_bash()
|
|---|
| 357 | u'r.neighbors method=average size=3'
|
|---|
| 358 |
|
|---|
| 359 | >>> new_neighbors3 = copy.deepcopy(neighbors)
|
|---|
| 360 | >>> new_neighbors3(input="mapA", size=3, output="mapB", run_=False)
|
|---|
| 361 | Module('r.neighbors')
|
|---|
| 362 | >>> new_neighbors3.get_bash()
|
|---|
| 363 | u'r.neighbors input=mapA method=average size=3 output=mapB'
|
|---|
| 364 |
|
|---|
| 365 | >>> mapcalc = Module("r.mapcalc", expression="test_a = 1",
|
|---|
| 366 | ... overwrite=True, run_=False)
|
|---|
| 367 | >>> mapcalc.run()
|
|---|
| 368 | Module('r.mapcalc')
|
|---|
| 369 | >>> mapcalc.popen.returncode
|
|---|
| 370 | 0
|
|---|
| 371 |
|
|---|
| 372 | >>> mapcalc = Module("r.mapcalc", expression="test_a = 1",
|
|---|
| 373 | ... overwrite=True, run_=False, finish_=False)
|
|---|
| 374 | >>> mapcalc.run()
|
|---|
| 375 | Module('r.mapcalc')
|
|---|
| 376 | >>> p = mapcalc.wait()
|
|---|
| 377 | >>> p.popen.returncode
|
|---|
| 378 | 0
|
|---|
| 379 | >>> mapcalc.run()
|
|---|
| 380 | Module('r.mapcalc')
|
|---|
| 381 | >>> p = mapcalc.wait()
|
|---|
| 382 | >>> p.popen.returncode
|
|---|
| 383 | 0
|
|---|
| 384 |
|
|---|
| 385 | >>> colors = Module("r.colors", map="test_a", rules="-",
|
|---|
| 386 | ... run_=False, stdout_=PIPE,
|
|---|
| 387 | ... stderr_=PIPE, stdin_="1 red")
|
|---|
| 388 | >>> colors.run()
|
|---|
| 389 | Module('r.colors')
|
|---|
| 390 | >>> p = mapcalc.wait()
|
|---|
| 391 | >>> p.popen.returncode
|
|---|
| 392 | 0
|
|---|
| 393 | >>> colors.inputs["stdin"].value
|
|---|
| 394 | u'1 red'
|
|---|
| 395 | >>> colors.outputs["stdout"].value
|
|---|
| 396 | u''
|
|---|
| 397 | >>> colors.outputs["stderr"].value.strip()
|
|---|
| 398 | u"Color table for raster map <test_a> set to 'rules'"
|
|---|
| 399 |
|
|---|
| 400 | >>> colors = Module("r.colors", map="test_a", rules="-",
|
|---|
| 401 | ... run_=False, finish_=False, stdin_=PIPE)
|
|---|
| 402 | >>> colors.run()
|
|---|
| 403 | Module('r.colors')
|
|---|
| 404 | >>> stdout, stderr = colors.popen.communicate(input="1 red")
|
|---|
| 405 | >>> colors.popen.returncode
|
|---|
| 406 | 0
|
|---|
| 407 | >>> stdout
|
|---|
| 408 | >>> stderr
|
|---|
| 409 |
|
|---|
| 410 | >>> colors = Module("r.colors", map="test_a", rules="-",
|
|---|
| 411 | ... run_=False, finish_=False,
|
|---|
| 412 | ... stdin_=PIPE, stderr_=PIPE)
|
|---|
| 413 | >>> colors.run()
|
|---|
| 414 | Module('r.colors')
|
|---|
| 415 | >>> stdout, stderr = colors.popen.communicate(input="1 red")
|
|---|
| 416 | >>> colors.popen.returncode
|
|---|
| 417 | 0
|
|---|
| 418 | >>> stdout
|
|---|
| 419 | >>> stderr.strip()
|
|---|
| 420 | "Color table for raster map <test_a> set to 'rules'"
|
|---|
| 421 |
|
|---|
| 422 | Run a second time
|
|---|
| 423 |
|
|---|
| 424 | >>> colors.run()
|
|---|
| 425 | Module('r.colors')
|
|---|
| 426 | >>> stdout, stderr = colors.popen.communicate(input="1 blue")
|
|---|
| 427 | >>> colors.popen.returncode
|
|---|
| 428 | 0
|
|---|
| 429 | >>> stdout
|
|---|
| 430 | >>> stderr.strip()
|
|---|
| 431 | "Color table for raster map <test_a> set to 'rules'"
|
|---|
| 432 |
|
|---|
| 433 | Multiple run test
|
|---|
| 434 |
|
|---|
| 435 | >>> colors = Module("r.colors", map="test_a",
|
|---|
| 436 | ... color="ryb", run_=False)
|
|---|
| 437 | >>> colors.get_bash()
|
|---|
| 438 | u'r.colors map=test_a color=ryb'
|
|---|
| 439 | >>> colors.run()
|
|---|
| 440 | Module('r.colors')
|
|---|
| 441 | >>> colors(color="gyr")
|
|---|
| 442 | Module('r.colors')
|
|---|
| 443 | >>> colors.run()
|
|---|
| 444 | Module('r.colors')
|
|---|
| 445 | >>> colors(color="ryg")
|
|---|
| 446 | Module('r.colors')
|
|---|
| 447 | >>> colors(stderr_=PIPE)
|
|---|
| 448 | Module('r.colors')
|
|---|
| 449 | >>> colors.run()
|
|---|
| 450 | Module('r.colors')
|
|---|
| 451 | >>> print(colors.outputs["stderr"].value.strip())
|
|---|
| 452 | Color table for raster map <test_a> set to 'ryg'
|
|---|
| 453 | >>> colors(color="byg")
|
|---|
| 454 | Module('r.colors')
|
|---|
| 455 | >>> colors(stdout_=PIPE)
|
|---|
| 456 | Module('r.colors')
|
|---|
| 457 | >>> colors.run()
|
|---|
| 458 | Module('r.colors')
|
|---|
| 459 | >>> print(colors.outputs["stderr"].value.strip())
|
|---|
| 460 | Color table for raster map <test_a> set to 'byg'
|
|---|
| 461 |
|
|---|
| 462 | Often in the Module class you can find ``*args`` and ``kwargs`` annotation
|
|---|
| 463 | in methods, like in the __call__ method.
|
|---|
| 464 | Python allow developers to not specify all the arguments and
|
|---|
| 465 | keyword arguments of a method or function. ::
|
|---|
| 466 |
|
|---|
| 467 | def f(*args):
|
|---|
| 468 | for arg in args:
|
|---|
| 469 | print arg
|
|---|
| 470 |
|
|---|
| 471 | therefore if we call the function like:
|
|---|
| 472 |
|
|---|
| 473 | >>> f('grass', 'gis', 'modules') # doctest: +SKIP
|
|---|
| 474 | grass
|
|---|
| 475 | gis
|
|---|
| 476 | modules
|
|---|
| 477 |
|
|---|
| 478 | or we can define a new list:
|
|---|
| 479 |
|
|---|
| 480 | >>> words = ['grass', 'gis', 'modules'] # doctest: +SKIP
|
|---|
| 481 | >>> f(*words) # doctest: +SKIP
|
|---|
| 482 | grass
|
|---|
| 483 | gis
|
|---|
| 484 | modules
|
|---|
| 485 |
|
|---|
| 486 | we can do the same with keyword arguments, rewrite the above function: ::
|
|---|
| 487 |
|
|---|
| 488 | def f(*args, **kargs):
|
|---|
| 489 | for arg in args:
|
|---|
| 490 | print arg
|
|---|
| 491 | for key, value in kargs.items():
|
|---|
| 492 | print "%s = %r" % (key, value)
|
|---|
| 493 |
|
|---|
| 494 | now we can use the new function, with:
|
|---|
| 495 |
|
|---|
| 496 | >>> f('grass', 'gis', 'modules', os = 'linux', language = 'python')
|
|---|
| 497 | ... # doctest: +SKIP
|
|---|
| 498 | grass
|
|---|
| 499 | gis
|
|---|
| 500 | modules
|
|---|
| 501 | os = 'linux'
|
|---|
| 502 | language = 'python'
|
|---|
| 503 |
|
|---|
| 504 | or, as before we can, define a dictionary and give the dictionary to
|
|---|
| 505 | the function, like:
|
|---|
| 506 |
|
|---|
| 507 | >>> keywords = {'os' : 'linux', 'language' : 'python'} # doctest: +SKIP
|
|---|
| 508 | >>> f(*words, **keywords) # doctest: +SKIP
|
|---|
| 509 | grass
|
|---|
| 510 | gis
|
|---|
| 511 | modules
|
|---|
| 512 | os = 'linux'
|
|---|
| 513 | language = 'python'
|
|---|
| 514 |
|
|---|
| 515 | In the Module class we heavily use this language feature to pass arguments
|
|---|
| 516 | and keyword arguments to the grass module.
|
|---|
| 517 | """
|
|---|
| 518 | def __init__(self, cmd, *args, **kargs):
|
|---|
| 519 | if isinstance(cmd, unicode):
|
|---|
| 520 | self.name = str(cmd)
|
|---|
| 521 | elif isinstance(cmd, str):
|
|---|
| 522 | self.name = cmd
|
|---|
| 523 | else:
|
|---|
| 524 | raise GrassError("Problem initializing the module {s}".format(s=cmd))
|
|---|
| 525 | try:
|
|---|
| 526 | # call the command with --interface-description
|
|---|
| 527 | get_cmd_xml = Popen([cmd, "--interface-description"], stdout=PIPE)
|
|---|
| 528 | except OSError as e:
|
|---|
| 529 | print("OSError error({0}): {1}".format(e.errno, e.strerror))
|
|---|
| 530 | str_err = "Error running: `%s --interface-description`."
|
|---|
| 531 | raise GrassError(str_err % self.name)
|
|---|
| 532 | # get the xml of the module
|
|---|
| 533 | self.xml = get_cmd_xml.communicate()[0]
|
|---|
| 534 | # transform and parse the xml into an Element class:
|
|---|
| 535 | # http://docs.python.org/library/xml.etree.elementtree.html
|
|---|
| 536 | tree = fromstring(self.xml)
|
|---|
| 537 |
|
|---|
| 538 | for e in tree:
|
|---|
| 539 | if e.tag not in ('parameter', 'flag'):
|
|---|
| 540 | self.__setattr__(e.tag, GETFROMTAG[e.tag](e))
|
|---|
| 541 |
|
|---|
| 542 | #
|
|---|
| 543 | # extract parameters from the xml
|
|---|
| 544 | #
|
|---|
| 545 | self.params_list = [Parameter(p) for p in tree.findall("parameter")]
|
|---|
| 546 | self.inputs = TypeDict(Parameter)
|
|---|
| 547 | self.outputs = TypeDict(Parameter)
|
|---|
| 548 | self.required = []
|
|---|
| 549 |
|
|---|
| 550 | # Insert parameters into input/output and required
|
|---|
| 551 | for par in self.params_list:
|
|---|
| 552 | if par.input:
|
|---|
| 553 | self.inputs[par.name] = par
|
|---|
| 554 | else:
|
|---|
| 555 | self.outputs[par.name] = par
|
|---|
| 556 | if par.required:
|
|---|
| 557 | self.required.append(par.name)
|
|---|
| 558 |
|
|---|
| 559 | #
|
|---|
| 560 | # extract flags from the xml
|
|---|
| 561 | #
|
|---|
| 562 | flags_list = [Flag(f) for f in tree.findall("flag")]
|
|---|
| 563 | self.flags = TypeDict(Flag)
|
|---|
| 564 | for flag in flags_list:
|
|---|
| 565 | self.flags[flag.name] = flag
|
|---|
| 566 |
|
|---|
| 567 | #
|
|---|
| 568 | # Add new attributes to the class
|
|---|
| 569 | #
|
|---|
| 570 | self.run_ = True
|
|---|
| 571 | self.finish_ = True
|
|---|
| 572 | self.check_ = True
|
|---|
| 573 | self.env_ = None
|
|---|
| 574 | self.stdin_ = None
|
|---|
| 575 | self.stdin = None
|
|---|
| 576 | self.stdout_ = None
|
|---|
| 577 | self.stderr_ = None
|
|---|
| 578 | diz = {'name': 'stdin', 'required': False,
|
|---|
| 579 | 'multiple': False, 'type': 'all',
|
|---|
| 580 | 'value': None}
|
|---|
| 581 | self.inputs['stdin'] = Parameter(diz=diz)
|
|---|
| 582 | diz['name'] = 'stdout'
|
|---|
| 583 | self.outputs['stdout'] = Parameter(diz=diz)
|
|---|
| 584 | diz['name'] = 'stderr'
|
|---|
| 585 | self.outputs['stderr'] = Parameter(diz=diz)
|
|---|
| 586 | self.popen = None
|
|---|
| 587 | self.time = None
|
|---|
| 588 | self.start_time = None # This variable will be set in the run() function
|
|---|
| 589 | self._finished = False # This variable is set True if wait() was successfully called
|
|---|
| 590 |
|
|---|
| 591 | if args or kargs:
|
|---|
| 592 | self.__call__(*args, **kargs)
|
|---|
| 593 | self.__call__.__func__.__doc__ = self.__doc__
|
|---|
| 594 |
|
|---|
| 595 | def __call__(self, *args, **kargs):
|
|---|
| 596 | """Set module parameters to the class and, if run_ is True execute the
|
|---|
| 597 | module, therefore valid parameters are all the module parameters
|
|---|
| 598 | plus some extra parameters that are: run_, stdin_, stdout_, stderr_,
|
|---|
| 599 | env_ and finish_.
|
|---|
| 600 | """
|
|---|
| 601 | if not args and not kargs:
|
|---|
| 602 | self.run()
|
|---|
| 603 | return self
|
|---|
| 604 |
|
|---|
| 605 | #
|
|---|
| 606 | # check for extra kargs, set attribute and remove from dictionary
|
|---|
| 607 | #
|
|---|
| 608 | if 'flags' in kargs:
|
|---|
| 609 | for flg in kargs['flags']:
|
|---|
| 610 | self.flags[flg].value = True
|
|---|
| 611 | del(kargs['flags'])
|
|---|
| 612 |
|
|---|
| 613 | # set attributs
|
|---|
| 614 | for key in ('run_', 'env_', 'finish_', 'stdout_', 'stderr_', 'check_'):
|
|---|
| 615 | if key in kargs:
|
|---|
| 616 | setattr(self, key, kargs.pop(key))
|
|---|
| 617 |
|
|---|
| 618 | # set inputs
|
|---|
| 619 | for key in ('stdin_', ):
|
|---|
| 620 | if key in kargs:
|
|---|
| 621 | self.inputs[key[:-1]].value = kargs.pop(key)
|
|---|
| 622 |
|
|---|
| 623 | #
|
|---|
| 624 | # set/update args
|
|---|
| 625 | #
|
|---|
| 626 | for param, arg in zip(self.params_list, args):
|
|---|
| 627 | param.value = arg
|
|---|
| 628 | for key, val in kargs.items():
|
|---|
| 629 | key = key.strip('_')
|
|---|
| 630 | if key in self.inputs:
|
|---|
| 631 | self.inputs[key].value = val
|
|---|
| 632 | elif key in self.outputs:
|
|---|
| 633 | self.outputs[key].value = val
|
|---|
| 634 | elif key in self.flags:
|
|---|
| 635 | # we need to add this, because some parameters (overwrite,
|
|---|
| 636 | # verbose and quiet) work like parameters
|
|---|
| 637 | self.flags[key].value = val
|
|---|
| 638 | else:
|
|---|
| 639 | raise ParameterError('%s is not a valid parameter.' % key)
|
|---|
| 640 |
|
|---|
| 641 | #
|
|---|
| 642 | # check if execute
|
|---|
| 643 | #
|
|---|
| 644 | if self.run_:
|
|---|
| 645 | #
|
|---|
| 646 | # check reqire parameters
|
|---|
| 647 | #
|
|---|
| 648 | if self.check_:
|
|---|
| 649 | self.check()
|
|---|
| 650 | return self.run()
|
|---|
| 651 | return self
|
|---|
| 652 |
|
|---|
| 653 | def get_bash(self):
|
|---|
| 654 | """Return a BASH representation of the Module."""
|
|---|
| 655 | return ' '.join(self.make_cmd())
|
|---|
| 656 |
|
|---|
| 657 | def get_python(self):
|
|---|
| 658 | """Return a Python representation of the Module."""
|
|---|
| 659 | prefix = self.name.split('.')[0]
|
|---|
| 660 | name = '_'.join(self.name.split('.')[1:])
|
|---|
| 661 | params = ', '.join([par.get_python() for par in self.params_list
|
|---|
| 662 | if par.get_python() != ''])
|
|---|
| 663 | flags = ''.join([flg.get_python()
|
|---|
| 664 | for flg in self.flags.values()
|
|---|
| 665 | if not flg.special and flg.get_python() != ''])
|
|---|
| 666 | special = ', '.join([flg.get_python()
|
|---|
| 667 | for flg in self.flags.values()
|
|---|
| 668 | if flg.special and flg.get_python() != ''])
|
|---|
| 669 | # pre name par flg special
|
|---|
| 670 | if flags and special:
|
|---|
| 671 | return "%s.%s(%s, flags=%r, %s)" % (prefix, name, params,
|
|---|
| 672 | flags, special)
|
|---|
| 673 | elif flags:
|
|---|
| 674 | return "%s.%s(%s, flags=%r)" % (prefix, name, params, flags)
|
|---|
| 675 | elif special:
|
|---|
| 676 | return "%s.%s(%s, %s)" % (prefix, name, params, special)
|
|---|
| 677 | else:
|
|---|
| 678 | return "%s.%s(%s)" % (prefix, name, params)
|
|---|
| 679 |
|
|---|
| 680 | def __str__(self):
|
|---|
| 681 | """Return the command string that can be executed in a shell"""
|
|---|
| 682 | return ' '.join(self.make_cmd())
|
|---|
| 683 |
|
|---|
| 684 | def __repr__(self):
|
|---|
| 685 | return "Module(%r)" % self.name
|
|---|
| 686 |
|
|---|
| 687 | @docstring_property(__doc__)
|
|---|
| 688 | def __doc__(self):
|
|---|
| 689 | """{cmd_name}({cmd_params})
|
|---|
| 690 | """
|
|---|
| 691 | head = DOC['head'].format(cmd_name=self.name,
|
|---|
| 692 | cmd_params=('\n' + # go to a new line
|
|---|
| 693 | # give space under the function name
|
|---|
| 694 | (' ' * (len(self.name) + 1))).join([', '.join(
|
|---|
| 695 | # transform each parameter in string
|
|---|
| 696 | [str(param) for param in line if param is not None])
|
|---|
| 697 | # make a list of parameters with only 3 param per line
|
|---|
| 698 | for line in zip_longest(*[iter(self.params_list)] * 3)]),)
|
|---|
| 699 | params = '\n'.join([par.__doc__ for par in self.params_list])
|
|---|
| 700 | flags = self.flags.__doc__
|
|---|
| 701 | return '\n'.join([head, params, DOC['flag_head'], flags, DOC['foot']])
|
|---|
| 702 |
|
|---|
| 703 | def check(self):
|
|---|
| 704 | """Check the correctness of the provide parameters"""
|
|---|
| 705 | required = True
|
|---|
| 706 | for flg in self.flags.values():
|
|---|
| 707 | if flg and flg.suppress_required:
|
|---|
| 708 | required = False
|
|---|
| 709 | if required:
|
|---|
| 710 | for k in self.required:
|
|---|
| 711 | if ((k in self.inputs and self.inputs[k].value is None) or
|
|---|
| 712 | (k in self.outputs and self.outputs[k].value is None)):
|
|---|
| 713 | msg = "Required parameter <%s> not set."
|
|---|
| 714 | raise ParameterError(msg % k)
|
|---|
| 715 |
|
|---|
| 716 | def get_dict(self):
|
|---|
| 717 | """Return a dictionary that includes the name, all valid
|
|---|
| 718 | inputs, outputs and flags
|
|---|
| 719 | """
|
|---|
| 720 | dic = {}
|
|---|
| 721 | dic['name'] = self.name
|
|---|
| 722 | dic['inputs'] = [(k, v.value) for k, v in self.inputs.items()
|
|---|
| 723 | if v.value]
|
|---|
| 724 | dic['outputs'] = [(k, v.value) for k, v in self.outputs.items()
|
|---|
| 725 | if v.value]
|
|---|
| 726 | dic['flags'] = [flg for flg in self.flags if self.flags[flg].value]
|
|---|
| 727 | return dic
|
|---|
| 728 |
|
|---|
| 729 | def make_cmd(self):
|
|---|
| 730 | """Create the command string that can be executed in a shell
|
|---|
| 731 |
|
|---|
| 732 | :returns: the command string
|
|---|
| 733 | """
|
|---|
| 734 | skip = ['stdin', 'stdout', 'stderr']
|
|---|
| 735 | args = [self.name, ]
|
|---|
| 736 | for key in self.inputs:
|
|---|
| 737 | if key not in skip and self.inputs[key].value is not None and self.inputs[key].value != '':
|
|---|
| 738 | args.append(self.inputs[key].get_bash())
|
|---|
| 739 | for key in self.outputs:
|
|---|
| 740 | if key not in skip and self.outputs[key].value is not None and self.outputs[key].value != '':
|
|---|
| 741 | args.append(self.outputs[key].get_bash())
|
|---|
| 742 | for flg in self.flags:
|
|---|
| 743 | if self.flags[flg].value:
|
|---|
| 744 | args.append(str(self.flags[flg]))
|
|---|
| 745 | return args
|
|---|
| 746 |
|
|---|
| 747 | def run(self):
|
|---|
| 748 | """Run the module
|
|---|
| 749 | This function will wait for the process to terminate in case
|
|---|
| 750 | finish_==True and sets up stdout and stderr. If finish_==False this
|
|---|
| 751 | function will return after starting the process. Use wait() to wait for
|
|---|
| 752 | the started process
|
|---|
| 753 |
|
|---|
| 754 | :return: A reference to this object
|
|---|
| 755 | """
|
|---|
| 756 | G_debug(1, self.get_bash())
|
|---|
| 757 | self._finished = False
|
|---|
| 758 | if self.inputs['stdin'].value:
|
|---|
| 759 | self.stdin = self.inputs['stdin'].value
|
|---|
| 760 | self.stdin_ = PIPE
|
|---|
| 761 |
|
|---|
| 762 | cmd = self.make_cmd()
|
|---|
| 763 | self.start_time = time.time()
|
|---|
| 764 | self.popen = Popen(cmd,
|
|---|
| 765 | stdin=self.stdin_,
|
|---|
| 766 | stdout=self.stdout_,
|
|---|
| 767 | stderr=self.stderr_,
|
|---|
| 768 | env=self.env_)
|
|---|
| 769 |
|
|---|
| 770 | if self.finish_ is True:
|
|---|
| 771 | self.wait()
|
|---|
| 772 |
|
|---|
| 773 | return self
|
|---|
| 774 |
|
|---|
| 775 | def wait(self):
|
|---|
| 776 | """Wait for the module to finish. Call this method if
|
|---|
| 777 | the run() call was performed with self.false_ = False.
|
|---|
| 778 |
|
|---|
| 779 | :return: A reference to this object
|
|---|
| 780 | """
|
|---|
| 781 | if self._finished is False:
|
|---|
| 782 | if self.stdin:
|
|---|
| 783 | self.stdin = encode(self.stdin)
|
|---|
| 784 | stdout, stderr = self.popen.communicate(input=self.stdin)
|
|---|
| 785 | self.outputs['stdout'].value = decode(stdout) if stdout else ''
|
|---|
| 786 | self.outputs['stderr'].value = decode(stderr) if stderr else ''
|
|---|
| 787 | self.time = time.time() - self.start_time
|
|---|
| 788 |
|
|---|
| 789 | self._finished = True
|
|---|
| 790 |
|
|---|
| 791 | if self.popen.poll():
|
|---|
| 792 | raise CalledModuleError(returncode=self.popen.returncode,
|
|---|
| 793 | code=self.get_bash(),
|
|---|
| 794 | module=self.name, errors=stderr)
|
|---|
| 795 |
|
|---|
| 796 | return self
|
|---|
| 797 |
|
|---|
| 798 |
|
|---|
| 799 | class MultiModule(object):
|
|---|
| 800 | """This class is designed to run a list of modules in serial in the provided order
|
|---|
| 801 | within a temporary region environment.
|
|---|
| 802 |
|
|---|
| 803 | Module can be run in serial synchronously or asynchronously.
|
|---|
| 804 |
|
|---|
| 805 | - Synchronously: When calling run() all modules will run in serial order
|
|---|
| 806 | until they are finished, The run() method will return until all modules finished.
|
|---|
| 807 | The modules objects can be accessed by calling get_modules() to check their return
|
|---|
| 808 | values.
|
|---|
| 809 | - Asynchronously: When calling run() all modules will run in serial order in a background process.
|
|---|
| 810 | Method run() will return after starting the modules without waiting for them to finish.
|
|---|
| 811 | The user must call the wait() method to wait for the modules to finish.
|
|---|
| 812 | Asynchronously called module can be optionally run in a temporary region
|
|---|
| 813 | environment, hence invokeing g.region will not alter the current
|
|---|
| 814 | region or the region of other MultiModule runs.
|
|---|
| 815 |
|
|---|
| 816 | Note:
|
|---|
| 817 |
|
|---|
| 818 | Modules run in asynchronous mode can only be accessed via the wait() method.
|
|---|
| 819 | The wait() method will return all finished module objects as list.
|
|---|
| 820 |
|
|---|
| 821 | Objects of this class can be passed to the ParallelModuleQueue to run serial stacks
|
|---|
| 822 | of modules in parallel. This is meaningful if region settings must be applied
|
|---|
| 823 | to each parallel module run.
|
|---|
| 824 |
|
|---|
| 825 | >>> from grass.pygrass.modules import Module
|
|---|
| 826 | >>> from grass.pygrass.modules import MultiModule
|
|---|
| 827 | >>> from multiprocessing import Process
|
|---|
| 828 | >>> import copy
|
|---|
| 829 |
|
|---|
| 830 | Synchronous module run
|
|---|
| 831 |
|
|---|
| 832 | >>> region_1 = Module("g.region", run_=False)
|
|---|
| 833 | >>> region_1.flags.p = True
|
|---|
| 834 | >>> region_2 = copy.deepcopy(region_1)
|
|---|
| 835 | >>> region_2.flags.p = True
|
|---|
| 836 | >>> mm = MultiModule(module_list=[region_1, region_2])
|
|---|
| 837 | >>> mm.run()
|
|---|
| 838 | >>> m_list = mm.get_modules()
|
|---|
| 839 | >>> m_list[0].popen.returncode
|
|---|
| 840 | 0
|
|---|
| 841 | >>> m_list[1].popen.returncode
|
|---|
| 842 | 0
|
|---|
| 843 |
|
|---|
| 844 | Asynchronous module run, setting finish = False
|
|---|
| 845 |
|
|---|
| 846 | >>> region_1 = Module("g.region", run_=False)
|
|---|
| 847 | >>> region_1.flags.p = True
|
|---|
| 848 | >>> region_2 = copy.deepcopy(region_1)
|
|---|
| 849 | >>> region_2.flags.p = True
|
|---|
| 850 | >>> region_3 = copy.deepcopy(region_1)
|
|---|
| 851 | >>> region_3.flags.p = True
|
|---|
| 852 | >>> region_4 = copy.deepcopy(region_1)
|
|---|
| 853 | >>> region_4.flags.p = True
|
|---|
| 854 | >>> region_5 = copy.deepcopy(region_1)
|
|---|
| 855 | >>> region_5.flags.p = True
|
|---|
| 856 | >>> mm = MultiModule(module_list=[region_1, region_2, region_3, region_4, region_5],
|
|---|
| 857 | ... sync=False)
|
|---|
| 858 | >>> t = mm.run()
|
|---|
| 859 | >>> isinstance(t, Process)
|
|---|
| 860 | True
|
|---|
| 861 | >>> m_list = mm.wait()
|
|---|
| 862 | >>> m_list[0].popen.returncode
|
|---|
| 863 | 0
|
|---|
| 864 | >>> m_list[1].popen.returncode
|
|---|
| 865 | 0
|
|---|
| 866 | >>> m_list[2].popen.returncode
|
|---|
| 867 | 0
|
|---|
| 868 | >>> m_list[3].popen.returncode
|
|---|
| 869 | 0
|
|---|
| 870 | >>> m_list[4].popen.returncode
|
|---|
| 871 | 0
|
|---|
| 872 |
|
|---|
| 873 | Asynchronous module run, setting finish = False and using temporary region
|
|---|
| 874 |
|
|---|
| 875 | >>> mm = MultiModule(module_list=[region_1, region_2, region_3, region_4, region_5],
|
|---|
| 876 | ... sync=False, set_temp_region=True)
|
|---|
| 877 | >>> str(mm)
|
|---|
| 878 | 'g.region -p ; g.region -p ; g.region -p ; g.region -p ; g.region -p'
|
|---|
| 879 | >>> t = mm.run()
|
|---|
| 880 | >>> isinstance(t, Process)
|
|---|
| 881 | True
|
|---|
| 882 | >>> m_list = mm.wait()
|
|---|
| 883 | >>> m_list[0].popen.returncode
|
|---|
| 884 | 0
|
|---|
| 885 | >>> m_list[1].popen.returncode
|
|---|
| 886 | 0
|
|---|
| 887 | >>> m_list[2].popen.returncode
|
|---|
| 888 | 0
|
|---|
| 889 | >>> m_list[3].popen.returncode
|
|---|
| 890 | 0
|
|---|
| 891 | >>> m_list[4].popen.returncode
|
|---|
| 892 | 0
|
|---|
| 893 |
|
|---|
| 894 | """
|
|---|
| 895 |
|
|---|
| 896 | def __init__(self, module_list, sync=True, set_temp_region=False):
|
|---|
| 897 | """Constructor of the multi module class
|
|---|
| 898 |
|
|---|
| 899 | :param module_list: A list of pre-configured Module objects that should be run
|
|---|
| 900 | :param sync: If set True the run() method will wait for all processes to finish -> synchronously run.
|
|---|
| 901 | If set False, the run() method will return after starting the processes -> asynchronously run.
|
|---|
| 902 | The wait() method must be called to finish the modules.
|
|---|
| 903 | :param set_temp_region: Set a temporary region in which the modules should be run, hence
|
|---|
| 904 | region settings in the process list will not affect the current
|
|---|
| 905 | computation region.
|
|---|
| 906 |
|
|---|
| 907 | Note:
|
|---|
| 908 |
|
|---|
| 909 | This flag is only available in asynchronous mode!
|
|---|
| 910 | :return:
|
|---|
| 911 | """
|
|---|
| 912 | self.module_list = module_list
|
|---|
| 913 | self.set_temp_region = set_temp_region
|
|---|
| 914 | self.finish_ = sync # We use the same variable name a Module
|
|---|
| 915 | self.p = None
|
|---|
| 916 | self.q = Queue()
|
|---|
| 917 |
|
|---|
| 918 | def __str__(self):
|
|---|
| 919 | """Return the command string that can be executed in a shell"""
|
|---|
| 920 | return ' ; '.join(str(string) for string in self.module_list)
|
|---|
| 921 |
|
|---|
| 922 | def get_modules(self):
|
|---|
| 923 | """Return the list of modules that have been run in synchronous mode
|
|---|
| 924 |
|
|---|
| 925 | Note: Asynchronously run module can only be accessed via the wait() method.
|
|---|
| 926 |
|
|---|
| 927 | :return: The list of modules
|
|---|
| 928 | """
|
|---|
| 929 | return self.module_list
|
|---|
| 930 |
|
|---|
| 931 | def run(self):
|
|---|
| 932 | """Start the modules in the list. If self.finished_ is set True
|
|---|
| 933 | this method will return after all processes finished.
|
|---|
| 934 |
|
|---|
| 935 | If self.finish_ is set False, this method will return
|
|---|
| 936 | after the process list was started for execution.
|
|---|
| 937 | In a background process, the processes in the list will
|
|---|
| 938 | be run one after the another.
|
|---|
| 939 |
|
|---|
| 940 | :return: None in case of self.finish_ is True,
|
|---|
| 941 | otherwise a multiprocessing.Process object that invokes the modules
|
|---|
| 942 | """
|
|---|
| 943 |
|
|---|
| 944 | if self.finish_ is True:
|
|---|
| 945 | for module in self.module_list:
|
|---|
| 946 | module.finish_ = True
|
|---|
| 947 | module.run()
|
|---|
| 948 | return None
|
|---|
| 949 | else:
|
|---|
| 950 | if self.set_temp_region is True:
|
|---|
| 951 | self.p = Process(target=run_modules_in_temp_region,
|
|---|
| 952 | args=[self.module_list, self.q])
|
|---|
| 953 | else:
|
|---|
| 954 | self.p = Process(target=run_modules,
|
|---|
| 955 | args=[self.module_list, self.q])
|
|---|
| 956 | self.p.start()
|
|---|
| 957 |
|
|---|
| 958 | return self.p
|
|---|
| 959 |
|
|---|
| 960 | def wait(self):
|
|---|
| 961 | """Wait for all processes to finish. Call this method
|
|---|
| 962 | in asynchronous mode, hence if finished was set False.
|
|---|
| 963 |
|
|---|
| 964 | :return: The process list with finished processes to check their return states
|
|---|
| 965 | """
|
|---|
| 966 | if self.p:
|
|---|
| 967 | proc_list = self.q.get()
|
|---|
| 968 | self.p.join()
|
|---|
| 969 |
|
|---|
| 970 | return proc_list
|
|---|
| 971 |
|
|---|
| 972 |
|
|---|
| 973 | def run_modules_in_temp_region(module_list, q):
|
|---|
| 974 | """Run the modules in a temporary region environment
|
|---|
| 975 |
|
|---|
| 976 | This function is the argument for multiprocessing.Process class
|
|---|
| 977 | in the MultiModule asynchronous execution.
|
|---|
| 978 |
|
|---|
| 979 | :param module_list: The list of modules to run in serial
|
|---|
| 980 | :param q: The process queue to put the finished process list
|
|---|
| 981 | """
|
|---|
| 982 | use_temp_region()
|
|---|
| 983 | try:
|
|---|
| 984 | for proc in module_list:
|
|---|
| 985 | proc.run()
|
|---|
| 986 | proc.wait()
|
|---|
| 987 | except:
|
|---|
| 988 | raise
|
|---|
| 989 | finally:
|
|---|
| 990 | q.put(module_list)
|
|---|
| 991 | del_temp_region()
|
|---|
| 992 |
|
|---|
| 993 |
|
|---|
| 994 | def run_modules(module_list, q):
|
|---|
| 995 | """Run the modules
|
|---|
| 996 |
|
|---|
| 997 | This function is the argument for multiprocessing.Process class
|
|---|
| 998 | in the MultiModule asynchronous execution.
|
|---|
| 999 |
|
|---|
| 1000 | :param module_list: The list of modules to run in serial
|
|---|
| 1001 | :param q: The process queue to put the finished process list
|
|---|
| 1002 | """
|
|---|
| 1003 | try:
|
|---|
| 1004 | for proc in module_list:
|
|---|
| 1005 | proc.run()
|
|---|
| 1006 | proc.wait()
|
|---|
| 1007 | except:
|
|---|
| 1008 | raise
|
|---|
| 1009 | finally:
|
|---|
| 1010 | q.put(module_list)
|
|---|
| 1011 |
|
|---|
| 1012 | ###############################################################################
|
|---|
| 1013 |
|
|---|
| 1014 | if __name__ == "__main__":
|
|---|
| 1015 | import doctest
|
|---|
| 1016 | doctest.testmod()
|
|---|