Tuesday 5 February 2013

Scala Expiration Map

Following from designing a functional session state, I have generalised it down to a simple map. It doesn't inherit the Map trait, but possibly could be extended in that way if required. Again, it uses an immutable queue rather than a heap, to keep it functional. This does mean if key values are not added in chronological order, the memory releasing will be less efficient
import collection.immutable.Queue

trait ExpirationMap[A, +B] {

  def expirationInterval:Long

  def apply(key:A)(implicit current:Long):B

  def get(key: A)(implicit current:Long): Option[B]

  def iterator(implicit current:Long): Iterator[(A, B)]

  def toMap(implicit current:Long):Map[A, B]

  def +[B1 >: B](kv: (A, B1))(implicit current:Long): ExpirationMap[A, B1]

  def -(key: A)(implicit current:Long): ExpirationMap[A, B]

  def mapValues[C](f: (B) ⇒ C)(implicit current:Long): ExpirationMap[A, C]
}

object ExpirationMap {

  def apply[A, B](expirationInterval:Long):ExpirationMap[A, B] = ExpirationMapImpl[A, B](expirationInterval, Queue.empty, Map.empty)
}

private case class ExpirationMapImpl[A, +B](expirationInterval:Long, expirationQueue:Queue[(A, Long)], keyValueExpiry:Map[A,(B,Long)]) extends ExpirationMap[A, B] {

  def apply(key:A)(implicit current:Long):B =
    get(key).get

  def get(key:A)(implicit datetime:Long):Option[B] =
    keyValueExpiry.get(key) collect {
      case (value, expiry) if (expiry > datetime) => value
    }

  def iterator(implicit current:Long): Iterator[(A, B)] = keyValueExpiry.iterator.filter(_._2._2 > current).map(p => (p._1, p._2._1))

  def toMap(implicit current:Long):Map[A, B] = keyValueExpiry.filter(_._2._2 > current).mapValues(_._1)

  def +[B1 >: B](kv: (A, B1))(implicit current:Long): ExpirationMap[A, B1] = {
    val cleared = clearedExpired(current)
    val newQueue =
      if (cleared.keyValueExpiry.contains(kv._1)) cleared.expirationQueue
      else cleared.expirationQueue.enqueue((kv._1, current + expirationInterval))
    val newMap = cleared.keyValueExpiry + (kv._1 -> (kv._2, current + expirationInterval))
    ExpirationMapImpl(this.expirationInterval, newQueue, newMap)
  }

  def -(key: A)(implicit current:Long): ExpirationMap[A, B] = {
    val cleared = clearedExpired(current)
    if (cleared.keyValueExpiry.contains(key)) ExpirationMapImpl(this.expirationInterval, cleared.expirationQueue, cleared.keyValueExpiry - key)
    else cleared
  }

  def mapValues[C](f: (B) ⇒ C)(implicit current:Long): ExpirationMap[A, C] = {
    val cleared = clearedExpired(current)
    ExpirationMapImpl(this.expirationInterval, cleared.expirationQueue, cleared.keyValueExpiry.mapValues(v => (f(v._1), v._2)))
  }

  private def clearedExpired(current:Long):ExpirationMapImpl[A, B] = clearedExpired(current, expirationQueue, keyValueExpiry)
  private def clearedExpired[C >: B](current:Long, expirationQueue:Queue[(A, Long)], keyValueExpiry:Map[A,(C, Long)]):ExpirationMapImpl[A, C] = {
    expirationQueue.headOption collect {
      case (key, expiry) if (expiry < current) => keyValueExpiry.get(key) map {
        case (_, expiry) if (expiry < current)  => clearedExpired(current, expirationQueue.dequeue._2, keyValueExpiry - key)
        case _                                  => clearedExpired(current, expirationQueue.dequeue._2.enqueue((key, expiry)), keyValueExpiry)
      } getOrElse(clearedExpired(current, expirationQueue.dequeue._2, keyValueExpiry))
    } getOrElse (ExpirationMapImpl(this.expirationInterval, expirationQueue, keyValueExpiry))
  }
}
It also requires something like this for the implicit time variable
implicit def currentTime:Long = new Date().getTime

Saturday 2 February 2013

Session state in Scala,1. Immutable Session State

I had recently knocked out an authorisation and session state manager in Scala for a solution I'm working on.   The problem was that it looked more like Java then Scala, so I am revisiting it with an eye to making it as functional as possible.

The first piece of work has been to create a data structure for handling session keys and their expiry. I built this using an immutable queue and an immutable hash-map. I was unable to find a satisfactory implementation of an immutable heap for use as a priority queue, perhaps they are just not that practical for use in an immutable context. The consequence is that the state may hold expired sessions in memory for longer than necessary, however no longer then twice the expiry time period.

I have developed this in Scala 2.10

trait SessionState[K,T] {
  /**
   * Expiry time of a session in milliseconds
   * @return
   */
  def expiry:Int

