events/src/EventCounter.m3


                            -*- Mode: Modula-3 -*- 
 * 
 * For information about this program, contact Blair MacIntyre            
 * (bm@cs.columbia.edu) or Steven Feiner (feiner@cs.columbia.edu)         
 * at the Computer Science Dept., Columbia University,                    
 * 1214 Amsterdam Ave. Mailstop 0401, New York, NY, 10027.                
 *                                                                        
 * Copyright (C) 1995, 1996 by The Trustees of Columbia University in the 
 * City of New York.  Blair MacIntyre, Computer Science Department.       
 * See file COPYRIGHT-COLUMBIA for details.
 * 
 * Author          : Blair MacIntyre
 * Created On      : Thu May 11 11:19:21 1995
 * Last Modified By: Blair MacIntyre
 * Last Modified On: Mon Aug  4 12:30:02 1997
 * Update Count    : 119
 * 
 * $Source: /usr/cvs/cm3/m3-comm/events/src/EventCounter.m3,v $
 * $Date: 2001-12-02 00:20:37 $
 * $Author: wagner $
 * $Revision: 1.2 $
 * 
 * $Log: EventCounter.m3,v $
 * Revision 1.2  2001-12-02 00:20:37  wagner
 * add copyright notes, fix overrides for cm3, and make everything compile
 *
 * added: events/COPYRIGHT-COLUMBIA
 * added: events/src/COPYRIGHT-COLUMBIA
 * modified: events/src/Event.i3
 * modified: events/src/Event.m3
 * modified: events/src/EventConn.i3
 * modified: events/src/EventConn.m3
 * modified: events/src/EventCounter.i3
 * modified: events/src/EventCounter.m3
 * modified: events/src/EventHandle.i3
 * modified: events/src/EventIO.i3
 * modified: events/src/EventNumber.i3
 * modified: events/src/EventNumber.m3
 * modified: events/src/EventNumberF.i3
 * modified: events/src/EventPort.i3
 * modified: events/src/EventPort.m3
 * modified: events/src/EventProtocol.i3
 * modified: events/src/EventRd.i3
 * modified: events/src/EventRd.m3
 * modified: events/src/EventSpaceID.i3
 * modified: events/src/EventSpaceID.m3
 * modified: events/src/EventStubLib.i3
 * modified: events/src/EventStubLib.m3
 * modified: events/src/EventWireRep.i3
 * modified: events/src/EventWireRep.m3
 * modified: events/src/EventWr.i3
 * modified: events/src/EventWr.m3
 * modified: events/src/EventWrF.i3
 * modified: events/src/HostInfo.i3
 * modified: events/src/HostInfo.m3
 * modified: events/src/RdWrMutex.i3
 * modified: events/src/RdWrMutex.m3
 * modified: events/src/Work.i3
 * modified: events/src/WorkerPool.i3
 * modified: events/src/WorkerPool.m3
 * modified: events/src/Zombie.i3
 * modified: events/src/m3makefile
 * modified: events/src/m3overrides
 *
 * Revision 1.1.1.1  2001/12/02 00:06:45  wagner
 * Blair MacIntyre's events library
 *
 * Revision 1.3  1997/08/04 20:15:08  bm
 * Fixed BRANDs
 *
 * Revision 1.2  1997/01/23 15:26:35  bm
 * Lots of little bug fixes.
 *
 * 
 * HISTORY
 * We use a Priority Queue to hold the blocked threads.  Each thread
 * has its own entry in the queue, and its own condition variable.
 * Threads blocked because of Acquire use priority value, wait := FALSE.
 * Threads blocked because of Wait use priority value+1, wait := TRUE.
 * Equal priorities are ordered with wait before non-wait.  
 * Thus, they don't get in each others way, and are obtained in the
 * correct order.
 

MODULE EventCounter;

IMPORT Thread, EventPQ, EventPQRep, EventNumber, Fmt, Process, Text,
       RdWrMutex;
IMPORT IO;
TYPE
  ThreadWaiting = EventPQ.Elt OBJECT cv: Thread.Condition;  END;
  EventWaiting = EventPQ.Elt OBJECT handler: Handler  END;

REVEAL
  T = Public BRANDED "EventCounter.T" OBJECT
        locker : Thread.T;
        mu     : RdWrMutex.T;
        wmu    : Thread.Mutex;
        val    : EventNumWait;
        waiting: EventPQ.Default;
      OVERRIDES
        init    := Init;
        tryAcquire := TryAcquire;
        enqueueAction := EnqueueAction;
        acquire := Acquire;
        release := Release;
        wait    := Wait;
        value   := Value;
        set     := Set;
      END;

