223 lines
6.1 KiB
Python
Executable File
223 lines
6.1 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import os
|
|
import platform
|
|
import subprocess
|
|
import sys
|
|
from time import sleep
|
|
import defl
|
|
from defl import Assert, cl, log
|
|
from subprocess import Popen, PIPE
|
|
from defl.testing_ import Test, Tester, TestState
|
|
from random import randrange
|
|
|
|
tester = Tester(name=__name__)
|
|
|
|
|
|
@tester.add()
|
|
def readLine():
|
|
sa = defl.StringAssembly()
|
|
Assert(sa.charReady(0.0001)).eq(False)
|
|
Assert(sa.lineReady(0.0001)).eq(False)
|
|
sa + 'a'
|
|
Assert(sa.charReady(0.0001)).eq(True)
|
|
Assert(sa.lineReady(0.0001)).eq(False)
|
|
|
|
Assert(sa.readLine()).eq(None)
|
|
sa + 'b'
|
|
Assert(sa.readLine()).eq(None)
|
|
sa + '\n'
|
|
Assert(sa.charReady(0.0001)).eq(True)
|
|
Assert(sa.lineReady(0.0001)).eq(True)
|
|
Assert(sa.readLine()).eq(b'ab')
|
|
sa + 'c\nd\re\nf'
|
|
Assert(sa.readLine()).eq(b'c')
|
|
Assert(sa.readLine()).eq(b'd')
|
|
sa + '\n\r\n'
|
|
Assert(sa.readLine()).eq(b'e')
|
|
sa + 'g'
|
|
sa + 'h\ni'
|
|
Assert(sa.readLine()).eq(b'f')
|
|
Assert(sa.readLine()).eq(b'')
|
|
Assert(sa.readLine()).eq(b'')
|
|
Assert(sa.readLine()).eq(b'gh')
|
|
Assert(sa.readLine()).eq(None)
|
|
sa + '\n'
|
|
Assert(sa.readLine()).eq(b'i')
|
|
Assert(sa.readLine()).eq(None)
|
|
Assert(sa.bytz).eq(b'')
|
|
Assert(sa.charReady(0.0001)).eq(False)
|
|
Assert(sa.lineReady(0.0001)).eq(False)
|
|
Assert(sa.eof).eq(False)
|
|
sa + ''
|
|
Assert(sa.eof).eq(True)
|
|
|
|
|
|
@tester.add()
|
|
def readChar():
|
|
sa = defl.StringAssembly()
|
|
Assert(sa.newLineIndex()).iss(None)
|
|
sa + 'ccccc\nd\re\nfff\n\r\naaaa\n'
|
|
Assert(sa.newLineIndex()).iss(5)
|
|
Assert(sa.readChar(2)).eq(b'cc')
|
|
Assert(sa.readChar(2)).eq(b'cc')
|
|
Assert(sa.readChar(10000)).eq(b'c')
|
|
Assert(sa.readChar(2)).eq(b'd')
|
|
Assert(sa.readChar(2)).eq(b'e')
|
|
Assert(sa.readChar(1)).eq(b'f')
|
|
Assert(sa.readChar(0)).eq(b'')
|
|
Assert(sa.readChar(1)).eq(b'f')
|
|
Assert(sa.readChar(1)).eq(b'f')
|
|
Assert(sa.readChar(10000)).eq(b'')
|
|
Assert(sa.readChar(10000)).eq(b'')
|
|
Assert(sa.readChar(10000)).eq(b'')
|
|
Assert(sa.readChar(10000)).eq(b'aaaa')
|
|
Assert(sa.readChar(10000)).eq(b'')
|
|
|
|
|
|
@tester.add()
|
|
def readChar():
|
|
sa = defl.StringAssembly()
|
|
sa + 'ccccc\nd\re\nfff\n\r\naaaa\n'
|
|
Assert(sa.readChar(1)).eq(b'c')
|
|
Assert(sa.readChar(1)).eq(b'c')
|
|
Assert(sa.readChar(1)).eq(b'c')
|
|
Assert(sa.readChar(1)).eq(b'c')
|
|
Assert(sa.readChar(1)).eq(b'c')
|
|
Assert(sa.readChar(1)).eq(b'')
|
|
Assert(sa.readChar(1)).eq(b'd')
|
|
Assert(sa.readChar(1)).eq(b'')
|
|
Assert(sa.readChar(1)).eq(b'e')
|
|
Assert(sa.readChar(1)).eq(b'')
|
|
Assert(sa.readChar(1)).eq(b'f')
|
|
Assert(sa.readChar(1)).eq(b'f')
|
|
Assert(sa.readChar(1)).eq(b'f')
|
|
Assert(sa.readChar(1)).eq(b'')
|
|
Assert(sa.readChar(1)).eq(b'')
|
|
Assert(sa.readChar(1)).eq(b'')
|
|
Assert(sa.readChar(1)).eq(b'a')
|
|
Assert(sa.readChar(1)).eq(b'a')
|
|
Assert(sa.readChar(1)).eq(b'a')
|
|
Assert(sa.readChar(1)).eq(b'a')
|
|
Assert(sa.readChar(1)).eq(b'')
|
|
|
|
|
|
@tester.add()
|
|
def all():
|
|
sa = defl.StringAssembly('ccccc\nd\re\nfff\n\r\naaaa\n')
|
|
Assert(sa.all()).eq(b'ccccc\nd\re\nfff\n\r\naaaa\n')
|
|
Assert(sa.all()).eq(b'')
|
|
|
|
|
|
@tester.add()
|
|
def all():
|
|
sa = defl.StringAssembly(b'__READY__\r\n')
|
|
Assert([x for x in sa.readLines()]).eq([bytearray(b'__READY__')])
|
|
|
|
|
|
@tester.add()
|
|
def autoReader():
|
|
slp = 0.2
|
|
count = 5
|
|
x = ' ; '.join([f'sleep {slp} ; echo {i}' for i in range(count)])
|
|
run = Popen(['bash', '-c', x], stdout=PIPE)
|
|
out = defl.AutoReader(run.stdout, readWhile=run, threadMode=True)
|
|
sleep(slp * 0.25)
|
|
for i in range(5):
|
|
sleep(slp)
|
|
# Assert(out.ready(1)) == True
|
|
Assert(out.assembly.bytz) == bytearray(f'{i}\n'.encode('utf8'))
|
|
Assert(out.assembly.readLine()) == bytearray(f'{i}'.encode('utf8'))
|
|
run.wait()
|
|
Assert(out.assembly.bytz) == bytearray(b'')
|
|
|
|
|
|
@tester.add()
|
|
def ready():
|
|
for i in range(100):
|
|
run = Popen(['echo', 'test'], stdout=PIPE)
|
|
out = defl.AutoReader(run.stdout, threadMode=True).wait()
|
|
Assert(out.a.bytz) == bytearray(b'test\n')
|
|
|
|
|
|
@tester.add()
|
|
def autoReader():
|
|
for slp in [0, 0.05, 0.1, 0.2, 0.4]:
|
|
maxNum = 3
|
|
print(slp, 'popen')
|
|
run = Popen(
|
|
['bash', '-c', f'i=0 ; while [[ $i -lt {maxNum} ]] ; do i=$((i+1)) ; sleep {slp} ; echo $i ; done'],
|
|
stdout=PIPE,
|
|
)
|
|
print(slp, 'readers')
|
|
out = defl.AutoReader(run.stdout, readWhile=run, threadMode=False)
|
|
|
|
i = 0
|
|
outt = []
|
|
while out.hasMore():
|
|
log.info(
|
|
'read pipe',
|
|
)
|
|
while not out.readDone() and out.pipeReady(0.5):
|
|
i += 1
|
|
log.info('pipe ready', i)
|
|
out.read()
|
|
log.info(
|
|
'read lines',
|
|
)
|
|
while out.a.lineReady(0.1):
|
|
i += 1
|
|
res = out.a.readLine()
|
|
log.info(f'line ready {res}', i)
|
|
outt.append(res)
|
|
|
|
Assert(outt).eq([bytearray(f'{x}'.encode('utf8')) for x in range(1, maxNum + 1)])
|
|
run.wait()
|
|
|
|
|
|
@tester.add()
|
|
def autoReader():
|
|
run = Popen(['bash', '-c', f'seq 5 '], stdout=PIPE)
|
|
out = defl.AutoReader(run.stdout, readWhile=run, threadMode=True)
|
|
|
|
i = 1
|
|
for line in out:
|
|
Assert(line).eq(bytearray(f'{i}', 'utf8'))
|
|
i += 1
|
|
run.wait()
|
|
|
|
|
|
@tester.add()
|
|
def dedent():
|
|
a = """
|
|
a
|
|
b
|
|
c
|
|
|
|
"""
|
|
Assert(defl.formatMultiLineStr(a, dedent=True, join=None)) == ['', 'a', ' b', ' c', '', '']
|
|
Assert(defl.formatMultiLineStr(a, dedent=True, join='')) == 'a b c'
|
|
Assert(
|
|
defl.formatMultiLineStr(a, dedent=False, join='')
|
|
) == ' a b c '
|
|
Assert(defl.formatMultiLineStr(a, split='a', dedent=True, join=False, filt=True)) == [
|
|
' b\n c\n\n '
|
|
]
|
|
|
|
|
|
@tester.add()
|
|
def trapStd():
|
|
with defl.OverrideWriterCM([sys.stdout]) as std:
|
|
try:
|
|
print('a')
|
|
print('b')
|
|
print('c')
|
|
raise ValueError()
|
|
except Exception as e:
|
|
e.add_note(std.text())
|
|
raise e
|
|
|
|
|
|
log.info(tester.run())
|
|
tester.exitWithStatus()
|