Merge 9d2df2e20579a0dc62936849dbdd8ac9484dd83b into 5cfe294063c9317928d8da3387004e3eaddc991a

This commit is contained in:
syeopite 2025-10-21 13:52:38 +05:30 committed by GitHub
commit 93f5f9e9c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 18 deletions

View File

@ -119,11 +119,11 @@ def create_notification_stream(env, topics, connection_channel)
end
end
spawn do
begin
heartbeat_timer = 0.seconds
loop do
event = connection.receive
select
when event = connection.receive
notification = JSON.parse(event.payload)
topic = notification["topic"].as_s
video_id = notification["videoId"].as_s
@ -139,27 +139,20 @@ def create_notification_stream(env, topics, connection_channel)
env.response.puts "id: #{id}"
env.response.puts "data: #{response.to_json}"
env.response.puts
env.response.flush
id += 1
end
rescue ex
ensure
connection_channel.send({false, connection})
end
when timeout heartbeat_timer
# Send heartbeat on every timeout
env.response.puts ":keepalive #{Time.utc.to_unix}"
end
begin
# Send heartbeat
loop do
env.response.puts ":keepalive #{Time.utc.to_unix}"
heartbeat_timer = ((20 + rand(11)).seconds)
env.response.puts
env.response.flush
sleep (20 + rand(11)).seconds
end
rescue ex
ensure
connection.close
connection_channel.send({false, connection})
end
end

View File

@ -32,7 +32,13 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
def begin
connections = [] of ::Channel(PQ::Notification)
PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) }
PG.connect_listen(pg_url, "notifications") do |event|
connections.each do |channel|
channel.send(event)
rescue ::Channel::ClosedError
# Notification stream was closed.
end
end
# hash of channels to their videos (id+published) that need notifying
to_notify = Hash(String, Set(VideoNotification)).new(