2024-11-25 16:53:40 -06:00

143 lines
4.7 KiB
Java

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package websocket.chat;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.ServerEndpoint;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import util.HTMLFilter;
@ServerEndpoint(value = "/websocket/chat")
public class ChatAnnotation {
private static final Log log = LogFactory.getLog(ChatAnnotation.class);
private static final String GUEST_PREFIX = "Guest";
private static final AtomicInteger connectionIds = new AtomicInteger(0);
private static final Set<ChatAnnotation> connections = new CopyOnWriteArraySet<>();
private final String nickname;
private Session session;
/*
* The queue of messages that may build up while another message is being sent. The thread that sends a message is
* responsible for clearing any queue that builds up while that message is being sent.
*/
private Queue<String> messageBacklog = new ArrayDeque<>();
private boolean messageInProgress = false;
public ChatAnnotation() {
nickname = GUEST_PREFIX + connectionIds.getAndIncrement();
}
@OnOpen
public void start(Session session) {
this.session = session;
connections.add(this);
String message = String.format("* %s %s", nickname, "has joined.");
broadcast(message);
}
@OnClose
public void end() {
connections.remove(this);
String message = String.format("* %s %s", nickname, "has disconnected.");
broadcast(message);
}
@OnMessage
public void incoming(String message) {
// Never trust the client
String filteredMessage = String.format("%s: %s", nickname, HTMLFilter.filter(message.toString()));
broadcast(filteredMessage);
}
@OnError
public void onError(Throwable t) throws Throwable {
log.error("Chat Error: " + t.toString(), t);
}
/*
* synchronized blocks are limited to operations that are expected to be quick. More specifically, messages are not
* sent from within a synchronized block.
*/
private void sendMessage(String msg) throws IOException {
synchronized (this) {
if (messageInProgress) {
messageBacklog.add(msg);
return;
} else {
messageInProgress = true;
}
}
boolean queueHasMessagesToBeSent = true;
String messageToSend = msg;
do {
session.getBasicRemote().sendText(messageToSend);
synchronized (this) {
messageToSend = messageBacklog.poll();
if (messageToSend == null) {
messageInProgress = false;
queueHasMessagesToBeSent = false;
}
}
} while (queueHasMessagesToBeSent);
}
private static void broadcast(String msg) {
for (ChatAnnotation client : connections) {
try {
client.sendMessage(msg);
} catch (IOException e) {
log.debug("Chat Error: Failed to send message to client", e);
if (connections.remove(client)) {
try {
client.session.close();
} catch (IOException e1) {
// Ignore
}
String message = String.format("* %s %s", client.nickname, "has been disconnected.");
broadcast(message);
}
}
}
}
}