Hej there, yes it’s me, I’m still alive and doing things, such as neo4j-jdbc, with automatic SQL to Cypher translations or building things like this for my own enjoyment. However, with the tons of meaningless posts and stuff around “Hey ma, look how I do AI with $tool”, I felt a bit out of time these days, just enjoying artisanal coding, requirements analysis and architectural work and didn’t have much incentive to write more prose. And giving the rest out for AI to swallow and regurgitate it, well… 🤷
In the last days however a rookie mistake I made gave me some food for thoughts and I write this post for anyone new in the Java ecosystem or new to programming in general: Regardless how much years you have acquired or how good you think you are, you always will make some stupid mistakes such as the following. Hopefully, you’re lucky as me and find yourself surrounded by colleagues who help you rolling a fix out. And yes, the blast radius of something that runs in $cloud is bigger than a bug in a client-side library, especially with the slow update cycles in many industries.
Enough talking, let’s go. I was refactoring some code to not use Java’s HttpURLConnection
of old anymore. Here’s basically how it looked:
import java.net.HttpURLConnection; import java.net.URI; import java.nio.charset.StandardCharsets; class S1 { public static void main(String... a) throws Exception { var uri = URI.create("http://randomapi.io"); var body = "Hello"; // Nice I get an output stream I just can write too, yeah… var connection = (HttpURLConnection) uri.toURL().openConnection(); connection.setDoOutput(true); connection.setRequestMethod("POST"); try (var out = connection.getOutputStream()) { // Reality: objectMapper.writeValue(out, Map.of("some", "content")); out.write(body.getBytes(StandardCharsets.UTF_8)); } connection.connect(); } } |
This works, but the URL connection is not a great API to work with. Also, it does not support HTTP/2 out of the box (or even not at all). The recommended replacement is the Java HttpClient. The same behaviour like above but with the HttpClient looks like this, defaulting to HTTP/2 by default:
import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpRequest.BodyPublishers; import java.net.http.HttpResponse.BodyHandlers; import java.util.concurrent.Executors; class S2 { public static void main(String... a) throws Exception { var uri = URI.create("http://randomapi.io"); var body = "Hello"; try (var client = HttpClient.newBuilder().executor(Executors.newVirtualThreadPerTaskExecutor()).build()) { var response = client .send( HttpRequest.newBuilder(uri).POST(BodyPublishers.ofString(body)).build(), BodyHandlers.ofString()) .body(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } |
The request body is prepared by a BodyPublishers.ofString(body)
. There are quite a few others, such as from a byte array, files, reactive streams and more. What does not exist is a way to just get an output stream from the HTTP client one can write directly to, like the above example using URL connection. So assuming I am a Jackson user and I want to write some rather large JSON as request body, I would need to materialise this as a byte array in memory or deal with the shenanigans of ByteBuffer
, which I really didn’t feel like. Instead, I thought, hey, there’s the pair of PipedInputStream
and PipedOutputStream
, let’s use a body publisher from an input stream supplier like the following:
var publisher = HttpRequest.BodyPublishers.ofInputStream(() -> { var in = new PipedInputStream(); try (var out = new PipedOutputStream(in)) { outputStreamConsumer.accept(out); } catch (IOException e) { throw new UncheckedIOException(e); } return in; }); |
Easy, right? While the pair is made exactly for the above purpose, the usage is 100% wrong and the JavaDoc is quite explicit with this:
Attempting to use both objects from a single thread is not recommended, as it may deadlock the thread. The piped input stream contains a buffer, decoupling read operations from write operations, within limits.
I did actually read this, set this up like above nevertheless, tested it and it “worked”. I thought, yeah, the HttpClient is asynchronous under the hood anyway, let’s go. Stupid me, as I did use its blocking API and ofc, reading and writing did occur on the same thread of course. Why did it not show up in tests? Rookie mistake number two, not testing proper extreme, like 0, -1, Integer.MAX_VALUE, a lizard and null
. Within limits means: There’s a default buffer of 1024 bytes, any payload bigger than coming from anywhere into the request body would blow up.
Do this often enough, and you have a procedure that happily drains all the available threads from a system by deadlocking them, congratulations.
What would have prevented that? Taking a step down from mount stupid to begin with. A second pair of eyes. Better testing. Would AI have helped me here? I am unsure. Maybe in generating test input. On API usage: Most likely not. It’s all correct, and Java has no way in the language to make me aware. Maybe an IDE with a proper base model for semantic analysis can spot it, too. IntelliJ does not see it.
How did I solve it in the end? I was not happy using a byte[]
array, so I really wanted to have that pipe and a working solution looks like this:
import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.io.UncheckedIOException; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpRequest.BodyPublishers; import java.net.http.HttpResponse.BodyHandlers; import java.nio.charset.StandardCharsets; class S3 { public static void main(String... a) throws Exception { var uri = URI.create("http://randomapi.io"); var body = "Hello"; try (var client = HttpClient.newBuilder().build()) { var bodyPublisher = BodyPublishers.ofInputStream(() -> { var in = new PipedInputStream(); var out = new PipedOutputStream(); try { out.connect(in); } catch (IOException e) { throw new UncheckedIOException(e); } Thread.ofVirtual().start(() -> { try (out) { // Here the stream can be passed to Jackson or whatever you have // that's generating output out.write(body.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { throw new UncheckedIOException(e); } }); return in; }); var response = client .send( HttpRequest.newBuilder(uri).POST(bodyPublisher).build(), BodyHandlers.ofString() ).body(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } |
The important pieces here are:
- Create the input and output streams independently
- Connect them outside both the reading and writing thread
- Kick of the writing thread, and since we are on modern Java, just use a virtual thread for it. In the end, they are exactly for that purpose: To be used when there’s some potentially block things happening
- Just return the input stream to the HTTP Client, it will take care of using and later closing it.
- Close the output stream inside the writing thread (note the try-with-resources statement that “imports” the stream)
A runnable solution is below. I am using an explicitly declared main class here, so that I can have a proper Java script without the boiler plate of old. It brings up an HTTP server for testing as well, that just mirrors its input. It than uses both methods described above to POST to that server, get the response and asserts it. Use Java 24 to run it with java -ea --enable-preview Complete.java
.
Fun was had the last days.
import java.io.BufferedInputStream; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.io.UncheckedIOException; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpRequest.BodyPublishers; import java.net.http.HttpResponse.BodyHandlers; import java.nio.charset.StandardCharsets; import java.security.SecureRandom; import java.util.Base64; import java.util.concurrent.Executors; import com.sun.net.httpserver.HttpServer; void main() throws IOException { var port = 4711; var server = HttpServer.create(new InetSocketAddress(port), 0); try { server.createContext("/mirror", exchange -> { byte[] response; try (var in = new BufferedInputStream(exchange.getRequestBody())) { response = in.readAllBytes(); } exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, response.length); var outputStream = exchange.getResponseBody(); outputStream.write(response); outputStream.flush(); outputStream.close(); }); server.setExecutor(null); server.start(); var secureRandom = new SecureRandom(); var buffer = new byte[(int) Math.pow(2, 16)]; secureRandom.nextBytes(buffer); var requestBody = Base64.getUrlEncoder().withoutPadding().encodeToString(buffer); var uri = URI.create("http://localhost:%d/mirror".formatted(port)); var responseBody = useUrlConnection(uri, requestBody); assert responseBody.equals(requestBody); responseBody = useHttpClient(uri, requestBody); assert responseBody.equals(requestBody); } finally { server.stop(0); } } private static String useUrlConnection(URI uri, String body) throws IOException { var urlConnection = (HttpURLConnection) uri.toURL().openConnection(); urlConnection.setDoOutput(true); urlConnection.setRequestMethod("POST"); try (var out = urlConnection.getOutputStream()) { out.write(body.getBytes(StandardCharsets.UTF_8)); } urlConnection.connect(); try (var in = new BufferedInputStream(urlConnection.getInputStream())) { return new String(in.readAllBytes(), StandardCharsets.UTF_8); } } private static String useHttpClient(URI uri, String body) throws IOException { try (var client = HttpClient.newBuilder().executor(Executors.newVirtualThreadPerTaskExecutor()).build()) { return client.send(HttpRequest.newBuilder(uri).POST( BodyPublishers.ofInputStream(() -> { var in = new PipedInputStream(); //noinspection resource var out = new PipedOutputStream(); try { out.connect(in); } catch (IOException e) { throw new UncheckedIOException(e); } Thread.ofVirtual().start(() -> { try (out) { out.write(body.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { throw new UncheckedIOException(e); } }); return in; })).build(), BodyHandlers.ofString()).body(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e.getMessage()); } } |
No comments yet
Post a Comment