getUser: User
getUsers: List[User]
hasNext: Boolean
next: User
val adults = users.filter(_.age > 17).sortBy(p => (p.lName, p.fName)).take(10)
+-----------------------------------------------------------------------------+
| execute typical instruction 1 nanosec (1 billionth/second)|
| fetch from L1 cache memory 0.5 nanosec |
| branch misprediction 5 nanosec |
| fetch from L2 cache memory 7 nanosec |
| Mutex lock/unlock 25 nanosec |
| fetch from main memory 100 nanosec |
+-----------------------------------------------------------------------------+
| send 2K bytes over 1Gbps network 20,000 nanosec |
| read 1MB sequentially from memory 250,000 nanosec |
| fetch from new disk location (seek) 8,000,000 nanosec |
| read 1MB sequentially from disk 20,000,000 nanosec |
| send packet US to Europe and back 150,000,000 nanosec (150 milliseconds) |
+-----------------------------------------------------------------------------+
source: http://norvig.com/21-days.html#answers
+-----------------------------------------------------------------------------+
| execute typical instruction 1 seconds |
| fetch from L1 cache memory 0.5 seconds |
| branch misprediction 5 seconds |
| fetch from L2 cache memory 7 seconds |
| Mutex lock/unlock 30 seconds |
| fetch from main memory 1.5 minutes |
+-----------------------------------------------------------------------------+
| send 2K bytes over 1Gbps network 5.5 hours |
| read 1MB sequentially from memory 3 days |
| fetch from new disk location (seek) 13 weeks |
| read 1MB sequentially from disk 6.5 months |
| send packet US to Europe and back 5 years |
+-----------------------------------------------------------------------------+
source: https://class.coursera.org/reactive-001/lecture/51
getTweetsWithUrlsFor("tednaleid", function(err, tweets) {
if (err) console.log("Error getting tweets for: " + err);
else {
var mostRecentShortUrl = parseTweetsForUrls(tweets[0]);
httpGet(mostRecentShortUrl, function(err, responseBody) {
if (err) console.log("Error retrieving tweet");
else {
var targetUrl = extractUrlFromTweet(responseBody);
httpGet(targetUrl, function(err, responseBody) {
if (err) console.log("Error retrieving tweet");
else console.log("most recent linked text: " + responseBody);
});
}
});
}
});
getTweetsWithUrlsFor("tednaleid").then(function(tweets) {
var mostRecentShortUrl = parseTweetsForUrls(tweets[0]);
return httpGet(mostRecentShortUrl);
}).then(function(responseBody) {
var targetUrl = extractUrlFromTweet(responseBody);
return httpGet(targetUrl);
}).then(
function(responseBody) {
console.log("most recent linked text: " + responseBody);
},
function(error) {
console.error("Problem getting most recent text: " + error);
}
);
getUser: Future[User]
getUsers: Future[List[Users]]
var path: Path = null
val moveObserver = { (event: MouseEvent) =>
path.lineTo(event.position)
draw(path)
}
control.addMouseDownObserver { event =>
path = new Path(event.position)
control.addMouseMoveObserver(moveObserver)
}
control.addMouseUpObserver { event =>
control.removeMouseMoveObserver(moveObserver)
path.close()
draw(path)
}
var path: Path = null
val moveObserver = { (event: MouseEvent) => // <- Observer #1
path.lineTo(event.position)
draw(path)
}
control.addMouseDownObserver { event => // <- Observer #2
path = new Path(event.position)
control.addMouseMoveObserver(moveObserver)
}
control.addMouseUpObserver { event => // <- Observer #3
control.removeMouseMoveObserver(moveObserver)
path.close()
draw(path)
}
var path: Path = null // <- Global State shared by all Observers
val moveObserver = { (event: MouseEvent) =>
path.lineTo(event.position) // mutate state
draw(path)
}
control.addMouseDownObserver { event =>
path = new Path(event.position) // mutate state
control.addMouseMoveObserver(moveObserver)
}
control.addMouseUpObserver { event =>
control.removeMouseMoveObserver(moveObserver)
path.close() // mutate state
draw(path)
}
var path: Path = null
val moveObserver = { (event: MouseEvent) =>
path.lineTo(event.position)
draw(path)
}
control.addMouseDownObserver { event =>
path = new Path(event.position) // <- Resource Instantiation
control.addMouseMoveObserver(moveObserver)
}
control.addMouseUpObserver { event =>
control.removeMouseMoveObserver(moveObserver)
path.close() // <- Resource Cleanup
draw(path)
}
Iterator (pull) Observable (push)
+-------------------+--------------------+
retrieve data | T next() | onNext(T) |
+-------------------+--------------------+
error occurs | throws Exception | onError(Exception) |
+-------------------+--------------------+
end of collection | return | onCompleted() |
+-------------------+--------------------+
getUsers: Observable[User]
They are also composable, as we will see
val numbers = Observable(1, 2, 3, 4, 5)
numbers.subscribe(
(x: Int) => println(x), // onNext
(e: Throwable) => println("Error! " + e), // onError
() => println("Done!") // onSuccess
)
prints
1
2
3
4
5
Done!
val numbers = Observable(1, 2, 3, 4, 5)
numbers.map(x => x * 2).subscribe(
(x: Int) => println(x), // onNext
(e: Throwable) => println("Error! " + e), // onError
() => println("Done!") // onSuccess
)
prints
2
4
6
8
10
Done!
val numbers = Observable(1, 2, 3, 4, 5)
numbers.reduce((a, b) => a + b).subscribe(
(x: Int) => println(x), // onNext
(e: Throwable) => println("Error! " + e), // onError
() => println("Done!") // onSuccess
)
prints
15
Done!
val moreNumbers = Observable(1, 2, 3, 4, 5, 6, 7, 8, 9)
moreNumbers.filter(x => x % 2 == 0).subscribe(
(x: Int) => println(x), // onNext
(e: Throwable) => println("Error! " + e), // onError
() => println("Done!") // onSuccess
)
prints
2
4
6
8
Done!
val odds = Observable(1, 3, 5)
val evens = Observable(2, 4, 6)
odds.merge(evens).subscribe(
(x: Int) => println(x), // onNext
(e: Throwable) => println("Error! " + e), // onError
() => println("Done!") // onSuccess
)
prints
1
3
5
2
4
6
Done!
(order isn't normally deterministic)
val odds = Observable(1, 3, 5)
val evens = Observable(2, 4, 6)
odds.zip(evens).subscribe(
pair => println(s"odd: ${pair._1}, even: ${pair._2}"), // onNext
(e: Throwable) => println("Error! " + e), // onError
() => println("Done!") // onSuccess
)
prints
odd: 1, even: 2
odd: 3, even: 4
odd: 5, even: 6
Done!
in Scala zip
emits a tuple by default
val numbers = Observable(1, 2, 3, 4, 5)
numbers.take(3).flatMap(x => Observable(x, x * 2)).subscribe(println(_))
prints
1
2
2
4
3
6
Observable
is a monad)def getVideos(userId: Long): Observable[Map[String, Any]] =
videoService.getVideos(userId) // async Observable
// take the first 10, then auto-unsubscribe
.take(10)
.flatMap(video => {
val metadata = video.getMetaData(video) // async Map
val bookmark = videoService.getBookmark(video, userId) // async Map
val rating = videoService.getRating(video, userId) // async Map
// zip the data from those 3 services into an Observable
Observable.zip(Observable(List(metadata, bookmark, rating): _*)).map {
case m :: b :: r :: Nil => Map("id" -> video.id) ++ m ++ b ++ r
}
})
That method composes 4 separate async calls into a List of 10 Maps
Map(id -> 1, rating -> *****, pos -> 1:33, length -> 2h, title -> Gravity)
…
One Many
+------------+---------------+
Synchronous/Blocking | T | Iterable[T] |
+------------+---------------+
Asynchronous/Non-Blocking | Future[T] | Observable[T] |
+------------+---------------+