Designing tapir’s WebSockets support

Adam Warski
SoftwareMill Tech Blog
8 min readOct 26, 2020

--

Tapir is a library for describing HTTP endpoints as immutable values, using a type-safe, composable API. Until recently, tapir’s main functionalities included the ability to describe “regular” REST-like endpoints, and interpret these descriptions as a server, client or OpenAPI documentation.

One of the most-voted feature request was web socket support, and here it is, in version 0.17! It is now possible to describe a web socket endpoint, expose it as an akka-http or http4s server, interpret as an sttp client, or generate AsyncAPI documentation!

Let’s look at the design of the feature in detail.

One of the most-voted feature is here in version 0.17!

Describing a web socket

In tapir, we already have a couple of ways of describing the body, e.g. stringBody, jsonBody[User] or streamBody[AkkaStreams]. Hence, it seems natural to add one more possibility: webSocketBody. However, unlike the descriptions mentioned before, this one can only be used as an output of an endpoint: we cannot send a websocket to a server, but a server can respond with an established websocket when an appropriate request is received.

A very rough sketch of describing a web socket endpoint would be (with many details omitted for now):

val webSocketBody: EndpointOutput[WS] = ...// I - type of inputs
// E - type of error outputs
// WS - web socket output type
val webSocketEndpoint: Endpoint[I, E, WS] =
endpoint.in(...).out(webSocketBody)

This way, we have a new possible output for an endpoint. What should be the type, that is assigned to a web socket output then? Let’s look at what happens when we try to interpret such an endpoint:

  • when an endpoint is interpreted as a server, it must be combined with a server logic function, which takes the endpoint’s input parameters (what is extracted from the request), and returns output parameters (which are translated to a response): I => Either[E, WS]. In case of web sockets, the server logic will have to return a value encapsulating the logic of the web socket, that is what actions to take when a web socket frame is received, and what frames to send back
  • when an endpoint is interpreted as a client, we get a function I => Either[E, WS], which, when applied to input parameters (which are translated to a request), sends the request. It then receives the response and extracts the appropriate values to output parameters. Hence for web sockets, as part of the output, we need to return a value which somehow captures the server-side behaviour, from the perspective of the client.

Note that both of these cases have to be covered by a single type, which will be the type of the web socket output! Luckily, we’ve got well-developed streaming abstractions, which will be very useful here.

Streams

The basic type of a websocket is a Pipe[WebSocketFrame, WebSocketFrame]. a Pipe[A, B] is an effectful streaming transformation of values of type A into values of type B. Note that this is much more powerful than a function A => B. Upon receiving an A, arbitrary effects might happen, as well an arbitrary number of Bs can be produces (including none). Moreover, Bs can be emitted also without any As received!

To make things more concrete, in akka streams, the pipe type is a Flow[A, B, Any]. In fs2, there is a Pipe[F, A, B] type alias, which stands for a function Stream[F, A] => Stream[F, B].

A WebSocketFrame is a data type with a couple of implementations:

  • Text and Binary for data frames
  • Ping, Pong and Close for control frames

Hence paraphrasing akka-streams Flow scaladocs, our representation of a web socket will be a series of stream processing steps, which accepts WebSocketFrames as input, and produces WebSocketFrames as output.

Let’s look again at the possible server/client interpretations:

  • when interpreting as a server, the provided server logic should return a Pipe, which describes how the server behaves
  • when interpreting as a client, after the input parameters are supplied, we obtain a Pipe encapsulating the server behaviour, that is, the Pipe contains the effects of sending frames to the server and receiving them from the server

Which means that so far we’ve got the following type for our output:

def webSocketBody: EndpointOutput[
Pipe[WebSocketFrame, WebSocketFrame]] = ...

More structure

Operating on raw web socket frames isn’t probably what you are looking forward to. It would be useful if we could add more structure and deal with high-level types. Combining fragmented frames, handling ping-pongs and close frames should be handled by the library, unless we have some special low-level needs.

Luckily, we’re already able to capture the structure of a type using a Codec[L, H, CF], where L is the low-level representation (e.g. String), H is the type that is being described (e.g. User), and CF is the format, to which the type is encoded/decoded (e.g. CodecFormat.Json). The codecs are used when describing how to handle bodies, query parameters, headers etc. The contain both encoding/decoding functions, as well as the schema and other type meta-data. Let’s use the same for web sockets!

The first thing to note is that we’ll in fact need two types and two codecs: one for messages that are sent to the server, and one for messages that are received. These might be distinct!

We still want to be able to describe “raw” web socket bodies, where we want to interact with WebSocketFrames directly, hence we’ll end up with two alternate web socket body descriptions:

def webSocketBody[
REQ, REQ_CF <: CodecFormat,
RESP, RESP_CF <: CodecFormat]: EndpointOutput[Pipe[REQ, RESP]]

def webSocketBodyRaw: EndpointOutput[
Pipe[WebSocketFrame, WebSocketFrame]]

Two variants, one implementation

