欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Play 2.6 使用Play WS调用REST API

程序员文章站 2022-05-17 20:05:00
...

使用Play WS调用REST API

https://playframework.com/documentation/2.6.x/JavaWS

又是我们需要去访问别的HTTP服务器的资源。PLAY提供了WS类库来支持异步的HTTP调用。

在使用WS API时有两个部分:发出请求,处理响应。我们首先会介绍如何发出GET和POST请求,然后展示如何通过类库来处理响应,最后介绍一下常见的用法。

Note: 在Play 2.6中WS被分为两个部分,一个是不依赖与Play的独立客户端,一个是封装了Play特殊类的封装类。另外为了最小化类库冲突,Play WS使用了 AsyncHttpClient和Netty的shaded 版本,所以Play的HTTP引擎可以使用Netty的不同版本。可以在https://playframework.com/documentation/2.6.x/WSMigration26中获取更多细节

添加依赖

libraryDependencies ++= Seq(
  ws
)

在Play WS中使用HTTP缓存

Play WS支持HTTP缓存,但是要求一个JSR-107标准的缓存实现,你可以使用ehcache

libraryDependencies += ehcache

也可以使用别的基于JSR-107标准的缓存比如Caffeine

在添加相应的依赖后,在WS Cache Configuration 章节查看详细配置

在与诸如stale-on-error and stale-while-revalidate的弹性特征进行组合是,使用HTTP缓存为后端HREST服务保存重复的请求会很有用。

创建请求

首先需要添加如下的依赖然后使用依赖注入添加一个WSClient

import javax.inject.Inject;

import play.mvc.*;
import play.libs.ws.*;
import java.util.concurrent.CompletionStage;

public class MyClient implements WSBodyReadables, WSBodyWritables {
    private final WSClient ws;

    @Inject
    public MyClient(WSClient ws) {
        this.ws = ws;
    }
    // ...
}

先使用ws.url()指明URL

WSRequest request = ws.url("http://example.com");

在返回的WSRequest中,可以添加各种HTTP选项,比如添加头部信息。可以通过调用链来构建复杂的请求

WSRequest complexRequest = request.addHeader("headerKey", "headerValue")
                                        .setRequestTimeout(Duration.of(1000, ChronoUnit.MILLIS))
                                        .addQueryParameter("paramKey", "paramValue");

最后调用你需要的HTTP方法,这会结束整个链,并使用其中定义的相关信息

CompletionStage<? extends WSResponse> responsePromise = complexRequest.get();

这会返回一个 CompletionStage, WSResponse中包含了服务器返回的数据。

Java8使用CompletionStage来管理异步代码,java WS API依赖于CompletionStage。如果你使用早期使用F.Promise的Play,https://www.playframework.com/documentation/2.5.x/JavaMigration25#Replaced-F.Promise-with-Java-8s-CompletionStage 会为你提供帮助 如果你使用了阻塞操作,包括调用java.utils.ULR.equals()等DNS服务,那你需要一个自定义的执行上下文。并且为了应对失败需要给线程池设定一个安全的大小。 如果访问一个不可靠的网络,考虑使用Futures.timeout和FialSafe这样的断路器。

添加验证信息

ws.url(url).setAuth("user", "password", WSAuthScheme.BASIC).get();

WSAuthScheme的选项有BASIC, DIGEST, KERBEROS, NTLM, SPNEGO

支持重定向

如果HTTP调用得到了302或301,可以自动进行重定向而不用再发起一次请求

ws.url(url).setFollowRedirects(true).get();

添加查询参数

ws.url(url).addQueryParameter("paramKey", "paramValue");

添加额外的头部

ws.url(url).addHeader("headerKey", "headerValue").get();

如果你的文本是一种特殊格式,可以定义内容的类型

ws.url(url).addHeader("Content-Type", "application/json").post(jsonString);
// OR
ws.url(url).setContentType("application/json").post(jsonString);

添加cookie

ws.url(url).addCookies(new WSCookieBuilder().setName("headerKey").setValue("headerValue").build()).get();

添加超时

