defl/tests.old/test_string.py

223 lines
6.1 KiB
Python
Raw Permalink Normal View History

2024-09-11 11:14:03 -04:00
#!/usr/bin/env python
import os
import platform
import subprocess
import sys
from time import sleep
import defl
from defl import Assert, cl, log
from subprocess import Popen, PIPE
from defl.testing_ import Test, Tester, TestState
from random import randrange
tester = Tester(name=__name__)
2025-03-09 09:17:53 -04:00
2024-09-11 11:14:03 -04:00
@tester.add()
def readLine():
sa = defl.StringAssembly()
2025-03-09 09:17:53 -04:00
Assert(sa.charReady(0.0001)).eq(False)
Assert(sa.lineReady(0.0001)).eq(False)
2024-09-11 11:14:03 -04:00
sa + 'a'
2025-03-09 09:17:53 -04:00
Assert(sa.charReady(0.0001)).eq(True)
Assert(sa.lineReady(0.0001)).eq(False)
2024-09-11 11:14:03 -04:00
Assert(sa.readLine()).eq(None)
sa + 'b'
Assert(sa.readLine()).eq(None)
sa + '\n'
2025-03-09 09:17:53 -04:00
Assert(sa.charReady(0.0001)).eq(True)
Assert(sa.lineReady(0.0001)).eq(True)
2024-09-11 11:14:03 -04:00
Assert(sa.readLine()).eq(b'ab')
sa + 'c\nd\re\nf'
Assert(sa.readLine()).eq(b'c')
Assert(sa.readLine()).eq(b'd')
sa + '\n\r\n'
Assert(sa.readLine()).eq(b'e')
sa + 'g'
sa + 'h\ni'
Assert(sa.readLine()).eq(b'f')
Assert(sa.readLine()).eq(b'')
Assert(sa.readLine()).eq(b'')
Assert(sa.readLine()).eq(b'gh')
Assert(sa.readLine()).eq(None)
sa + '\n'
Assert(sa.readLine()).eq(b'i')
Assert(sa.readLine()).eq(None)
Assert(sa.bytz).eq(b'')
2025-03-09 09:17:53 -04:00
Assert(sa.charReady(0.0001)).eq(False)
Assert(sa.lineReady(0.0001)).eq(False)
2024-09-11 11:14:03 -04:00
Assert(sa.eof).eq(False)
sa + ''
Assert(sa.eof).eq(True)
2025-03-09 09:17:53 -04:00
2024-09-11 11:14:03 -04:00
@tester.add()
def readChar():
sa = defl.StringAssembly()
Assert(sa.newLineIndex()).iss(None)
sa + 'ccccc\nd\re\nfff\n\r\naaaa\n'
Assert(sa.newLineIndex()).iss(5)
Assert(sa.readChar(2)).eq(b'cc')
Assert(sa.readChar(2)).eq(b'cc')
Assert(sa.readChar(10000)).eq(b'c')
Assert(sa.readChar(2)).eq(b'd')
Assert(sa.readChar(2)).eq(b'e')
Assert(sa.readChar(1)).eq(b'f')
Assert(sa.readChar(0)).eq(b'')
Assert(sa.readChar(1)).eq(b'f')
Assert(sa.readChar(1)).eq(b'f')
Assert(sa.readChar(10000)).eq(b'')
Assert(sa.readChar(10000)).eq(b'')
Assert(sa.readChar(10000)).eq(b'')
Assert(sa.readChar(10000)).eq(b'aaaa')
Assert(sa.readChar(10000)).eq(b'')
2025-03-09 09:17:53 -04:00
2024-09-11 11:14:03 -04:00
@tester.add()
def readChar():
sa = defl.StringAssembly()
sa + 'ccccc\nd\re\nfff\n\r\naaaa\n'
Assert(sa.readChar(1)).eq(b'c')
Assert(sa.readChar(1)).eq(b'c')
Assert(sa.readChar(1)).eq(b'c')
Assert(sa.readChar(1)).eq(b'c')
Assert(sa.readChar(1)).eq(b'c')
Assert(sa.readChar(1)).eq(b'')
Assert(sa.readChar(1)).eq(b'd')
Assert(sa.readChar(1)).eq(b'')
Assert(sa.readChar(1)).eq(b'e')
Assert(sa.readChar(1)).eq(b'')
Assert(sa.readChar(1)).eq(b'f')
Assert(sa.readChar(1)).eq(b'f')
Assert(sa.readChar(1)).eq(b'f')
Assert(sa.readChar(1)).eq(b'')
Assert(sa.readChar(1)).eq(b'')
Assert(sa.readChar(1)).eq(b'')
Assert(sa.readChar(1)).eq(b'a')
Assert(sa.readChar(1)).eq(b'a')
Assert(sa.readChar(1)).eq(b'a')
Assert(sa.readChar(1)).eq(b'a')
Assert(sa.readChar(1)).eq(b'')
2025-03-09 09:17:53 -04:00
2024-09-11 11:14:03 -04:00
@tester.add()
def all():
sa = defl.StringAssembly('ccccc\nd\re\nfff\n\r\naaaa\n')
Assert(sa.all()).eq(b'ccccc\nd\re\nfff\n\r\naaaa\n')
Assert(sa.all()).eq(b'')
2025-03-09 09:17:53 -04:00
2024-09-11 11:14:03 -04:00
@tester.add()
def all():
sa = defl.StringAssembly(b'__READY__\r\n')
Assert([x for x in sa.readLines()]).eq([bytearray(b'__READY__')])
2025-03-09 09:17:53 -04:00
2024-09-11 11:14:03 -04:00
@tester.add()
def autoReader():
2025-03-09 09:17:53 -04:00
slp = 0.2
2024-09-11 11:14:03 -04:00
count = 5
x = ' ; '.join([f'sleep {slp} ; echo {i}' for i in range(count)])
run = Popen(['bash', '-c', x], stdout=PIPE)
out = defl.AutoReader(run.stdout, readWhile=run, threadMode=True)
2025-03-09 09:17:53 -04:00
sleep(slp * 0.25)
2024-09-11 11:14:03 -04:00
for i in range(5):
sleep(slp)
# Assert(out.ready(1)) == True
Assert(out.assembly.bytz) == bytearray(f'{i}\n'.encode('utf8'))
Assert(out.assembly.readLine()) == bytearray(f'{i}'.encode('utf8'))
run.wait()
Assert(out.assembly.bytz) == bytearray(b'')
2025-03-09 09:17:53 -04:00
2024-09-11 11:14:03 -04:00
@tester.add()
def ready():
for i in range(100):
run = Popen(['echo', 'test'], stdout=PIPE)
out = defl.AutoReader(run.stdout, threadMode=True).wait()
Assert(out.a.bytz) == bytearray(b'test\n')
2025-03-09 09:17:53 -04:00
2024-09-11 11:14:03 -04:00
@tester.add()
def autoReader():
2025-03-09 09:17:53 -04:00
for slp in [0, 0.05, 0.1, 0.2, 0.4]:
2024-09-11 11:14:03 -04:00
maxNum = 3
print(slp, 'popen')
2025-03-09 09:17:53 -04:00
run = Popen(
['bash', '-c', f'i=0 ; while [[ $i -lt {maxNum} ]] ; do i=$((i+1)) ; sleep {slp} ; echo $i ; done'],
stdout=PIPE,
)
2024-09-11 11:14:03 -04:00
print(slp, 'readers')
out = defl.AutoReader(run.stdout, readWhile=run, threadMode=False)
i = 0
outt = []
while out.hasMore():
2025-03-09 09:17:53 -04:00
log.info(
'read pipe',
)
while not out.readDone() and out.pipeReady(0.5):
2024-09-11 11:14:03 -04:00
i += 1
log.info('pipe ready', i)
out.read()
2025-03-09 09:17:53 -04:00
log.info(
'read lines',
)
while out.a.lineReady(0.1):
2024-09-11 11:14:03 -04:00
i += 1
res = out.a.readLine()
log.info(f'line ready {res}', i)
outt.append(res)
Assert(outt).eq([bytearray(f'{x}'.encode('utf8')) for x in range(1, maxNum + 1)])
run.wait()
2025-03-09 09:17:53 -04:00
2024-09-11 11:14:03 -04:00
@tester.add()
def autoReader():
run = Popen(['bash', '-c', f'seq 5 '], stdout=PIPE)
out = defl.AutoReader(run.stdout, readWhile=run, threadMode=True)
i = 1
for line in out:
Assert(line).eq(bytearray(f'{i}', 'utf8'))
i += 1
run.wait()
2025-03-09 09:17:53 -04:00
2024-09-11 11:14:03 -04:00
@tester.add()
def dedent():
2025-03-09 09:17:53 -04:00
a = """
2024-09-11 11:14:03 -04:00
a
b
c
2025-03-09 09:17:53 -04:00
"""
2024-09-11 11:14:03 -04:00
Assert(defl.formatMultiLineStr(a, dedent=True, join=None)) == ['', 'a', ' b', ' c', '', '']
Assert(defl.formatMultiLineStr(a, dedent=True, join='')) == 'a b c'
Assert(
defl.formatMultiLineStr(a, dedent=False, join='')
) == ' a b c '
2025-03-09 09:17:53 -04:00
Assert(defl.formatMultiLineStr(a, split='a', dedent=True, join=False, filt=True)) == [
' b\n c\n\n '
]
2024-09-11 11:14:03 -04:00
@tester.add()
def trapStd():
with defl.OverrideWriterCM([sys.stdout]) as std:
try:
print('a')
print('b')
print('c')
raise ValueError()
except Exception as e:
e.add_note(std.text())
raise e
2025-03-09 09:17:53 -04:00
2024-09-11 11:14:03 -04:00
log.info(tester.run())
tester.exitWithStatus()