The default implementation of Samsara CORE doesn’t do any processing beside adding an id to every event.
To implement your custom processing you have to follow these steps:
lein new samsara my-streaming-app
This will create a project with the default structure:
my-streaming-app/
├── CHANGELOG.md
├── LICENSE
├── README.md
├── config
│ └── config.edn
├── doc
│ └── intro.md
├── docker-compose.yml
├── project.clj
├── resources
├── src
│ └── my_streaming_app
│ ├── core.clj
│ └── main.clj
└── test
└── my_streaming_app
└── core_test.clj
The project.clj
should look like the following:
(defproject my-streaming-app "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.8.0"]
[samsara/samsara-core "0.5.5.0"]]
:main my-streaming-app.main
:profiles {:uberjar {:aot :all}
:dev {:dependencies [[midje "1.7.0"]]
:plugins [[lein-midje "3.1.3"]
[lein-binplus "0.4.1"]]}}
)
Please note the Samsara dependency, you can see all available version Clojars.
Create a main.clj
file which will start the processing part.
(ns my-streaming-app.main
(:require [my-streaming-app.core :refer :all]
[samsara-core.main :as sam])
(:gen-class))
(defn -main [config-file]
(println "Starting streaming processing.")
(sam/start-processing! (sam/init! config-file)))
The config file config/config.edn
looks like follow:
{:streams
[{:id :ingestion
:input-topic "ingestion"
:state :partitioned
:output-topic "events"
:processor "my-streaming-app.core/make-processor"
}]
:job
{:job-name "my-streaming-app"
;; a CSV list of hosts and ports (and optional path)
:zookeepers "127.0.0.1:2181"
;; a CSV list of host and ports of kafka brokers
:brokers "127.0.0.1:9092"
:offset :smallest
;; this is useful only for local development
:samza-overrides { :task.checkpoint.replication.factor 1 }
}
:tracking {:enabled true :type :console
:reporting-frequency-seconds 600}
}
The :streams
section contains a list of stream which must be
consumed. The default input is from a topic called ingestion
and
the processed output goes to a topic called events
.
The development endpoints for :zookeeper
and Kafka :brokers
will
be in your local machine. The :samza-overrides
this is required only
for local development.
Similarly, the :tracking
for development purposes can be enabled
on the console, but for a production environment you might want to publish
all metrics to the monitoring machine. More detail on this is available
on Samsara’s TRACKit! project.
Now let’s create a the processing functions:
(ns my-streaming-app.core
(:require [samsara-core.core :as sam] ; default pipeline
[moebius.core :refer :all] ; processing functions
[moebius.kv :as kv] ; state management
))
We require the samsara CORE and the moebius for the processing functions:
now we can create our enrichment, filtering and correlation functions:
;;
;; Enrichment example.
;; You can compute any additional field and inject it
;; directly into the event.
;;
(defenrich game-name
[event]
(assoc event :game-name "Apocalypse Now"))
;;
;; Filtering example.
;; You can tell the pipeline to discard events
;; which match a particular condition.
;;
(deffilter no-ads [{:keys [eventName]}]
(not= eventName "game.ad.displayed"))
;;
;; Correlation example.
;; Based on the events you are processing
;; you can produce new events
;;
(defcorrelate new-player
[{:keys [eventName level timestamp sourceId] :as event}]
(when (and (= eventName "game.started")
(= level 1))
[{:timestamp timestamp :sourceId sourceId :eventName "game.new.player"}]))
At this point we are able to compose our functions into a pipeline as follow:
;;
;; Pipelines.
;; Finally you can compose your pipelines
;; chaining your processing functions in the order
;; you wish process them.
;;
(def my-pipeline
(pipeline
(sam/make-samsara-pipeline {})
game-name
no-ads
new-player))
;;
;; Finally you can produce a moebius
;; function which is it used by Samsara-CORE
;; to process incoming events.
;;
(defn make-processor [config]
(moebius my-pipeline))
That’s all we need to create our custom processing functions.
To see more about the processing functions read the Develop your stream processing pipelines page.
Testing
If you wish to test your function you can use any of the standard Clojure testing framework such as midje.
Here is an example of testing:
(ns my-streaming-app.core-test
(:require [my-streaming-app.core :refer :all]
[midje.sweet :refer :all]))
(fact "ENRICHMENT: the game-name must be injected into all events"
(game-name
{:eventName "game.started"
:timestamp 1430760258401
:sourceId "device1"
:level 1})
=> (contains {:game-name "Apocalypse Now"}))
(fact "FILTERING: filtering should drop game.ad.displayed"
;; no surprise here the predicates work like in filter function
(no-ads {:eventName "game.level.completed"
:timestamp 1430760258403
:sourceId "device1"
:levelCompleted 1})
=> true
;; when it doesn't match `false` or `nil` is returned
(no-ads {:eventName "game.ad.displayed"
:timestamp 1430760258402
:sourceId "device1"})
=> false)
(fact "CORRELATION: when a game.started event if found with a level=1,
we could infer that a new player started play with our game."
(new-player {:eventName "game.started"
:timestamp 1430760258401
:sourceId "device1"
:level 1})
=>
[{:timestamp 1430760258401, :sourceId "device1", :eventName "game.new.player"}]
(new-player {:eventName "game.started"
:timestamp 1430760258401
:sourceId "device1"
:level 5})
=> nil)
How to build, test and run.
Build with:
lein do clean, midje, bin
This will build a executable jar using the lein-binplus plugin.
The easiest way to run it locally is to create a local development cluster with docker-compose
. Here is a sample file:
NOTE: in ADV_IP you are required to put your machine’s local ip address but not the localhost (127.0.0.1) to enable
your processing job to communicate with Zookeeper and Kafka.
#
# Zookeeper
#
zookeeper:
image: samsara/zookeeper:snapshot
ports:
- "2181:2181"
- "15001:15000"
environment:
ZK_SERVER_ID: 1
# Your ip but NOT 127.0.0.1
ADV_IP: "192.168.0.2"
volumes:
- /tmp/logs/zk1:/logs
- /tmp/data/zk1:/data
#
# Kafka
#
kafka:
image: samsara/kafka:snapshot
ports:
- "9092:9092"
- "15002:15000"
links:
- zookeeper:zookeeper
environment:
KAFKA_BROKER_ID: 1
# Your ip but NOT 127.0.0.1
ADV_IP: "192.168.0.2"
volumes:
- /tmp/logs/kafka1:/logs
- /tmp/data/kafka1:/data
#
# Samsara Ingestion API
#
ingestion:
image: samsara/ingestion-api:snapshot
links:
- kafka:kafka
- monitoring:riemann
ports:
- "9000:9000"
- "15003:15000"
environment:
OUTPUT_TOPIC: "ingestion"
TRACKING_ENABLED: "true"
volumes:
- /tmp/logs/ingestion-api:/logs
#
# Samsara CORE
#
#core:
# image: myuser/my-streaming-app
# links:
# - kafka:kafka
# - zookeeper:zookeeper
# - monitoring:riemann
# ports:
# - "15010:15000"
# environment:
# TRACKING_ENABLED: "true"
# SINGLE_BROKER: "true"
# volumes:
# - /tmp/logs/core:/logs
#
# ElasticSearch
#
elasticsearch:
image: samsara/elasticsearch:snapshot
links:
- zookeeper:zookeeper
ports:
- "9200:9200"
- "9300:9300"
- "15004:15000"
volumes:
- /tmp/logs/els:/logs
- /tmp/data/els:/data
#
# Kibana
#
kibana:
image: samsara/kibana:snapshot
links:
- elasticsearch:elasticsearch
ports:
- "8000:8000"
- "15005:15000"
volumes:
- /tmp/logs/kibana:/logs
#
# Samsara Qanal
#
qanal:
image: samsara/qanal:snapshot
links:
- zookeeper:zookeeper
- elasticsearch:els
- monitoring:riemann
ports:
- "15006:15000"
environment:
TRACKING_ENABLED: "true"
KAFKA_TOPICS_SPEC: |
{ :topic "events" :partitions :all :type :plain
:indexing {:strategy :simple :index "events" :doc-type "events" :id-field "id"}}
volumes:
- /tmp/logs/qanal1:/logs
#
# Monitoring
#
monitoring:
image: samsara/monitoring:snapshot
ports:
- "15000:80"
- "5555:5555"
- "5556:5556"
- "8083:8083"
- "8086:8086"
environment:
HTTP_USER: admin
HTTP_PASS: samsara
volumes:
- /tmp/logs/monitoring:/logs
- /tmp/data/monitoring:/data
#
# Bootstrap
#
bootstrap:
image: samsara/kafka:snapshot
links:
- zookeeper:zookeeper
- kafka:kafka
- elasticsearch:elasticsearch
command: bash -c "curl -sSL 'https://raw.githubusercontent.com/samsara/samsara/master/docker-images/bootstrap/bootstrap.sh' | bash"
volumes:
- /tmp/logs/bootstrap:/logs
Please NOTE: you have to replace the ADV_IP with your local machine ip address.
Then run:
docker-compose up
and when you see the following message appearing on the console:
bootstrap_1 |
bootstrap_1 | ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
bootstrap_1 | ;; ;;
bootstrap_1 | ;; ---==| S A M S A R A I S R E A D Y F O R A C T I O N |==---- ;;
bootstrap_1 | ;; ;;
bootstrap_1 | ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
bootstrap_1 |
The you can start your streaming processing core:
./target/my-streaming-app-0.1.0-SNAPSHOT config/config.edn
This should startup and connect to Kafka and Zookeeper and when you see:
INFO [samsara-core.main] - Processed 0 events at 0/s
INFO [samsara-core.main] - Processed 0 events at 0/s
INFO [samsara-core.main] - Processed 0 events at 0/s
INFO [samsara-core.main] - Processed 0 events at 0/s
INFO [samsara-core.main] - Processed 0 events at 0/s
You are ready to publish events to the ingestion-api, like:
cat <<EOF | curl -i -H "Content-Type: application/json" \
-H "X-Samsara-publishedTimestamp: $(date +%s999)" \
-XPOST "http://127.0.0.1:9000/v1/events" -d @-
[
{
"timestamp": $(date +%s000),
"sourceId": "test-device",
"eventName": "game.started",
"level": 1,
"levelScore": $RANDOM
}
]
EOF
Finally you should be able to see the events via http://127.0.0.1:8000. The first time you’ll need to set it up as described in Quick start guide.