001package io.jstach.rainbowgum.rabbitmq; 002 003import java.io.IOException; 004import java.io.UncheckedIOException; 005import java.net.URI; 006import java.util.LinkedHashMap; 007import java.util.Map; 008import java.util.concurrent.TimeoutException; 009import java.util.concurrent.locks.ReentrantReadWriteLock; 010import java.util.function.Function; 011 012import org.eclipse.jdt.annotation.Nullable; 013 014import com.rabbitmq.client.AMQP.BasicProperties; 015import com.rabbitmq.client.AlreadyClosedException; 016import com.rabbitmq.client.Channel; 017import com.rabbitmq.client.Connection; 018import com.rabbitmq.client.ConnectionFactory; 019 020import io.jstach.rainbowgum.LogConfig; 021import io.jstach.rainbowgum.LogEncoder.BufferHints; 022import io.jstach.rainbowgum.LogEvent; 023import io.jstach.rainbowgum.LogOutput; 024import io.jstach.rainbowgum.LogProperties; 025import io.jstach.rainbowgum.MetaLog; 026import io.jstach.rainbowgum.annotation.GeneratedByATrustedSource; 027import io.jstach.rainbowgum.annotation.LogConfigurable; 028 029/** 030 * RabbitMQ Output that will write publish messages to a given exchange with a given 031 * routing key. 032 */ 033public final class RabbitMQOutput implements LogOutput { 034 035 private final URI uri; 036 037 private final ConnectionFactory connectionFactory; 038 039 private Connection connection; 040 041 private volatile Channel channel; 042 043 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 044 045 private final @Nullable String appId; 046 047 private final String exchange; 048 049 private final Function<LogEvent, String> routingKeyFunction; 050 051 private final String connectionName; 052 053 private final boolean declareExchange; 054 055 private final String exchangeType; 056 057 /** 058 * The rabbitmq URI scheme for configuration. 059 */ 060 public final static String URI_SCHEME = "amqp"; 061 062 /** 063 * Default exchange. 064 */ 065 public final static String DEFAULT_EXCHANGE = "logging"; 066 067 /** 068 * Default exchange type for declaration. 069 */ 070 public final static String DEFAULT_EXCHANGE_TYPE = "topic"; 071 072 RabbitMQOutput(URI uri, ConnectionFactory connectionFactory, @Nullable String appId, String exchange, 073 Function<LogEvent, String> routingKeyFunction, String connectionName, boolean declareExchange, 074 String exchangeType) { 075 super(); 076 this.uri = uri; 077 this.connectionFactory = connectionFactory; 078 this.appId = appId; 079 this.exchange = exchange; 080 this.routingKeyFunction = routingKeyFunction; 081 this.connectionName = connectionName; 082 this.declareExchange = declareExchange; 083 this.exchangeType = exchangeType; 084 } 085 086 /** 087 * Creates a RabbitMQOutput. 088 * @param name used to resolve config and give the output a name. 089 * @param uri passed to the rabbitmq connection factory. 090 * @param exchange exchange to send messages to. 091 * @param routingKey the logging event level will be used by default. 092 * @param declareExchange declare exchange on start. Default is false. 093 * @param host host. 094 * @param username set user name if not null outside of URI. 095 * @param password set password if not null outside of URI. 096 * @param port set port if not null. 097 * @param appId sets the message appId if not null. 098 * @param connectionName connection name if not null. 099 * @param exchangeType exchange type like "topic" covered in rabbitmq doc. 100 * @param virtualHost sets virtualhost if not null. 101 * @return output. 102 */ 103 @LogConfigurable(prefix = LogProperties.OUTPUT_PREFIX) 104 static RabbitMQOutput of( // 105 @LogConfigurable.KeyParameter String name, // 106 @Nullable URI uri, // 107 @LogConfigurable.DefaultParameter("DEFAULT_EXCHANGE") String exchange, // 108 @LogConfigurable.ConvertParameter("toRoutingKeyFunction") @Nullable Function<LogEvent, String> routingKey, // 109 @Nullable Boolean declareExchange, // 110 @Nullable String host, // 111 @Nullable String username, // 112 @Nullable String password, // 113 @Nullable Integer port, // 114 @Nullable String appId, // 115 @Nullable String connectionName, // 116 @LogConfigurable.DefaultParameter("DEFAULT_EXCHANGE_TYPE") @Nullable String exchangeType, // 117 @Nullable String virtualHost) { 118 connectionName = connectionName == null ? "rainbowgumOutput" : connectionName; 119 declareExchange = declareExchange == null ? false : declareExchange; 120 exchangeType = exchangeType == null ? DEFAULT_EXCHANGE_TYPE : exchangeType; 121 ConnectionFactory factory = new ConnectionFactory(); 122 if (uri != null) { 123 try { 124 factory.setUri(uri); 125 } 126 catch (Exception e) { 127 throw new RuntimeException(e); 128 } 129 } 130 if (username != null) { 131 factory.setUsername(username); 132 } 133 if (password != null) { 134 factory.setPassword(password); 135 } 136 if (port != null) { 137 factory.setPort(port); 138 } 139 if (host != null) { 140 factory.setHost(host); 141 } 142 if (virtualHost != null) { 143 factory.setVirtualHost(virtualHost); 144 } 145 Function<LogEvent, String> routingKeyFunction; 146 if (routingKey != null) { 147 routingKeyFunction = routingKey; 148 } 149 else { 150 routingKeyFunction = e -> e.level().name(); 151 } 152 return new RabbitMQOutput(uri, factory, appId, exchange, routingKeyFunction, connectionName, declareExchange, 153 exchangeType); 154 } 155 156 static Function<LogEvent, String> toRoutingKeyFunction(String routingKey) { 157 return e -> routingKey; 158 } 159 160 @Override 161 public void start(LogConfig config) { 162 lock.writeLock().lock(); 163 try { 164 this.connection = connectionFactory.newConnection(connectionName); 165 if (declareExchange) { 166 var channel = this.connection.createChannel(); 167 channel.exchangeDeclare(exchange, exchangeType); 168 } 169 } 170 catch (IOException e) { 171 throw new UncheckedIOException(e); 172 } 173 catch (TimeoutException e) { 174 throw new RuntimeException(e); 175 } 176 finally { 177 lock.writeLock().unlock(); 178 } 179 } 180 181 @Override 182 public URI uri() { 183 return this.uri; 184 } 185 186 @Override 187 public void write(LogEvent event, byte[] bytes, int off, int len, ContentType contentType) { 188 // https://github.com/rabbitmq/rabbitmq-java-client/issues/422 189 byte[] copy = new byte[len]; 190 System.arraycopy(bytes, off, copy, 0, len); 191 write(event, bytes, contentType); 192 } 193 194 @Override 195 public void write(LogEvent event, byte[] bytes, ContentType contentType) { 196 if (checkReentry(event)) { 197 return; 198 } 199 BasicProperties props = properties(event, contentType); 200 byte[] body = bytes; 201 try { 202 var c = channel(); 203 c.basicPublish(exchange, routingKeyFunction.apply(event), props, body); 204 } 205 catch (IOException e) { 206 MetaLog.error(RabbitMQOutput.class, e); 207 lock.writeLock().lock(); 208 try { 209 this.channel = null; 210 } 211 finally { 212 lock.writeLock().unlock(); 213 } 214 } 215 } 216 217 // This is to exclude this code from code coverage as it is not possible with current 218 // RabbitMQ client. 219 @GeneratedByATrustedSource 220 // TODO make this generic and add to MetaLog. 221 private static boolean checkReentry(LogEvent event) { 222 if (event.loggerName().startsWith("com.rabbitmq.client")) { 223 StringBuilder sb = new StringBuilder(); 224 event.formattedMessage(sb); 225 String docUrl = MetaLog.documentBaseUrl() + "/#appender_reentry"; 226 MetaLog.error(RabbitMQOutput.class, "RabbitMQ attempted to recursively log. File a bug. See: " + docUrl, 227 new Exception(sb.toString())); 228 return true; 229 } 230 return false; 231 } 232 233 private BasicProperties properties(LogEvent event, ContentType contentType) { 234 var builder = new BasicProperties.Builder().contentType(contentType.contentType()).appId(appId); 235 var kvs = event.keyValues(); 236 if (!kvs.isEmpty()) { 237 Map<String, Object> headers = new LinkedHashMap<>(kvs.size()); 238 kvs.forEach(headers::put); 239 builder.headers(headers); 240 } 241 if (appId != null) { 242 builder.appId(appId); 243 } 244 return builder.build(); 245 } 246 247 Channel channel() throws IOException { 248 var c = this.channel; 249 if (c == null) { 250 lock.writeLock().lock(); 251 try { 252 c = this.channel = connection.createChannel(); 253 if (c == null) { 254 throw new IOException("channel is unavailable"); 255 } 256 return c; 257 } 258 finally { 259 lock.writeLock().unlock(); 260 } 261 } 262 return c; 263 } 264 265 @Override 266 public void flush() { 267 268 } 269 270 @Override 271 public BufferHints bufferHints() { 272 return WriteMethod.BYTES; 273 } 274 275 @Override 276 public OutputType type() { 277 return OutputType.NETWORK; 278 } 279 280 @Override 281 public void close() { 282 lock.writeLock().lock(); 283 try { 284 var c = this.channel; 285 var conn = this.connection; 286 if (c != null) { 287 try { 288 c.close(); 289 } 290 catch (AlreadyClosedException ae) { 291 // do nothing. 292 } 293 catch (IOException | TimeoutException e) { 294 MetaLog.error(getClass(), e); 295 } 296 } 297 if (conn != null) { 298 try { 299 conn.close(); 300 } 301 catch (AlreadyClosedException ae) { 302 // do nothing. 303 } 304 catch (IOException e) { 305 MetaLog.error(getClass(), e); 306 } 307 } 308 this.channel = null; 309 this.connection = null; 310 } 311 finally { 312 lock.writeLock().unlock(); 313 } 314 315 } 316 317}