介绍

GraphQL订阅模式支持服务端主动向客户端推送数据通知,避免客户端轮训

DGS Subscriptions

新增依赖

此处使用的是SpringBoot Web组件,故引入websockets依赖

如果使用的是WebFlux则要引入对应的dgs-webflux

		  
            com.netflix.graphql.dgs
            graphql-dgs-subscriptions-websockets-autoconfigure
        

异步通知解析

此处定义为当新增一个Actor就给所有订阅者发异步通知

@Slf4j
@DgsComponent
@RequiredArgsConstructor
public class ActorDataFetcher {

	private final ActorAssembler actorAssembler;

	private FluxSink actorStream;

	private ConnectableFlux actorPublisher;

	@PostConstruct
	private void createActorPublisher() {
		Flux publisher = Flux.create(emitter -> {
			actorStream = emitter;
		});

		actorPublisher = publisher.publish();
		actorPublisher.connect();
	}

	@DgsMutation
	public Actor addActor(@InputArgument SubmitActor actor) {
		Actor actorEntity = actorAssembler.convert(actor);
		actorEntity.setActorId(10);
		actorEntity.setLastUpdate(DateUtil.format(new Date(), DatePattern.NORM_DATETIME_PATTERN));
		actorStream.next(actorEntity);
		log.info("服务端创建演员:{}", actor);
		return actorEntity;
	}

	/**
	 * 异步通知
	 *
	 * @return
	 */
	@DgsSubscription
	public Publisher actorAdded() {
		return actorPublisher;
	}
}

单元测试

由于异步通知无法用浏览器自带的控制台测试,只能通过GraphQL客户端订阅后接收异步通知,此处参考官方最佳实践,使用单元测试模拟

@Slf4j
@SpringBootTest
class ActorDataFetcherTest {

	@Autowired
	private DgsQueryExecutor dgsQueryExecutor;

	@Test
	void addActor() {

		GraphQLQueryRequest graphQLQueryRequest = new GraphQLQueryRequest(
				AddActorGraphQLQuery.newRequest()
						.actor(
								SubmitActor.newBuilder()
										.firstName("fu")
										.lastName("jiayang")
										.build())
						.build(),
				new AddActorProjectionRoot()
						.actorId()
						.firstName()
						.lastUpdate());
		dgsQueryExecutor.execute(graphQLQueryRequest.serialize());
	}

	@Test
	void actorAdded() {
		ExecutionResult executionResult = dgsQueryExecutor.execute("""
				subscription {
				  actorAdded {
				    actorId
				    firstName
				    lastName
				    lastUpdate
				  }
				}
				""");
		Publisher reviewPublisher = executionResult.getData();
		List actors = new CopyOnWriteArrayList<>();

		reviewPublisher.subscribe(new Subscriber<>() {
			@Override
			public void onSubscribe(Subscription s) {
				s.request(2);
			}

			@Override
			public void onNext(ExecutionResult executionResult) {
				if (executionResult.getErrors().size() > 0) {
					executionResult.getErrors().forEach(error -> log.error(error.toString()));
				}
				Map actorResult = executionResult.getData();
				Actor actor = new ObjectMapper().convertValue(actorResult.get("actorAdded"), Actor.class);
             log.info("客户端监听到演员新增:{}",actor);
				actors.add(actor);
			}

			@Override
			public void onError(Throwable t) {
				t.printStackTrace();
			}

			@Override
			public void onComplete() {
			}
		});

		addActor();
		addActor();

		assertThat(actors.size()).isEqualTo(2);
	}
}

从日志输出可以看到客户端成功接收异步通知

image-20221028214837298

总结

订阅模式属于GraphQL特色功能,类似消息队列实现方式,减少客户端对普通查询接口轮训性能开销

最后修改:2022 年 10 月 28 日
如果觉得我的文章对你有用,请随意赞赏