Moebius contains a set of functions which help you to perform tranformations on a stream of events. Moebius processes Samsara’s events which are defined as follow.
{:timestamp 1430760258405 ; milliseconds from EPOCH
:eventName "any.meaningful.name" ; a descriptive name for an event (typically dotted)
:sourceId "any-identifier"} ; an identifier of the who is sending the event (such as: userId, deviceId, clientId etc)
This is the minimal event which could be sent to Samsara. However you can specify additional properties as part of the event such as:
{:timestamp 1430760258405
:eventName "any.meaningful.name"
:sourceId "any-identifier"
;; it supports any additional k/v pairs
:color "red"
:temperature 45 }
How Moebius works is better explained with an example.
Let’s assume we have an imaginary game called “Apocalypse Now” which sends to Samsara the following events:
(def events
[{:eventName "game.started" :timestamp 1430760258401 :sourceId "device1" :level 1}
{:eventName "game.ad.displayed" :timestamp 1430760258402 :sourceId "device1"}
{:eventName "game.level.completed" :timestamp 1430760258403 :sourceId "device1" :levelCompleted 1}
{:eventName "game.level.completed" :timestamp 1430760258404 :sourceId "device1" :levelCompleted 2}
{:eventName "game.ad.displayed" :timestamp 1430760258405 :sourceId "device1"}
{:eventName "game.level.completed" :timestamp 1430760258406 :sourceId "device1" :levelCompleted 3}
{:eventName "game.stopped" :timestamp 1430760258407 :sourceId "device1" :level 4}])
Enrichment
Let’s say that in our server we manage multiple games, and we want to be able to distinguish
the events coming from this game from those of other games. To do so we can enrich
every
incoming event from a specific endpoint with an attribute :game-name
.
To do so we use an enrichment function which takes an event as argument and injects the game name.
(defenrich game-name
[event]
(assoc event :game-name "Apocalypse Now"))
This function will add the :game-name
attribute with the value
“Apocalypse Now” to every event in the stream. Let’s test it out:
(game-name {:eventName "game.started"
:timestamp 1430760258401
:sourceId "device1"
:level 1})
;;=> {:eventName "game.started"
;; :timestamp 1430760258401
;; :sourceId "device1"
;; :level 1
;; :game-name "Apocalypse Now"}
Easy enough. Next thing we would like to do is to inject the current level to all “game.level.completed” events. By doing so we simplify queries such as “Average current level over time”.
(defenrich current-level
[{:keys [levelCompleted] :as event}]
(when-event-name-is event "game.level.completed"
(inject-as event :level (inc levelCompleted))))
This is another enrichment example as we enrich
a particular set of
events with additional information. when-event-name-is
compares the eventName
to a given name. inject-as
ingests the :level
property into the event,
as long as the value is not nil
. Now every “game.level.completed” event
will be enriched with this new property. All others will be left unchanged.
Let’s try it out.
(current-level {:eventName "game.level.completed"
:timestamp 1430760258403
:sourceId "device1"
:levelCompleted 1})
;;=> {:eventName "game.level.completed"
;; :timestamp 1430760258403
;; :sourceId "device1"
;; :levelCompleted 1
;; :level 2}
(current-level {:eventName "game.ad.displayed"
:timestamp 1430760258402
:sourceId "device1"})
;;=> nil
When applied to a non-matching event, the when
-like clause will
return nil
and the pipeline processor will interpret this as if you
don’t want to change the event. This is just a little simplification
to avoid having to return the original event when you don’t want to
change it. In case you want to discard the event, then you can use
the deffilter
macro to define a filter.
Filtering
Sometimes you want to filter out some of the events you receive. Although this is not very frequent it might still happen. Usually is better to store everything as you never know if in the future you will need these events.
So if you want to filter some events you can create a filter with deffilter
.
Let’s assume that we want to remove all events called “game.ad.displayed”:
(deffilter no-ads [{:keys [eventName]}]
(not= eventName "game.ad.displayed"))
;; no surprise here the predicates work like in filter function
(no-ads {:eventName "game.level.completed"
:timestamp 1430760258403
:sourceId "device1"
:levelCompleted 1})
;;=> true
;; when it doesn't match `false` or `nil` is returned
(no-ads {:eventName "game.ad.displayed"
:timestamp 1430760258402
:sourceId "device1"})
;;=> false
Correlation
Another interesting capability of a stream processing system is to generate/derive new events from a given event. The capability to generate new events is very important in order to keep the client small and send only a minimal number of significant events and do the hard work on the server side.
In our example let’s assume that every time a user starts from the
level 1
it means that a new user is starting the game. Obviously most
of the time there are better ways to find if there are new users playing
with your new game, but for the sake of this example let’s assume that to
derive this information in this way it make sense. So let’s write a
correlation function.
(defcorrelate new-player
[{:keys [eventName level timestamp sourceId] :as event}]
(when (and (= eventName "game.started")
(= level 1))
[{:timestamp timestamp :sourceId sourceId :eventName "game.new.player"}]))
A correlation function can return nil
, []
, or 1 or more events.
Every new event generated here will be processed by the same
Moebius pipeline as if was send by the client.
In this case when the event matches the criteria selected,
a new event is returned. Let’s try it into the REPL.
(new-player {:eventName "game.started"
:timestamp 1430760258401
:sourceId "device1"
:level 1})
;;=>[{:timestamp 1430760258401
;; :sourceId "device1"
;; :eventName "game.new.player"}]
Compose your processing
Now let’s put all the things together into a single streaming function.
;; With the `moebius` function you can combine all the streaming
;; processing functions pretty much in the same way as `comp` does
;; ** with the important difference that the function listed will
;; be executed in the same order they apper (left-to-right)**
(def mf (moebius
game-name
current-level
no-ads
new-player))
moebius
returns a function which will apply all composed functions to
all given events. It takes an initial state as well, however if
all processing functions are stateless, the state will be returned
unchanged. We will explore more about the stateful processing later.
(mf nil events)
;;=>[nil ;; <-- state
;; [{:eventName "game.started", :game-name "Apocalypse Now", :level 1, :sourceId "device1", :timestamp 1430760258401}
;; {:game-name "Apocalypse Now", :timestamp 1430760258401, :sourceId "device1", :eventName "game.new.player"}
;; {:levelCompleted 1, :eventName "game.level.completed", :game-name "Apocalypse Now", :level 2, :sourceId "device1", :timestamp 1430760258403}
;; {:levelCompleted 2, :eventName "game.level.completed", :game-name "Apocalypse Now", :level 3, :sourceId "device1", :timestamp 1430760258404}
;; {:levelCompleted 3, :eventName "game.level.completed", :game-name "Apocalypse Now", :level 4, :sourceId "device1", :timestamp 1430760258406}
;; {:eventName "game.stopped", :game-name "Apocalypse Now", :level 4, :sourceId "device1", :timestamp 1430760258407}]]
A few things need to be noted here.
- The
game-name
has been applied to all events, even the event generated by the correlation function. - The
current-level
has been injected in allgame.level.completed
- The
game.ad.displayed
are now present in the result. The functions afterno-ads
wouldn’t receive the event at all, while the functions which appear before will receive it and they could do some processing on it before it get discarded. - Finally the
game.new.player
is generated and it went through the full processing. Infact thegame-name
has been injected even if the function appears before the corellation function.
The function generated by moebius
can be used to process events in
batches of any size.
Pattern matching
Sometimes it is easier to express complicated conditions in terms of
pattern matching. For this purpose we integrate
core.match. The macro
when-event-match
matches an event based on its
properties. Patterns are processed in order of appearance so if you have
multiple pattern matches the the associated expression of the
first one appearing in the list is will be executed.
If nothing matches, then the event is return unchanged.
To support this, an implicit :else
statement is included in the match,
so you can’t use it in your match expressions.
(let [event {:eventName "game.started" :level 8}]
(when-event-match event
[{:eventName "game.started" :level 0}] (assoc event :new-player true)
[{:eventName _ :level (_ :guard even?)}] (assoc event :start :even-level)
[{:eventName _ :level (_ :guard #(= 0 (mod % 11)))}] (assoc event :level-type :extra-challenge)
[{:eventName "game.new.level" :level _}] (assoc event :level-type :normal)))
In this case the event {:eventName "game.started" :level 8}
is going to match
the second expression.
Glob matching
The eventName
can be any string, however we recommend you to use a
series of words separated by dots, such as:
<segment>.<segment>.<...>.<segment>
. By doing so you will be able to
leverage the glob matching facilities provided by the framework.
The glob matching works by matching *
to any single segment and
matching **
to multiple segments.
Here are some examples of glob matching.
(match-glob "game.*.started" "game.level.started") ;;=> truthy
(match-glob "game.*.started" "game.level.2.started") ;;=> false
(match-glob "game.**.started" "game.level.2.started") ;;=> truthy
(match-glob "game.**" "game.level.5.stopped") ;;=> truthy
(match-glob "game.**" "game.anything.else") ;;=> truthy
(match-glob "game.**.ended" "game.1.2.3.ended") ;;=> truthy
To use, you simply use it in a condition statement:
(defenrich current-level
[{:keys [levelCompleted eventName] :as event}]
(when (match-glob "game.**.completed" eventName)
(inject-as event :level (inc levelCompleted))))