#!/usr/bin/env -- python """This module implements a Timer class for managing periodic events called TimerEvents. There is a Clock abstraction which could use time.time(). TimerEvents default to repeating periodicly forver, but you can specify a count to restrict the number of times the event will ring. You can also specify a scale (like 'minute', 'hour', 'day') and the period will be measured relative to those units.""" __author__ = 'Chuck Swiger ' __copyright__ = 'Copyright (c) 2005 Charles Swiger' __version__ = '$Id: Timer.py,v 1.15 2005/12/14 22:57:14 chuck Exp $' import sys,time __all__ = ['Clock', 'Timer', 'TimerEvent', 'seconds4time'] """The list of known timescale strings which seconds4time() understands.""" TimeScaleList = ['second', 'minute', 'hour', 'day', 'week', 'year', 'eon'] def seconds4time(time, scale='second'): """Compute the number of seconds from a value and scale as string.""" ch1 = scale[0].lower() if ch1 == 's': return time if ch1 == 'm': return time * 60 if ch1 == 'h': return time * 3600 if ch1 == 'd': return time * 3600 * 24 if ch1 == 'w': return time * 3600 * 24 * 7 if ch1 == 'y': year = int(3600 * 24 * 365.25) return time * year if ch1 == 'e': eon = int(3600 * 24 * 365.25 * 1000) return time * eon print "TIMER.seconds4time(): unknown timescale string:", scale return time class Clock: """The Clock class implements different types of clocks used by the Timer. This includes your standard realtime clock (aka time.time()), an elapsed clock (measuring realtime since the Clock was initialized), and even a dummy or "null" clock which only ticks when you call Clock.tick(). The elapsed and null clocks also supports operation at a ratio, which could include a value of 0, meaning time is 'paused' for this Clock.""" __slots__ = ('clocktype', 'function', 'now', 'origin', 'ratio') def __init__(self, clocktype='RT', ratio=1, origin=0): """The default options create a "realtime" clock, using time.time(). You may also create an "elapsed" clock (aka 'EL'), which behaves much like a stopwatch, and has self.origin set to now (aka time.time()), or pass a callable function of your own in which returns a number. You can also pass a clocktype of "NULL" or None, which keeps track of a fake timer, which you or the run() method update via Clock.tick().""" # lets recognize these common cases as being the realtime clock if clocktype is time.time or clocktype is time.clock: clocktype = 'RT' # Note that we keep the clock function relative to the origin set for # that clock, which can be changed (in order to accomodate pausing). # It turns out to be better to simply always subtract the origin, # rather than testing whether the origin == 0. # if the user passed a callable function in directly, use that if callable(clocktype): self.origin = origin self.function = lambda: clocktype() - self.origin clocktype = 'User' elif clocktype == 'realtime' or clocktype == 'RT': self.origin = origin self.function = lambda: time.time() - self.origin elif clocktype == 'elapsed' or clocktype == 'EL': self.origin = time.time() self.function = lambda: time.time() - self.origin else: clocktype = 'NULL' self.origin = origin self.now = self.origin self.function = lambda: self.now self.now = self.function() self.clocktype = clocktype self.ratio = ratio def __call__(self): """Return current time by calling the underlying clock function.""" if self.ratio == 0: # we are paused, no need to call the clock function, just return return self.now if self.ratio == 1: # normal case, time passing with ratio == 1 self.now = self.function() return self.now # We are running in variable ratio time, however the above tests should # be optional, the code below should handle ratio == 0 and ratio == 1 # to give the same results. last = self.now self.now = self.function() delta = self.now - last self.origin -= delta * (self.ratio - 1) self.now = last + delta * self.ratio return self.now def __repr__(self): """Returns a description of the Clock.""" return '<%s Clock now=%s>' % (self.clocktype, self.now) def pause(self): """Pause the timer.""" if 0: print "Clock.pause() called at %0.3f" % self() Clock.setRatio(self, 0) def unpause(self, ratio=1): """Unpause the timer and revert to normal speed.""" if 0: print "Clock.unpause() called at %0.3f" % self() Clock.setRatio(self, ratio) def tick(self, amount=1, scale=None): """Advance a "null" clock by 1 second or as specified.""" if self.clocktype != 'NULL': raise AssertionError("Can't call Timer.tick() except on a NULL clock!") if self.ratio == 0: return if scale == None: delta = amount else: delta = seconds4time(amount,scale) self.now += delta def setRatio(self, ratio=0): """Change the ratio to the specified value, adjusting the clock origin so as to return the right value for now after the ratio is applied. Note that changing the ratio calls the underlying clock function.""" if self.ratio == 0: if ratio == 0: return # we were paused, but are now unpausing last = self.now self.now = self.function() delta = self.now - last self.origin += delta else: self.now = self.function() self.ratio = ratio class TimerEvent: """This is an event which gets invoked by the timer periodicly. TimerEvents default to repeating periodicly forever, but you can specify a count to restrict the number of times the event will ring. You can also specify a scale (like 'minute', 'hour', 'day') from Timer.TimeScaleList and the period will be measured in those units. Non-decimal and sub-second timescales and offsets are supported. In practice, with your system quantum HZ=1000, you can schedule events to millisecond precision and get reasonable accuracy.""" def __init__(self, name, function, count=0, period=1, scale=None,offset=0): """Create a new instance with the given name and function. Optionally, one may specify a repeat count (count=0 means do-forever), a period, the scale of the period ('minute', 'hour', etc), and an offset, which is measured in the same scale as the period.""" self.name = name self.function = function self.count = count self.TODO = [] self.resync_to_realtime = 1 self.last_ring = None self.times_rung = 0 self.debug = 0 # decide whether to ring immediately when the timer first starts self.zeroring = False # deal with time scales of other than seconds if scale == None: self.scale = 'second' else: # seconds4time() is looking for lowercase time interval names # (it converts it, but lets be safe anyway) self.scale = scale.lower() # determine if period is a multiple of largest common timescale self.period = seconds4time(period, self.scale) self.offset = seconds4time(offset, self.scale) for unit in TimeScaleList: secs = seconds4time(1, unit) if (self.period % secs) == 0: self.scale = unit def ring(self): """This is called from the Timer when the event should ring.""" if self.debug > 1: print "TIMER.ring(): %0.12s #%d, period = %d" % \ (self.name, self.times_rung, self.period) if self.function is None: # if the function is None and there are no child events, retire if len(self.TODO) == 0: if self.debug: print "TIMER.ring(): retiring timer:", self self.parent.remove(self) self.timer.rescale() return else: self.function(self) for child in self.TODO: self.timer.testEvent(child) self.times_rung += 1 # handle timers which happen a limited count of times if self.count > 0: self.count -= 1 if self.count == 0: self.function = None def remove(self, event): self.TODO.remove(event) def __repr__(self): return '' % (self.name, self.period) class Timer: """Timer instances have a clock and list of periodic events to run. An initial list can be provided via TODO, or events can be added or removed at will. By default, a Timer will use the realtime clock [time.time()], but you can call it with Timer(clock=None) and use a simulated "null" clock which gets updated by calling Timer.tick(). If one uses a simulated clock, one can adjust Timer.origin to some appropriate value, otherwise the origin of the timer will be set to the current time as an integer, so that a one-minute timer fires at 60.000 seconds, and not 60.735. However, you can provide any other timer you want: all it needs to do is be callable and return a number, and you can reset the origin if you like before adding an event in order to offset events by whatever amount you wish (0.12 seconds or 15 hours). The Timer will run periodic events forever, but it also supports do-once or do-COUNT-times events, which get removed after they have rung the specified # of times. If until is set to a non-zero value, Timer.run() will exit once Timer.clock() > until. epsilon is the minimum granularity the Timer considers significant. It defaults to one millisecond, but check your kernel's HZ setting.""" def __init__(self, TODO=[], clock='RT', scale='second', origin=0, until=0): self.TODO = TODO # make sure all events can see this Timer for i in self.TODO: i.timer = self self.scale = scale self.period = seconds4time(1, self.scale) # by default, we will use the system realtime clock if clock == None: self.clock = Clock('NULL', origin=origin) elif isinstance(clock, Clock): self.clock = clock else: self.clock = Clock(clock, origin=origin) if self.clock.clocktype == 'NULL': self.hasclock = False else: self.hasclock = True self.now = self.clock() self.until = self.now + until self.etr = self.clock.origin self.delay = 0 self.debug = 0 self.maxdelta = 0.1 self.epsilon = 0.001 def __repr__(self): """Returns a description of the Timer.""" if self.hasclock: self.now = self.clock() return '' % (self.clock.clocktype, self.now, self.TODO) def add(self, event): """Adds a new timer item to the TODO list.""" event.timer = self if self.period > event.period: if self.debug > 1: print "TIMER.add(): WARNING: event period too small for clock period, shrinking clock period", self, event self.scale = event.scale self.period = seconds4time(1, self.scale) event.parent = self if event.scale != 'second': # if the event.period is a multiple of another event, coalesce for ev in self.TODO: if ev.scale == event.scale and (event.period % ev.period) == 0: event.parent = ev if self.debug: print "TIMER.add() merging %s with %s" % \ (event, event.parent) event.parent.TODO.append(event) def find(self, name): """Return the first event in the TODO list matching the name.""" if name == None: return self.TODO[0] for e in self.TODO: if e.name == name: return e for ev in e.TODO: if ev.name == name: return ev return None def remove(self, event): """Delete event from the TODO list if specified, else TODO.pop(0).""" if event == None: self.TODO.pop(0) else: try: self.TODO.remove(event) except ValueError: for ev in self.TODO: try: ev.remove(event) except: pass def rescale(self): """Reset Timer scale as needed based on existing TODO list.""" self.scale = 'eon' for ev in self.TODO: if seconds4time(1, self.scale) > ev.period: self.scale = ev.scale self.period = seconds4time(1, self.scale) if self.debug: print 'TIMER.rescale: ', self.scale, self.period def ring(self, TODOlist=None): """ring() should be called whenever time changes, to check whether any timer events have fired. Returns the interval until the next event should be rung in seconds.""" self.now = self.clock() # Check whether we woke up too soon... delta = self.etr - self.now if delta > 0: # however, delta within a millisecond is good enough... if delta < self.epsilon / 2: pass else: # if delta is larger, return and try again later... if delta > 0.9 and self.debug > 1: print "TIMER.ring(): woke up early! now = %.3f, ETR = %.3f" % (self.now, self.etr) return delta # go through each timer item in the list and ring the events as needed if TODOlist is None: TODOlist = self.TODO for ev in TODOlist: self.testEvent(ev) # it may take a while to test and run all events, so if using a real # clock, recheck the time afterwards to better estimate next ring if self.hasclock: finishtime = self.clock() delta = finishtime - self.now self.now = finishtime if self.maxdelta and delta > self.maxdelta: print 'WARNING: events took %.3f seconds to run, re-ringing' % delta return self.ring() later = self.etr - self.now if later < 0: if abs(later) < self.epsilon: return 0 print "Timer.ring(): subzero later, delta=%0.5f" % later return self.epsilon if self.debug > 0: print 'TIMER.ring(): now = %.3f, ETR = %0.3f, later = %.3f' % (self.now, self.etr, later) return later def run(self, until=0): """Run the Timer while there are events left to do or time expires.""" if len(self.TODO) == 0: print "Timer.run: called when TODO list is empty!" return # see if the user wants to only run the timer loop for a while if until > 0: self.until = self.now + until else: self.until = 0 while self.TODO: later = self.ring() if self.until and (self.now > self.until): return if self.hasclock: if self.debug > 1: print "TIMER.run: sleeping for %.3f seconds" % later time.sleep(later) else: if self.debug > 1: print "TIMER.run: next event in %.3f seconds" % later # set this to delay ringing of simulated clock events if self.delay > 0: time.sleep(self.delay) self.tick(later) if self.debug: print "TIMER.run: completed @ %s", self.now def setEventLR(self, event): """Compute the last_ring time the event should have had to be sync'ed with the realtime (if desired), the first time the event is seen.""" self.now = self.clock() if event.resync_to_realtime: if self.hasclock: # figure out the last time the event was in sync with localtime lt = time.localtime() if event.period > 86400: delta = lt[5] + lt[4] * 60 + lt[3] * 3600 + lt[7] * 86400 elif event.period > 3600: delta = lt[5] + lt[4] * 60 + lt[3] * 3600 elif event.period > 60: delta = lt[5] + lt[4] * 60 else: delta = lt[5] else: delta = self.now % event.period if delta > event.period: delta = delta % event.period if event.period > 1: event.last_ring = int(self.now - delta) else: ratio = 1.0 / event.period event.last_ring = (int((self.now - delta) * ratio)) / ratio event.last_ring += event.offset if event.debug > 1: print "TIMER.setEventLR():", event.name, event.period, event.last_ring, delta, self.now else: # we are not resync'ing to real time, just start clock immediately if self.now > 0: event.last_ring = self.now else: event.last_ring = 0 def testEvent(self, ev): """Test whether a TimerEvent has rung.""" # if the event has never been called, initialize ev.last_ring if ev.last_ring == None: self.setEventLR(ev) if ev.zeroring: ev.ring() delta = ev.last_ring + ev.period - self.now if delta > 0: next_ring = ev.last_ring + ev.period else: # time to ring! ev.last_ring += ev.period if ev.debug > 1: print "TIMER: %s called: %.3, last_ring = %d, period = %d" % (ev.name, self.now, ev.last_ring, ev.period) ev.ring() if ev.period + delta > 0: # right now is only slightly after we should have rung pass else: #print 'TIMER: delta between rings greater than event period!' targettime = self.now - (abs(delta) % ev.period) while targettime > ev.last_ring: ev.last_ring += ev.period if ev.debug: print "TIMER: missed ring for %s: %.3f, last_ring = %d, period = %d" % (ev.name, self.now, ev.last_ring, ev.period) ev.ring() next_ring = ev.last_ring + ev.period # update timer's ETR to the next soonest event to ring... if self.etr <= self.now: self.etr = next_ring else: if self.etr > next_ring: self.etr = next_ring def tick(self, amount=1, scale=None): """Advance a "null" clock by 1 second or as specified.""" self.clock.tick(amount=amount, scale=scale) self.now = self.clock() #### # Some simple test functions to display data #### def generic_h(ev): """Generic timer handler.""" print "%15s handler: %5d last_ring=%.3f @ %.3f" % \ (ev.name, ev.times_rung, ev.last_ring, ev.timer.now) def hour_h(ev): if ev.times_rung < 5 or (ev.times_rung % 100) == 0: print " hour handler: %7d last_ring=%.3f" % \ (ev.times_rung, ev.last_ring) def day_h(ev): if (ev.times_rung % 10) == 0: print " day handler: %7d last_ring=%.3f" % \ (ev.times_rung, ev.last_ring) ev.timer.add(TimerEvent('20-min', generic_h, count=3, period=1200)) def year_h(ev): print " year handler: %7d last_ring=%.3f" % \ (ev.times_rung, ev.last_ring) def main(): global t if 1: # The default case is a realtime clock. Hit control-C to end unless # you actually want to wait for the three years to expire. :-) t = Timer() else: # This is a simulated clock, pipe the output to a file). t = Timer(clock=None, origin=543210) # set this say 0.3 to delay some between events firing t.delay = 0.0 t.add(TimerEvent('frac-second', generic_h, count=256, period=0.005)) t.add(TimerEvent('second', generic_h, count=400)) t.add(TimerEvent('10-second', generic_h, count=20,period=10,offset=5)) t.add(TimerEvent('1-minute', generic_h, count=240,period=1,scale='minute')) t.add(TimerEvent('5-minute', generic_h, count=20,period=5,scale='minute',offset=0)) t.add(TimerEvent('hour', hour_h, count=10000, scale='hour')) t.run(10000) print t if 0: t.add(TimerEvent('day', day_h, count=30, scale='day')) t.add(TimerEvent('year', year_h, count=3, scale='year')) t.run(100000000) if __name__ == '__main__': if 1: try: main() except KeyboardInterrupt: sys.stderr.write("\n ...Control-C seen, quitting program.\n") else: # set the above to 0 to run under the python profiler, see: # http://docs.python.org/lib/profile-instant.html import profile pr = profile.Profile(bias=2.0e-5) profile.run('main()', 'timer.profile')