Monday 8 April 2013

Extended Joda DateTime Formatter to format by Quarter

This is just a simple wrapper for the Joda DateTimeFormatter which allows the formatting of quarter as follows:

 Q = 2
 QQ = 02
 QQQ = Q2
 QQQQ = second quarter

I have only hard-coded the English for this, and have only ported the formatter functions I needed.


import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
import org.joda.time.{DateTimeZone, DateTime}

object ExtendedDateTimeFormat {
  val regex =  """^([^Q]*)(|Q{1,4})([^Q]*)$""".r

  def forPattern(pattern:String) = pattern match {
    case ExtendedDateTimeFormat.regex(beginning, quarter, end) => ExtendedDateTimeFormatter(
      DateTimeZone.getDefault,
      if (beginning.isEmpty) None else Some(DateTimeFormat.forPattern(beginning)),
      if (quarter.isEmpty) None else Some(quarter.length),
      if (end.isEmpty) None else Some(DateTimeFormat.forPattern(end))
    )
    case _ => throw new Exception("Invalid dateFormat")
  }
}

case class ExtendedDateTimeFormatter(dateTimeZone:DateTimeZone, s:Option[DateTimeFormatter], q:Option[Int], e:Option[DateTimeFormatter]) {

  def print(dt:Long):String = print(new DateTime(dt, dateTimeZone))
  def print(dt:DateTime):String =
    s.map(_.print(dt)).getOrElse("") + q.map{
      case 1 => getQuarter(dt).toString
      case 2 => "0" + getQuarter(dt)
      case 3 => "Q" + getQuarter(dt)
      case 4 => getQuarter(dt) match {
        case 1 => "first quarter"
        case 2 => "second quarter"
        case 3 => "third quarter"
        case 4 => "fourth quarter"
      }
    }.getOrElse("") + e.map(_.print(dt)).getOrElse("")

  def getQuarter(dt:DateTime) = dt.withZone(dateTimeZone).getMonthOfYear / 3  + 1

  def withZone(newDateTimeZone:DateTimeZone) = ExtendedDateTimeFormatter(newDateTimeZone, s.map(_.withZone(newDateTimeZone)), q, e.map(_.withZone(newDateTimeZone)))
}

Saturday 16 March 2013

CQRS in Scala: Commands

I have been working with the CQRS pattern in C# for a while, and about a year ago have been implementing it in Scala.  Over the last year I have been simplifying down my approach. Please, any comments, suggestions or criticism are most welcome. It assumes knowledge of the principles of CQRS.  You might also find this post helpful 

The following implementation does not yet take into account events, however this will be covered.

First we have a trait to mark a command
trait Command


Individual command will implement this trait, such as

case class NewUserCommand(string:Login, string:Password) extends Command


For each command there should be a handler which will extend the following:

trait Handler[T <: Command] {
  def handle(command:T):Unit
}

The command handlers I use follow the cake pattern, and have state wired in.  For example

trait NewUserCommandHandler extends Handler[NewUserCommand] {
  this:SecurityService with PersistencyService =>

  def handle(command:NewUserCommand) { ... }
}

In this example state is a document cache required by the PersistencyService:

trait DocumentPersistencyService extends PersistencyService {
  def state:DocumentCache

  ...

I have created a registry for holding the constructor of the command handlers.  Here state is a type parameter, though I don't think that's entirely necessary. The registry is mutable only until first use, I have followed the same approach that I used here.

trait CommandRegistry[State] {
  def getHandler[T <: Command:TypeTag](implicit state:State):Handler[T]
}

trait CommandConstructorRegistry[State] extends CommandRegistry[State] {

  private val registryBuffer = new ListBuffer[(TypeSymbol, (State) => Handler[_])]
  private lazy val registry = registryBuffer.toMap

  def getHandler[T <: Command:TypeTag](implicit state:State):Handler[T] = registry(typeOf[T].typeSymbol.asType)(state).asInstanceOf[Handler[T]]

  def register[T <: Command:TypeTag](constructor:((State) => Handler[T])) {
    registryBuffer += ((typeOf[T].typeSymbol.asType, constructor))
  }
}

A command dispatcher wraps this all up, so that all commands can be issued via the dispatcher.

trait CommandDispatcher[State] {

