#!/usr/bin/env python import datetime import enum import itertools import os import random import re import sys import time from dataclasses import KW_ONLY, dataclass, field from operator import itemgetter from time import sleep from datetime import datetime, timedelta, timezone from functools import partial import defl from defl import Assert, CLIError, Path, SignalTimeoutError, cl, log from defl._typing_ import * from defl.testing_ import Test, Tester, TestState from defl import Time, TimeDelta, TimeZone from zoneinfo import ZoneInfo tester = Tester(name=__file__) @tester.add() def timeDeltaFromStr(): tu = defl.TimeUnit Assert(tu.FromString(' 100ms')) == tu(ms=100.0) Assert(tu.FromString('.1ms')) == tu(ms=0.1) Assert(tu.FromString('.100ms')) == tu(ms=0.1) Assert(tu.FromString('1')) == tu(s=1.0) Assert(tu.FromString('1s')) == tu(s=1.0) Assert(tu.FromString('1.1')) == tu(s=1.1) Assert(tu.FromString('1:1.1')) == tu(m=1.0, s=1.1) Assert(tu.FromString('10:1:1.1')) == tu(h=10.0, m=1.0, s=1.1) Assert(tu.FromString('10:1:1.1 5ms')) == tu(h=10.0, m=1.0, s=1.1, ms=5) Assert(tu.FromString('10 1 1.1 5ms')) == tu(h=10.0, m=1.0, s=1.1, ms=5) Assert(tu.FromString('10h1m 1.1 5ms')) == tu(h=10.0, m=1.0, s=1.1, ms=5) Assert(tu.FromString('10:1:1.1s5ms')) == tu(h=10.0, m=1.0, s=1.1, ms=5) Assert(tu.FromString('10 1 1.1 5ms')) == tu(h=10.0, m=1.0, s=1.1, ms=5) Assert(tu.FromString('10h1m 1.1 5ms')) == tu(h=10.0, m=1.0, s=1.1, ms=5) Assert(tu.FromString('10:1m1.1s5ms')) == tu(h=10.0, m=1.0, s=1.1, ms=5) Assert(tu.FromString('10 1 1.1 5ms')) == tu(h=10.0, m=1.0, s=1.1, ms=5) Assert(tu.FromString('10h1m 1.1 5ms')) == tu(h=10.0, m=1.0, s=1.1, ms=5) Assert(tu.FromString('10h1m1.1s5ms')) == tu(h=10.0, m=1.0, s=1.1, ms=5) Assert(tu.FromString('10 1 1.1 5ms')) == tu(h=10.0, m=1.0, s=1.1, ms=5) Assert(tu.FromString('10h1m 1.1 5ms')) == tu(h=10.0, m=1.0, s=1.1, ms=5) Assert(tu.FromString('1.1:1.1:1.1:1.1')) == tu(s=1.1, m=1.1, h=1.1, d=1.1) Assert(tu(ms=100.0).toSec()) == 0.1 Assert(tu(ms=0.1).toSec()) == 0.0001 Assert(tu(ms=0.1).toSec()) == 0.0001 Assert(tu(s=1.0).toSec()) == 1.0 Assert(tu(s=1.0).toSec()) == 1.0 Assert(tu(s=1.1).toSec()) == 1.1 Assert(tu(m=1.0, s=1.1).toSec()) == 61.1 Assert(tu(h=10.0, m=1.0, s=1.1).toSec()) == 36061.1 Assert(tu(h=10.0, m=1.0, s=1.1, ms=5).toSec()) == 36061.105 Assert(tu(s=1.1, m=1.1, h=1.1, d=1.1).toSec()) == 99067.1 a = tu.FromString('1.1:1.1:1.1:1.1').toSec() b = tu(s=1.1, m=1.1, h=1.1, d=1.1).toSec() c = TimeDelta.FromString('1.1:1.1:1.1:1.1') Assert(a).inst(float) Assert(b).inst(float) Assert(c).inst(TimeDelta) Assert(a) == 99067.1 Assert(b) == 99067.1 Assert(c.toJson()) == {'seconds': 99067.1} @tester.add() def formatSeconds(): fs = defl.FormatSeconds Assert(fs(0).toStr()).eq('00S') Assert(fs(1).toStr()).eq('01S') Assert(fs(2).toStr()).eq('02S') Assert(fs(60).toStr()).eq('01M:00S') Assert(fs(61).toStr()).eq('01M:01S') Assert(fs(60 * 60).toStr()).eq('01H:00M:00S') Assert(fs([1, 1, 1]).toStr()).eq('01H:01M:01S') Assert(fs([1, 1, 1]).inc(-60).toStr()).eq('01H:00M:01S') Assert(fs().inc(60).toStr()).eq('01M:00S') Assert(fs().inc(1).toStr()).eq('01S') Assert(fs(60).inc().toStr()).eq('01M:00S') Assert(fs(1).inc().toStr()).eq('01S') Assert(fs(60).inc(-60).toStr()).eq('00S') @tester.add() def countDownTimer(): defl.countDownTimer(3) @tester.add() def progressPrint(): pp = defl.ProgressPrint(100, eta=None) Assert(pp.inc(0, write=false)).eq(' (0 / 100 = 0.00%) ') Assert(pp.inc(25.6, write=false)).eq(' (26 / 100 = 25.60%) ') Assert(pp.inc(100 - 25.6, write=false)).eq(' (100 / 100 = 100.00%) ') @tester.add() def eta(): eta = defl.ETA() total = 111 for i in range(1, total): res = eta.lap(i / total, secOverride=3.2) def fmt(x): return f'{x:.2f}' Assert(fmt(res.seconds)).eq(fmt(3.2 * (total - i))) # @tester.add() # def b(): # a = defl.ProgressPrint(300) # for i in range(300): # a.inc(postmsg='\n') # time.sleep(.5) @tester.add() def sectionStrFormat(): assert defl.sectionStrFormat('31=22:30') == [[31, 1350]] assert defl.sectionStrFormat('2:05,2:37=50:11') == [[0, 125], [157, 3011]] assert defl.sectionStrFormat('0=2:05,2:37=50:11') == [[0, 125], [157, 3011]] assert defl.sectionStrFormat('0=2:05,2:37=3011') == [[0, 125], [157, 3011]] assert defl.sectionStrFormat('0=125,157=50:11') == [[0, 125], [157, 3011]] assert defl.sectionStrFormat('0=125,157=3011') == [[0, 125], [157, 3011]] assert defl.sectionStrFormat('125,157=3011,3020', endDuration=3030) == [[0, 125], [157, 3011], [3020, 3030]] @tester.add() def timeoutCM(): with defl.TimeoutCM(0.5): ... @tester.add() def timeoutCM(): try: with defl.TimeoutCM(0.5): time.sleep(2) except defl.SignalTimeoutError: return assert False, 'signal not raised' @tester.add() def timeoutCM(): sig1 = defl.TimeoutCM(9999999) sig1.__enter__() sig1Num = sig1._signalNumber sig2 = defl.TimeoutCM(9999999) sig2.__enter__() sig2Num = sig2._signalNumber Assert(sig1Num).ne(sig2Num) sig1.__exit__() sig2.__exit__() @tester.add() def timeoutCM(): with defl.TimeoutCM(5): with defl.TimeoutCM(5): ... def t1t2(t1time: int, t2time: int, raisedName: str): t1 = None t2 = None try: with (t1 := defl.TimeoutCM(t1time, name='t1')): with (t2 := defl.TimeoutCM(t2time, name='t2')): time.sleep(999) except SignalTimeoutError as e: Assert(t1._exited).isEq(True) Assert(t2._exited).isEq(True) Assert(e.args[0].name).eq(raisedName) return raise Exception('Dont get here') @tester.add() def timeoutCM(): t1t2(0.5, 99999, 't1') @tester.add() def timeoutCM(): t1t2(99999, 0.5, 't2') @tester.add() def rollingAverage4(): tot = 20 avg = defl.ProgressPrint(20) for i in range(20): time.sleep(0.1) avg.inc() @tester.add() def rollingAverage4(): tot = 20 avg = defl.ProgressPrint(20, eta=defl.ETA(rollAvg=defl.RollingAverage(window=6))) for i in range(20): time.sleep(0.1) avg.inc() @tester.add() def timeClass(): utc = timezone.utc utcp1 = ZoneInfo('Asia/Baghdad') DT = datetime TD = timedelta start = partial(DT, 2001, 1, 1) ####################################### a = Time(datetime(2001, 1, 1, tzinfo=utc)).tz.offset Assert(a) == timedelta(0) t1 = datetime(2001, 2, 3, tzinfo=utc) t2 = Time(t1, autoAddTzStr=False) Assert(str(t2)) == '2001-02-03_00-00-00', t2 t3 = t2 - 1 t4 = Time(datetime(2001, 2, 2, 23, 59, 59, tzinfo=utc)) t5 = datetime(2001, 2, 2, 23, 59, 59, tzinfo=utc) t6 = t2 - timedelta(seconds=1) t7 = t2 - dict(seconds=1) for i in [t3, t4, t6, t7]: Assert(i).isType(Time) Assert(t3) == t5 Assert(t3) == t4 Assert(t4) == t5 Assert(t4) == t6 tz = Time(datetime(2000, 1, 1), tz='UTC').tz Assert(tz) == TimeZone(offset=timedelta(0)) Assert(tz) == timedelta(0) Assert(tz) != timedelta(hours=1) # assert (t := datetime(1970, 1, 1, 0, 0, 0, tzinfo=timezone.utc).strftime("%s")) == "0", t a = Assert.Me() / Time.epochFromDatetime(datetime(1970, 1, 1, tzinfo=utc)) a() == 0 a = Assert.Me() / Time(datetime(1970, 1, 1, tzinfo=utc)).epoch() a() == 0 a = Assert.Me() / Time(datetime(1970, 1, 1, tzinfo=timezone(timedelta(hours=-1)))).epoch() a() == 60**2 a = Assert.Me() / (datetime(2000, 1, 1, tzinfo=utcp1) < datetime(2000, 1, 1, tzinfo=utc)) a().iss(True) a = Assert.Me() / ( Time(datetime(2000, 1, 1, tzinfo=utc)).epoch() - Time(datetime(2000, 1, 1, tzinfo=utcp1)).epoch() ) a() == 10800.0 a = Time(datetime(2000, 1, 1, tzinfo=utcp1)).epoch() b = Time(datetime(2000, 1, 1, tzinfo=utc)).epoch() a = Assert.Me() / (a < b) a().iss(True) b = Time(datetime(2000, 1, 1, tzinfo=ZoneInfo('EST')), fmt='%Y-%m-%d_%H-%M-%S %z') c = Time(datetime(2000, 1, 1, tzinfo=ZoneInfo('UTC')), fmt='%Y-%m-%d_%H-%M-%S %z') res = b > c Assert(res).iss(True) ####################################### def tzt(zone, res): off = TimeZone.Offset(zone) (Assert.Me() / off)() == res # tzt(zone=ZoneInfo('Europe/Berlin'), res=TimeDelta(timedelta(hours=1))) tzt(zone=ZoneInfo('UTC'), res=TimeDelta(timedelta(hours=0))) TimeZone.Offset(None) # local time ####################################### def tzt(dt, res, **kargs): ttz = Time.ToTimeZone(dt, **kargs) off = TimeZone.Offset(ttz.tzinfo) dt = ttz.replace(tzinfo=None) Assert(off) == TimeZone.Offset(res.tzinfo) Assert(dt) == res.replace(tzinfo=None) tzt(dt=start(0, tzinfo=utc), dst=utcp1, isDst=True, res=start(0, tzinfo=utcp1)) tzt(dt=start(0, tzinfo=utc), dst=utcp1, isDst=False, res=start(3, tzinfo=utcp1)) tzt(dt=start(0, tzinfo=utc), dst=utcp1, isDst=False, res=start(3, tzinfo=utcp1)) tzt(dt=start(0, tzinfo=utc), dst=utc, isDst=True, res=start(0, tzinfo=utc)) tzt(dt=start(0, tzinfo=utc), dst=utc, isDst=False, res=start(0, tzinfo=utc)) tzt(dt=start(0, tzinfo=None), dst=utc, isDst=True, res=start(0, tzinfo=utc)) tzt(dt=start(0, tzinfo=None), dst=utc, isDst=False, res=start(0, tzinfo=utc) + Time.LocalTz.offset * -1) tzt(dt=start(0, tzinfo=utcp1), dst=utc, isDst=True, res=start(0, tzinfo=utc)) tzt(dt=start(0, tzinfo=utcp1), dst=utc, isDst=False, res=start(0, tzinfo=utc) - TD(hours=3)) ####################################### def tzt(dt, res, **kargs): (Assert.Me() / (t := Time(dt, **kargs)))() == res return t t = tzt(dt=start(0, tzinfo=utc), res=start(3, tzinfo=utcp1)) Assert(t.dt.tzinfo == t.tz == TimeZone(timedelta(hours=1))) t = tzt(dt=start(0, tzinfo=utcp1), res=start(0, tzinfo=utcp1)) Assert(t.dt.tzinfo == t.tz == TimeZone(timedelta(hours=2))) t = tzt(dt=start(0, tzinfo=None), res=start(0, tzinfo=utc), tz=utc) Assert(t.dt.tzinfo == t.tz == TimeZone(timedelta(hours=0))) t = tzt(dt=start(0, tzinfo=None), res=start(0, tzinfo=utcp1), tz=utcp1) Assert(t.dt.tzinfo == t.tz == TimeZone(timedelta(hours=1))) t = tzt(dt=start(0, tzinfo=utc), res=start(0, tzinfo=utc)) Assert(t.dt.tzinfo == t.tz == TimeZone(timedelta(hours=1))) t = tzt(dt=start(0, tzinfo=None), res=start(0, tzinfo=Time.LocalTz)) Assert(t.dt.tzinfo == t.tz == Time.LocalTz.offset) a = datetime(2000, 1, 1, tzinfo=ZoneInfo('EST')) b = datetime(2000, 1, 1, tzinfo=ZoneInfo('UTC')) Assert(a > b).iss(True) c = Time.epochFromDatetime(a) d = Time.epochFromDatetime(b) Assert(c > d).iss(True) c = Time(a) d = Time(b) Assert(c > d).iss(True) Assert(c < d).iss(False) Assert(c == d).iss(False) Assert(c - d).isType(TimeDelta) Assert(c - TimeDelta(0)).isType(Time) Assert(c - timedelta(0)).isType(Time) Assert(TimeDelta(0) - TimeDelta(0)).isType(TimeDelta) Assert(TimeDelta(0) - timedelta(0)).isType(TimeDelta) Assert(TimeDelta(0)).raises(func=lambda x: x - c, exception=TypeError) a = Time.FromEpoch(0).epoch() Assert(a) == 0 Assert(TimeDelta(0) < 0).isType(bool) @tester.add() def timeClass(): t1 = Time(datetime(2001, 2, 3)) t2 = t1.replace(m=1) Assert(t2.minute) == 1 now = Time() t3 = now.replace(h=1, m=1, s=1) Assert(t3.hour) == 1 Assert(t3.minute) == 1 Assert(t3.second) == 1 @tester.add() def fuzzyDateTime() -> N: t1 = Time(datetime(2001, 2, 3)) t2 = t1.fuzzyTimeDelta('1 year') Assert(int((t2 - t1).days)) == 365 t2 = t1.fuzzyTimeDelta('2 year') Assert(int((t2 - t1).days)) == 365 * 2 t2 = t1.fuzzyTimeDelta('1 day') Assert(int((t2 - t1).days)) == 1 t2 = t1.fuzzyTimeDelta('1 year ago') Assert(int((t2 - t1).days)) == -365 log.info(tester.run()) tester.exitWithStatus()