Saturday 2 February 2013

Session state in Scala,1. Immutable Session State

I had recently knocked out an authorisation and session state manager in Scala for a solution I'm working on.   The problem was that it looked more like Java then Scala, so I am revisiting it with an eye to making it as functional as possible.

The first piece of work has been to create a data structure for handling session keys and their expiry. I built this using an immutable queue and an immutable hash-map. I was unable to find a satisfactory implementation of an immutable heap for use as a priority queue, perhaps they are just not that practical for use in an immutable context. The consequence is that the state may hold expired sessions in memory for longer than necessary, however no longer then twice the expiry time period.

I have developed this in Scala 2.10

trait SessionState[K,T] {
  /**
   * Expiry time of a session in milliseconds
   * @return
   */
  def expiry:Int

  /**
   * Returns the corresponding tag for the sessionkey
   * @param sessionKey
   * @param datetime current datetime in milliseconds
   * @return
   */
  def getSessionTag(sessionKey:K)(implicit datetime:Long):Try[T]

  /**
   * Rejuvenates the session increasing its expiry time to the datetime + the expiry time
   * @param sessionKey
   * @param datetime current datetime in milliseconds
   * @return Success if session has not already expired, otherwise SessionExpiredException
   */
  def refreshSession(sessionKey:K)(implicit datetime:Long):Try[SessionState[K,T]]

  /**
   * Adds a session key and tag
   * @param sessionKey
   * @param tag
   * @param datetime current datetime in milliseconds
   * @return New SessionState if successfull, otherwise SessionAlreadyExistsException
   */
  def addSession(sessionKey:K, tag:T)(implicit datetime:Long):Try[SessionState[K,T]]

  /**
   * Adds a session key and tag
   * @param sessionKey
   * @param tag
   * @param datetime current datetime in milliseconds
   * @return New SessionState if successfull, otherwise SessionAlreadyExistsException
   */
  def +(sessionKey:K, tag:T)(implicit datetime:Long):Try[SessionState[K,T]] = addSession(sessionKey, tag, datetime)

  /**
   * Removes the session key if found
   * @param sessionKey
   * @param datetime
   * @return New SessionState with the session key removed
   */
  def expireSession(sessionKey:K)(implicit datetime:Long):SessionState[K,T]

  /**
   * Removes the session key if found
   * @param sessionKey
   * @param datetime
   * @return New SessionState with the session key removed
   */
  def -(sessionKey:K)(implicit datetime:Long):SessionState[K,T] = expireSession(sessionKey)
}


For the implicit parameter I just used

implicit def currentTime:Long = new Date().getTime


and for the implementation I have
object SessionState {

  def apply[K,T](expiry:Int):SessionState[K, T] = SessionStateInstance[K, T](expiry, Queue.empty, Map.empty)
}

private case class SessionStateInstance[K, T](expiry:Int, sessionQueue:Queue[(K, Long)], keysExpireAndTag:Map[K,(T,Long)]) extends SessionState[K,T] {

  def getSessionTag(sessionKey:K)(implicit datetime:Long):Try[T] =
    keysExpireAndTag.get(sessionKey) collect {
      case (tag, expiry) if (expiry > datetime) => Success(tag)
    } getOrElse(Failure(SessionExpiredException))

  def refreshSession(sessionKey:K)(implicit datetime:Long):Try[SessionState[K,T]] =
    keysExpireAndTag.get(sessionKey) collect {
      case (tag, expiry) if (expiry > datetime) => {
        val cleared = clearedExpiredSessions(datetime)
        Success(SessionStateInstance(this.expiry, cleared.sessionQueue, cleared.keysExpireAndTag + (sessionKey -> (tag, expiry))))
      }
    } getOrElse(Failure(SessionExpiredException))

  def addSession(sessionKey:K, tag:T)(implicit datetime:Long):Try[SessionState[K,T]] =
    keysExpireAndTag.get(sessionKey) collect {
      case (tag, expiry) if (expiry > datetime) => Failure(SessionAlreadyExistsException)
    } getOrElse {
      val cleared = clearedExpiredSessions(datetime)
      Success(SessionStateInstance(this.expiry, cleared.sessionQueue.enqueue((sessionKey, datetime + expiry)), cleared.keysExpireAndTag + (sessionKey -> (tag, datetime + expiry))))
    }

  def expireSession(sessionKey:K)(implicit datetime:Long):SessionState[K,T] = {
    val cleared = clearedExpiredSessions(datetime)
    if (cleared.keysExpireAndTag.contains(sessionKey)) SessionStateInstance(this.expiry, cleared.sessionQueue, cleared.keysExpireAndTag - sessionKey)
    else cleared
  }

  private def clearedExpiredSessions(datetime:Long):SessionStateInstance[K,T] = clearedExpiredSessions(datetime, sessionQueue, keysExpireAndTag)
  private def clearedExpiredSessions(datetime:Long, sessionQueue:Queue[(K, Long)], keysExpireAndTag:Map[K,(T,Long)]):SessionStateInstance[K,T] = {
    sessionQueue.headOption collect {
      case (key, expiry) if (expiry < datetime) => keysExpireAndTag.get(key) map {
        case (tag, expiry) if (expiry < datetime) => clearedExpiredSessions(datetime, sessionQueue.dequeue._2, keysExpireAndTag - key)
        case (tag, expiry)                        => clearedExpiredSessions(datetime, sessionQueue.dequeue._2.enqueue((key, expiry)), keysExpireAndTag)
      } getOrElse(clearedExpiredSessions(datetime, sessionQueue.dequeue._2, keysExpireAndTag))
    } getOrElse (SessionStateInstance(this.expiry, sessionQueue, keysExpireAndTag))
  }
}