  def dispatch[T <: Command:TypeTag](command:T)(implicit state:State):Unit
}


trait DefaultCommandDispatcher[State] extends CommandDispatcher[State] {
  this:CommandRegistry[State] =>
  def dispatch[T <: Command:TypeTag](command:T)(implicit state:State) {
    val handler = getHandler[T]
    handler.handle(command)
  }
}

So tying it all together looks something like this:

val commandDispatcher = new DefaultCommandDispatcher[DocumentCache] with CommandConstructorRegistry[DocumentCache] {
  register(s => new NewUserCommandHandler with DefaultSecurityService with DocumentPersistencyService { val state = s })
}

implicit val state = new DocumentCache()
val command = NewUserCommand("login","password")
commandDispatcher.dispatch(command)

You could also enrich the dispatcher through the use of mixins, say if you wanted to log the commands content:

trait CommandLogger[State] extends CommandDispatcher[State] {
  
  abstract override def dispatch[T <: Command:TypeTag](command:T)(implicit state:State) {
    println(command)
    super.dispatch(command)
  }
}

Thursday 7 March 2013

Scala 2.10 Simple DI Container

I have been finding the use of a container object still quite handy, despite use of the cake pattern.  Mostly I have used it to return singletons and dependency injection.  Using Scala 2.10 reflection, I have developed a simple container for my requirements. This has been handy in getting the basics of the new reflection libraries, in particular, generating an item from its constructor method.  My other goal was to make the container functional, hence transparent and immutable, I believe I pretty much achieved that, though am open to criticism.
So first the container trait:

trait Container {
  def get[T:TypeTag]:T

  def get[T:TypeTag](args:Any*):T
}

This methods return an instance of Type T, the second of which allows external dependencies to be injected.
Next is a trait for binding classes. The idea is that a list of types or instances will be registered for the containers use. The bind methods will attach all base classes of the registered type or instance as well, excluding Any or Object. If the same class or base class has already been registered, the latest bind will dominate. Bind occurs in the constructor, making the result effectively immutable.
By using the function bindAt, only the specified, or instance level class will be registered against the instance. So a binding trait extension looks something like this:

trait BoundElements extends Bindings {

  bind[ClassA]
  bind[ClassB]
  bind(new InstanceA()]
  bindAt[BaseClassB](new InstanceB()]
}

This is achieved by storing the bindings in private list buffers which are then access through a lazy evaluated immutables map. The bindings trait is as follows:

trait Bindings {
  private val bindingsBuffer = new ListBuffer[(TypeSymbol, (ClassSymbol, MethodSymbol))]
  private val instanceBuffer = new ListBuffer[(TypeSymbol, Any)]
  private val excluded:Set[Symbol] = Set(typeOf[Any].typeSymbol, typeOf[Object].typeSymbol)

  protected lazy val registry:Map[TypeSymbol, (ClassSymbol, MethodSymbol)] = bindingsBuffer.toMap
  protected lazy val instanceRegistry:Map[TypeSymbol, Any] = instanceBuffer.toMap

  protected def bind[T:TypeTag]() {
    val types = getBaseTypes[T]
    val ctor = getConstructor[T]
    bindingsBuffer ++= types.map((_ -> (typeOf[T].typeSymbol.asClass, ctor)))
  }

  protected def bind[T:TypeTag](instance:T) {
    val types = getBaseTypes[T]
    instanceBuffer ++= types.map((_ -> instance))
  }
  protected def bindAt[T:TypeTag](instance:T) {
    instanceBuffer += (typeOf[T].typeSymbol.asType -> instance)
  }

  private def getBaseTypes[T:TypeTag] = {
    val base = typeOf[T].baseClasses.map(_.asType)
    base.filter(!excluded.contains(_))
  }

  private def getConstructor[T:TypeTag] = typeOf[T].members.filter(_.isMethod).map(_.asMethod).filter(_.isConstructor).head
}

The next trait is the dependency injector, it requires bindings and extends the container trait.  In this case it will only use the first listed constructor. The Dependency injector constructs the required classes, or if they are already instances, provides them.

trait DependencyInjector extends Container {
  this:Bindings =>