使用setRequestTimeout来设置毫秒。 Duration.ofMillis(Long.MAX_VALUE)设置无限的timeout

ws.url(url).setRequestTimeout(Duration.of(1000, ChronoUnit.MILLIS)).get();

提交表单

ws.url(url).setContentType("application/x-www-form-urlencoded")
           .post("key1=value1&key2=value2");

提交 multipart/form

最简单方法是使用Source<Http.MultipartFormData.Part<Source<ByteString>, ?>, ?>

import play.mvc.Http.MultipartFormData.*;
ws.url(url).post(Source.single(new DataPart("hello", "world")));

为了将文件作为multipart form的一部分,需要将Http.MultipartFormData.FilePart<Source<ByteString>, ?>转为Source

Source<ByteString, ?> file = FileIO.fromFile(new File("hello.txt"));
FilePart<Source<ByteString, ?>> fp = new FilePart<>("hello", "hello.txt", "text/plain", file);
DataPart dp = new DataPart("key", "value");

ws.url(url).post(Source.from(Arrays.asList(fp, dp)));

提交JSON

import com.fasterxml.jackson.databind.JsonNode;
import play.libs.Json;

JsonNode json = Json.newObject()
                    .put("key1", "value1")
                    .put("key2", "value2");

ws.url(url).post(json);

也可以传递一个自定义的ObjectMapper

ObjectMapper objectMapper = play.libs.Json.newDefaultMapper();
ws.url(url).post(body(json, objectMapper));

提交XML

Document xml = play.libs.XML.fromString("<document></document>");
ws.url(url).post(xml);

提交流数据

可以通过Akka stream来添加流数据

CompletionStage<WSResponse> wsResponse = ws.url(url).setBody(body(largeImage)).execute("PUT");

请求过滤器

一个过滤器需要扩展play.libs.ws.WSRequestFilter特质,然后通过request.withRequestFilter(filter)添加到请求中

public CompletionStage<Result> index() {
    WSRequestFilter filter = executor -> request -> {
        logger.debug("url = " + request.getUrl());
        return executor.apply(request);
    };

    return ws.url(feedUrl)
            .setRequestFilter(filter)
            .get()
            .thenApply((WSResponse r) -> {
                String title = r.getBody(json()).findPath("title").asText();
                return ok("Feed title: " + title);
            });
}

处理响应

通过CompletionStage的thenApply和thenCompose方法来处理响应

作为JSON来处理

通过r.getBody(json())将结果转换为JsonNode,(json()为play.libs.ws.WSBodyReadables.json()的默认方法)

// implements WSBodyReadables or use WSBodyReadables.instance.json()
CompletionStage<JsonNode> jsonPromise = ws.url(url).get()
        .thenApply(r -> r.getBody(json()));

作为XML处理

// implements WSBodyReadables or use WSBodyReadables.instance.xml()
CompletionStage<Document> documentPromise = ws.url(url).get()
        .thenApply(r -> r.getBody(xml()));

处理大容量数据

调用get(), post()或execute()方法会降返回体读入到内存中,在处理大容量数据时可能会会导致内存错误。

可以使用Akka Stream提供的Sink来处理。WSRequest 的stream()方法返回一个 CompletionStage,WSResponse也含有一个返回Source

import javax.inject.Inject;

import akka.stream.Materializer;
import akka.stream.javadsl.*;
import akka.util.ByteString;

import play.mvc.*;
import play.libs.ws.*;

import scala.compat.java8.FutureConverters;

public class MyController extends Controller {

    @Inject WSClient ws;
    @Inject Materializer materializer;

    // ...
}

下面是一个计算返回byte数的例子

// Make the request
CompletionStage<WSResponse> futureResponse =
    ws.url(url).setMethod("GET").stream();

CompletionStage<Long> bytesReturned = futureResponse.thenCompose(res -> {
    Source<ByteString, ?> responseBody = res.getBodyAsSource();

    // Count the number of bytes returned
    Sink<ByteString, CompletionStage<Long>> bytesSum =
        Sink.fold(0L, (total, bytes) -> total + bytes.length());

    return responseBody.runWith(bytesSum, materializer);
});

