Skip to content

Commit 739cc3f

Browse files
committed
Upgrade to cats-effect, fs2 3.0.0-M3 builds
1 parent 1d47de0 commit 739cc3f

13 files changed

Lines changed: 229 additions & 312 deletions

File tree

.github/workflows/ci.yml

Lines changed: 21 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -32,48 +32,21 @@ jobs:
3232
fetch-depth: 0
3333

3434
- name: Setup Java and Scala
35-
uses: olafurpg/setup-scala@v5
35+
uses: olafurpg/setup-scala@v10
3636
with:
3737
java-version: ${{ matrix.java }}
3838

39-
- name: Cache ivy2
40-
uses: actions/cache@v1
41-
with:
42-
path: ~/.ivy2/cache
43-
key: ${{ runner.os }}-sbt-ivy-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
44-
45-
- name: Cache coursier (generic)
46-
uses: actions/cache@v1
47-
with:
48-
path: ~/.coursier/cache/v1
49-
key: ${{ runner.os }}-generic-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
50-
51-
- name: Cache coursier (linux)
52-
if: contains(runner.os, 'linux')
53-
uses: actions/cache@v1
54-
with:
55-
path: ~/.cache/coursier/v1
56-
key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
57-
58-
- name: Cache coursier (macOS)
59-
if: contains(runner.os, 'macos')
60-
uses: actions/cache@v1
61-
with:
62-
path: ~/Library/Caches/Coursier/v1
63-
key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
64-
65-
- name: Cache coursier (windows)
66-
if: contains(runner.os, 'windows')
67-
uses: actions/cache@v1
68-
with:
69-
path: ~/AppData/Local/Coursier/Cache/v1
70-
key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
71-
7239
- name: Cache sbt
73-
uses: actions/cache@v1
40+
uses: actions/cache@v2
7441
with:
75-
path: ~/.sbt
76-
key: ${{ runner.os }}-sbt-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
42+
path: |
43+
~/.sbt
44+
~/.ivy2/cache
45+
~/.coursier/cache/v1
46+
~/.cache/coursier/v1
47+
~/AppData/Local/Coursier/Cache/v1
48+
~/Library/Caches/Coursier/v1
49+
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
7750

7851
- name: Check that workflows are up to date
7952
run: sbt ++${{ matrix.scala }} githubWorkflowCheck
@@ -107,48 +80,21 @@ jobs:
10780
fetch-depth: 0
10881

10982
- name: Setup Java and Scala
110-
uses: olafurpg/setup-scala@v5
83+
uses: olafurpg/setup-scala@v10
11184
with:
11285
java-version: ${{ matrix.java }}
11386

114-
- name: Cache ivy2
115-
uses: actions/cache@v1
116-
with:
117-
path: ~/.ivy2/cache
118-
key: ${{ runner.os }}-sbt-ivy-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
119-
120-
- name: Cache coursier (generic)
121-
uses: actions/cache@v1
122-
with:
123-
path: ~/.coursier/cache/v1
124-
key: ${{ runner.os }}-generic-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
125-
126-
- name: Cache coursier (linux)
127-
if: contains(runner.os, 'linux')
128-
uses: actions/cache@v1
129-
with:
130-
path: ~/.cache/coursier/v1
131-
key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
132-
133-
- name: Cache coursier (macOS)
134-
if: contains(runner.os, 'macos')
135-
uses: actions/cache@v1
136-
with:
137-
path: ~/Library/Caches/Coursier/v1
138-
key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
139-
140-
- name: Cache coursier (windows)
141-
if: contains(runner.os, 'windows')
142-
uses: actions/cache@v1
143-
with:
144-
path: ~/AppData/Local/Coursier/Cache/v1
145-
key: ${{ runner.os }}-sbt-coursier-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
146-
14787
- name: Cache sbt
148-
uses: actions/cache@v1
149-
with:
150-
path: ~/.sbt
151-
key: ${{ runner.os }}-sbt-cache-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
88+
uses: actions/cache@v2
89+
with:
90+
path: |
91+
~/.sbt
92+
~/.ivy2/cache
93+
~/.coursier/cache/v1
94+
~/.cache/coursier/v1
95+
~/AppData/Local/Coursier/Cache/v1
96+
~/Library/Caches/Coursier/v1
97+
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}
15298