  def get[T:TypeTag]:T = getInstance(typeOf[T].typeSymbol.asType).asInstanceOf[T]

  def get[T:TypeTag](args:Any*):T = getInstance(typeOf[T].typeSymbol.asType, args.toSet).asInstanceOf[T]

  protected def getInstance(typeSymbol:TypeSymbol, args:Set[Any] = Set.empty):Any =
    args.find(a => currentMirror.reflect(a).symbol.toType <:< typeSymbol.toType).getOrElse{
      instanceRegistry.getOrElse(typeSymbol, {
        val (clazz, ctor) = registry(typeSymbol)
        val ctorArgs = ctor.asMethod.paramss.head map (p => getInstance(p.typeSignature.typeSymbol.asType, args))
        currentMirror.reflectClass(clazz).reflectConstructor(ctor).apply(ctorArgs:_*)
      })
    }
}

Now all the bits are in place, you can create a container by mixing in bindings into the dependency injector. You can mix in any number of bindings, however latter bindings will override any previously registered classes if found. I have been told this goes against the spirit of mixins, which I kind of agree with, but I think it's still a nice way of doing the bindings.

So using the BoundElements trait you could create your container like:

object MyContainer extends DependencyInjector with BoundElements

And you would use it like

MyContainer.get[ClassA]
MyContainer.get[ClassB](new Date(), new InstanceC())

I am also using it to hold traits where I have used the cake pattern.  However this assumes the cake layers don't require any additional variable wiring, or if they do, it will be on a singleton instance.

trait BindCake extends Bindings {
  bind(new DefaultService with DefaultSecurity with DefaultLogging)
}

MyCakeContainer.get[Service]

It would be nice for it to be able to assemble the cake objects itself, but the only way I have seen that might be possible is using twitters eval. I'm open to any ideas.
Also, please feel free to suggest any better ways of handling the mutability of the bindings.

Another thing to note is that the container instance will only exist in the class library with created it, not, say a core library which holds the primary traits and is shared amongst all other libraries.

Edit:
I have found cases where an a get might not work if there isn't a clear inheritance line.  For example, if I registered say Map("value1" -> 123, "value2" -> "result"), and I performed a get with Map[String,Any], I would not get the map object returned.  This is where I would need to do a check against something like:
a => classTag[U].runtimeClass.isAssignableFrom(a.getClass)  

Tuesday 5 February 2013

Scala Expiration Map

Following from designing a functional session state, I have generalised it down to a simple map. It doesn't inherit the Map trait, but possibly could be extended in that way if required. Again, it uses an immutable queue rather than a heap, to keep it functional. This does mean if key values are not added in chronological order, the memory releasing will be less efficient
import collection.immutable.Queue

trait ExpirationMap[A, +B] {

  def expirationInterval:Long

  def apply(key:A)(implicit current:Long):B

  def get(key: A)(implicit current:Long): Option[B]

  def iterator(implicit current:Long): Iterator[(A, B)]

  def toMap(implicit current:Long):Map[A, B]

  def +[B1 >: B](kv: (A, B1))(implicit current:Long): ExpirationMap[A, B1]

  def -(key: A)(implicit current:Long): ExpirationMap[A, B]

  def mapValues[C](f: (B) ⇒ C)(implicit current:Long): ExpirationMap[A, C]
}

object ExpirationMap {