也可以将将返回体流化到另一个目的地,比如一个文件

File file = File.createTempFile("stream-to-file-", ".txt");
OutputStream outputStream = java.nio.file.Files.newOutputStream(file.toPath());

// Make the request
CompletionStage<WSResponse> futureResponse =
    ws.url(url).setMethod("GET").stream();

CompletionStage<File> downloadedFile = futureResponse.thenCompose(res -> {
    Source<ByteString, ?> responseBody = res.getBodyAsSource();

    // The sink that writes to the output stream
    Sink<ByteString, CompletionStage<akka.Done>> outputWriter =
        Sink.<ByteString>foreach(bytes -> outputStream.write(bytes.toArray()));

    // materialize and run the stream
    CompletionStage<File> result = responseBody.runWith(outputWriter, materializer)
        .whenComplete((value, error) -> {
            // Close the output stream whether there was an error or not
            try { outputStream.close(); }
            catch(IOException e) {}
        })
        .thenApply(v -> file);
    return result;
});

另外一种是将结果返回到Action中

// Make the request
CompletionStage<WSResponse> futureResponse = ws.url(url).setMethod("GET").stream();

CompletionStage<Result> result = futureResponse.thenApply(response -> {
    Source<ByteString, ?> body = response.getBodyAsSource();
    // Check that the response was successful
    if (response.getStatus() == 200) {
        // Get the content type
        String contentType =
                Optional.ofNullable(response.getHeaders().get("Content-Type"))
                        .map(contentTypes -> contentTypes.get(0))
                        .orElse("application/octet-stream");

        // If there's a content length, send that, otherwise return the body chunked
        Optional<String> contentLength = Optional.ofNullable(response.getHeaders()
                .get("Content-Length"))
                .map(contentLengths -> contentLengths.get(0));
        if (contentLength.isPresent()) {
            return ok().sendEntity(new HttpEntity.Streamed(
                    body,
                    Optional.of(Long.parseLong(contentLength.get())),
                    Optional.of(contentType)
            ));
        } else {
            return ok().chunked(body).as(contentType);
        }
    } else {
        return new Result(Status.BAD_GATEWAY);
    }
});

你也许注意到了,在调用stream()方法前要天通过setMethod(String)来设置HTTP方法,下面是一个使用PUT的例子

CompletionStage<WSResponse> futureResponse  =
    ws.url(url).setMethod("PUT").setBody(body("some body")).stream();

所以合法的HTTP动作都是可以的

常见用法

WS调用链

通过 thenCompose实现调用链

final CompletionStage<WSResponse> responseThreePromise = ws.url(urlOne).get()
        .thenCompose(responseOne -> ws.url(responseOne.getBody()).get())
        .thenCompose(responseTwo -> ws.url(responseTwo.getBody()).get());

异常恢复

如果需要恢复调用中的异常,可以使用handle或exceptionally来取代response

CompletionStage<WSResponse> responsePromise = ws.url("http://example.com").get();
responsePromise.handle((result, error) -> {
    if (error != null) {
        return ws.url("http://backup.example.com").get();
    } else {
        return CompletableFuture.completedFuture(result);
    }
});

在controller中使用

通过在异步处理结果中定义的异步action模型,Play可以直接将CompletionStage映射为CompletionStage

public CompletionStage<Result> index() {
    return ws.url(feedUrl).get().thenApply(response ->
        ok("Feed title: " + response.asJson().findPath("title").asText())
    );
}

在WSClien中使用超时Futuret

如果一个调用链没能及时结束,将结果封装进一个timeout block中会很有用,在链超时时会返回一个失败的Future,这比使用withRequestTimeout更加通用,withRequestTimeout只适用于单个请求。

最好的方式是使用Play的非阻塞超时feature,通过Futures.timeoutCustomExecutionContext 来保证相应的解决方案

