100 lines
2.3 KiB
Python
Executable File
100 lines
2.3 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import enum
|
|
import itertools
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import KW_ONLY, dataclass, field
|
|
from functools import partial, partialmethod
|
|
from operator import itemgetter
|
|
from time import sleep
|
|
|
|
import defl
|
|
from defl import CLIError, DotDict, Null, Path, Undefined, cl, log
|
|
from defl._run_ import *
|
|
from defl._assert_ import Assert
|
|
from defl._typing_ import *
|
|
from defl.testing_ import Tester
|
|
from defl.threadPool_ import ThreadPool
|
|
|
|
|
|
tester = Tester(name=__file__)
|
|
|
|
|
|
def run(i) -> tuple:
|
|
r = Run(['sleep', i]).run(T, T).assSuc()
|
|
j = Run(['echo', defl.jdumps({'a': 2, 'b': 2}, compact=T)]).run(T, T).assSuc()
|
|
return j.json()
|
|
|
|
|
|
class TestThrowError(Exception):
|
|
__slot__ = ()
|
|
|
|
|
|
def throw():
|
|
raise TestThrowError()
|
|
|
|
|
|
@tester.add()
|
|
def test():
|
|
with ThreadPool().add(partial(run, i=x / 5) for x in [1, 2, 3]) as tp:
|
|
tp.add(partial(run, i=x / 10) for x in range(4, 10))
|
|
for fut, res in tp.resultsInfinite():
|
|
Assert(fut.raised) == F
|
|
assert res == {'a': 2, 'b': 2}
|
|
|
|
|
|
@tester.add()
|
|
def test():
|
|
threads = ThreadPool().add(partial(run, i=x / 5) for x in [1, 2, 3])
|
|
results = [x for x in threads.gather()]
|
|
ids = [hash(x[0]) for x in results]
|
|
Assert(len(ids)) == len(set(ids))
|
|
print(results)
|
|
Assert(str(threads._state)) == 'done'
|
|
Assert(len(results)) == 3
|
|
for res in results:
|
|
fut, res = (res[0], res[1])
|
|
assert res == {'a': 2, 'b': 2}
|
|
|
|
|
|
@tester.add()
|
|
def test():
|
|
threads = ThreadPool().add(throw)
|
|
try:
|
|
for x in threads.gather():
|
|
...
|
|
raise NotImplementedError('')
|
|
except TestThrowError as e:
|
|
...
|
|
|
|
|
|
@tester.add()
|
|
def test():
|
|
threads = ThreadPool().add(partial(run, i=x / 5) for x in [1, 2, 3]).add(throw).add(throw)
|
|
i = -1
|
|
for fut, res in threads.gather(throw=F):
|
|
print(fut, res)
|
|
i += 1
|
|
if i in [0, 1, 2]:
|
|
Assert(fut.raised) == F
|
|
elif i in [3, 4]:
|
|
Assert(fut.raised) == T
|
|
try:
|
|
raise res
|
|
raise NotImplementedError('')
|
|
except TestThrowError:
|
|
...
|
|
else:
|
|
raise NotImplementedError('')
|
|
|
|
|
|
log.info(tester.run())
|
|
tester.exitWithStatus()
|
|
|
|
# for i in ch:
|
|
# print(defl.printTable([[y for y in x] for x in ch], squareFill=T))
|