case object SessionAlreadyExistsException extends Exception("Session key already exists")
case object SessionExpiredException extends Exception("Session key has already expired")

12 comments:

  1. I got a compile error on 2.10 and thought that it would benefit from adding a main method which let people explore how it worked as a demo. The gist is over at https://gist.github.com/simbo1905/11351545

    ReplyDelete
  2. Also its nice to write durations rather than integers with:

    import scala.concurrent.duration._
    @volatile var sessions: SessionState[String, DiscoveryInformation] = SessionState(1 minute)

    this can be done by either overloading or replacing the apply in SessionState such as

    object SessionState {
    def apply[K, T](duration: FiniteDuration): SessionState[K, T] = SessionStateInstance[K, T](duration.toMillis.toInt, Queue.empty, Map.empty)
    }

    ReplyDelete
  3. The documentation for refresh says it refreshes any session by extending it. The implemention code returns failure unless the session has already expired. Looks like a bug.

    ReplyDelete
    Replies
    1. Doh! My bad scrolling around on a phone. The equality check is correct it only refreshed sessions which have not yet expired.

      Delete
  4. It would seem natural for a user session that regular get/set of values into the session would extend the session due to user activity. Only admin or monitoring logic would want to get or set the session without refreshing it. The current refresh only refreshed expired sessions. Normally there is no reason to do that the user should have to login again and no admin command should exist to refresh it.

    ReplyDelete
    Replies
    1. My remark about refresh was incorrect. Yet it would still seem natural to have a "get and refresh" as well as a "set and refresh" which were atomic. The getAndRefresh could return a try of a truple of the refreshed sessions and the value. For consistence the setAndRefresh could return the a try of a types of the refreshed session and the value put.

      Delete
  5. Why return a Try(SessionExpiredException) when trying to get a key which was never added? Would it not be more natural to return Option[T] for any access. Then if it were not there or had expired return None else return Some(t). The Try(SessionAlreadyExistsException) makes sense for any update. When trying to refresh a session which had expired or never been added then a Try(SessionNotFound) makes more sense.

    Sorry for the long list of feedback but I am actually test driving the code and am making these refactors. I will blog the derived work when I have finished. Thanks!

    ReplyDelete
    Replies
    1. The use of SessionExpiredException on getting a key which never existed arose from the idea that it would go to limit attempts at key spoofing, if you coudlnt distinguish between expired or non-existent. Option[T] would be fine, just to me the logic of the system feels that its intrinsically a failure to not get back an active session, when a sessionkey or token is passed.

      Delete
  6. Cool, glad you're finding it useful. I have done a re-factor in my production code as well. Some core decisions I have made is the use of FiniteDuration as well as the removal of the use of Try. Instead I now use the scalaz.Validation, as a bad key is not really an exception but rather a data issue (In fact I've pretty much ditched the use of Try anywhere).

    case class TokenExpiredFailure(tokenPart:String)

    case class TokenSupersededFailure(tokenPart:String)

    ReplyDelete
  7. Thanks for your blog it was just what I needed to get me thinking about functional programming of session state.

    Here is the gist of what I finally evolved things into with a unit test spec and a command-line demo https://gist.github.com/simbo1905/11351545

    I went with having the apply method get the value, refresh the expiry if found, and expiring old keys in one invocation. I think that is the most natural operation as the session should not expire if the user keeps on accessing their data. So that method returns a Tuple2 of the revised session set and an Option[V] of the value sought.

    The refresh or a duplicate session will throw to indicate that it didn't happen else you could be losing data or think you have extended a user when they have already expired.

    The rest of the API uses Option[V] to indicate that a value it wasn't there. Whether it was never there or expired is not something that the API reveals but I think thats not a problem as either they were never logged in or their login as expired its usually the same next steps.

    Many thanks.

    ReplyDelete
  8. Things speeded up 20x by replacing the use of Queue with Vector which is another immutable data structure. The dequeue and enqueue of the Queue seem to be a lot slower than v.drop(1) and v:+(a,b) so I would recommend that you upgrade to a Vector.

    ReplyDelete
  9. Here is my fork of your idea http://simbo1905.wordpress.com/2014/05/03/immutable-session-state-in-scala/ thanks very much for your inspiring work!

    ReplyDelete