Improved stream error handling

master
mgifos 8 years ago
parent 248e646137
commit d93c6866d9
  1. 12
      README.md
  2. 91
      src/main/scala/com.github.mgifos.workouts/GarminConnect.scala
  3. 2
      src/test/resources/ultra-80k-runnersworld.csv

@ -79,14 +79,14 @@ The reserved keywords of the notation are: workout, warmup, cooldown, run, repea
**`<time-duration>`** := `<minutes>:<seconds>` **`<time-duration>`** := `<minutes>:<seconds>`
**`<target`>** := `<zone-target> | <pace-target>` **`<target>`** := `<zone-target> | <pace-target>`
**`<zone-target`>** := `z[1-6]` **`<zone-target>`** := `z[1-6]`
**`<pace-target`>** := `<pace> - <pace>` **`<pace-target>`** := `<pace> - <pace>`
**`<pace`>** := `<minutes>:<seconds>` **`<pace>`** := `<minutes>:<seconds>`
**`<minutes`>** := `\d{1,2}` **`<minutes>`** := `\d{1,2}`
**`<seconds`>** := `\d{2}` **`<seconds>`** := `\d{2}`

@ -12,9 +12,10 @@ import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.model.headers._
import akka.pattern.ask import akka.pattern.ask
import akka.stream.{ Materializer, ThrottleMode }
import akka.stream.scaladsl.{ Flow, Sink, Source } import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.{ Materializer, ThrottleMode }
import akka.util.Timeout import akka.util.Timeout
import com.github.mgifos.workouts.GarminConnect._
import com.github.mgifos.workouts.model.WorkoutDef import com.github.mgifos.workouts.model.WorkoutDef
import com.typesafe.scalalogging.Logger import com.typesafe.scalalogging.Logger
import play.api.libs.json.{ JsObject, Json } import play.api.libs.json.{ JsObject, Json }
@ -22,6 +23,7 @@ import play.api.libs.json.{ JsObject, Json }
import scala.collection.immutable.{ Map, Seq } import scala.collection.immutable.{ Map, Seq }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.{ ExecutionContext, Future }
import scala.language.implicitConversions
import scala.util.Failure import scala.util.Failure
case class GarminWorkout(name: String, id: Long) case class GarminWorkout(name: String, id: Long)
@ -49,7 +51,6 @@ class GarminConnect(email: String, password: String)(implicit system: ActorSyste
.withHeaders(session.headers .withHeaders(session.headers
:+ Referer("https://connect.garmin.com/modern/workout/create/running") :+ Referer("https://connect.garmin.com/modern/workout/create/running")
:+ RawHeader("NK", "NT")) :+ RawHeader("NK", "NT"))
log.debug(s"Sending req: ${req.httpMessage}")
workout.name -> req workout.name -> req
}) })
} }
@ -58,18 +59,16 @@ class GarminConnect(email: String, password: String)(implicit system: ActorSyste
.throttle(1, 1.second, 1, ThrottleMode.shaping) .throttle(1, 1.second, 1, ThrottleMode.shaping)
.mapAsync(1) { .mapAsync(1) {
case (workout, req) => case (workout, req) =>
for { Http().singleRequest(req).flatMap { res =>
res <- Http().singleRequest(req).andThen { if (res.status == OK) {
case util.Success(ok) => res.body.map { json =>
log.debug(s"response is ok: $ok") log.info(s" $workout")
case Failure(ex) => GarminWorkout(workout, Json.parse(json).\("workoutId").as[Long])
log.error("Ups", ex) }
} else {
log.debug(s"Creation wo response: $res")
Future.failed(new Error("Cannot create workout"))
} }
if res.status == OK
json <- res.entity.toStrict(10.seconds).map(_.data.utf8String)
} yield {
log.info(s" $workout")
GarminWorkout(workout, Json.parse(json).\("workoutId").as[Long])
} }
} }
@ -88,11 +87,9 @@ class GarminConnect(email: String, password: String)(implicit system: ActorSyste
val futureRequests = for { val futureRequests = for {
session <- login() session <- login()
map <- getWorkoutsMap() map <- getWorkoutsMap()
_ = log.debug(s"MAP: $map")
pairs = workouts.flatMap { wo => pairs = workouts.flatMap { wo =>
map.filter { case (name, _) => name == wo } map.filter { case (name, _) => name == wo }
} }
_ = log.debug(s"PAIRS: $pairs")
} yield { } yield {
log.info("\nDeleting workouts:") log.info("\nDeleting workouts:")
pairs.map { pairs.map {
@ -109,11 +106,12 @@ class GarminConnect(email: String, password: String)(implicit system: ActorSyste
.throttle(1, 1.second, 1, ThrottleMode.shaping) .throttle(1, 1.second, 1, ThrottleMode.shaping)
.mapAsync(1) { .mapAsync(1) {
case (label, req) => case (label, req) =>
log.debug(s" Delete request: $req") Http().singleRequest(req).withoutBody.map { res =>
Http().singleRequest(req).map { res =>
res.discardEntityBytes()
if (res.status == NoContent) log.info(s" $label") if (res.status == NoContent) log.info(s" $label")
else log.error(s" Cannot delete workout: $label") else {
log.error(s" Cannot delete workout: $label")
log.debug(s" Response: $res")
}
} }
} }
source.runWith(Sink.seq).map(_.length) source.runWith(Sink.seq).map(_.length)
@ -125,7 +123,6 @@ class GarminConnect(email: String, password: String)(implicit system: ActorSyste
log.info("\nScheduling:") log.info("\nScheduling:")
Source(spec).map { Source(spec).map {
case (date, gw) => case (date, gw) =>
log.debug(s"Making $date -> $gw")
s"$date -> ${gw.name}" -> Post(s"https://connect.garmin.com/modern/proxy/workout-service/schedule/${gw.id}") s"$date -> ${gw.name}" -> Post(s"https://connect.garmin.com/modern/proxy/workout-service/schedule/${gw.id}")
.withHeaders(session.headers .withHeaders(session.headers
:+ Referer("https://connect.garmin.com/modern/calendar") :+ Referer("https://connect.garmin.com/modern/calendar")
@ -134,12 +131,10 @@ class GarminConnect(email: String, password: String)(implicit system: ActorSyste
}.throttle(1, 1.second, 1, ThrottleMode.shaping) }.throttle(1, 1.second, 1, ThrottleMode.shaping)
.mapAsync(1) { .mapAsync(1) {
case (label, req) => case (label, req) =>
log.debug(s" Sending $req") Http().singleRequest(req).withoutBody.map { res =>
Http().singleRequest(req).map { res =>
log.debug(s" Received $res") log.debug(s" Received $res")
if (res.status == OK) log.info(s" $label") if (res.status == OK) log.info(s" $label")
else log.error(s" Cannot schedule: $label") else log.error(s" Cannot schedule: $label")
res.discardEntityBytes()
} }
} }
}.runWith(Sink.seq).map(_.length) }.runWith(Sink.seq).map(_.length)
@ -156,13 +151,18 @@ class GarminConnect(email: String, password: String)(implicit system: ActorSyste
:+ Referer("https://connect.garmin.com/modern/workouts") :+ Referer("https://connect.garmin.com/modern/workouts")
:+ RawHeader("NK", "NT")) :+ RawHeader("NK", "NT"))
Source.fromFuture( Source.fromFuture(
for { Http().singleRequest(req).flatMap { res =>
res <- Http().singleRequest(req) if (res.status == OK)
if res.status == OK res.body.map { json =>
json <- res.entity.toStrict(2.seconds).map(_.data.utf8String) Json.parse(json).asOpt[Seq[JsObject]].map { arr =>
} yield Json.parse(json).asOpt[Seq[JsObject]].map { arr => arr.map(x => (x \ "workoutName").as[String] -> (x \ "workoutId").as[Long])
arr.map(x => (x \ "workoutName").as[String] -> (x \ "workoutId").as[Long]) }.getOrElse(Seq.empty)
}.getOrElse(Seq.empty)) }
else {
log.debug(s"Cannot retrieve workout list, response: $res")
Future.failed(new Error("Cannot retrieve workout list from Garmin Connect"))
}
})
} }
source.runWith(Sink.head) source.runWith(Sink.head)
} }
@ -208,8 +208,7 @@ class GarminConnect(email: String, password: String)(implicit system: ActorSyste
def redirectionLoop(count: Int, url: String, acc: Seq[Cookie]): Future[Seq[Cookie]] = { def redirectionLoop(count: Int, url: String, acc: Seq[Cookie]): Future[Seq[Cookie]] = {
Http().singleRequest { Http().singleRequest {
HttpRequest(uri = Uri(url)).withHeaders(acc) HttpRequest(uri = Uri(url)).withHeaders(acc)
}.flatMap { res => }.withoutBody.flatMap { res =>
res.discardEntityBytes()
val cookies = extractCookies(res) val cookies = extractCookies(res)
res.headers.find(_.name() == "Location") match { res.headers.find(_.name() == "Location") match {
case Some(header) => case Some(header) =>
@ -232,7 +231,7 @@ class GarminConnect(email: String, password: String)(implicit system: ActorSyste
"gauthHost" -> "https://sso.garmin.com/sso", "gauthHost" -> "https://sso.garmin.com/sso",
"consumeServiceTicket" -> "false") "consumeServiceTicket" -> "false")
for { for {
res1 <- Http().singleRequest(HttpRequest(uri = Uri("https://sso.garmin.com/sso/login").withQuery(Query(params)))) res1 <- Http().singleRequest(HttpRequest(uri = Uri("https://sso.garmin.com/sso/login").withQuery(Query(params)))).withoutBody
res2 <- Http().singleRequest( res2 <- Http().singleRequest(
HttpRequest( HttpRequest(
POST, POST,
@ -241,24 +240,36 @@ class GarminConnect(email: String, password: String)(implicit system: ActorSyste
"username" -> email, "username" -> email,
"password" -> password, "password" -> password,
"_eventId" -> "submit", "_eventId" -> "submit",
"embed" -> "true")).toEntity).withHeaders(extractCookies(res1))) "embed" -> "true")).toEntity).withHeaders(extractCookies(res1))).withoutBody
sessionCookies <- redirectionLoop(0, "https://connect.garmin.com/post-auth/login", extractCookies(res2)) sessionCookies <- redirectionLoop(0, "https://connect.garmin.com/post-auth/login", extractCookies(res2))
username <- getUsername(sessionCookies) username <- getUsername(sessionCookies)
} yield { } yield Session(username, sessionCookies)
res1.discardEntityBytes()
res2.discardEntityBytes()
Session(username, sessionCookies)
}
} }
private def getUsername(sessionCookies: Seq[HttpHeader]): Future[String] = { private def getUsername(sessionCookies: Seq[HttpHeader]): Future[String] = {
val req = HttpRequest(GET, Uri("https://connect.garmin.com/user/username")).withHeaders(sessionCookies) val req = HttpRequest(GET, Uri("https://connect.garmin.com/user/username")).withHeaders(sessionCookies)
Http().singleRequest(req).flatMap { res => Http().singleRequest(req).flatMap { res =>
if (res.status != StatusCodes.OK) throw new Error("Login failed!") if (res.status != StatusCodes.OK) throw new Error("Login failed!")
res.entity.toStrict(2.seconds).map(_.data.utf8String).map { json => res.body.map { json =>
(Json.parse(json) \ "username").as[String] (Json.parse(json) \ "username").as[String]
} }
} }
} }
} }
} }
object GarminConnect {
class HttpResponseWithBody(original: HttpResponse) {
def body(implicit ec: ExecutionContext, mat: Materializer): Future[String] = original.entity.toStrict(10.seconds).map(_.data.utf8String)
}
class LiteHttpFuture(original: Future[HttpResponse]) {
def withoutBody(implicit ec: ExecutionContext, mat: Materializer) = original.andThen {
case util.Success(res) => res.discardEntityBytes()
}
}
implicit def responseWithBody(original: HttpResponse): HttpResponseWithBody = new HttpResponseWithBody(original)
implicit def liteHttpFuture(original: Future[HttpResponse]): LiteHttpFuture = new LiteHttpFuture(original)
}

@ -24,7 +24,7 @@ WEEK,Monday,Tuesday,Wednesday,Thursday,Friday,Saturday,Sunday,Estimated km,,Dura
- recover: 1.4km @z2 - recover: 1.4km @z2
- run: 4km @z2 - run: 4km @z2
- cooldown: lap-button",8k jog,"11-15k, middle 5k @MP",,"workout: 2h run - cooldown: lap-button",8k jog,"11-15k, middle 5k @MP",,"workout: 2h run
- run 120:00 - run: 120:00
- cooldown: lap-button","workout: 3.5h run - cooldown: lap-button","workout: 3.5h run
- run: 210:00 - run: 210:00
- cooldown: lap-button",90.0,,0:30:00,5.0,3.1 - cooldown: lap-button",90.0,,0:30:00,5.0,3.1

1 WEEK Monday Tuesday Wednesday Thursday Friday Saturday Sunday Estimated km Duration Avg km Miles
24
25
26
27
28
29
30
Loading…
Cancel
Save