15399
- name: Download target directories (2.12.12)
154100
uses: actions/download-artifact@v2

project/Version.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
object Version {
22
val Akka = "2.6.9"
33
val Camel = "2.20.4"
4-
val CatsEffect = "2.2.0"
54
val Config = "1.4.0"
6-
val Fs2 = "2.4.4"
5+
val CatsEffect = "3.0.0-M3"
6+
val Fs2 = "3.0.0-M3"
77
val JUnitInterface = "0.11"
88
val Log4j = "2.13.0"
99
val ScalaCollectionCompat = "2.2.0"

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.4.0
1+
sbt.version=1.4.3

project/plugins.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0")
44

55
addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.3")
66

7-
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.13")
7+
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.15")
88

9-
addSbtPlugin("com.codecommit" % "sbt-github-actions" % "0.9.3")
9+
addSbtPlugin("com.codecommit" % "sbt-github-actions" % "0.9.5")
1010

1111
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.21")
1212

streamz-camel-fs2/README.md

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ Camel DSL for FS2
22
-----------------
33

44
[Apache Camel endpoints](http://camel.apache.org/components.html) can be integrated into [FS2](https://github.com/functional-streams-for-scala/fs2) applications with a [DSL](#dsl).
5-
5+
66
### Dependencies
77

88
The DSL is provided by the `streamz-camel-fs2` artifact which is available for Scala 2.11 and 2.12:
99

1010
resolvers += Resolver.bintrayRepo("krasserm", "maven")
1111

1212
libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.10-M2"
13-
13+
1414
### Configuration
1515

1616
The consumer receive timeout on Camel endpoints defaults to 500 ms. If you need to change that, you can do so in `application.conf`:
@@ -31,11 +31,11 @@ Its usage requires an implicit [`StreamContext`](http://krasserm.github.io/strea
3131
```scala
3232
import streamz.camel.StreamContext
3333

34-
// contains an internally managed CamelContext
34+
// contains an internally managed CamelContext
3535
implicit val streamContext: StreamContext = StreamContext()
3636
```
3737

38-
Applications that want to re-use an existing, externally managed `CamelContext` should create a `StreamContext` with `StreamContext(camelContext: CamelContext)`:
38+
Applications that want to re-use an existing, externally managed `CamelContext` should create a `StreamContext` with `StreamContext(camelContext: CamelContext)`:
3939

4040
```scala
4141
import org.apache.camel.CamelContext
@@ -49,7 +49,7 @@ implicit val streamContext: StreamContext = StreamContext(camelContext)
4949
```
5050
A `StreamContext` internally manages an `executorService` for running blocking endpoint operations. Applications can configure a custom executor service by providing an `executorServiceFactory` during `StreamContext` creation. See [API docs](http://krasserm.github.io/streamz/scala-2.12/unidoc/streamz/camel/StreamContext$.html) for details.
5151

52-
After usage, a `StreamContext` should be stopped with `streamContext.stop()`.
52+
After usage, a `StreamContext` should be stopped with `streamContext.stop()`.
5353

5454
#### Receiving in-only message exchanges from an endpoint
5555

@@ -58,8 +58,8 @@ An FS2 stream that emits messages consumed from a Camel endpoint can be created
5858
```scala
5959
import cats.effect.IO
6060
import fs2.Stream
61-
import streamz.camel.StreamContext
62-
import streamz.camel.StreamMessage
61+
import streamz.camel.StreamContext
62+
import streamz.camel.StreamMessage
6363
import streamz.camel.fs2.dsl._
6464

6565
val s1: Stream[IO, StreamMessage[String]] = receive[IO, String]("seda:q1")
@@ -73,7 +73,7 @@ val s1b: Stream[IO, String] = receiveBody[IO, String]("seda:q1")
7373

7474
This is equivalent to `receive[IO, String]("seda:q1").map(_.body)`.
7575

76-
`receive` and `receiveBody` can only be used with endpoints that create [in-only message exchanges](http://camel.apache.org/exchange-pattern.html).
76+
`receive` and `receiveBody` can only be used with endpoints that create [in-only message exchanges](http://camel.apache.org/exchange-pattern.html).
7777

7878
#### Receiving in-out message exchanges from an endpoint
7979

@@ -87,9 +87,7 @@ For sending a `StreamMessage` to a Camel endpoint, the `send` combinator should
8787
val s2: Stream[IO, StreamMessage[String]] = s1.send("seda:q2")
8888
```
8989

90-
This initiates an in-only message [exchange](http://camel.apache.org/exchange.html) with an endpoint and continues the stream with the sent `StreamMessage`.
91-
92-
The `send` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any `Stream[F, A]` where `F: ContextShift: Async`.
90+
This initiates an in-only message [exchange](http://camel.apache.org/exchange.html) with an endpoint and continues the stream with the sent `StreamMessage`.
9391

9492
```scala
9593
val s2b: Stream[IO, String] = s1b.send("seda:q2")
@@ -105,9 +103,7 @@ For sending a request `StreamMessage` to an endpoint and obtaining a reply, the
105103
val s3: Stream[IO, StreamMessage[Int]] = s2.sendRequest[Int]("bean:service?method=weight")
106104
```
107105

108-
This initiates an in-out message exchange with the endpoint and continues the stream with the output `StreamMessage`. Here, a [Bean endpoint](https://camel.apache.org/bean.html) is used to call the `weight(String): Int` method on an object that is registered in the `CamelContext` under the name `service`. The input message body is used as `weight` call argument, the output message body is assigned the return value. The `sendRequest` type parameter (`Int`) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (`Double`, for example).
109-
110-
The `sendRequest` combinator is not only available for streams of type `Stream[IO, StreamMessage[A]]` but more generally for any `Stream[F, A]` where `F: ContextShift: Async`.
106+
This initiates an in-out message exchange with the endpoint and continues the stream with the output `StreamMessage`. Here, a [Bean endpoint](https://camel.apache.org/bean.html) is used to call the `weight(String): Int` method on an object that is registered in the `CamelContext` under the name `service`. The input message body is used as `weight` call argument, the output message body is assigned the return value. The `sendRequest` type parameter (`Int`) specifies the expected output value type. The output message body can also be converted to another type provided that an appropriate Camel type converter is available (`Double`, for example).
111107

112108
```scala
113109
val s3b: Stream[IO, Int] = s2b.sendRequest[Int]("bean:service?method=weight")

streamz-camel-fs2/src/main/scala/streamz/camel/fs2/dsl/package.scala

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ package streamz.camel.fs2
1818

1919
import java.util.concurrent.TimeUnit
2020

21-
import cats.effect.{ Async, ContextShift }
22-
import cats.implicits._
21+
import cats.effect.Async
2322
import fs2._
2423
import org.apache.camel.spi.Synchronization
2524
import org.apache.camel.{ Exchange, ExchangePattern }
@@ -33,7 +32,7 @@ package object dsl {
3332
/**
3433
* Camel endpoint combinators for [[StreamMessage]] streams of type `Stream[F, StreamMessage[A]]`.
3534
*/
36-
implicit class SendDsl[F[_]: ContextShift: Async, A](self: Stream[F, StreamMessage[A]]) {
35+
implicit class SendDsl[F[_]: Async, A](self: Stream[F, StreamMessage[A]]) {
3736
/**
3837
* @see [[dsl.send]]
3938
*/
@@ -50,7 +49,7 @@ package object dsl {
5049
/**
5150
* Camel endpoint combinators for [[StreamMessage]] body streams of type `Stream[F, A]`.
5251
*/
53-
implicit class SendBodyDsl[F[_]: ContextShift: Async, A](self: Stream[F, A]) {
52+
implicit class SendBodyDsl[F[_]: Async, A](self: Stream[F, A]) {
5453
/**
5554
* @see [[dsl.sendBody]]
5655
*/
@@ -71,13 +70,13 @@ package object dsl {
7170
/**
7271
* @see [[dsl.send]]
7372
*/
74-
def send[F[_]](uri: String)(implicit context: StreamContext, contextShift: ContextShift[F], async: Async[F]): Stream[F, StreamMessage[A]] =
73+
def send[F[_]](uri: String)(implicit context: StreamContext, async: Async[F]): Stream[F, StreamMessage[A]] =
7574
new SendDsl[F, A](self.covary[F]).send(uri)
7675

7776
/**
7877
* @see [[dsl.sendRequest()]]
7978
*/
80-
def sendRequest[F[_], B](uri: String)(implicit context: StreamContext, tag: ClassTag[B], contextShift: ContextShift[F], async: Async[F]): Stream[F, StreamMessage[B]] =
79+
def sendRequest[F[_], B](uri: String)(implicit context: StreamContext, tag: ClassTag[B], async: Async[F]): Stream[F, StreamMessage[B]] =
8180
new SendDsl[F, A](self.covary[F]).sendRequest(uri)
8281
}
8382

@@ -88,13 +87,13 @@ package object dsl {
8887
/**
8988
* @see [[dsl.sendBody]]
9089
*/
91-
def send[F[_]: ContextShift: Async](uri: String)(implicit context: StreamContext): Stream[F, A] =
90+
def send[F[_]: Async](uri: String)(implicit context: StreamContext): Stream[F, A] =
9291
new SendBodyDsl[F, A](self.covary[F]).send(uri)
9392

9493
/**
9594
* @see [[dsl.sendRequestBody]]
9695
*/
97-
def sendRequest[F[_]: ContextShift: Async, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[F, B] =
96+
def sendRequest[F[_]: Async, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Stream[F, B] =
9897
new SendBodyDsl[F, A](self.covary[F]).sendRequest(uri)
9998
}
10099

@@ -110,7 +109,7 @@ package object dsl {
110109
* @param uri Camel endpoint URI.
111110
* @throws org.apache.camel.TypeConversionException if type conversion fails.
112111
*/
113-
def receive[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, StreamMessage[A]] = {
112+
def receive[F[_]: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, StreamMessage[A]] = {
114113
consume(uri).filter(_ != null)
115114
}
116115

@@ -126,7 +125,7 @@ package object dsl {
126125
* @param uri Camel endpoint URI.
127126
* @throws org.apache.camel.TypeConversionException if type conversion fails.
128127
*/
129-
def receiveBody[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, A] =
128+
def receiveBody[F[_]: Async, A](uri: String)(implicit context: StreamContext, tag: ClassTag[A]): Stream[F, A] =
130129
receive(uri).map(_.body)
131130

132131
/**
@@ -136,7 +135,7 @@ package object dsl {
136135
*
137136
* @param uri Camel endpoint URI.
138137
*/
139-
def send[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, StreamMessage[A], StreamMessage[A]] =
138+
def send[F[_]: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, StreamMessage[A], StreamMessage[A]] =
140139
produce[F, A, A](uri, ExchangePattern.InOnly, (message, _) => message)
141140

142141
/**
@@ -146,7 +145,7 @@ package object dsl {
146145
*
147146
* @param uri Camel endpoint URI.
148147
*/
149-
def sendBody[F[_]: ContextShift: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, A, A] =
148+
def sendBody[F[_]: Async, A](uri: String)(implicit context: StreamContext): Pipe[F, A, A] =
150149
s => s.map(StreamMessage(_)).through(send(uri)).map(_.body)
151150

152151
/**
@@ -158,7 +157,7 @@ package object dsl {
158157
* @param uri Camel endpoint URI.
159158
* @throws org.apache.camel.TypeConversionException if type conversion fails.
160159
*/
161-
def sendRequest[F[_]: ContextShift: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, StreamMessage[A], StreamMessage[B]] =
160+
def sendRequest[F[_]: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, StreamMessage[A], StreamMessage[B]] =
162161
produce[F, A, B](uri, ExchangePattern.InOut, (_, exchange) => StreamMessage.from[B](exchange.getOut))
163162

164163
/**
@@ -170,13 +169,13 @@ package object dsl {
170169
* @param uri Camel endpoint URI.
171170
* @throws org.apache.camel.TypeConversionException if type conversion fails.
172171
*/
173-
def sendRequestBody[F[_]: ContextShift: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, A, B] =
172+
def sendRequestBody[F[_]: Async, A, B](uri: String)(implicit context: StreamContext, tag: ClassTag[B]): Pipe[F, A, B] =
174173
s => s.map(StreamMessage(_)).through(sendRequest[F, A, B](uri)).map(_.body)
175174

176-
private def consume[F[_], A](uri: String)(implicit context: StreamContext, tag: ClassTag[A], contextShift: ContextShift[F], F: Async[F]): Stream[F, StreamMessage[A]] = {
175+
private def consume[F[_], A](uri: String)(implicit context: StreamContext, tag: ClassTag[A], F: Async[F]): Stream[F, StreamMessage[A]] = {
177176
val timeout = context.config.getDuration("streamz.camel.consumer.receive.timeout", TimeUnit.MILLISECONDS)
178177
Stream.repeatEval {
179-
contextShift.shift >> F.async[StreamMessage[A]] { callback =>
178+
F.async_[StreamMessage[A]] { callback =>
180179
Try(context.consumerTemplate.receive(uri, timeout)) match {
181180
case Success(null) =>
182181
callback(Right(null))
@@ -199,10 +198,10 @@ package object dsl {
199198
}
200199
}
201200

202-
private def produce[F[_], A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext, contextShift: ContextShift[F], F: Async[F]): Pipe[F, StreamMessage[A], StreamMessage[B]] = { s =>
201+
private def produce[F[_], A, B](uri: String, pattern: ExchangePattern, result: (StreamMessage[A], Exchange) => StreamMessage[B])(implicit context: StreamContext, F: Async[F]): Pipe[F, StreamMessage[A], StreamMessage[B]] = { s =>
203202
s.flatMap { message =>
204203
Stream.eval {
205-
contextShift.shift >> F.async[StreamMessage[B]] { callback =>
204+
F.async_[StreamMessage[B]] { callback =>
206205
context.producerTemplate.asyncCallback(uri, context.createExchange(message, pattern), new Synchronization {
207206
override def onFailure(exchange: Exchange): Unit =
208207
callback(Left(exchange.getException))

streamz-camel-fs2/src/test/scala/streamz/camel/fs2/dsl/DslSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@ import org.scalatest.matchers.should.Matchers
3030
import org.scalatest.wordspec.AnyWordSpec
3131

3232
class DslSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll {
33+
import cats.effect.unsafe.implicits.global
34+
3335
val camelRegistry = new SimpleRegistry
3436
val camelContext = new DefaultCamelContext()
3537

3638
camelContext.setRegistry(camelRegistry)
3739
camelRegistry.put("service", new Service)
3840

3941
implicit val streamContext = new StreamContext(camelContext)
40-
implicit val contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global)
4142

4243
import streamContext._
4344

streamz-converter/build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ name := "streamz-converter"
22

33
libraryDependencies ++= Seq(
44
"co.fs2" %% "fs2-core" % Version.Fs2,
5-
"org.typelevel" %% "cats-effect" % Version.CatsEffect,
5+
"org.typelevel" %% "cats-effect-std" % Version.CatsEffect,
6+
"org.typelevel" %% "cats-effect" % Version.CatsEffect % "test",
67
"com.typesafe.akka" %% "akka-stream" % Version.Akka,
78
"com.typesafe.akka" %% "akka-stream-testkit" % Version.Akka % "test",
89
"com.typesafe.akka" %% "akka-testkit" % Version.Akka % "test",

0 commit comments

Comments
 (0)