defl/tests/test_run.py
2024-09-11 11:14:03 -04:00

134 lines
3.5 KiB
Python
Executable File

#!/usr/bin/env python
import enum
import itertools
import json
import os
import re
import subprocess
import sys
from dataclasses import KW_ONLY, dataclass, field
from functools import partial, partialmethod
from operator import itemgetter
from time import sleep
import defl
from defl import CLIError, DotDict, Null, Path, Undefined, cl, log
from defl._run_ import *
from defl._assert_ import Assert
from defl._typing_ import *
from defl.testing_ import Tester
tester = Tester(name=__file__)
@tester.add()
def run():
a = Run('true && true')
Assert(a.run(T, F)) == a
Assert(a.c) == ['bash', '-c', 'true && true']
Run('true').setThreadRead().run(F, F).wait().assSuc()
try:
Run('false').run(F, F).wait().assSuc()
raise Exception()
except RunReturnCodeError:
...
a = Run(['seq', '10']).run(T, F)
Assert(a._out)
Assert(not a._err)
a.wait().assSuc()
Assert([x for x in a.out]) == [str(x).encode() for x in range(1, 11)]
a = Run(['lsblk', '-O', '-J']).run(T, F)
a.wait().assSuc()
Assert('blockdevices' in a.json())
try:
Run(['false']).assSuc()
except NotRunYetError:
...
try:
Run(['false']).run(F, F).assSuc()
except RunReturnCodeError:
...
jj = {'a': 1, 'b': [1, 2, {"c": 5}]}
a = Run(['echo', json.dumps(jj)]).run(T, F)
a.wait().assSuc()
Assert(a.json()) == jj
a = Run('cat')
a.run(T, T, T).sendIn('')
@tester.add()
def stdin():
a = Run(['cat']).run(T, F, T)
Assert(a.p.stdout)
Assert(not a.p.stderr)
Assert(a.p.stdin)
txt = [defl.randomString() for x in range(10)]
a.sendIn('\n'.join(txt)).wait().assSuc()
Assert(a.outStr()) == txt
@tester.add()
def stdin():
for i in ['test', b'test', bytearray('test', encoding='utf8')]:
log.info('i', lambda r: r.i)
res = Run(['cat']).run(T, F, T)
Assert(res._popen.stdout)
Assert(res._popen.stdin)
Assert(not res._popen.stderr)
res.sendIn(i).wait().assSuc()
Assert(res.out.all()) == b'test'
Assert(Run(['cat']).run(T, F, T).sendIn(i).wait().assSuc().outStr()) == ['test']
@tester.add()
def state():
a = Run(['sleep', '.3'])
Assert(a.ran) == F
Assert(a.rc) == None
Assert(a.success) == None
Assert(a.running) == F
Assert(a.done) == F
a.run(T, F)
Assert(a.ran) == T
Assert(a.rc) == None
Assert(a.success) == None
Assert(a.running) == T
Assert(a.done) == F
a.wait()
Assert(a.ran) == T
Assert(a.rc) == 0
Assert(a.success) == T
Assert(a.running) == F
Assert(a.done) == T
@tester.add()
def env():
r = Run(['env']).envSet({'a': 'true'}).run(T, F).wait().assSuc().outStr()
Assert(r) == ['a=true']
r = Run(['env']).envSet().run(T, F).wait().assSuc().outStr()
Assert(r) == []
r = Run(['env']).envSet().envUpdate({'a': 'true'}).run(T, F).wait().assSuc().outStr()
Assert(r) == ['a=true']
r = Run(['env']).envUpdate({}).run(T, F).wait().assSuc().outStr()
assert len(r) > 2
@tester.add()
def stringFile():
res = Run(['bash', '-c', 'echo 1 ; echo 2 >&2 ; false']).run(T, 'out').wait().outStr()
Assert(res) == ['1', '2']
try:
Run(['bash', '-c', 'echo 1 ; echo 2 >&2 ; false']).run(T, 'out').wait().assSuc()
raise ValueError()
except Exception as e:
Assert(e.msg) == [bytearray(b'1'), bytearray(b'2')]
log.info(tester.run())
tester.exitWithStatus()
# for i in ch:
# print(defl.printTable([[y for y in x] for x in ch], squareFill=T))