#!/usr/bin/env python import enum import itertools import json import os import re import subprocess import sys from dataclasses import KW_ONLY, dataclass, field from functools import partial, partialmethod from operator import itemgetter from time import sleep import defl from defl import CLIError, DotDict, Null, Path, Undefined, cl, log from defl._run_ import * from defl._assert_ import Assert from defl._typing_ import * from defl.testing_ import Tester from defl.threadPool_ import ThreadPool tester = Tester(name=__file__) def run(i) -> tuple: r = Run(['sleep', i]).run(T, T).assSuc() j = Run(['echo', defl.jdumps({'a': 2, 'b': 2}, compact=T)]).run(T, T).assSuc() return j.json() class TestThrowError(Exception): __slot__ = () def throw(): raise TestThrowError() @tester.add() def test(): with ThreadPool().add(partial(run, i=x / 5) for x in [1, 2, 3]) as tp: tp.add(partial(run, i=x / 10) for x in range(4, 10)) for fut, res in tp.resultsInfinite(): Assert(fut.raised) == F assert res == {'a': 2, 'b': 2} @tester.add() def test(): threads = ThreadPool().add(partial(run, i=x / 5) for x in [1, 2, 3]) results = [x for x in threads.gather()] ids = [hash(x[0]) for x in results] Assert(len(ids)) == len(set(ids)) print(results) Assert(str(threads._state)) == 'done' Assert(len(results)) == 3 for res in results: fut, res = (res[0], res[1]) assert res == {'a': 2, 'b': 2} @tester.add() def test(): threads = ThreadPool().add(throw) try: for x in threads.gather(): ... raise NotImplementedError('') except TestThrowError as e: ... @tester.add() def test(): threads = ThreadPool().add(partial(run, i=x / 5) for x in [1, 2, 3]).add(throw).add(throw) i = -1 for fut, res in threads.gather(throw=F): print(fut, res) i += 1 if i in [0, 1, 2]: Assert(fut.raised) == F elif i in [3, 4]: Assert(fut.raised) == T try: raise res raise NotImplementedError('') except TestThrowError: ... else: raise NotImplementedError('') log.info(tester.run()) tester.exitWithStatus() # for i in ch: # print(defl.printTable([[y for y in x] for x in ch], squareFill=T))