public CompletionStage<Result> index() {
    CompletionStage<Result> f = futures.timeout(ws.url("http://playframework.com").get().thenApplyAsync(result -> {
        try {
            Thread.sleep(10000L);
            return Results.ok();
        } catch (InterruptedException e) {
            return Results.status(SERVICE_UNAVAILABLE);
        }
    }, customExecutionContext), 1L, TimeUnit.SECONDS);

    return f.handleAsync((result, e) -> {
        if (e != null) {
            if (e instanceof CompletionException) {
                Throwable completionException = e.getCause();
                if (completionException instanceof TimeoutException) {
                    return Results.status(SERVICE_UNAVAILABLE, "Service has timed out");
                } else {
                    return internalServerError(e.getMessage());
                }
            } else {
                logger.error("Unknown exception " + e.getMessage(), e);
                return internalServerError(e.getMessage());
            }
        } else {
            return result;
        }
    });
}

直接创建WSClient

我们推荐使用DI的方式来获取WSClient实例。通过注入获取的实例会在应用启动的时候自动创建并在服务停止时自动清理。

你也可以在代码中创建实例来发送请求或者进行AsyncHttpClient配置

Note: 如果你手动创建WSClient那么一定要调用client.close()方法进去清理。每个clinet会创建自己定线程池,如果你没有进行管理或者创建了过多的客户端,你会耗尽线程或文件句柄——最终会得到类似“Unable to create new native thread” 或 “too many open files” 的错误。

下面是一个例子

import akka.stream.Materializer;
import akka.stream.javadsl.*;
import akka.util.ByteString;
import play.mvc.Results;

// Set up the client config (you can also use a parser here):
// play.api.Configuration configuration = ... // injection
// play.Environment environment = ... // injection

WSClient customWSClient = play.libs.ws.ahc.AhcWSClient.create(
        play.libs.ws.ahc.AhcWSClientConfigFactory.forConfig(
                configuration.underlying(),
                environment.classLoader()),
                null, // no HTTP caching
                materializer);

也可以在功能测试中使用play.test.WSTestClient.newClient来创建一个实例,在这里获取更多信息。

或者你可以不依赖Play应用和配置独立运行WSClient

import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.ActorMaterializerSettings;
import org.junit.Test;
import play.shaded.ahc.org.asynchttpclient.*;
import play.libs.ws.*;
import play.libs.ws.ahc.*;

// Set up Akka
String name = "wsclient";
ActorSystem system = ActorSystem.create(name);
ActorMaterializerSettings settings = ActorMaterializerSettings.create(system);
ActorMaterializer materializer = ActorMaterializer.create(settings, system, name);

// Set up AsyncHttpClient directly from config
AsyncHttpClientConfig asyncHttpClientConfig = new DefaultAsyncHttpClientConfig.Builder()
        .setMaxRequestRetry(0)
        .setShutdownQuietPeriod(0)
        .setShutdownTimeout(0).build();
AsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient(asyncHttpClientConfig);

// Set up WSClient instance directly from asynchttpclient.
WSClient client = new AhcWSClient(
    asyncHttpClient,
    materializer
);

