From 26ad443faf32ec205cc55746c953e49070459011 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 11 Jan 2021 15:58:18 +0100 Subject: [PATCH 1/5] Add payload and headers capture for spring webflux Signed-off-by: Pavol Loffay --- .../NettyChannelPipelineInstrumentation.java | 19 +- .../netty/v4_1/server/DataCaptureUtils.java | 32 +++- .../HttpServerRequestTracingHandler.java | 13 +- .../HttpServerResponseTracingHandler.java | 7 +- .../v4_1/server/HttpServerTracingHandler.java | 5 +- .../Netty41ServerInstrumentationTest.java | 8 +- .../spring-webflux-5.0/build.gradle.kts | 19 ++ .../spring/webflux/EchoHandler.java | 35 ++++ .../hypertrace/spring/webflux/FooModel.java | 32 ++++ .../webflux/SpringWebFluxTestApplication.java | 114 +++++++++++ .../webflux/SpringWebfluxServerTest.java | 178 ++++++++++++++++++ .../spring/webflux/TestController.java | 78 ++++++++ .../src/test/resources/logback.xml | 20 ++ settings.gradle.kts | 2 + 14 files changed, 535 insertions(+), 27 deletions(-) create mode 100644 instrumentation/spring/spring-webflux-5.0/build.gradle.kts create mode 100644 instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/EchoHandler.java create mode 100644 instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/FooModel.java create mode 100644 instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebFluxTestApplication.java create mode 100644 instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebfluxServerTest.java create mode 100644 instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/TestController.java create mode 100644 instrumentation/spring/spring-webflux-5.0/src/test/resources/logback.xml diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyChannelPipelineInstrumentation.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyChannelPipelineInstrumentation.java index 8efe168a9..71b44dce1 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyChannelPipelineInstrumentation.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyChannelPipelineInstrumentation.java @@ -21,7 +21,6 @@ import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import io.netty.channel.ChannelHandler; @@ -46,8 +45,7 @@ public class NettyChannelPipelineInstrumentation implements TypeInstrumentation @Override public ElementMatcher classLoaderOptimization() { - return hasClassesNamed("io.netty.channel.ChannelPipeline") - .and(not(hasClassesNamed("org.springframework.web.reactive.HandlerAdapter"))); + return hasClassesNamed("io.netty.channel.ChannelPipeline"); } @Override @@ -106,14 +104,25 @@ public static void addHandler( .class .getName(), HttpServerTracingHandler.class.getName(), - new HttpServerTracingHandler()); + new HttpServerTracingHandler( + new io.opentelemetry.javaagent.instrumentation.netty.v4_1.server + .HttpServerRequestTracingHandler())); + + pipeline.addBefore( + HttpServerTracingHandler.class.getName(), + io.opentelemetry.javaagent.instrumentation.netty.v4_1.server + .HttpServerRequestTracingHandler.class + .getName(), + new io.opentelemetry.javaagent.instrumentation.netty.v4_1.server + .HttpServerRequestTracingHandler()); + pipeline.addLast( HttpServerBlockingRequestHandler.class.getName(), new HttpServerBlockingRequestHandler()); } else if (handler instanceof HttpRequestDecoder) { pipeline.addLast( HttpServerRequestTracingHandler.class.getName(), - new HttpServerRequestTracingHandler()); + new HttpServerRequestTracingHandler(null)); } else if (handler instanceof HttpResponseEncoder) { pipeline.replace( io.opentelemetry.javaagent.instrumentation.netty.v4_1.server diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/DataCaptureUtils.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/DataCaptureUtils.java index ae9736dd4..b412d29b6 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/DataCaptureUtils.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/DataCaptureUtils.java @@ -16,6 +16,7 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpMessage; @@ -35,7 +36,7 @@ public static void captureBody( Span span, Channel channel, AttributeKey attributeKey, - HttpContent httpContent) { + Object httpContentOrBuffer) { Attribute bufferAttr = channel.attr(attributeKey); BoundedByteArrayOutputStream buffer = bufferAttr.get(); @@ -44,16 +45,17 @@ public static void captureBody( return; } - final ByteArrayOutputStream finalBuffer = buffer; - httpContent - .content() - .forEachByte( - value -> { - finalBuffer.write(value); - return true; - }); + ByteBuf content = castToBuf(httpContentOrBuffer); + if (content != null && content.isReadable()) { + final ByteArrayOutputStream finalBuffer = buffer; + content.forEachByte( + value -> { + finalBuffer.write(value); + return true; + }); + } - if (httpContent instanceof LastHttpContent) { + if (httpContentOrBuffer instanceof LastHttpContent) { bufferAttr.remove(); try { span.setAttribute(attributeKey.name(), buffer.toStringWithSuppliedCharset()); @@ -63,6 +65,16 @@ public static void captureBody( } } + private static ByteBuf castToBuf(Object msg) { + if (msg instanceof ByteBuf) { + return (ByteBuf) msg; + } else if (msg instanceof HttpContent) { + HttpContent httpContent = (HttpContent) msg; + return httpContent.content(); + } + return null; + } + // see io.netty.handler.codec.http.HttpUtil public static CharSequence getContentType(HttpMessage message) { return message.headers().get("content-type"); diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java index 8dd627480..a83e4bad6 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java @@ -16,6 +16,7 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -44,6 +45,12 @@ public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapte private final AgentConfig agentConfig = HypertraceConfig.get(); + public final ChannelInboundHandlerAdapter runBefore; + + public HttpServerRequestTracingHandler(ChannelInboundHandlerAdapter runBefore) { + this.runBefore = runBefore; + } + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel channel = ctx.channel(); @@ -84,9 +91,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } if (msg instanceof HttpContent - && agentConfig.getDataCapture().getHttpBody().getRequest().getValue()) { - DataCaptureUtils.captureBody( - span, channel, AttributeKeys.REQUEST_BODY_BUFFER, (HttpContent) msg); + || msg instanceof ByteBuf + && agentConfig.getDataCapture().getHttpBody().getRequest().getValue()) { + DataCaptureUtils.captureBody(span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg); } ctx.fireChannelRead(msg); diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerResponseTracingHandler.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerResponseTracingHandler.java index d7fab4004..18641a67e 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerResponseTracingHandler.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerResponseTracingHandler.java @@ -16,6 +16,7 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; @@ -82,9 +83,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) { } if (msg instanceof HttpContent - && agentConfig.getDataCapture().getHttpBody().getResponse().getValue()) { - DataCaptureUtils.captureBody( - span, ctx.channel(), AttributeKeys.RESPONSE_BODY_BUFFER, (HttpContent) msg); + || msg instanceof ByteBuf + && agentConfig.getDataCapture().getHttpBody().getResponse().getValue()) { + DataCaptureUtils.captureBody(span, ctx.channel(), AttributeKeys.RESPONSE_BODY_BUFFER, msg); } try (Scope ignored = context.makeCurrent()) { diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerTracingHandler.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerTracingHandler.java index 9c5d1cef0..78b1a695d 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerTracingHandler.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerTracingHandler.java @@ -16,13 +16,14 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.CombinedChannelDuplexHandler; public class HttpServerTracingHandler extends CombinedChannelDuplexHandler< HttpServerRequestTracingHandler, HttpServerResponseTracingHandler> { - public HttpServerTracingHandler() { - super(new HttpServerRequestTracingHandler(), new HttpServerResponseTracingHandler()); + public HttpServerTracingHandler(ChannelInboundHandlerAdapter runBefore) { + super(new HttpServerRequestTracingHandler(runBefore), new HttpServerResponseTracingHandler()); } } diff --git a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerInstrumentationTest.java b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerInstrumentationTest.java index 935139dfa..f2722ece1 100644 --- a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerInstrumentationTest.java +++ b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerInstrumentationTest.java @@ -32,10 +32,9 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; @@ -87,8 +86,9 @@ protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst("logger", LOGGING_HANDLER); - pipeline.addLast(new HttpRequestDecoder()); - pipeline.addLast(new HttpResponseEncoder()); + pipeline.addLast(new HttpServerCodec()); + // pipeline.addLast(new HttpRequestDecoder()); + // pipeline.addLast(new HttpResponseEncoder()); pipeline.addLast( new SimpleChannelInboundHandler() { diff --git a/instrumentation/spring/spring-webflux-5.0/build.gradle.kts b/instrumentation/spring/spring-webflux-5.0/build.gradle.kts new file mode 100644 index 000000000..8154f19fb --- /dev/null +++ b/instrumentation/spring/spring-webflux-5.0/build.gradle.kts @@ -0,0 +1,19 @@ +plugins { + `java-library` +} + +val versions: Map by extra + +dependencies { + testImplementation(project(":testing-common")) + testImplementation(project(":instrumentation:netty:netty-4.1")) + + testImplementation("io.opentelemetry.javaagent.instrumentation:opentelemetry-javaagent-netty-4.1:${versions["opentelemetry_java_agent"]}") + + testImplementation("org.springframework:spring-webflux:5.0.0.RELEASE") + testImplementation("io.projectreactor.ipc:reactor-netty:0.7.0.RELEASE") + + testImplementation("org.springframework.boot:spring-boot-starter-webflux:2.0.0.RELEASE") + testImplementation("org.springframework.boot:spring-boot-starter-test:2.0.0.RELEASE") + testImplementation("org.springframework.boot:spring-boot-starter-reactor-netty:2.0.0.RELEASE") +} diff --git a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/EchoHandler.java b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/EchoHandler.java new file mode 100644 index 000000000..effc95498 --- /dev/null +++ b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/EchoHandler.java @@ -0,0 +1,35 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.javaagent.instrumentation.hypertrace.spring.webflux; + +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Mono; + +@Component +class EchoHandler { + Mono echo(ServerRequest request) { + return ServerResponse.accepted() + .contentType(MediaType.APPLICATION_JSON) + .header( + SpringWebFluxTestApplication.RESPONSE_HEADER_NAME, + SpringWebFluxTestApplication.RESPONSE_HEADER_VALUE) + .body(request.bodyToMono(String.class), String.class); + } +} diff --git a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/FooModel.java b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/FooModel.java new file mode 100644 index 000000000..aae818b69 --- /dev/null +++ b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/FooModel.java @@ -0,0 +1,32 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.javaagent.instrumentation.hypertrace.spring.webflux; + +class FooModel { + public String id; + public String name; + + FooModel(String id, String name) { + this.id = id; + this.name = name; + } + + @Override + public String toString() { + return "{\"id\":" + id + ",\"name\":\"" + name + "\"}"; + } +} diff --git a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebFluxTestApplication.java b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebFluxTestApplication.java new file mode 100644 index 000000000..de3d8f34d --- /dev/null +++ b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebFluxTestApplication.java @@ -0,0 +1,114 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.javaagent.instrumentation.hypertrace.spring.webflux; + +import static org.springframework.web.reactive.function.server.ServerResponse.ok; + +import java.time.Duration; +import java.util.Arrays; +import java.util.stream.Stream; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.server.HandlerFunction; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@SpringBootApplication +class SpringWebFluxTestApplication { + + static final String RESPONSE_HEADER_NAME = "resphadername"; + static final String RESPONSE_HEADER_VALUE = "respheaderval"; + + @Bean + RouterFunction echoRouterFunction(EchoHandler echoHandler) { + return org.springframework.web.reactive.function.server.RouterFunctions.route( + org.springframework.web.reactive.function.server.RequestPredicates.POST("/post"), + new EchoHandlerFunction(echoHandler)); + } + + class EchoHandlerFunction implements HandlerFunction { + private final EchoHandler echoHandler; + + EchoHandlerFunction(EchoHandler echoHandler) { + this.echoHandler = echoHandler; + } + + @Override + public Mono handle(ServerRequest request) { + return echoHandler.echo(request); + } + } + + @Bean + RouterFunction greetRouterFunction(GreetingHandler greetingHandler) { + return org.springframework.web.reactive.function.server.RouterFunctions.route( + org.springframework.web.reactive.function.server.RequestPredicates.GET("/get"), + new HandlerFunction() { + @Override + public Mono handle(ServerRequest request) { + return greetingHandler.defaultGreet(); + } + }); + } + + @Bean + RouterFunction finiteStream() { + return org.springframework.web.reactive.function.server.RouterFunctions.route( + org.springframework.web.reactive.function.server.RequestPredicates.GET("/stream"), + new HandlerFunction() { + @Override + public Mono handle(ServerRequest request) { + return finiteStream(request); + } + }); + } + + public Mono finiteStream(ServerRequest req) { + String[] array = {"a", "b", "c", "d", "e"}; + + // Arrays.stream + Stream stream = Arrays.stream(array); + + Flux mapFlux = + Flux.fromStream(stream) + .zipWith(Flux.interval(Duration.ofSeconds(1))) + .map( + i -> { + return new FooModel(i.getT1(), "name"); + }); + + return ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(mapFlux, FooModel.class); + } + + @Component + static class GreetingHandler { + static final String DEFAULT_RESPONSE = "HELLO"; + + Mono defaultGreet() { + return ServerResponse.ok() + .contentType(MediaType.TEXT_PLAIN) + .header(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE) + .body(BodyInserters.fromObject(DEFAULT_RESPONSE)); + } + } +} diff --git a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebfluxServerTest.java b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebfluxServerTest.java new file mode 100644 index 000000000..8644b6dbe --- /dev/null +++ b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebfluxServerTest.java @@ -0,0 +1,178 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.javaagent.instrumentation.hypertrace.spring.webflux; + +import io.opentelemetry.javaagent.instrumentation.hypertrace.spring.webflux.SpringWebFluxTestApplication.GreetingHandler; +import io.opentelemetry.javaagent.instrumentation.hypertrace.spring.webflux.SpringWebfluxServerTest.ForceNettyAutoConfiguration; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeoutException; +import okhttp3.MediaType; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes; +import org.hypertrace.agent.testing.AbstractInstrumenterTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +@ExtendWith(SpringExtension.class) // enables junit5 +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + classes = {SpringWebFluxTestApplication.class, ForceNettyAutoConfiguration.class}) +public class SpringWebfluxServerTest extends AbstractInstrumenterTest { + + static final String REQUEST_HEADER_NAME = "reqheader"; + static final String REQUEST_HEADER_VALUE = "reqheadervalue"; + static final String REQUEST_BODY = "foobar"; + + @TestConfiguration + static class ForceNettyAutoConfiguration { + @Bean + NettyReactiveWebServerFactory nettyFactory() { + return new NettyReactiveWebServerFactory(); + } + } + + @LocalServerPort private int port; + + @Test + public void get() throws IOException, TimeoutException, InterruptedException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/get", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .get() + .build(); + + try (Response response = httpClient.newCall(request).execute()) { + Assertions.assertEquals(200, response.code()); + } + + List> traces = TEST_WRITER.getTraces(); + TEST_WRITER.waitForTraces(1); + Assertions.assertEquals(1, traces.size()); + List trace = traces.get(0); + Assertions.assertEquals(1, trace.size()); + SpanData spanData = trace.get(0); + + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); + Assertions.assertEquals( + SpringWebFluxTestApplication.RESPONSE_HEADER_VALUE, + spanData + .getAttributes() + .get( + HypertraceSemanticAttributes.httpResponseHeader( + SpringWebFluxTestApplication.RESPONSE_HEADER_NAME))); + Assertions.assertNull( + spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_REQUEST_BODY)); + Assertions.assertNull( + spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY)); + } + + @Test + public void post() throws IOException, TimeoutException, InterruptedException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/post", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .post(RequestBody.create(REQUEST_BODY, MediaType.parse("application/json"))) + .build(); + + try (Response response = httpClient.newCall(request).execute()) { + Assertions.assertEquals(202, response.code()); + Assertions.assertEquals(REQUEST_BODY, response.body().string()); + } + + List> traces = TEST_WRITER.getTraces(); + TEST_WRITER.waitForTraces(1); + Assertions.assertEquals(1, traces.size()); + List trace = traces.get(0); + Assertions.assertEquals(1, trace.size()); + SpanData spanData = trace.get(0); + + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); + Assertions.assertEquals( + SpringWebFluxTestApplication.RESPONSE_HEADER_VALUE, + spanData + .getAttributes() + .get( + HypertraceSemanticAttributes.httpResponseHeader( + SpringWebFluxTestApplication.RESPONSE_HEADER_NAME))); + Assertions.assertEquals( + REQUEST_BODY, spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_REQUEST_BODY)); + Assertions.assertEquals( + REQUEST_BODY, + spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY)); + } + + @Test + public void blocking() throws IOException, TimeoutException, InterruptedException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/get", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .header("mockblock", "true") + .get() + .build(); + + try (Response response = httpClient.newCall(request).execute()) { + Assertions.assertEquals(403, response.code()); + Assertions.assertTrue(response.body().string().isEmpty()); + } + + List> traces = TEST_WRITER.getTraces(); + TEST_WRITER.waitForTraces(1); + Assertions.assertEquals(1, traces.size()); + List trace = traces.get(0); + Assertions.assertEquals(1, trace.size()); + SpanData spanData = trace.get(0); + + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); + Assertions.assertNull( + spanData + .getAttributes() + .get( + HypertraceSemanticAttributes.httpResponseHeader( + SpringWebFluxTestApplication.RESPONSE_HEADER_NAME))); + Assertions.assertNull( + spanData + .getAttributes() + .get( + HypertraceSemanticAttributes.httpResponseHeader(GreetingHandler.DEFAULT_RESPONSE))); + } +} diff --git a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/TestController.java b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/TestController.java new file mode 100644 index 000000000..20a6ed90f --- /dev/null +++ b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/TestController.java @@ -0,0 +1,78 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.javaagent.instrumentation.hypertrace.spring.webflux; + +import java.time.Duration; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +@RestController +class TestController { + + @GetMapping("/foo") + Mono getFooModel() { + return Mono.just(new FooModel("0", "DEFAULT")); + } + + @GetMapping("/foo/{id}") + Mono getFooModel(@PathVariable("id") String id) { + return Mono.just(new FooModel(id, "pass")); + } + + @GetMapping("/foo/{id}/{name}") + Mono getFooModel(@PathVariable("id") String id, @PathVariable("name") String name) { + return Mono.just(new FooModel(id, name)); + } + + @GetMapping("/foo-delayed") + Mono getFooDelayed() { + return Mono.just(new FooModel("3", "delayed")).delayElement(Duration.ofMillis(100)); + } + + @GetMapping("/foo-failfast/{id}") + Mono getFooFailFast(@PathVariable("id") long id) { + throw new RuntimeException("bad things happen"); + } + + @GetMapping("/foo-failmono/{id}") + Mono getFooFailMono(@PathVariable("id") long id) { + return Mono.error(new RuntimeException("bad things happen")); + } + + @GetMapping("/foo-traced-method/{id}") + Mono getTracedMethod(@PathVariable("id") String id) { + return Mono.just(tracedMethod(id)); + } + + @GetMapping("/foo-mono-from-callable/{id}") + Mono getMonoFromCallable(@PathVariable("id") String id) { + return Mono.fromCallable(() -> tracedMethod(id)); + } + + @GetMapping("/foo-delayed-mono/{id}") + Mono getFooDelayedMono(@PathVariable("id") long id) { + return Mono.just(id) + .delayElement(Duration.ofMillis(100)) + .map(i -> tracedMethod(String.valueOf(i))); + } + + private FooModel tracedMethod(String id) { + return new FooModel(id, "tracedMethod"); + } +} diff --git a/instrumentation/spring/spring-webflux-5.0/src/test/resources/logback.xml b/instrumentation/spring/spring-webflux-5.0/src/test/resources/logback.xml new file mode 100644 index 000000000..7f2406629 --- /dev/null +++ b/instrumentation/spring/spring-webflux-5.0/src/test/resources/logback.xml @@ -0,0 +1,20 @@ + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + diff --git a/settings.gradle.kts b/settings.gradle.kts index ae6d5bb06..66eb8415a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -51,3 +51,5 @@ include("instrumentation:netty:netty-4.0") findProject(":instrumentation:netty:netty-4.0")?.name = "netty-4.0" include("instrumentation:netty:netty-4.1") findProject(":instrumentation:netty:netty-4.1")?.name = "netty-4.1" +include("instrumentation:spring:spring-webflux-5.0") +findProject(":instrumentation:spring:spring-webflux-5.0")?.name = "spring-webflux-5.0" From 6bb31c8ea005fb0d7f1b85c79fadafbab0e41e89 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 11 Jan 2021 16:00:13 +0100 Subject: [PATCH 2/5] docs Signed-off-by: Pavol Loffay --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index cbca981ba..f49966136 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ List of supported frameworks with additional capabilities: | [OkHttp](https://github.com/square/okhttp/) | 3.0+ | | [Servlet](https://javaee.github.io/javaee-spec/javadocs/javax/servlet/package-summary.html) | 2.3+ | | [Spark Web Framework](https://github.com/perwendel/spark) | 2.3+ | +| [Spring Webflux](https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/reactive/package-summary.html) (only server) | 5.0+ | | [Vert.x](https://vertx.io) (only server) | 3.0+ | ### Adding custom filter implementation From b8aa3fcce2d41edb1cb2aa24f9f4556365e39969 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 11 Jan 2021 17:53:23 +0100 Subject: [PATCH 3/5] Fix format Signed-off-by: Pavol Loffay --- .../hypertrace/spring/webflux/FooModel.java | 2 +- .../webflux/SpringWebFluxTestApplication.java | 29 ++++++-------- .../webflux/SpringWebfluxServerTest.java | 39 +++++++++++++++++++ 3 files changed, 52 insertions(+), 18 deletions(-) diff --git a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/FooModel.java b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/FooModel.java index aae818b69..e7e455411 100644 --- a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/FooModel.java +++ b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/FooModel.java @@ -27,6 +27,6 @@ class FooModel { @Override public String toString() { - return "{\"id\":" + id + ",\"name\":\"" + name + "\"}"; + return "{\"id\":\"" + id + "\",\"name\":\"" + name + "\"}"; } } diff --git a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebFluxTestApplication.java b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebFluxTestApplication.java index de3d8f34d..c5e944342 100644 --- a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebFluxTestApplication.java +++ b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebFluxTestApplication.java @@ -16,8 +16,6 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.spring.webflux; -import static org.springframework.web.reactive.function.server.ServerResponse.ok; - import java.time.Duration; import java.util.Arrays; import java.util.stream.Stream; @@ -72,32 +70,29 @@ public Mono handle(ServerRequest request) { } @Bean - RouterFunction finiteStream() { + RouterFunction finiteStreamRouterFunction() { return org.springframework.web.reactive.function.server.RouterFunctions.route( org.springframework.web.reactive.function.server.RequestPredicates.GET("/stream"), new HandlerFunction() { @Override public Mono handle(ServerRequest request) { - return finiteStream(request); + return finiteStreamResponse(); } }); } - public Mono finiteStream(ServerRequest req) { - String[] array = {"a", "b", "c", "d", "e"}; + public static Mono finiteStreamResponse() { + return ServerResponse.ok() + .contentType(MediaType.APPLICATION_STREAM_JSON) + .body(finiteStream(), FooModel.class); + } - // Arrays.stream + public static Flux finiteStream() { + String[] array = {"a", "b", "c", "d", "e"}; Stream stream = Arrays.stream(array); - - Flux mapFlux = - Flux.fromStream(stream) - .zipWith(Flux.interval(Duration.ofSeconds(1))) - .map( - i -> { - return new FooModel(i.getT1(), "name"); - }); - - return ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(mapFlux, FooModel.class); + return Flux.fromStream(stream) + .zipWith(Flux.interval(Duration.ofSeconds(1))) + .map(i -> new FooModel(i.getT1(), "name")); } @Component diff --git a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebfluxServerTest.java b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebfluxServerTest.java index 8644b6dbe..9e72548b2 100644 --- a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebfluxServerTest.java +++ b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/SpringWebfluxServerTest.java @@ -37,6 +37,8 @@ import org.springframework.boot.web.server.LocalServerPort; import org.springframework.context.annotation.Bean; import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Flux; @ExtendWith(SpringExtension.class) // enables junit5 @SpringBootTest( @@ -96,6 +98,43 @@ public void get() throws IOException, TimeoutException, InterruptedException { spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY)); } + @Test + public void getStream() throws IOException, TimeoutException, InterruptedException { + Flux modelFlux = SpringWebFluxTestApplication.finiteStream(); + StringBuilder responseBodyStr = new StringBuilder(); + modelFlux + .flatMap( + fooModel -> { + responseBodyStr.append(fooModel.toString()).append("\n"); + return Flux.empty(); + }) + .then() + .block(); + + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/stream", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .get() + .build(); + + ServerResponse serverResponse = SpringWebFluxTestApplication.finiteStreamResponse().block(); + try (Response response = httpClient.newCall(request).execute()) { + Assertions.assertEquals(200, response.code()); + Assertions.assertEquals(responseBodyStr.toString(), response.body().string()); + } + + List> traces = TEST_WRITER.getTraces(); + TEST_WRITER.waitForTraces(1); + Assertions.assertEquals(1, traces.size()); + List trace = traces.get(0); + Assertions.assertEquals(1, trace.size()); + SpanData spanData = trace.get(0); + Assertions.assertEquals( + responseBodyStr.toString(), + spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY)); + } + @Test public void post() throws IOException, TimeoutException, InterruptedException { Request request = From 50d720480a28986acc67bf45b2acbe72e874e58f Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 11 Jan 2021 18:27:54 +0100 Subject: [PATCH 4/5] Add tests Signed-off-by: Pavol Loffay --- ...1ServerDuplexCodecInstrumentationTest.java | 24 ++++ .../Netty41ServerInstrumentationTest.java | 94 ++------------ .../netty/v4_1/NettyTestServer.java | 120 ++++++++++++++++++ .../spring/webflux/TestController.java | 78 ------------ 4 files changed, 157 insertions(+), 159 deletions(-) create mode 100644 instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerDuplexCodecInstrumentationTest.java create mode 100644 instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyTestServer.java delete mode 100644 instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/TestController.java diff --git a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerDuplexCodecInstrumentationTest.java b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerDuplexCodecInstrumentationTest.java new file mode 100644 index 000000000..79fdb5777 --- /dev/null +++ b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerDuplexCodecInstrumentationTest.java @@ -0,0 +1,24 @@ +package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1; + +import io.netty.handler.codec.http.HttpServerCodec; +import java.io.IOException; +import java.util.Arrays; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class Netty41ServerDuplexCodecInstrumentationTest extends Netty41ServerInstrumentationTest { + + private static int port; + private static NettyTestServer nettyTestServer; + + @BeforeAll + private static void startServer() throws IOException, InterruptedException { + nettyTestServer = new NettyTestServer(); + port = nettyTestServer.create(Arrays.asList(HttpServerCodec.class)); + } + + @AfterAll + private static void stopServer() { + nettyTestServer.stopServer(); + } +} diff --git a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerInstrumentationTest.java b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerInstrumentationTest.java index f2722ece1..8b6363744 100644 --- a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerInstrumentationTest.java +++ b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerInstrumentationTest.java @@ -18,6 +18,9 @@ import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.NettyTestServer.RESPONSE_BODY; +import static io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.NettyTestServer.RESPONSE_HEADER_NAME; +import static io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.NettyTestServer.RESPONSE_HEADER_VALUE; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; @@ -53,100 +56,29 @@ import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes; import org.hypertrace.agent.testing.AbstractInstrumenterTest; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class Netty41ServerInstrumentationTest extends AbstractInstrumenterTest { - private static final LoggingHandler LOGGING_HANDLER = - new LoggingHandler(Netty41ServerInstrumentationTest.class, LogLevel.DEBUG); - public static final String REQUEST_HEADER_NAME = "reqheader"; public static final String REQUEST_HEADER_VALUE = "reqheadervalue"; - public static final String RESPONSE_HEADER_NAME = "respheader"; - public static final String RESPONSE_HEADER_VALUE = "respheadervalue"; - private static final String RESPONSE_BODY = "{\"foo\": \"bar\"}"; - private static EventLoopGroup eventLoopGroup; private static int port; + private static NettyTestServer nettyTestServer; - @BeforeAll - private static void startServer() throws IOException, InterruptedException { - eventLoopGroup = new NioEventLoopGroup(); - - ServerBootstrap serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(eventLoopGroup); - serverBootstrap - .handler(LOGGING_HANDLER) - .childHandler( - new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addFirst("logger", LOGGING_HANDLER); - - pipeline.addLast(new HttpServerCodec()); - // pipeline.addLast(new HttpRequestDecoder()); - // pipeline.addLast(new HttpResponseEncoder()); - - pipeline.addLast( - new SimpleChannelInboundHandler() { - - HttpRequest httpRequest; - - @Override - protected void channelRead0(ChannelHandlerContext ctx, Object msg) { - - if (msg instanceof HttpRequest) { - this.httpRequest = (HttpRequest) msg; - } - - // write response after all content has been received otherwise - // server span is closed before request payload is captured - if (msg instanceof LastHttpContent) { - if (httpRequest.getUri().contains("get_no_content")) { - HttpResponse response = - new DefaultFullHttpResponse( - HTTP_1_1, HttpResponseStatus.valueOf(204)); - response.headers().add(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE); - response.headers().set(CONTENT_LENGTH, 0); - ctx.write(response); - } else if (httpRequest.getUri().contains("post")) { - ByteBuf responseBody = Unpooled.wrappedBuffer(RESPONSE_BODY.getBytes()); - HttpResponse response = - new DefaultFullHttpResponse( - HTTP_1_1, HttpResponseStatus.valueOf(200), responseBody); - response.headers().add(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE); - response.headers().add("Content-Type", "application-json"); - response.headers().set(CONTENT_LENGTH, responseBody.readableBytes()); - ctx.write(response); - } - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) { - ctx.flush(); - } - }); - } - }) - .channel(NioServerSocketChannel.class); - - ServerSocket socket; - socket = new ServerSocket(0); - port = socket.getLocalPort(); - socket.close(); - - serverBootstrap.bind(port).sync(); + @BeforeEach + private void startServer() throws IOException, InterruptedException { + nettyTestServer = new NettyTestServer(); + port = nettyTestServer.create(Arrays.asList(HttpServerCodec.class)); } - @AfterAll - private static void stopServer() { - if (eventLoopGroup != null) { - eventLoopGroup.shutdownGracefully(); - } + @AfterEach + private void stopServer() { + nettyTestServer.stopServer(); } @Test diff --git a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyTestServer.java b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyTestServer.java new file mode 100644 index 000000000..6f15544b9 --- /dev/null +++ b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyTestServer.java @@ -0,0 +1,120 @@ +package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.List; + +public class NettyTestServer { + + static final String RESPONSE_HEADER_NAME = "respheader"; + static final String RESPONSE_HEADER_VALUE = "respheadervalue"; + static final String RESPONSE_BODY = "{\"foo\": \"bar\"}"; + + private static final LoggingHandler LOGGING_HANDLER = + new LoggingHandler(Netty41ServerInstrumentationTest.class, LogLevel.DEBUG); + + private EventLoopGroup eventLoopGroup; + + public int create(List> handlerList) throws IOException, InterruptedException { + eventLoopGroup = new NioEventLoopGroup(); + + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(eventLoopGroup); + serverBootstrap + .handler(LOGGING_HANDLER) + .childHandler( + new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addFirst("logger", LOGGING_HANDLER); + + for (Class channelHandlerClass: handlerList) { + ChannelHandler channelHandler = channelHandlerClass.newInstance(); + pipeline.addLast(channelHandler); + } +// pipeline.addLast(handlerList.toArray(new ChannelHandler[0])); +// pipeline.addLast(new HttpServerCodec()); + // pipeline.addLast(new HttpRequestDecoder()); + // pipeline.addLast(new HttpResponseEncoder()); + + pipeline.addLast( + new SimpleChannelInboundHandler() { + + HttpRequest httpRequest; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + + if (msg instanceof HttpRequest) { + this.httpRequest = (HttpRequest) msg; + } + + // write response after all content has been received otherwise + // server span is closed before request payload is captured + if (msg instanceof LastHttpContent) { + if (httpRequest.getUri().contains("get_no_content")) { + HttpResponse response = + new DefaultFullHttpResponse( + HTTP_1_1, HttpResponseStatus.valueOf(204)); + response.headers().add(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE); + response.headers().set(CONTENT_LENGTH, 0); + ctx.write(response); + } else if (httpRequest.getUri().contains("post")) { + ByteBuf responseBody = Unpooled.wrappedBuffer(RESPONSE_BODY.getBytes()); + HttpResponse response = + new DefaultFullHttpResponse( + HTTP_1_1, HttpResponseStatus.valueOf(200), responseBody); + response.headers().add(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE); + response.headers().add("Content-Type", "application-json"); + response.headers().set(CONTENT_LENGTH, responseBody.readableBytes()); + ctx.write(response); + } + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + }); + } + }) + .channel(NioServerSocketChannel.class); + + ServerSocket socket; + socket = new ServerSocket(0); + int port = socket.getLocalPort(); + socket.close(); + + serverBootstrap.bind(port).sync(); + return port; + } + + public void stopServer() { + eventLoopGroup.shutdownGracefully(); + } +} diff --git a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/TestController.java b/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/TestController.java deleted file mode 100644 index 20a6ed90f..000000000 --- a/instrumentation/spring/spring-webflux-5.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/spring/webflux/TestController.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright The Hypertrace Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.opentelemetry.javaagent.instrumentation.hypertrace.spring.webflux; - -import java.time.Duration; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RestController; -import reactor.core.publisher.Mono; - -@RestController -class TestController { - - @GetMapping("/foo") - Mono getFooModel() { - return Mono.just(new FooModel("0", "DEFAULT")); - } - - @GetMapping("/foo/{id}") - Mono getFooModel(@PathVariable("id") String id) { - return Mono.just(new FooModel(id, "pass")); - } - - @GetMapping("/foo/{id}/{name}") - Mono getFooModel(@PathVariable("id") String id, @PathVariable("name") String name) { - return Mono.just(new FooModel(id, name)); - } - - @GetMapping("/foo-delayed") - Mono getFooDelayed() { - return Mono.just(new FooModel("3", "delayed")).delayElement(Duration.ofMillis(100)); - } - - @GetMapping("/foo-failfast/{id}") - Mono getFooFailFast(@PathVariable("id") long id) { - throw new RuntimeException("bad things happen"); - } - - @GetMapping("/foo-failmono/{id}") - Mono getFooFailMono(@PathVariable("id") long id) { - return Mono.error(new RuntimeException("bad things happen")); - } - - @GetMapping("/foo-traced-method/{id}") - Mono getTracedMethod(@PathVariable("id") String id) { - return Mono.just(tracedMethod(id)); - } - - @GetMapping("/foo-mono-from-callable/{id}") - Mono getMonoFromCallable(@PathVariable("id") String id) { - return Mono.fromCallable(() -> tracedMethod(id)); - } - - @GetMapping("/foo-delayed-mono/{id}") - Mono getFooDelayedMono(@PathVariable("id") long id) { - return Mono.just(id) - .delayElement(Duration.ofMillis(100)) - .map(i -> tracedMethod(String.valueOf(i))); - } - - private FooModel tracedMethod(String id) { - return new FooModel(id, "tracedMethod"); - } -} From 9fe6679fa132138212f8115dd95288f6fafa416c Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 11 Jan 2021 18:50:18 +0100 Subject: [PATCH 5/5] More tests Signed-off-by: Pavol Loffay --- .../NettyChannelPipelineInstrumentation.java | 10 + ...tractNetty40ServerInstrumentationTest.java | 202 +++++++++++++ ...y40HttpServerCodecInstrumentationTest.java | 29 ++ .../Netty40ServerInstrumentationTest.java | 270 +----------------- .../netty/v4_0/NettyTestServer.java | 142 +++++++++ .../NettyChannelPipelineInstrumentation.java | 6 +- .../HttpServerRequestTracingHandler.java | 6 - .../v4_1/server/HttpServerTracingHandler.java | 6 +- ...tractNetty41ServerInstrumentationTest.java | 202 +++++++++++++ ...y41HttpServerCodecInstrumentationTest.java | 29 ++ ...1ServerDuplexCodecInstrumentationTest.java | 24 -- .../Netty41ServerInstrumentationTest.java | 206 +------------ .../netty/v4_1/NettyTestServer.java | 40 ++- 13 files changed, 659 insertions(+), 513 deletions(-) create mode 100644 instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/AbstractNetty40ServerInstrumentationTest.java create mode 100644 instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/Netty40HttpServerCodecInstrumentationTest.java create mode 100644 instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/NettyTestServer.java create mode 100644 instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/AbstractNetty41ServerInstrumentationTest.java create mode 100644 instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41HttpServerCodecInstrumentationTest.java delete mode 100644 instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerDuplexCodecInstrumentationTest.java diff --git a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/NettyChannelPipelineInstrumentation.java b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/NettyChannelPipelineInstrumentation.java index 67f8834d0..4311cc67b 100644 --- a/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/NettyChannelPipelineInstrumentation.java +++ b/instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/NettyChannelPipelineInstrumentation.java @@ -98,6 +98,16 @@ public static void addHandler( .getName(), HttpServerTracingHandler.class.getName(), new HttpServerTracingHandler()); + + // add OTEL request handler to start spans + pipeline.addBefore( + HttpServerTracingHandler.class.getName(), + io.opentelemetry.javaagent.instrumentation.netty.v4_0.server + .HttpServerRequestTracingHandler.class + .getName(), + new io.opentelemetry.javaagent.instrumentation.netty.v4_0.server + .HttpServerRequestTracingHandler()); + pipeline.addLast( HttpServerBlockingRequestHandler.class.getName(), new HttpServerBlockingRequestHandler()); diff --git a/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/AbstractNetty40ServerInstrumentationTest.java b/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/AbstractNetty40ServerInstrumentationTest.java new file mode 100644 index 000000000..fe9d0923e --- /dev/null +++ b/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/AbstractNetty40ServerInstrumentationTest.java @@ -0,0 +1,202 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0; + +import static io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.NettyTestServer.RESPONSE_BODY; +import static io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.NettyTestServer.RESPONSE_HEADER_NAME; +import static io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.NettyTestServer.RESPONSE_HEADER_VALUE; + +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import okhttp3.MediaType; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okio.Buffer; +import okio.BufferedSink; +import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes; +import org.hypertrace.agent.testing.AbstractInstrumenterTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public abstract class AbstractNetty40ServerInstrumentationTest extends AbstractInstrumenterTest { + + public static final String REQUEST_HEADER_NAME = "reqheader"; + public static final String REQUEST_HEADER_VALUE = "reqheadervalue"; + + private static int port; + private static NettyTestServer nettyTestServer; + + @BeforeEach + private void startServer() throws IOException, InterruptedException { + nettyTestServer = createNetty(); + port = nettyTestServer.create(); + } + + @AfterEach + private void stopServer() throws ExecutionException, InterruptedException { + nettyTestServer.stopServer(); + } + + protected abstract NettyTestServer createNetty(); + + @Test + public void get() throws IOException, TimeoutException, InterruptedException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/get_no_content", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .get() + .build(); + + try (Response response = httpClient.newCall(request).execute()) { + Assertions.assertEquals(204, response.code()); + } + + List> traces = TEST_WRITER.getTraces(); + TEST_WRITER.waitForTraces(1); + Assertions.assertEquals(1, traces.size()); + List trace = traces.get(0); + Assertions.assertEquals(1, trace.size()); + SpanData spanData = trace.get(0); + + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); + Assertions.assertEquals( + RESPONSE_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME))); + Assertions.assertNull( + spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_REQUEST_BODY)); + Assertions.assertNull( + spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY)); + } + + @Test + public void postJson() throws IOException, TimeoutException, InterruptedException { + RequestBody requestBody = requestBody(true, 3000, 75); + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/post", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .header("Transfer-Encoding", "chunked") + .post(requestBody) + .build(); + + try (Response response = httpClient.newCall(request).execute()) { + Assertions.assertEquals(200, response.code()); + Assertions.assertEquals(RESPONSE_BODY, response.body().string()); + } + + List> traces = TEST_WRITER.getTraces(); + TEST_WRITER.waitForTraces(1); + Assertions.assertEquals(1, traces.size()); + List trace = traces.get(0); + Assertions.assertEquals(1, trace.size()); + SpanData spanData = trace.get(0); + + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); + Assertions.assertEquals( + RESPONSE_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME))); + Buffer requestBodyBuffer = new Buffer(); + requestBody.writeTo(requestBodyBuffer); + Assertions.assertEquals( + new String(requestBodyBuffer.readByteArray()), + spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_REQUEST_BODY)); + Assertions.assertEquals( + RESPONSE_BODY, + spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY)); + } + + @Test + public void blocking() throws IOException, TimeoutException, InterruptedException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/post", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .header("mockblock", "true") + .get() + .build(); + + try (Response response = httpClient.newCall(request).execute()) { + Assertions.assertEquals(403, response.code()); + Assertions.assertTrue(response.body().string().isEmpty()); + } + + List> traces = TEST_WRITER.getTraces(); + TEST_WRITER.waitForTraces(1); + Assertions.assertEquals(1, traces.size()); + List trace = traces.get(0); + Assertions.assertEquals(1, trace.size()); + SpanData spanData = trace.get(0); + + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); + Assertions.assertNull( + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME))); + Assertions.assertNull( + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_BODY))); + } + + private RequestBody requestBody(final boolean chunked, final long size, final int writeSize) { + final byte[] buffer = new byte[writeSize]; + Arrays.fill(buffer, (byte) 'x'); + + return new RequestBody() { + @Override + public MediaType contentType() { + return MediaType.get("application/json; charset=utf-8"); + } + + @Override + public long contentLength() throws IOException { + return chunked ? -1L : size; + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + for (int count = 0; count < size; count += writeSize) { + sink.write(buffer, 0, (int) Math.min(size - count, writeSize)); + } + } + }; + } +} diff --git a/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/Netty40HttpServerCodecInstrumentationTest.java b/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/Netty40HttpServerCodecInstrumentationTest.java new file mode 100644 index 000000000..92f863a17 --- /dev/null +++ b/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/Netty40HttpServerCodecInstrumentationTest.java @@ -0,0 +1,29 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0; + +import io.netty.handler.codec.http.HttpServerCodec; +import java.util.Arrays; + +public class Netty40HttpServerCodecInstrumentationTest + extends AbstractNetty40ServerInstrumentationTest { + + @Override + protected NettyTestServer createNetty() { + return new NettyTestServer(Arrays.asList(HttpServerCodec.class)); + } +} diff --git a/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/Netty40ServerInstrumentationTest.java b/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/Netty40ServerInstrumentationTest.java index 04d49b5ad..b42b48783 100644 --- a/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/Netty40ServerInstrumentationTest.java +++ b/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/Netty40ServerInstrumentationTest.java @@ -16,276 +16,14 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequestDecoder; -import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseEncoder; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import io.opentelemetry.sdk.trace.data.SpanData; -import java.io.IOException; -import java.net.ServerSocket; import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeoutException; -import okhttp3.MediaType; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; -import okio.Buffer; -import okio.BufferedSink; -import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes; -import org.hypertrace.agent.testing.AbstractInstrumenterTest; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; - -public class Netty40ServerInstrumentationTest extends AbstractInstrumenterTest { - - private static final LoggingHandler LOGGING_HANDLER = - new LoggingHandler(Netty40ServerInstrumentationTest.class, LogLevel.DEBUG); - - public static final String REQUEST_HEADER_NAME = "reqheader"; - public static final String REQUEST_HEADER_VALUE = "reqheadervalue"; - public static final String RESPONSE_HEADER_NAME = "respheader"; - public static final String RESPONSE_HEADER_VALUE = "respheadervalue"; - private static final String RESPONSE_BODY = "{\"foo\": \"bar\"}"; - - private static EventLoopGroup eventLoopGroup; - private static int port; - - @BeforeAll - private static void startServer() throws IOException, InterruptedException { - eventLoopGroup = new NioEventLoopGroup(); - - ServerBootstrap serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(eventLoopGroup); - serverBootstrap - .handler(LOGGING_HANDLER) - .childHandler( - new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addFirst("logger", LOGGING_HANDLER); - - pipeline.addLast(new HttpRequestDecoder()); - pipeline.addLast(new HttpResponseEncoder()); - - pipeline.addLast( - new SimpleChannelInboundHandler() { - - HttpRequest httpRequest; - - @Override - protected void channelRead0(ChannelHandlerContext ctx, Object msg) { - - if (msg instanceof HttpRequest) { - this.httpRequest = (HttpRequest) msg; - } - - // write response after all content has been received otherwise - // server span is closed before request payload is captured - if (msg instanceof LastHttpContent) { - if (httpRequest.getUri().contains("get_no_content")) { - HttpResponse response = - new DefaultFullHttpResponse( - HTTP_1_1, HttpResponseStatus.valueOf(204)); - response.headers().add(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE); - response.headers().set(CONTENT_LENGTH, 0); - ctx.write(response); - } else if (httpRequest.getUri().contains("post")) { - ByteBuf responseBody = Unpooled.wrappedBuffer(RESPONSE_BODY.getBytes()); - HttpResponse response = - new DefaultFullHttpResponse( - HTTP_1_1, HttpResponseStatus.valueOf(200), responseBody); - response.headers().add(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE); - response.headers().add("Content-Type", "application-json"); - response.headers().set(CONTENT_LENGTH, responseBody.readableBytes()); - ctx.write(response); - } - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) { - ctx.flush(); - } - }); - } - }) - .channel(NioServerSocketChannel.class); - - ServerSocket socket; - socket = new ServerSocket(0); - port = socket.getLocalPort(); - socket.close(); - - serverBootstrap.bind(port).sync(); - } - - @AfterAll - private static void stopServer() { - if (eventLoopGroup != null) { - eventLoopGroup.shutdownGracefully(); - } - } - - @Test - public void get() throws IOException, TimeoutException, InterruptedException { - Request request = - new Request.Builder() - .url(String.format("http://localhost:%d/get_no_content", port)) - .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) - .get() - .build(); - - try (Response response = httpClient.newCall(request).execute()) { - Assertions.assertEquals(204, response.code()); - } - - List> traces = TEST_WRITER.getTraces(); - TEST_WRITER.waitForTraces(1); - Assertions.assertEquals(1, traces.size()); - List trace = traces.get(0); - Assertions.assertEquals(1, trace.size()); - SpanData spanData = trace.get(0); - - Assertions.assertEquals( - REQUEST_HEADER_VALUE, - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); - Assertions.assertEquals( - RESPONSE_HEADER_VALUE, - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME))); - Assertions.assertNull( - spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_REQUEST_BODY)); - Assertions.assertNull( - spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY)); - } - - @Test - public void postJson() throws IOException, TimeoutException, InterruptedException { - RequestBody requestBody = requestBody(true, 3000, 75); - Request request = - new Request.Builder() - .url(String.format("http://localhost:%d/post", port)) - .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) - .header("Transfer-Encoding", "chunked") - .post(requestBody) - .build(); - - try (Response response = httpClient.newCall(request).execute()) { - Assertions.assertEquals(200, response.code()); - Assertions.assertEquals(RESPONSE_BODY, response.body().string()); - } - - List> traces = TEST_WRITER.getTraces(); - TEST_WRITER.waitForTraces(1); - Assertions.assertEquals(1, traces.size()); - List trace = traces.get(0); - Assertions.assertEquals(1, trace.size()); - SpanData spanData = trace.get(0); - - Assertions.assertEquals( - REQUEST_HEADER_VALUE, - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); - Assertions.assertEquals( - RESPONSE_HEADER_VALUE, - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME))); - Buffer requestBodyBuffer = new Buffer(); - requestBody.writeTo(requestBodyBuffer); - Assertions.assertEquals( - new String(requestBodyBuffer.readByteArray()), - spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_REQUEST_BODY)); - Assertions.assertEquals( - RESPONSE_BODY, - spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY)); - } - - @Test - public void blocking() throws IOException, TimeoutException, InterruptedException { - Request request = - new Request.Builder() - .url(String.format("http://localhost:%d/post", port)) - .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) - .header("mockblock", "true") - .get() - .build(); - - try (Response response = httpClient.newCall(request).execute()) { - Assertions.assertEquals(403, response.code()); - Assertions.assertTrue(response.body().string().isEmpty()); - } - - List> traces = TEST_WRITER.getTraces(); - TEST_WRITER.waitForTraces(1); - Assertions.assertEquals(1, traces.size()); - List trace = traces.get(0); - Assertions.assertEquals(1, trace.size()); - SpanData spanData = trace.get(0); - - Assertions.assertEquals( - REQUEST_HEADER_VALUE, - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); - Assertions.assertNull( - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME))); - Assertions.assertNull( - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_BODY))); - } - - private RequestBody requestBody(final boolean chunked, final long size, final int writeSize) { - final byte[] buffer = new byte[writeSize]; - Arrays.fill(buffer, (byte) 'x'); - - return new RequestBody() { - @Override - public MediaType contentType() { - return MediaType.get("application/json; charset=utf-8"); - } - @Override - public long contentLength() throws IOException { - return chunked ? -1L : size; - } +public class Netty40ServerInstrumentationTest extends AbstractNetty40ServerInstrumentationTest { - @Override - public void writeTo(BufferedSink sink) throws IOException { - for (int count = 0; count < size; count += writeSize) { - sink.write(buffer, 0, (int) Math.min(size - count, writeSize)); - } - } - }; + @Override + protected NettyTestServer createNetty() { + return new NettyTestServer(Arrays.asList(HttpRequestDecoder.class, HttpResponseEncoder.class)); } } diff --git a/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/NettyTestServer.java b/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/NettyTestServer.java new file mode 100644 index 000000000..d5c97ce5d --- /dev/null +++ b/instrumentation/netty/netty-4.0/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/NettyTestServer.java @@ -0,0 +1,142 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.List; +import java.util.concurrent.ExecutionException; + +public class NettyTestServer { + + static final String RESPONSE_HEADER_NAME = "respheader"; + static final String RESPONSE_HEADER_VALUE = "respheadervalue"; + static final String RESPONSE_BODY = "{\"foo\": \"bar\"}"; + + private static final LoggingHandler LOGGING_HANDLER = + new LoggingHandler(NettyTestServer.class, LogLevel.DEBUG); + + private final List> handlers; + + NettyTestServer(List> handlers) { + this.handlers = handlers; + } + + private EventLoopGroup eventLoopGroup; + + public int create() throws IOException, InterruptedException { + eventLoopGroup = new NioEventLoopGroup(); + + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(eventLoopGroup); + serverBootstrap + .handler(LOGGING_HANDLER) + .childHandler( + new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addFirst("logger", LOGGING_HANDLER); + + for (Class channelHandlerClass : handlers) { + ChannelHandler channelHandler = channelHandlerClass.newInstance(); + pipeline.addLast(channelHandler); + } + // pipeline.addLast(handlerList.toArray(new ChannelHandler[0])); + // pipeline.addLast(new HttpServerCodec()); + // pipeline.addLast(new HttpRequestDecoder()); + // pipeline.addLast(new HttpResponseEncoder()); + + pipeline.addLast( + new SimpleChannelInboundHandler() { + + HttpRequest httpRequest; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) { + + if (msg instanceof HttpRequest) { + this.httpRequest = (HttpRequest) msg; + } + + // write response after all content has been received otherwise + // server span is closed before request payload is captured + if (msg instanceof LastHttpContent && httpRequest != null) { + if (httpRequest.getUri().contains("get_no_content")) { + HttpResponse response = + new DefaultFullHttpResponse( + HTTP_1_1, HttpResponseStatus.valueOf(204)); + response.headers().add(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE); + response.headers().set(CONTENT_LENGTH, 0); + ctx.write(response); + } else if (httpRequest.getUri().contains("post")) { + ByteBuf responseBody = Unpooled.wrappedBuffer(RESPONSE_BODY.getBytes()); + HttpResponse response = + new DefaultFullHttpResponse( + HTTP_1_1, HttpResponseStatus.valueOf(200), responseBody); + response.headers().add(RESPONSE_HEADER_NAME, RESPONSE_HEADER_VALUE); + response.headers().add("Content-Type", "application-json"); + response.headers().set(CONTENT_LENGTH, responseBody.readableBytes()); + ctx.write(response); + } + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + }); + } + }) + .channel(NioServerSocketChannel.class); + + ServerSocket socket; + socket = new ServerSocket(0); + int port = socket.getLocalPort(); + socket.close(); + + serverBootstrap.bind(port).sync(); + return port; + } + + public void stopServer() throws ExecutionException, InterruptedException { + eventLoopGroup.shutdownGracefully().get(); + } +} diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyChannelPipelineInstrumentation.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyChannelPipelineInstrumentation.java index 71b44dce1..9e18a7eb4 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyChannelPipelineInstrumentation.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyChannelPipelineInstrumentation.java @@ -104,9 +104,7 @@ public static void addHandler( .class .getName(), HttpServerTracingHandler.class.getName(), - new HttpServerTracingHandler( - new io.opentelemetry.javaagent.instrumentation.netty.v4_1.server - .HttpServerRequestTracingHandler())); + new HttpServerTracingHandler()); pipeline.addBefore( HttpServerTracingHandler.class.getName(), @@ -122,7 +120,7 @@ public static void addHandler( } else if (handler instanceof HttpRequestDecoder) { pipeline.addLast( HttpServerRequestTracingHandler.class.getName(), - new HttpServerRequestTracingHandler(null)); + new HttpServerRequestTracingHandler()); } else if (handler instanceof HttpResponseEncoder) { pipeline.replace( io.opentelemetry.javaagent.instrumentation.netty.v4_1.server diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java index a83e4bad6..f077ee7c3 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java @@ -45,12 +45,6 @@ public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapte private final AgentConfig agentConfig = HypertraceConfig.get(); - public final ChannelInboundHandlerAdapter runBefore; - - public HttpServerRequestTracingHandler(ChannelInboundHandlerAdapter runBefore) { - this.runBefore = runBefore; - } - @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel channel = ctx.channel(); diff --git a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerTracingHandler.java b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerTracingHandler.java index 78b1a695d..43fc5fda1 100644 --- a/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerTracingHandler.java +++ b/instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerTracingHandler.java @@ -16,14 +16,12 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.server; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.CombinedChannelDuplexHandler; public class HttpServerTracingHandler extends CombinedChannelDuplexHandler< HttpServerRequestTracingHandler, HttpServerResponseTracingHandler> { - - public HttpServerTracingHandler(ChannelInboundHandlerAdapter runBefore) { - super(new HttpServerRequestTracingHandler(runBefore), new HttpServerResponseTracingHandler()); + public HttpServerTracingHandler() { + super(new HttpServerRequestTracingHandler(), new HttpServerResponseTracingHandler()); } } diff --git a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/AbstractNetty41ServerInstrumentationTest.java b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/AbstractNetty41ServerInstrumentationTest.java new file mode 100644 index 000000000..dd8787722 --- /dev/null +++ b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/AbstractNetty41ServerInstrumentationTest.java @@ -0,0 +1,202 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1; + +import static io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.NettyTestServer.RESPONSE_BODY; +import static io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.NettyTestServer.RESPONSE_HEADER_NAME; +import static io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.NettyTestServer.RESPONSE_HEADER_VALUE; + +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import okhttp3.MediaType; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okio.Buffer; +import okio.BufferedSink; +import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes; +import org.hypertrace.agent.testing.AbstractInstrumenterTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public abstract class AbstractNetty41ServerInstrumentationTest extends AbstractInstrumenterTest { + + public static final String REQUEST_HEADER_NAME = "reqheader"; + public static final String REQUEST_HEADER_VALUE = "reqheadervalue"; + + private static int port; + private static NettyTestServer nettyTestServer; + + @BeforeEach + private void startServer() throws IOException, InterruptedException { + nettyTestServer = createNetty(); + port = nettyTestServer.create(); + } + + @AfterEach + private void stopServer() throws ExecutionException, InterruptedException { + nettyTestServer.stopServer(); + } + + protected abstract NettyTestServer createNetty(); + + @Test + public void get() throws IOException, TimeoutException, InterruptedException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/get_no_content", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .get() + .build(); + + try (Response response = httpClient.newCall(request).execute()) { + Assertions.assertEquals(204, response.code()); + } + + List> traces = TEST_WRITER.getTraces(); + TEST_WRITER.waitForTraces(1); + Assertions.assertEquals(1, traces.size()); + List trace = traces.get(0); + Assertions.assertEquals(1, trace.size()); + SpanData spanData = trace.get(0); + + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); + Assertions.assertEquals( + RESPONSE_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME))); + Assertions.assertNull( + spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_REQUEST_BODY)); + Assertions.assertNull( + spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY)); + } + + @Test + public void postJson() throws IOException, TimeoutException, InterruptedException { + RequestBody requestBody = requestBody(true, 3000, 75); + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/post", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .header("Transfer-Encoding", "chunked") + .post(requestBody) + .build(); + + try (Response response = httpClient.newCall(request).execute()) { + Assertions.assertEquals(200, response.code()); + Assertions.assertEquals(RESPONSE_BODY, response.body().string()); + } + + List> traces = TEST_WRITER.getTraces(); + TEST_WRITER.waitForTraces(1); + Assertions.assertEquals(1, traces.size()); + List trace = traces.get(0); + Assertions.assertEquals(1, trace.size()); + SpanData spanData = trace.get(0); + + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); + Assertions.assertEquals( + RESPONSE_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME))); + Buffer requestBodyBuffer = new Buffer(); + requestBody.writeTo(requestBodyBuffer); + Assertions.assertEquals( + new String(requestBodyBuffer.readByteArray()), + spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_REQUEST_BODY)); + Assertions.assertEquals( + RESPONSE_BODY, + spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY)); + } + + @Test + public void blocking() throws IOException, TimeoutException, InterruptedException { + Request request = + new Request.Builder() + .url(String.format("http://localhost:%d/post", port)) + .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) + .header("mockblock", "true") + .get() + .build(); + + try (Response response = httpClient.newCall(request).execute()) { + Assertions.assertEquals(403, response.code()); + Assertions.assertTrue(response.body().string().isEmpty()); + } + + List> traces = TEST_WRITER.getTraces(); + TEST_WRITER.waitForTraces(1); + Assertions.assertEquals(1, traces.size()); + List trace = traces.get(0); + Assertions.assertEquals(1, trace.size()); + SpanData spanData = trace.get(0); + + Assertions.assertEquals( + REQUEST_HEADER_VALUE, + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); + Assertions.assertNull( + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME))); + Assertions.assertNull( + spanData + .getAttributes() + .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_BODY))); + } + + private RequestBody requestBody(final boolean chunked, final long size, final int writeSize) { + final byte[] buffer = new byte[writeSize]; + Arrays.fill(buffer, (byte) 'x'); + + return new RequestBody() { + @Override + public MediaType contentType() { + return MediaType.get("application/json; charset=utf-8"); + } + + @Override + public long contentLength() throws IOException { + return chunked ? -1L : size; + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + for (int count = 0; count < size; count += writeSize) { + sink.write(buffer, 0, (int) Math.min(size - count, writeSize)); + } + } + }; + } +} diff --git a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41HttpServerCodecInstrumentationTest.java b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41HttpServerCodecInstrumentationTest.java new file mode 100644 index 000000000..70e1549d6 --- /dev/null +++ b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41HttpServerCodecInstrumentationTest.java @@ -0,0 +1,29 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1; + +import io.netty.handler.codec.http.HttpServerCodec; +import java.util.Arrays; + +public class Netty41HttpServerCodecInstrumentationTest + extends AbstractNetty41ServerInstrumentationTest { + + @Override + protected NettyTestServer createNetty() { + return new NettyTestServer(Arrays.asList(HttpServerCodec.class)); + } +} diff --git a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerDuplexCodecInstrumentationTest.java b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerDuplexCodecInstrumentationTest.java deleted file mode 100644 index 79fdb5777..000000000 --- a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerDuplexCodecInstrumentationTest.java +++ /dev/null @@ -1,24 +0,0 @@ -package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1; - -import io.netty.handler.codec.http.HttpServerCodec; -import java.io.IOException; -import java.util.Arrays; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; - -public class Netty41ServerDuplexCodecInstrumentationTest extends Netty41ServerInstrumentationTest { - - private static int port; - private static NettyTestServer nettyTestServer; - - @BeforeAll - private static void startServer() throws IOException, InterruptedException { - nettyTestServer = new NettyTestServer(); - port = nettyTestServer.create(Arrays.asList(HttpServerCodec.class)); - } - - @AfterAll - private static void stopServer() { - nettyTestServer.stopServer(); - } -} diff --git a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerInstrumentationTest.java b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerInstrumentationTest.java index 8b6363744..b66fede27 100644 --- a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerInstrumentationTest.java +++ b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/Netty41ServerInstrumentationTest.java @@ -16,208 +16,14 @@ package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1; -import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.NettyTestServer.RESPONSE_BODY; -import static io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.NettyTestServer.RESPONSE_HEADER_NAME; -import static io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1.NettyTestServer.RESPONSE_HEADER_VALUE; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import io.opentelemetry.sdk.trace.data.SpanData; -import java.io.IOException; -import java.net.ServerSocket; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeoutException; -import okhttp3.MediaType; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; -import okio.Buffer; -import okio.BufferedSink; -import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes; -import org.hypertrace.agent.testing.AbstractInstrumenterTest; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -public class Netty41ServerInstrumentationTest extends AbstractInstrumenterTest { - - public static final String REQUEST_HEADER_NAME = "reqheader"; - public static final String REQUEST_HEADER_VALUE = "reqheadervalue"; - - private static int port; - private static NettyTestServer nettyTestServer; - - @BeforeEach - private void startServer() throws IOException, InterruptedException { - nettyTestServer = new NettyTestServer(); - port = nettyTestServer.create(Arrays.asList(HttpServerCodec.class)); - } - - @AfterEach - private void stopServer() { - nettyTestServer.stopServer(); - } - - @Test - public void get() throws IOException, TimeoutException, InterruptedException { - Request request = - new Request.Builder() - .url(String.format("http://localhost:%d/get_no_content", port)) - .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) - .get() - .build(); - - try (Response response = httpClient.newCall(request).execute()) { - Assertions.assertEquals(204, response.code()); - } - - List> traces = TEST_WRITER.getTraces(); - TEST_WRITER.waitForTraces(1); - Assertions.assertEquals(1, traces.size()); - List trace = traces.get(0); - Assertions.assertEquals(1, trace.size()); - SpanData spanData = trace.get(0); - - Assertions.assertEquals( - REQUEST_HEADER_VALUE, - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); - Assertions.assertEquals( - RESPONSE_HEADER_VALUE, - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME))); - Assertions.assertNull( - spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_REQUEST_BODY)); - Assertions.assertNull( - spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY)); - } - - @Test - public void postJson() throws IOException, TimeoutException, InterruptedException { - RequestBody requestBody = requestBody(true, 3000, 75); - Request request = - new Request.Builder() - .url(String.format("http://localhost:%d/post", port)) - .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) - .header("Transfer-Encoding", "chunked") - .post(requestBody) - .build(); - - try (Response response = httpClient.newCall(request).execute()) { - Assertions.assertEquals(200, response.code()); - Assertions.assertEquals(RESPONSE_BODY, response.body().string()); - } - - List> traces = TEST_WRITER.getTraces(); - TEST_WRITER.waitForTraces(1); - Assertions.assertEquals(1, traces.size()); - List trace = traces.get(0); - Assertions.assertEquals(1, trace.size()); - SpanData spanData = trace.get(0); - - Assertions.assertEquals( - REQUEST_HEADER_VALUE, - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); - Assertions.assertEquals( - RESPONSE_HEADER_VALUE, - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME))); - Buffer requestBodyBuffer = new Buffer(); - requestBody.writeTo(requestBodyBuffer); - Assertions.assertEquals( - new String(requestBodyBuffer.readByteArray()), - spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_REQUEST_BODY)); - Assertions.assertEquals( - RESPONSE_BODY, - spanData.getAttributes().get(HypertraceSemanticAttributes.HTTP_RESPONSE_BODY)); - } - - @Test - public void blocking() throws IOException, TimeoutException, InterruptedException { - Request request = - new Request.Builder() - .url(String.format("http://localhost:%d/post", port)) - .header(REQUEST_HEADER_NAME, REQUEST_HEADER_VALUE) - .header("mockblock", "true") - .get() - .build(); - - try (Response response = httpClient.newCall(request).execute()) { - Assertions.assertEquals(403, response.code()); - Assertions.assertTrue(response.body().string().isEmpty()); - } - - List> traces = TEST_WRITER.getTraces(); - TEST_WRITER.waitForTraces(1); - Assertions.assertEquals(1, traces.size()); - List trace = traces.get(0); - Assertions.assertEquals(1, trace.size()); - SpanData spanData = trace.get(0); - - Assertions.assertEquals( - REQUEST_HEADER_VALUE, - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpRequestHeader(REQUEST_HEADER_NAME))); - Assertions.assertNull( - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_HEADER_NAME))); - Assertions.assertNull( - spanData - .getAttributes() - .get(HypertraceSemanticAttributes.httpResponseHeader(RESPONSE_BODY))); - } - - private RequestBody requestBody(final boolean chunked, final long size, final int writeSize) { - final byte[] buffer = new byte[writeSize]; - Arrays.fill(buffer, (byte) 'x'); - - return new RequestBody() { - @Override - public MediaType contentType() { - return MediaType.get("application/json; charset=utf-8"); - } - @Override - public long contentLength() throws IOException { - return chunked ? -1L : size; - } +public class Netty41ServerInstrumentationTest extends AbstractNetty41ServerInstrumentationTest { - @Override - public void writeTo(BufferedSink sink) throws IOException { - for (int count = 0; count < size; count += writeSize) { - sink.write(buffer, 0, (int) Math.min(size - count, writeSize)); - } - } - }; + @Override + protected NettyTestServer createNetty() { + return new NettyTestServer(Arrays.asList(HttpRequestDecoder.class, HttpResponseEncoder.class)); } } diff --git a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyTestServer.java b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyTestServer.java index 6f15544b9..8110db4bd 100644 --- a/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyTestServer.java +++ b/instrumentation/netty/netty-4.1/src/test/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/NettyTestServer.java @@ -1,3 +1,19 @@ +/* + * Copyright The Hypertrace Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_1; import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; @@ -19,13 +35,13 @@ import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import java.io.IOException; import java.net.ServerSocket; import java.util.List; +import java.util.concurrent.ExecutionException; public class NettyTestServer { @@ -34,11 +50,17 @@ public class NettyTestServer { static final String RESPONSE_BODY = "{\"foo\": \"bar\"}"; private static final LoggingHandler LOGGING_HANDLER = - new LoggingHandler(Netty41ServerInstrumentationTest.class, LogLevel.DEBUG); + new LoggingHandler(NettyTestServer.class, LogLevel.DEBUG); + + private final List> handlers; + + NettyTestServer(List> handlers) { + this.handlers = handlers; + } private EventLoopGroup eventLoopGroup; - public int create(List> handlerList) throws IOException, InterruptedException { + public int create() throws IOException, InterruptedException { eventLoopGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); @@ -52,12 +74,12 @@ protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst("logger", LOGGING_HANDLER); - for (Class channelHandlerClass: handlerList) { + for (Class channelHandlerClass : handlers) { ChannelHandler channelHandler = channelHandlerClass.newInstance(); pipeline.addLast(channelHandler); } -// pipeline.addLast(handlerList.toArray(new ChannelHandler[0])); -// pipeline.addLast(new HttpServerCodec()); + // pipeline.addLast(handlerList.toArray(new ChannelHandler[0])); + // pipeline.addLast(new HttpServerCodec()); // pipeline.addLast(new HttpRequestDecoder()); // pipeline.addLast(new HttpResponseEncoder()); @@ -75,7 +97,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) { // write response after all content has been received otherwise // server span is closed before request payload is captured - if (msg instanceof LastHttpContent) { + if (msg instanceof LastHttpContent && httpRequest != null) { if (httpRequest.getUri().contains("get_no_content")) { HttpResponse response = new DefaultFullHttpResponse( @@ -114,7 +136,7 @@ public void channelReadComplete(ChannelHandlerContext ctx) { return port; } - public void stopServer() { - eventLoopGroup.shutdownGracefully(); + public void stopServer() throws ExecutionException, InterruptedException { + eventLoopGroup.shutdownGracefully().get(); } }