  def apply[A, B](expirationInterval:Long):ExpirationMap[A, B] = ExpirationMapImpl[A, B](expirationInterval, Queue.empty, Map.empty)
}

private case class ExpirationMapImpl[A, +B](expirationInterval:Long, expirationQueue:Queue[(A, Long)], keyValueExpiry:Map[A,(B,Long)]) extends ExpirationMap[A, B] {

  def apply(key:A)(implicit current:Long):B =
    get(key).get

  def get(key:A)(implicit datetime:Long):Option[B] =
    keyValueExpiry.get(key) collect {
      case (value, expiry) if (expiry > datetime) => value
    }

  def iterator(implicit current:Long): Iterator[(A, B)] = keyValueExpiry.iterator.filter(_._2._2 > current).map(p => (p._1, p._2._1))

  def toMap(implicit current:Long):Map[A, B] = keyValueExpiry.filter(_._2._2 > current).mapValues(_._1)

  def +[B1 >: B](kv: (A, B1))(implicit current:Long): ExpirationMap[A, B1] = {
    val cleared = clearedExpired(current)
    val newQueue =
      if (cleared.keyValueExpiry.contains(kv._1)) cleared.expirationQueue
      else cleared.expirationQueue.enqueue((kv._1, current + expirationInterval))
    val newMap = cleared.keyValueExpiry + (kv._1 -> (kv._2, current + expirationInterval))
    ExpirationMapImpl(this.expirationInterval, newQueue, newMap)
  }

  def -(key: A)(implicit current:Long): ExpirationMap[A, B] = {
    val cleared = clearedExpired(current)
    if (cleared.keyValueExpiry.contains(key)) ExpirationMapImpl(this.expirationInterval, cleared.expirationQueue, cleared.keyValueExpiry - key)
    else cleared
  }

  def mapValues[C](f: (B) ⇒ C)(implicit current:Long): ExpirationMap[A, C] = {
    val cleared = clearedExpired(current)
    ExpirationMapImpl(this.expirationInterval, cleared.expirationQueue, cleared.keyValueExpiry.mapValues(v => (f(v._1), v._2)))
  }

  private def clearedExpired(current:Long):ExpirationMapImpl[A, B] = clearedExpired(current, expirationQueue, keyValueExpiry)
  private def clearedExpired[C >: B](current:Long, expirationQueue:Queue[(A, Long)], keyValueExpiry:Map[A,(C, Long)]):ExpirationMapImpl[A, C] = {
    expirationQueue.headOption collect {
      case (key, expiry) if (expiry < current) => keyValueExpiry.get(key) map {
        case (_, expiry) if (expiry < current)  => clearedExpired(current, expirationQueue.dequeue._2, keyValueExpiry - key)
        case _                                  => clearedExpired(current, expirationQueue.dequeue._2.enqueue((key, expiry)), keyValueExpiry)
      } getOrElse(clearedExpired(current, expirationQueue.dequeue._2, keyValueExpiry))
    } getOrElse (ExpirationMapImpl(this.expirationInterval, expirationQueue, keyValueExpiry))
  }
}
It also requires something like this for the implicit time variable
implicit def currentTime:Long = new Date().getTime

Saturday 2 February 2013

Session state in Scala,1. Immutable Session State

I had recently knocked out an authorisation and session state manager in Scala for a solution I'm working on.   The problem was that it looked more like Java then Scala, so I am revisiting it with an eye to making it as functional as possible.

The first piece of work has been to create a data structure for handling session keys and their expiry. I built this using an immutable queue and an immutable hash-map. I was unable to find a satisfactory implementation of an immutable heap for use as a priority queue, perhaps they are just not that practical for use in an immutable context. The consequence is that the state may hold expired sessions in memory for longer than necessary, however no longer then twice the expiry time period.

I have developed this in Scala 2.10

trait SessionState[K,T] {
  /**
   * Expiry time of a session in milliseconds
   * @return
   */
  def expiry:Int

  /**
   * Returns the corresponding tag for the sessionkey
   * @param sessionKey
   * @param datetime current datetime in milliseconds
   * @return
   */
  def getSessionTag(sessionKey:K)(implicit datetime:Long):Try[T]

  /**
   * Rejuvenates the session increasing its expiry time to the datetime + the expiry time
   * @param sessionKey
   * @param datetime current datetime in milliseconds
   * @return Success if session has not already expired, otherwise SessionExpiredException
   */
  def refreshSession(sessionKey:K)(implicit datetime:Long):Try[SessionState[K,T]]

  /**
   * Adds a session key and tag
   * @param sessionKey
   * @param tag
   * @param datetime current datetime in milliseconds
   * @return New SessionState if successfull, otherwise SessionAlreadyExistsException
   */
  def addSession(sessionKey:K, tag:T)(implicit datetime:Long):Try[SessionState[K,T]]