TYPE
  EventNumWait = EventNumber.T OBJECT
    wait: BOOLEAN := FALSE;
  OVERRIDES
    compare := Compare;
    fmt := Format;
  END;

PROCEDURE Compare(self: EventNumWait; en: EventNumber.T): [-1..1] =
  VAR cmp := EventNumber.Compare(self, en);
  BEGIN
    (* IO.Put("Comparing " & self.fmt() & " and " & en.fmt() & "\n"); *)
    (* If the EventNumber part is different, they are different! *)
    IF cmp # 0 THEN
      RETURN cmp;
    END;
    TYPECASE en OF
    | EventNumWait(p2) =>
      (* "wait" is greater than non-"wait" values. *)
      IF self.wait = p2.wait THEN
        RETURN 0;
      ELSIF self.wait THEN
        RETURN -1;
      ELSE
        RETURN 1;
      END;
    ELSE
      (* "wait" is greater than normal EventNumber.T, non-"wait" is
         equal. *)
      IF self.wait THEN
        RETURN -1;
      ELSE
        RETURN 0;
      END;
    END;
  END Compare;

PROCEDURE Format(self: EventNumWait; base: Fmt.Base): Text.T =
  BEGIN
    RETURN EventNumber.T.fmt(self, base) & "." & Fmt.Bool(self.wait);
  END Format;

PROCEDURE Init (self: T; mu: RdWrMutex.T; value: EventNumber.T): T =
  BEGIN
    self.val := NEW(EventNumWait, wait := FALSE).init(value);
    self.waiting := NEW(EventPQ.Default).init();
    self.mu := mu;
    self.wmu := NEW(Thread.Mutex);
    self.locker := NIL;
    RETURN self;
  END Init;

PROCEDURE New(mu: RdWrMutex.T; t: EventNumber.T): T =
  BEGIN
    RETURN NEW(T).init(mu, t);
  END New;