While we have two alternate ways of describing a web socket body, in fact there’s a single implementation of the web socket body output. The single implementation contains codecs between the high-level request/response types and the low-level type, which in this case is a WebSocketFrame. These codecs might be identity ones (for the raw variant), or they might contain some logic.

There’s also configuration specifying how to handle control frames. In summary, a WebSocketBodyOutput contains:

  • a Codec[WebSocketFrame, REQ, REQ_CF] and Codec[WebSocketFrame, RESP, RESP_CF] for mapping request/responses frames into higher-level types
  • a Codec[Pipe[REQ, RESP], T, CF] for further mapping the complete pipe into another type
  • endpoint meta-data (e.g. description)
  • concatenateFragmentedFrames flag: whether the codecs can handle fragmented frames, or should they be concatenated beforehand
  • ignorePong, autoPongOnPing, decodeCloseRequests, decodeCloseResponses, autoPing configure control frame handling

Text/JSON example

Let’s look at an example! Here’s a description of a web socket endpoint, where:

  • the incoming data is expected to be text; any fragmented frames are concatenated
  • outgoing data should correspond to the JSON representation of the Response class
  • Ping-Pong frames are automatically handled
  • the pipe is closed when a Close frame is received
  • the streams implementation used is akka-streams, as witnessed by the AkkaStreams parameter. Hence the pipe implementation is a Flow.
case class Response(seqNo: Int, data: String, valid: Boolean)val wsEndpoint: Endpoint[
Unit,
Unit,
Flow[String, Response, Any],
AkkaStreams with WebSockets] = endpoint
.get
.in("ping")
.out(webSocketBody[
String, CodecFormat.TextPlain,
Response, CodecFormat.Json](AkkaStreams))

With this description, if a client or server attempts to send a binary frame, or a response sent by the server isn’t in the proper json format, a decoding error will be reported and the web socket will close.

Two more technical details

There are two more technical details that should be noted.

First, you might have noticed the additional parameter passed to webSocketBody, in our example, AkkaStreams. This specifies the streaming implementation that we’ll be using, and defines the concrete type of Pipe. The object is very simple, it only contains two type aliases: the types of binary streams and pipes. Binary streams are used in sttp client, as well as when describing streaming bodies in tapir.

The object comes in a dependency that is independent of any of the client or server interpreters, so the endpoint descriptions and server logic implementations can be still kept in separate subprojects.

Second, the 4th type parameter of endpoint defines the required capabilities, which any interpreter that will be used must meet. The capabilities describe any non-standard requirements on the interpreter, and currently three are supported: Streams[S], WebSockets and Effect[F].

Here we’ve got an endpoint which requires that the interpreter supports akka streams and web sockets. Such an interpreter is e.g. the akka http one, which is included as a tapir module. Another choice would be Fs2Streams[F] with WebSockets, which could be used when using http4s with F effects.

That way, if we’ve got an interpreter which doesn’t support websockets or the given non-blocking streams, the compiler won’t allow us to interpret an endpoint with such requirements.

As a server

Let’s expose our endpoint as a server using akka http. The server logic here will be trivial, a simple echo-style web socket, but returning responses in JSON:

val wsRoute: Route = wsEndpoint.toRoute(_ => 
Future.successful(Right(Flow.fromFunction(
(in: String) => Response(0, in, true)))))

Here, we’ve interpreted the endpoint description as an akka http route. We can now combine this with any other routes that we have in our application, and expose it using:

Http()
.newServerAt("localhost", 8080)
.bindFlow(wsRoute)

The tapir source code contains two runnable examples:

  • one using akka-http, which implements logic as described above
  • another using http4s, which implements a slightly more complex web socket, which sends a message every second with the number of received text frames

As documentation

Because we are dealing with descriptions, we can interpret the endpoint as documentation! OpenAPI doesn’t support web sockets, but instead we’ve got the AsyncAPI specification, which allows documenting event-driven systems. It supports protocols such as AMQP, Kafka, JMS and WebSockets (among others).

The format is very similar to OpenAPI, and there’s a lot of common ground between the two specifications. Not everything is the same of course, as the nature of the communication which both of these formats describe is quite different.

Tapir has a module which allows interpreting web socket endpoints as AsyncAPI documentation:

val apiDocs: String = wsEndpoint.toAsyncAPI(
"JSON echo",
"1.0",
List("dev" -> Server("localhost:8080", "ws"))).toYaml

This will produce the following yaml:

We can then take the yaml and use it in AsyncAPI’s playground, getting a nice rendering of the information contained there:

Summing up

Web sockets complement existing tapir’s functionality, providing a way to describe the structure of the expected web socket communication in a type-safe way. At the same time, we’re keeping the possibility to work with raw web socket frames.

Please test the web socket implementation, and let us know what works, what doesn’t, and what use cases aren’t covered well! More details on how to use the functionality can be found in the docs, and we’re always happy to answer your questions via github issues or gitter.

--

--

Software engineer, Functional Programming and Scala enthusiast, SoftwareMill co-founder