  /**
   * Adds a session key and tag
   * @param sessionKey
   * @param tag
   * @param datetime current datetime in milliseconds
   * @return New SessionState if successfull, otherwise SessionAlreadyExistsException
   */
  def +(sessionKey:K, tag:T)(implicit datetime:Long):Try[SessionState[K,T]] = addSession(sessionKey, tag, datetime)

  /**
   * Removes the session key if found
   * @param sessionKey
   * @param datetime
   * @return New SessionState with the session key removed
   */
  def expireSession(sessionKey:K)(implicit datetime:Long):SessionState[K,T]

  /**
   * Removes the session key if found
   * @param sessionKey
   * @param datetime
   * @return New SessionState with the session key removed
   */
  def -(sessionKey:K)(implicit datetime:Long):SessionState[K,T] = expireSession(sessionKey)
}


For the implicit parameter I just used

implicit def currentTime:Long = new Date().getTime


and for the implementation I have
object SessionState {

  def apply[K,T](expiry:Int):SessionState[K, T] = SessionStateInstance[K, T](expiry, Queue.empty, Map.empty)
}

private case class SessionStateInstance[K, T](expiry:Int, sessionQueue:Queue[(K, Long)], keysExpireAndTag:Map[K,(T,Long)]) extends SessionState[K,T] {

  def getSessionTag(sessionKey:K)(implicit datetime:Long):Try[T] =
    keysExpireAndTag.get(sessionKey) collect {
      case (tag, expiry) if (expiry > datetime) => Success(tag)
    } getOrElse(Failure(SessionExpiredException))

  def refreshSession(sessionKey:K)(implicit datetime:Long):Try[SessionState[K,T]] =
    keysExpireAndTag.get(sessionKey) collect {
      case (tag, expiry) if (expiry > datetime) => {
        val cleared = clearedExpiredSessions(datetime)
        Success(SessionStateInstance(this.expiry, cleared.sessionQueue, cleared.keysExpireAndTag + (sessionKey -> (tag, expiry))))
      }
    } getOrElse(Failure(SessionExpiredException))

  def addSession(sessionKey:K, tag:T)(implicit datetime:Long):Try[SessionState[K,T]] =
    keysExpireAndTag.get(sessionKey) collect {
      case (tag, expiry) if (expiry > datetime) => Failure(SessionAlreadyExistsException)
    } getOrElse {
      val cleared = clearedExpiredSessions(datetime)
      Success(SessionStateInstance(this.expiry, cleared.sessionQueue.enqueue((sessionKey, datetime + expiry)), cleared.keysExpireAndTag + (sessionKey -> (tag, datetime + expiry))))
    }

  def expireSession(sessionKey:K)(implicit datetime:Long):SessionState[K,T] = {
    val cleared = clearedExpiredSessions(datetime)
    if (cleared.keysExpireAndTag.contains(sessionKey)) SessionStateInstance(this.expiry, cleared.sessionQueue, cleared.keysExpireAndTag - sessionKey)
    else cleared
  }

  private def clearedExpiredSessions(datetime:Long):SessionStateInstance[K,T] = clearedExpiredSessions(datetime, sessionQueue, keysExpireAndTag)
  private def clearedExpiredSessions(datetime:Long, sessionQueue:Queue[(K, Long)], keysExpireAndTag:Map[K,(T,Long)]):SessionStateInstance[K,T] = {
    sessionQueue.headOption collect {
      case (key, expiry) if (expiry < datetime) => keysExpireAndTag.get(key) map {
        case (tag, expiry) if (expiry < datetime) => clearedExpiredSessions(datetime, sessionQueue.dequeue._2, keysExpireAndTag - key)
        case (tag, expiry)                        => clearedExpiredSessions(datetime, sessionQueue.dequeue._2.enqueue((key, expiry)), keysExpireAndTag)
      } getOrElse(clearedExpiredSessions(datetime, sessionQueue.dequeue._2, keysExpireAndTag))
    } getOrElse (SessionStateInstance(this.expiry, sessionQueue, keysExpireAndTag))
  }
}

case object SessionAlreadyExistsException extends Exception("Session key already exists")
case object SessionExpiredException extends Exception("Session key has already expired")