  /**
   * Returns the corresponding tag for the sessionkey
   * @param sessionKey
   * @param datetime current datetime in milliseconds
   * @return
   */
  def getSessionTag(sessionKey:K)(implicit datetime:Long):Try[T]

  /**
   * Rejuvenates the session increasing its expiry time to the datetime + the expiry time
   * @param sessionKey
   * @param datetime current datetime in milliseconds
   * @return Success if session has not already expired, otherwise SessionExpiredException
   */
  def refreshSession(sessionKey:K)(implicit datetime:Long):Try[SessionState[K,T]]

  /**
   * Adds a session key and tag
   * @param sessionKey
   * @param tag
   * @param datetime current datetime in milliseconds
   * @return New SessionState if successfull, otherwise SessionAlreadyExistsException
   */
  def addSession(sessionKey:K, tag:T)(implicit datetime:Long):Try[SessionState[K,T]]

  /**
   * Adds a session key and tag
   * @param sessionKey
   * @param tag
   * @param datetime current datetime in milliseconds
   * @return New SessionState if successfull, otherwise SessionAlreadyExistsException
   */
  def +(sessionKey:K, tag:T)(implicit datetime:Long):Try[SessionState[K,T]] = addSession(sessionKey, tag, datetime)

  /**
   * Removes the session key if found
   * @param sessionKey
   * @param datetime
   * @return New SessionState with the session key removed
   */
  def expireSession(sessionKey:K)(implicit datetime:Long):SessionState[K,T]

  /**
   * Removes the session key if found
   * @param sessionKey
   * @param datetime
   * @return New SessionState with the session key removed
   */
  def -(sessionKey:K)(implicit datetime:Long):SessionState[K,T] = expireSession(sessionKey)
}


For the implicit parameter I just used

implicit def currentTime:Long = new Date().getTime


and for the implementation I have
object SessionState {

  def apply[K,T](expiry:Int):SessionState[K, T] = SessionStateInstance[K, T](expiry, Queue.empty, Map.empty)
}

private case class SessionStateInstance[K, T](expiry:Int, sessionQueue:Queue[(K, Long)], keysExpireAndTag:Map[K,(T,Long)]) extends SessionState[K,T] {

  def getSessionTag(sessionKey:K)(implicit datetime:Long):Try[T] =
    keysExpireAndTag.get(sessionKey) collect {
      case (tag, expiry) if (expiry > datetime) => Success(tag)
    } getOrElse(Failure(SessionExpiredException))

  def refreshSession(sessionKey:K)(implicit datetime:Long):Try[SessionState[K,T]] =
    keysExpireAndTag.get(sessionKey) collect {
      case (tag, expiry) if (expiry > datetime) => {
        val cleared = clearedExpiredSessions(datetime)
        Success(SessionStateInstance(this.expiry, cleared.sessionQueue, cleared.keysExpireAndTag + (sessionKey -> (tag, expiry))))
      }
    } getOrElse(Failure(SessionExpiredException))

  def addSession(sessionKey:K, tag:T)(implicit datetime:Long):Try[SessionState[K,T]] =
    keysExpireAndTag.get(sessionKey) collect {
      case (tag, expiry) if (expiry > datetime) => Failure(SessionAlreadyExistsException)
    } getOrElse {
      val cleared = clearedExpiredSessions(datetime)
      Success(SessionStateInstance(this.expiry, cleared.sessionQueue.enqueue((sessionKey, datetime + expiry)), cleared.keysExpireAndTag + (sessionKey -> (tag, datetime + expiry))))
    }

  def expireSession(sessionKey:K)(implicit datetime:Long):SessionState[K,T] = {
    val cleared = clearedExpiredSessions(datetime)
    if (cleared.keysExpireAndTag.contains(sessionKey)) SessionStateInstance(this.expiry, cleared.sessionQueue, cleared.keysExpireAndTag - sessionKey)
    else cleared
  }

  private def clearedExpiredSessions(datetime:Long):SessionStateInstance[K,T] = clearedExpiredSessions(datetime, sessionQueue, keysExpireAndTag)
  private def clearedExpiredSessions(datetime:Long, sessionQueue:Queue[(K, Long)], keysExpireAndTag:Map[K,(T,Long)]):SessionStateInstance[K,T] = {
    sessionQueue.headOption collect {
      case (key, expiry) if (expiry < datetime) => keysExpireAndTag.get(key) map {
        case (tag, expiry) if (expiry < datetime) => clearedExpiredSessions(datetime, sessionQueue.dequeue._2, keysExpireAndTag - key)
        case (tag, expiry)                        => clearedExpiredSessions(datetime, sessionQueue.dequeue._2.enqueue((key, expiry)), keysExpireAndTag)
      } getOrElse(clearedExpiredSessions(datetime, sessionQueue.dequeue._2, keysExpireAndTag))
    } getOrElse (SessionStateInstance(this.expiry, sessionQueue, keysExpireAndTag))
  }
}

case object SessionAlreadyExistsException extends Exception("Session key already exists")
case object SessionExpiredException extends Exception("Session key has already expired")