001package io.jstach.rainbowgum.rabbitmq;
002
003import java.io.IOException;
004import java.io.UncheckedIOException;
005import java.net.URI;
006import java.util.LinkedHashMap;
007import java.util.Map;
008import java.util.concurrent.TimeoutException;
009import java.util.concurrent.locks.ReentrantReadWriteLock;
010import java.util.function.Function;
011
012import org.eclipse.jdt.annotation.Nullable;
013
014import com.rabbitmq.client.AMQP.BasicProperties;
015import com.rabbitmq.client.AlreadyClosedException;
016import com.rabbitmq.client.Channel;
017import com.rabbitmq.client.Connection;
018import com.rabbitmq.client.ConnectionFactory;
019
020import io.jstach.rainbowgum.LogConfig;
021import io.jstach.rainbowgum.LogEncoder.BufferHints;
022import io.jstach.rainbowgum.LogEvent;
023import io.jstach.rainbowgum.LogOutput;
024import io.jstach.rainbowgum.LogProperties;
025import io.jstach.rainbowgum.MetaLog;
026import io.jstach.rainbowgum.annotation.GeneratedByATrustedSource;
027import io.jstach.rainbowgum.annotation.LogConfigurable;
028
029/**
030 * RabbitMQ Output that will write publish messages to a given exchange with a given
031 * routing key.
032 */
033public final class RabbitMQOutput implements LogOutput {
034
035        private final URI uri;
036
037        private final ConnectionFactory connectionFactory;
038
039        private Connection connection;
040
041        private volatile Channel channel;
042
043        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
044
045        private final @Nullable String appId;
046
047        private final String exchange;
048
049        private final Function<LogEvent, String> routingKeyFunction;
050
051        private final String connectionName;
052
053        private final boolean declareExchange;
054
055        private final String exchangeType;
056
057        /**
058         * The rabbitmq URI scheme for configuration.
059         */
060        public final static String URI_SCHEME = "amqp";
061
062        /**
063         * Default exchange.
064         */
065        public final static String DEFAULT_EXCHANGE = "logging";
066
067        /**
068         * Default exchange type for declaration.
069         */
070        public final static String DEFAULT_EXCHANGE_TYPE = "topic";
071
072        RabbitMQOutput(URI uri, ConnectionFactory connectionFactory, @Nullable String appId, String exchange,
073                        Function<LogEvent, String> routingKeyFunction, String connectionName, boolean declareExchange,
074                        String exchangeType) {
075                super();
076                this.uri = uri;
077                this.connectionFactory = connectionFactory;
078                this.appId = appId;
079                this.exchange = exchange;
080                this.routingKeyFunction = routingKeyFunction;
081                this.connectionName = connectionName;
082                this.declareExchange = declareExchange;
083                this.exchangeType = exchangeType;
084        }
085
086        /**
087         * Creates a RabbitMQOutput.
088         * @param name used to resolve config and give the output a name.
089         * @param uri passed to the rabbitmq connection factory.
090         * @param exchange exchange to send messages to.
091         * @param routingKey the logging event level will be used by default.
092         * @param declareExchange declare exchange on start. Default is false.
093         * @param host host.
094         * @param username set user name if not null outside of URI.
095         * @param password set password if not null outside of URI.
096         * @param port set port if not null.
097         * @param appId sets the message appId if not null.
098         * @param connectionName connection name if not null.
099         * @param exchangeType exchange type like "topic" covered in rabbitmq doc.
100         * @param virtualHost sets virtualhost if not null.
101         * @return output.
102         */
103        @LogConfigurable(prefix = LogProperties.OUTPUT_PREFIX)
104        static RabbitMQOutput of( //
105                        @LogConfigurable.KeyParameter String name, //
106                        @Nullable URI uri, //
107                        @LogConfigurable.DefaultParameter("DEFAULT_EXCHANGE") String exchange, //
108                        @LogConfigurable.ConvertParameter("toRoutingKeyFunction") @Nullable Function<LogEvent, String> routingKey, //
109                        @Nullable Boolean declareExchange, //
110                        @Nullable String host, //
111                        @Nullable String username, //
112                        @Nullable String password, //
113                        @Nullable Integer port, //
114                        @Nullable String appId, //
115                        @Nullable String connectionName, //
116                        @LogConfigurable.DefaultParameter("DEFAULT_EXCHANGE_TYPE") @Nullable String exchangeType, //
117                        @Nullable String virtualHost) {
118                connectionName = connectionName == null ? "rainbowgumOutput" : connectionName;
119                declareExchange = declareExchange == null ? false : declareExchange;
120                exchangeType = exchangeType == null ? DEFAULT_EXCHANGE_TYPE : exchangeType;
121                ConnectionFactory factory = new ConnectionFactory();
122                if (uri != null) {
123                        try {
124                                factory.setUri(uri);
125                        }
126                        catch (Exception e) {
127                                throw new RuntimeException(e);
128                        }
129                }
130                if (username != null) {
131                        factory.setUsername(username);
132                }
133                if (password != null) {
134                        factory.setPassword(password);
135                }
136                if (port != null) {
137                        factory.setPort(port);
138                }
139                if (host != null) {
140                        factory.setHost(host);
141                }
142                if (virtualHost != null) {
143                        factory.setVirtualHost(virtualHost);
144                }
145                Function<LogEvent, String> routingKeyFunction;
146                if (routingKey != null) {
147                        routingKeyFunction = routingKey;
148                }
149                else {
150                        routingKeyFunction = e -> e.level().name();
151                }
152                return new RabbitMQOutput(uri, factory, appId, exchange, routingKeyFunction, connectionName, declareExchange,
153                                exchangeType);
154        }
155
156        static Function<LogEvent, String> toRoutingKeyFunction(String routingKey) {
157                return e -> routingKey;
158        }
159
160        @Override
161        public void start(LogConfig config) {
162                lock.writeLock().lock();
163                try {
164                        this.connection = connectionFactory.newConnection(connectionName);
165                        if (declareExchange) {
166                                var channel = this.connection.createChannel();
167                                channel.exchangeDeclare(exchange, exchangeType);
168                        }
169                }
170                catch (IOException e) {
171                        throw new UncheckedIOException(e);
172                }
173                catch (TimeoutException e) {
174                        throw new RuntimeException(e);
175                }
176                finally {
177                        lock.writeLock().unlock();
178                }
179        }
180
181        @Override
182        public URI uri() {
183                return this.uri;
184        }
185
186        @Override
187        public void write(LogEvent event, byte[] bytes, int off, int len, ContentType contentType) {
188                // https://github.com/rabbitmq/rabbitmq-java-client/issues/422
189                byte[] copy = new byte[len];
190                System.arraycopy(bytes, off, copy, 0, len);
191                write(event, bytes, contentType);
192        }
193
194        @Override
195        public void write(LogEvent event, byte[] bytes, ContentType contentType) {
196                if (checkReentry(event)) {
197                        return;
198                }
199                BasicProperties props = properties(event, contentType);
200                byte[] body = bytes;
201                try {
202                        var c = channel();
203                        c.basicPublish(exchange, routingKeyFunction.apply(event), props, body);
204                }
205                catch (IOException e) {
206                        MetaLog.error(RabbitMQOutput.class, e);
207                        lock.writeLock().lock();
208                        try {
209                                this.channel = null;
210                        }
211                        finally {
212                                lock.writeLock().unlock();
213                        }
214                }
215        }
216
217        // This is to exclude this code from code coverage as it is not possible with current
218        // RabbitMQ client.
219        @GeneratedByATrustedSource
220        // TODO make this generic and add to MetaLog.
221        private static boolean checkReentry(LogEvent event) {
222                if (event.loggerName().startsWith("com.rabbitmq.client")) {
223                        StringBuilder sb = new StringBuilder();
224                        event.formattedMessage(sb);
225                        String docUrl = MetaLog.documentBaseUrl() + "/#appender_reentry";
226                        MetaLog.error(RabbitMQOutput.class, "RabbitMQ attempted to recursively log. File a bug. See: " + docUrl,
227                                        new Exception(sb.toString()));
228                        return true;
229                }
230                return false;
231        }
232
233        private BasicProperties properties(LogEvent event, ContentType contentType) {
234                var builder = new BasicProperties.Builder().contentType(contentType.contentType()).appId(appId);
235                var kvs = event.keyValues();
236                if (!kvs.isEmpty()) {
237                        Map<String, Object> headers = new LinkedHashMap<>(kvs.size());
238                        kvs.forEach(headers::put);
239                        builder.headers(headers);
240                }
241                if (appId != null) {
242                        builder.appId(appId);
243                }
244                return builder.build();
245        }
246
247        Channel channel() throws IOException {
248                var c = this.channel;
249                if (c == null) {
250                        lock.writeLock().lock();
251                        try {
252                                c = this.channel = connection.createChannel();
253                                if (c == null) {
254                                        throw new IOException("channel is unavailable");
255                                }
256                                return c;
257                        }
258                        finally {
259                                lock.writeLock().unlock();
260                        }
261                }
262                return c;
263        }
264
265        @Override
266        public void flush() {
267
268        }
269
270        @Override
271        public BufferHints bufferHints() {
272                return WriteMethod.BYTES;
273        }
274
275        @Override
276        public OutputType type() {
277                return OutputType.NETWORK;
278        }
279
280        @Override
281        public void close() {
282                lock.writeLock().lock();
283                try {
284                        var c = this.channel;
285                        var conn = this.connection;
286                        if (c != null) {
287                                try {
288                                        c.close();
289                                }
290                                catch (AlreadyClosedException ae) {
291                                        // do nothing.
292                                }
293                                catch (IOException | TimeoutException e) {
294                                        MetaLog.error(getClass(), e);
295                                }
296                        }
297                        if (conn != null) {
298                                try {
299                                        conn.close();
300                                }
301                                catch (AlreadyClosedException ae) {
302                                        // do nothing.
303                                }
304                                catch (IOException e) {
305                                        MetaLog.error(getClass(), e);
306                                }
307                        }
308                        this.channel = null;
309                        this.connection = null;
310                }
311                finally {
312                        lock.writeLock().unlock();
313                }
314
315        }
316
317}