PROCEDURE TryAcquire (self: T; value: EventNumber.T): BOOLEAN
  RAISES {Duplicate} =
  BEGIN
    LOCK self.wmu DO
      (* Make sure we don't try to recursively grab the counter. *)
      IF self.locker = Thread.Self() THEN
        Process.Crash(
          "Thread #" (* & Fmt.Int(ThreadF.MyId()) *)
            & " is trying to acquire an event counter it already holds.\n");
      END;
      (* If the counter has already passed this value, raise Duplicate. *)
      CASE value.compare(self.val) OF
      | -1 => RAISE Duplicate;
      |  1 => RETURN FALSE;
      ELSE (* drop through *)
      END;
    END;

    self.mu.acquireWrite();
    Thread.Acquire(self.wmu);
    (* Need to check again after acquiring the lock.
       If the counter has already passed this value, raise Duplicate. *)
    IF value.compare(self.val) = -1 THEN
      Thread.Release(self.wmu);
      self.mu.releaseWrite();
      RAISE Duplicate;
    END;
    self.locker := Thread.Self();
    Thread.Release(self.wmu);
    RETURN TRUE;
  END TryAcquire;

PROCEDURE Acquire (self: T; value: EventNumber.T) RAISES {Duplicate} =
  BEGIN
    LOCK self.wmu DO
      (* Make sure we don't try to recursively grab the counter. *)
      IF self.locker = Thread.Self() THEN
        Process.Crash(
          "Thread #" (* & Fmt.Int(ThreadF.MyId()) *)
            & " is trying to acquire an event counter it already holds.\n");
      END;
      (* If the counter has already passed this value, raise Duplicate. *)
      IF value.compare(self.val) = -1 THEN
        RAISE Duplicate;
      END;
    END;

    self.mu.acquireWrite();
    Thread.Acquire(self.wmu);
    (* Need to check again after acquiring the lock.
       If the counter has already passed this value, raise Duplicate. *)
    IF value.compare(self.val) = -1 THEN
      Thread.Release(self.wmu);
      self.mu.releaseWrite();
      RAISE Duplicate;
    ELSIF value.compare(self.val) = 1 THEN
      WITH newSleeper = NEW(ThreadWaiting, cv := NEW(Thread.Condition),
                            priority := NEW(EventNumWait,
                                            wait := FALSE).init(value)) DO
        self.waiting.insert(newSleeper);
        self.mu.wait(self.wmu, newSleeper.cv);

        (* If multiple threads block trying to acquire the same value, one
           of them will return from wait first and eventually release the
           counter (incrementing it).  All others will see self.val >
           value when they eventually get to run.  These release the lock
           and raise Duplicate. *)
        IF value.compare(self.val) = 0 THEN
          self.locker := Thread.Self();
          Thread.Release(self.wmu);
          RETURN;
        END;
        Thread.Release(self.wmu);
        self.mu.releaseWrite();
        RAISE Duplicate;
      END;
    END;
    self.locker := Thread.Self();
    Thread.Release(self.wmu);
  END Acquire;
The algorithm is this: Acquire self.wmu. Now, check the eventnumber. - If we could run right now, release the lock, try to Acquire the counter, execute the update, Release the counter and return. - If not, enqueue it.
PROCEDURE EnqueueAction(self: T; value: EventNumber.T;
                        handler: Handler) RAISES {Duplicate} =
  BEGIN
    LOCK self.wmu DO
      IF self.locker = Thread.Self() THEN
        Process.Crash("Thread #" (* & Fmt.Int(ThreadF.MyId()) *)
            & " is trying to wait for an event counter it already holds.\n");
      END;
      CASE value.compare(self.val) OF
      | -1 => RAISE Duplicate;
      |  1 =>
        (* we really can't execute this action yet, so enqueue it *)
        WITH newEvent = NEW(EventWaiting, handler := handler,
                            priority := NEW(EventNumWait,
                                            wait := FALSE).init(value)) DO
          self.waiting.insert(newEvent);
        END;
        RETURN;
      ELSE (* drop through *)
      END;
    END;

    (* We are actually next in line, so just try to acquire the lock
       and execute the event *)
    Acquire(self, value);
    handler.handle();
    Release(self);
  END EnqueueAction;

PROCEDURE Wait (self: T; value: EventNumber.T) =
  BEGIN
    LOCK self.wmu DO
      (* Make sure only the owner releases the counter. *)
      IF self.locker = Thread.Self() THEN
        Process.Crash(
          "Thread #" (* & Fmt.Int(ThreadF.MyId()) *)
            & " is trying to wait for an event counter it already holds.\n");
      END;
      IF value.compare(self.val) >= 0 THEN
        WITH newSleeper = NEW(ThreadWaiting, cv := NEW(Thread.Condition),
                              priority := NEW(EventNumWait,
                                              wait := TRUE).init(value)) DO
          (* Increment this so it comes after the blocked sleepers trying
             to acquire the lock. *)
          TRY
            newSleeper.priority.inc();
          EXCEPT
          | EventNumber.Overflow =>
            Process.Crash("EventCounter overflowed on wait().");
          END;
          self.waiting.insert(newSleeper);
          (* IO.Put("EC.Wait(" & Fmt.Int(value) & ") blocked\n"); *)
          Thread.Wait(self.wmu, newSleeper.cv);
        END;
      END;
    END;
  END Wait;
Advance as far through the queue as possible, waking up waiters and anyone attempting to acquire the lock, and executing actions.
PROCEDURE AdvanceQueue(self: T): ThreadWaiting =
  VAR
    sleeper   : ThreadWaiting := NIL;
  BEGIN
    LOOP
      IF self.waiting.size() = 0 THEN RETURN sleeper END;

      WITH minElt = NARROW(self.waiting.min(), EventPQ.Elt) DO
        IF minElt.priority.compare(self.val) > 0 THEN
          RETURN sleeper;
        END;

        TYPECASE self.waiting.deleteMin() OF
        | ThreadWaiting(minSleeper) =>
          IF sleeper = NIL THEN
            (* Wake this one up after we leave loop. *)
            sleeper := minSleeper;
          ELSE
            (* Wake up any more. *)
            Thread.Signal(minSleeper.cv);
          END;
        | EventWaiting(minEvent) =>
          (* If this event is the current one we are waiting for,
             process it.  If we've already hit this number, then
             call the duplicate method. *)
          IF minEvent.priority.compare(self.val) = 0 THEN
            (* an event we can process! *)
            minEvent.handler.handle();

            (* after processing, increment our counter *)
            TRY
              self.val.inc();
            EXCEPT
            | EventNumber.Overflow =>
              Process.Crash("EventCounter overflowed on release().");
            END;

            (* If we had a sleeper, wake it since it's now going to
               just raise Duplicate when it's turn comes, and we may
               want to give someone else a chance to be the sleeper *)
            IF sleeper # NIL THEN
              Thread.Signal(sleeper.cv);
              sleeper := NIL;
            END;
          ELSE
            minEvent.handler.duplicate();
          END;
        ELSE
          <*ASSERT FALSE*>
        END;
      END;
    END;
  END AdvanceQueue;
These should never happen.
<* FATAL EventPQ.Empty *>

PROCEDURE Release (self: T) =
  VAR
    sleeper   : ThreadWaiting := NIL;
  BEGIN
    LOCK self.wmu DO
      (* Make sure only the owner releases the counter. *)
      IF self.locker = NIL THEN
        Process.Crash(
          "Thread #" (* & Fmt.Int(ThreadF.MyId()) *)
            & " is trying to release an event counter that is not held.\n");
      END;

      (* Increment the value of the event counter. *)
      TRY
        self.val.inc();
      EXCEPT
      | EventNumber.Overflow =>
        Process.Crash("EventCounter overflowed on release().");
      END;
      sleeper := AdvanceQueue(self);

      self.locker := NIL;
    END;

    (* Release the lock, signal a sleeper if one should wake. *)
    self.mu.releaseWrite();
    IF sleeper # NIL THEN Thread.Signal(sleeper.cv); END;
  END Release;

PROCEDURE Set (self: T; val: EventNumber.T) RAISES {Invalid} =
  BEGIN
    LOCK self.wmu DO
      (* Make sure only the owner releases the counter. *)
      IF self.locker = Thread.Self() THEN
        Process.Crash(
          "Thread #" (* & Fmt.Int(ThreadF.MyId()) *)
            & " is trying to set an event counter it already holds.\n");
      END;
    END;
    self.mu.acquireWrite();
    Thread.Acquire(self.wmu);
    (* If the counter is at, or has already passed, this value, raise
       Invalid. *)
    IF val.compare(self.val) <= 0 THEN
      Thread.Release(self.wmu);
      self.mu.releaseWrite();
      RAISE Invalid;
    END;

    (* Set the value of the event counter minus 1, so release will
       leave it at the correct value. *)
    TRY
      self.val := NEW(EventNumWait, wait := FALSE).init(val);
      self.val.dec();
    EXCEPT
    | EventNumber.Overflow => RAISE Invalid;
    END;

    self.locker := Thread.Self();
    Thread.Release(self.wmu);

    (*  we used to lock, set and then unlock.  We now treat set
        similarly to acquire, and require that the programmer call
        release as well.  This stuff is no longer needed:
    sleeper := AdvanceQueue(self);

    Thread.Release(self.wmu);
    self.mu.releaseWrite();
    IF sleeper # NIL THEN Thread.Signal(sleeper.cv); END;
    *)
  END Set;

PROCEDURE Value(self: T): EventNumber.T =
  BEGIN
    LOCK self.wmu DO
      RETURN NEW(EventNumber.T).init(self.val);
    END;
  END Value;

PROCEDURE DefaultHandlerHandle(<*UNUSED*>self: Handler) =
  BEGIN
  END DefaultHandlerHandle;

PROCEDURE DefaultHandlerDuplicate(<*UNUSED*>self: Handler) =
  BEGIN
  END DefaultHandlerDuplicate;

PROCEDURE ToText(self: T): TEXT =
  VAR t := "{";
  BEGIN
    LOCK self.wmu DO
      IF self.locker # NIL THEN t := t & "(locked)" END;
      t := t & "curr=" & self.val.fmt(10) & ",";
      IF self.waiting.size() = 0 THEN
        t := t & "<none blocked>";
      ELSE
        WITH minElt = NARROW(self.waiting.min(), EventPQ.Elt) DO
          t := t & "queue=" & minElt.priority.fmt(10) & "+" &
                   Fmt.Int(self.waiting.size()) & "[";
          WITH arr = self.waiting.heap DO
            FOR i := 1 TO self.waiting.sz DO
              TYPECASE arr[i] OF
              | ThreadWaiting(tw) =>
                t := t & "waiting(" & tw.priority.fmt(10) & ")";
              | EventWaiting(tw) =>
                t := t & "event(" & tw.priority.fmt(10) & ")";
              ELSE <*ASSERT FALSE*>
              END;
            END;
          END;
        END;
      END;
    END;
    RETURN t & "}";
  END ToText;

BEGIN
END EventCounter.