// Call out to a remote system and then and close the client and akka.
client.url("http://www.google.com").get().whenComplete((r, e) -> {
    Optional.ofNullable(r).ifPresent(response -> {
        String statusText = response.getStatusText();
        System.out.println("Got a response " + statusText);
    });
}).thenRun(() -> {
    try {
        client.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}).thenRun(system::terminate);

如果你想独立的运行WSClient,但是依然使用Play的配置(包括SSL),你可以使用一个配置解析器

// Set up Akka
String name = "wsclient";
ActorSystem system = ActorSystem.create(name);
ActorMaterializerSettings settings = ActorMaterializerSettings.create(system);
ActorMaterializer materializer = ActorMaterializer.create(settings, system, name);

// Read in config file from application.conf
Config conf = ConfigFactory.load();
WSConfigParser parser = new WSConfigParser(conf, ClassLoader.getSystemClassLoader());
AhcWSClientConfig clientConf = AhcWSClientConfigFactory.forClientConfig(parser.parse());

// Start up asynchttpclient
final DefaultAsyncHttpClientConfig asyncHttpClientConfig = new AhcConfigBuilder(clientConf).configure().build();
final DefaultAsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient(asyncHttpClientConfig);

// Create a new WS client, and then close the client.
WSClient client = new AhcWSClient(asyncHttpClient, materializer);
client.close();
system.terminate();

最后不要忘记关闭客户端

try {
    customWSClient.close();
} catch (IOException e) {
    logger.error(e.getMessage(), e);
}

只有在所有的请求都完成之后才能关闭客户端。不能使用try-with-resources来自动关闭客户端实例,因为WSClient逻辑是异步的,而try-with-resources只支持同步代码。

自定义的BodyReadables和BodyWritables

Play WS对body提供了丰富的支持。play.libs.ws.WSBodyWritables提供方法将WSRequest中的JsonNode或XML这样的输入转化为ByteString or Source,相应的play.libs.ws.WSBodyReadables从WSResponse中读取 ByteString or Source[ByteString, _]然后返回一个合适的类型(JsValue或者XML)。WSRequest和WSResponse中提供了默认的实现供你使用,但是你也可以根据需要通过response.getBody(myReadable())request.post(myWritable(data))使用自定义的类型。在使用自定义的类库时会非常使用i.e.你也许想通过STaX API将XML流化。

创建一个Readable

public interface URLBodyReadables {
    default BodyReadable<java.net.URL> url() {
        return response -> {
            try {
                String s = response.getBody();
                return java.net.URI.create(s).toURL();
            } catch (MalformedURLException e) {
                throw new RuntimeException(e);
            }
        };
    }
}

创建一个BodyWritable

使用InMemoryBodyWritable来创建一个body writable,如果是流的形式,使用ourceBodyWritable

public interface URLBodyWritables {
    default InMemoryBodyWritable body(java.net.URL url) {
        try {
            String s = url.toURI().toString();
            ByteString byteString = ByteString.fromString(s);
            return new InMemoryBodyWritable(byteString, "text/plain");
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }
}

独立WS

如果逆向在Play之外调用WS,可以使用Play WS的独立版本,这不依赖于任何Play类库。

libraryDependencies += "com.typesafe.play" %% "play-ahc-ws-standalone" % playWSStandalone

可以参考https://github.com/playframework/play-wshttps://playframework.com/documentation/2.6.x/WSMigration26获取更多信息。

获取AsyncHttpClient

可以从WSClient中获取隐含的AsyncHttpClient

play.shaded.ahc.org.asynchttpclient.AsyncHttpClient underlyingClient =
    (play.shaded.ahc.org.asynchttpclient.AsyncHttpClient) ws.getUnderlying();

配置WS

application.conf中有以下参数可以配置
- play.ws.followRedirects: 配置客户端跟随301或302的重定向 (默认是 true).
- play.ws.useProxyProperties: 使用系统http代理(http.proxyHost, http.proxyPort) (default is true).
- play.ws.useragent: 配置User-Agent header
- play.ws.compressionEnabled: 设置为true使用gzip/deflater编码 (default is false).

超时

WS中有三种超时
- play.ws.timeout.connection: 建立连接时的最大等待时间 (default is 120 seconds).
- play.ws.timeout.idle: request保持空闲的最大时间 (连接已经建立但是等待更多数据) (default is 120 seconds).
- play.ws.timeout.request: The request能够使用的最大时间(在远程主机仍在发送数据时也会中断) (default is 120 seconds).
对于一个特殊的连接可以使用setTimeout()覆盖超时。

SSL

详见
https://playframework.com/documentation/2.6.x/WsSSL

缓存

HTTP缓存详见
https://playframework.com/documentation/2.6.x/WsCache

AsyncClientConfig

下面列出了AsyncHttpClientConfig的高级配置,详见 AsyncHttpClientConfig Documentation
- play.ws.ahc.keepAlive
- play.ws.ahc.maxConnectionsPerHost
- play.ws.ahc.maxConnectionsTotal
- play.ws.ahc.maxConnectionLifetime
- play.ws.ahc.idleConnectionInPoolTimeout
- play.ws.ahc.maxNumberOfRedirects
- play.ws.ahc.maxRequestRetry
- play.ws.ahc.disableUrlEncoding