degmods.com/src/controllers/relay.ts
2024-09-18 08:21:03 +05:00

759 lines
25 KiB
TypeScript

import { Event, Filter, kinds, nip57, Relay } from 'nostr-tools'
import {
extractZapAmount,
log,
LogType,
normalizeWebSocketURL,
timeout
} from '../utils'
import { MetadataController, UserRelaysType } from './metadata'
/**
* Singleton class to manage relay operations.
*/
export class RelayController {
private static instance: RelayController
private events = new Map<string, Event>()
private debug = true
public connectedRelays: Relay[] = []
private constructor() {}
/**
* Provides the singleton instance of RelayController.
*
* @returns The singleton instance of RelayController.
*/
public static getInstance(): RelayController {
if (!RelayController.instance) {
RelayController.instance = new RelayController()
}
return RelayController.instance
}
public connectRelay = async (relayUrl: string) => {
const relay = this.connectedRelays.find(
(relay) =>
normalizeWebSocketURL(relay.url) === normalizeWebSocketURL(relayUrl)
)
if (relay) {
// already connected, skip
return relay
}
return await Relay.connect(relayUrl)
.then((relay) => {
log(this.debug, LogType.Info, `✅ nostr (${relayUrl}): Connected!`)
this.connectedRelays.push(relay)
return relay
})
.catch((err) => {
log(
this.debug,
LogType.Error,
`❌ nostr (${relayUrl}): Connection error!`,
err
)
return null
})
}
/**
* Publishes an event to multiple relays.
*
* This method establishes a connection to the application relay specified by
* an environment variable and a set of relays obtained from the
* `MetadataController`. It attempts to publish the event to all connected
* relays and returns a list of URLs of relays where the event was successfully
* published.
*
* If the process of finding relays or publishing the event takes too long,
* it handles the timeout to prevent blocking the operation.
*
* @param event - The event to be published.
* @param userHexKey - The user's hexadecimal public key, used to retrieve their relays.
* If not provided, the event's public key will be used.
* @param userRelaysType - The type of relays to be retrieved (e.g., write relays).
* Defaults to `UserRelaysType.Write`.
* @returns A promise that resolves to an array of URLs of relays where the event
* was published, or an empty array if no relays were connected or the
* event could not be published.
*/
publish = async (
event: Event,
userHexKey?: string,
userRelaysType?: UserRelaysType
): Promise<string[]> => {
// Connect to the application relay specified by an environment variable
const appRelayPromise = this.connectRelay(import.meta.env.VITE_APP_RELAY)
// TODO: Implement logic to retrieve relays using `window.nostr.getRelays()` once it becomes available in nostr-login.
// Retrieve an instance of MetadataController to find user relays
const metadataController = await MetadataController.getInstance()
// Retrieve the list of relays for the specified user's public key
const relayUrls = await metadataController.findUserRelays(
userHexKey || event.pubkey,
userRelaysType || UserRelaysType.Write
)
// Add admin relay URLs from the metadata controller to the list of relay URLs
metadataController.adminRelays.forEach((url) => {
relayUrls.push(url)
})
// Attempt to connect to all write relays obtained from MetadataController
const relayPromises = relayUrls.map((relayUrl) =>
this.connectRelay(relayUrl)
)
// Wait for all relay connection attempts to settle (either fulfilled or rejected)
await Promise.allSettled([appRelayPromise, ...relayPromises])
// If no relays are connected, log an error and return an empty array
if (this.connectedRelays.length === 0) {
log(this.debug, LogType.Error, 'No relay is connected!')
return []
}
const publishedOnRelays: string[] = [] // Track relays where the event was successfully published
// Create promises to publish the event to each connected relay
const publishPromises = this.connectedRelays.map((relay) => {
log(
this.debug,
LogType.Info,
`⬆️ nostr (${relay.url}): Sending event:`,
event
)
return Promise.race([
relay.publish(event), // Publish the event to the relay
timeout(30000) // Set a timeout to handle slow publishing operations
])
.then((res) => {
log(
this.debug,
LogType.Info,
`⬆️ nostr (${relay.url}): Publish result:`,
res
)
publishedOnRelays.push(relay.url) // Add successful relay URL to the list
})
.catch((err) => {
log(
this.debug,
LogType.Error,
`❌ nostr (${relay.url}): Publish error!`,
err
)
})
})
// Wait for all publish operations to complete (either fulfilled or rejected)
await Promise.allSettled(publishPromises)
if (publishedOnRelays.length > 0) {
// If the event was successfully published to any relays, check if it contains an `aTag`
// If the `aTag` is present, cache the event locally
const aTag = event.tags.find((item) => item[0] === 'a')
if (aTag && aTag[1]) {
this.events.set(aTag[1], event)
}
}
// Return the list of relay URLs where the event was successfully published
return publishedOnRelays
}
/**
* Publishes an encrypted DM to receiver's read relays.
*
* This method connects to the application relay and a set of receiver's read relays
* obtained from the `MetadataController`. It then publishes the event to
* all connected relays and returns a list of relays where the event was successfully published.
*
* @param event - The event to be published.
* @returns A promise that resolves to an array of URLs of relays where the event was published,
* or an empty array if no relays were connected or the event could not be published.
*/
publishDM = async (event: Event, receiver: string): Promise<string[]> => {
// Connect to the application relay specified by environment variable
const appRelayPromise = this.connectRelay(import.meta.env.VITE_APP_RELAY)
// todo: window.nostr.getRelays() is not implemented yet in nostr-login, implement the logic once its done
const metadataController = await MetadataController.getInstance()
// Retrieve the list of read relays for the receiver
const readRelayUrls = await metadataController.findUserRelays(
receiver,
UserRelaysType.Read
)
// push admin relay urls obtained from metadata controller to readRelayUrls list
metadataController.adminRelays.forEach((url) => {
readRelayUrls.push(url)
})
// Connect to all write relays obtained from MetadataController
const relayPromises = readRelayUrls.map((relayUrl) =>
this.connectRelay(relayUrl)
)
// Wait for all relay connections to settle (either fulfilled or rejected)
await Promise.allSettled([appRelayPromise, ...relayPromises])
// Check if any relays are connected; if not, log an error and return null
if (this.connectedRelays.length === 0) {
log(this.debug, LogType.Error, 'No relay is connected!')
return []
}
const publishedOnRelays: string[] = [] // List to track which relays successfully published the event
// Create a promise for publishing the event to each connected relay
const publishPromises = this.connectedRelays.map((relay) => {
log(
this.debug,
LogType.Info,
`⬆️ nostr (${relay.url}): Sending event:`,
event
)
return Promise.race([
relay.publish(event), // Publish the event to the relay
timeout(30000) // Set a timeout to handle cases where publishing takes too long
])
.then((res) => {
log(
this.debug,
LogType.Info,
`⬆️ nostr (${relay.url}): Publish result:`,
res
)
publishedOnRelays.push(relay.url) // Add the relay URL to the list of successfully published relays
})
.catch((err) => {
log(
this.debug,
LogType.Error,
`❌ nostr (${relay.url}): Publish error!`,
err
)
})
})
// Wait for all publish operations to complete (either fulfilled or rejected)
await Promise.allSettled(publishPromises)
// Return the list of relay URLs where the event was published
return publishedOnRelays
}
/**
* Publishes an event to multiple relays.
*
* This method establishes a connection to the application relay specified by
* an environment variable and a set of relays provided as argument.
* It attempts to publish the event to all connected relays
* and returns a list of URLs of relays where the event was successfully published.
*
* If the process of publishing the event takes too long,
* it handles the timeout to prevent blocking the operation.
*
* @param event - The event to be published.
* @param relayUrls - The array of relayUrl where event should be published
* @returns A promise that resolves to an array of URLs of relays where the event
* was published, or an empty array if no relays were connected or the
* event could not be published.
*/
publishOnRelays = async (
event: Event,
relayUrls: string[]
): Promise<string[]> => {
const appRelay = import.meta.env.VITE_APP_RELAY
if (!relayUrls.includes(appRelay)) {
/**
* NOTE: To avoid side-effects on external relayUrls array passed as argument
* re-assigned relayUrls with added sigit relay instead of just appending to same array
*/
relayUrls = [...relayUrls, appRelay] // Add app relay to relays array if not exists already
}
// connect to all specified relays
const relayPromises = relayUrls.map((relayUrl) =>
this.connectRelay(relayUrl)
)
// Use Promise.allSettled to wait for all promises to settle
const results = await Promise.allSettled(relayPromises)
// Extract non-null values from fulfilled promises in a single pass
const relays = results.reduce<Relay[]>((acc, result) => {
if (result.status === 'fulfilled') {
const value = result.value
if (value) {
acc.push(value)
}
}
return acc
}, [])
// Check if any relays are connected
if (relays.length === 0) {
log(this.debug, LogType.Error, 'No relay is connected!')
return []
}
const publishedOnRelays: string[] = [] // Track relays where the event was successfully published
// Create promises to publish the event to each connected relay
const publishPromises = this.connectedRelays.map((relay) => {
log(
this.debug,
LogType.Info,
`⬆️ nostr (${relay.url}): Sending event:`,
event
)
return Promise.race([
relay.publish(event), // Publish the event to the relay
timeout(30000) // Set a timeout to handle slow publishing operations
])
.then((res) => {
log(
this.debug,
LogType.Info,
`⬆️ nostr (${relay.url}): Publish result:`,
res
)
publishedOnRelays.push(relay.url) // Add successful relay URL to the list
})
.catch((err) => {
log(
this.debug,
LogType.Error,
`❌ nostr (${relay.url}): Publish error!`,
err
)
})
})
// Wait for all publish operations to complete (either fulfilled or rejected)
await Promise.allSettled(publishPromises)
if (publishedOnRelays.length > 0) {
// If the event was successfully published to any relays, check if it contains an `aTag`
// If the `aTag` is present, cache the event locally
const aTag = event.tags.find((item) => item[0] === 'a')
if (aTag && aTag[1]) {
this.events.set(aTag[1], event)
}
}
// Return the list of relay URLs where the event was successfully published
return publishedOnRelays
}
/**
* Asynchronously retrieves multiple event from a set of relays based on a provided filter.
* If no relays are specified, it defaults to using connected relays.
*
* @param {Filter} filter - The filter criteria to find the event.
* @param {string[]} [relays] - An optional array of relay URLs to search for the event.
* @returns {Promise<Event | null>} - Returns a promise that resolves to the found event or null if not found.
*/
fetchEvents = async (
filter: Filter,
relayUrls: string[] = []
): Promise<Event[]> => {
const relaySet = new Set<string>()
// add all the relays passed to relay set
relayUrls.forEach((relayUrl) => {
relaySet.add(relayUrl)
})
relaySet.add(import.meta.env.VITE_APP_RELAY)
const metadataController = await MetadataController.getInstance()
// add admin relays to relays array
metadataController.adminRelays.forEach((relayUrl) => {
relaySet.add(relayUrl)
})
relayUrls = Array.from(relaySet)
// Connect to all specified relays
const relayPromises = relayUrls.map((relayUrl) =>
this.connectRelay(relayUrl)
)
// Use Promise.allSettled to wait for all promises to settle
const results = await Promise.allSettled(relayPromises)
// Extract non-null values from fulfilled promises in a single pass
const relays = results.reduce<Relay[]>((acc, result) => {
if (result.status === 'fulfilled') {
const value = result.value
if (value) {
acc.push(value)
}
}
return acc
}, [])
// Check if any relays are connected
if (relays.length === 0) {
throw new Error('No relay is connected to fetch events!')
}
const events: Event[] = []
const eventIds = new Set<string>() // To keep track of event IDs and avoid duplicates
// Create a promise for each relay subscription
const subPromises = relays.map((relay) => {
return new Promise<void>((resolve) => {
// Subscribe to the relay with the specified filter
const sub = relay.subscribe([filter], {
// Handle incoming events
onevent: (e) => {
// Add the event to the array if it's not a duplicate
if (!eventIds.has(e.id)) {
eventIds.add(e.id) // Record the event ID
events.push(e) // Add the event to the array
}
},
// Handle the End-Of-Stream (EOSE) message
oneose: () => {
sub.close() // Close the subscription
resolve() // Resolve the promise when EOSE is received
}
})
})
})
// Wait for all subscriptions to complete
await Promise.allSettled(subPromises)
// It is possible that different relays will send different events and events array may contain more events then specified limit in filter
// To fix this issue we'll first sort these events and then return only limited events
if (filter.limit) {
// Sort events by creation date in descending order
events.sort((a, b) => b.created_at - a.created_at)
return events.slice(0, filter.limit)
}
return events
}
/**
* Asynchronously retrieves an event from a set of relays based on a provided filter.
* If no relays are specified, it defaults to using connected relays.
*
* @param {Filter} filter - The filter criteria to find the event.
* @param {string[]} [relays] - An optional array of relay URLs to search for the event.
* @returns {Promise<Event | null>} - Returns a promise that resolves to the found event or null if not found.
*/
fetchEvent = async (
filter: Filter,
relays: string[] = []
): Promise<Event | null> => {
// first check if event is present in cached map then return that
// otherwise query relays
if (filter['#a']) {
const aTag = filter['#a'][0]
const cachedEvent = this.events.get(aTag)
if (cachedEvent) return cachedEvent
}
const events = await this.fetchEvents(filter, relays)
// Sort events by creation date in descending order
events.sort((a, b) => b.created_at - a.created_at)
if (events.length > 0) {
const event = events[0]
// if the aTag was specified in filter then cache the fetched event before returning
if (filter['#a']) {
const aTag = filter['#a'][0]
this.events.set(aTag, event)
}
// return the event
return event
}
// return null if event array is empty
return null
}
/**
* Asynchronously retrieves multiple events from the user's relays based on a specified filter.
* The function first retrieves the user's relays, and then fetches the events using the provided filter.
*
* @param filter - The event filter to use when fetching the event (e.g., kinds, authors).
* @param hexKey - The hexadecimal representation of the user's public key.
* @param userRelaysType - The type of relays to search (e.g., write, read).
* @returns A promise that resolves with an array of events.
*/
fetchEventsFromUserRelays = async (
filter: Filter,
hexKey: string,
userRelaysType: UserRelaysType
): Promise<Event[]> => {
// Get an instance of the MetadataController, which manages user metadata and relays
const metadataController = await MetadataController.getInstance()
// Find the user's relays using the MetadataController.
const relayUrls = await metadataController.findUserRelays(
hexKey,
userRelaysType
)
// Fetch the event from the user's relays using the provided filter and relay URLs
return this.fetchEvents(filter, relayUrls)
}
/**
* Fetches an event from the user's relays based on a specified filter.
* The function first retrieves the user's relays, and then fetches the event using the provided filter.
*
* @param filter - The event filter to use when fetching the event (e.g., kinds, authors).
* @param hexKey - The hexadecimal representation of the user's public key.
* @param userRelaysType - The type of relays to search (e.g., write, read).
* @returns A promise that resolves to the fetched event or null if the operation fails.
*/
fetchEventFromUserRelays = async (
filter: Filter,
hexKey: string,
userRelaysType: UserRelaysType
): Promise<Event | null> => {
// first check if event is present in cached map then return that
// otherwise query relays
if (filter['#a']) {
const aTag = filter['#a'][0]
const cachedEvent = this.events.get(aTag)
if (cachedEvent) return cachedEvent
}
const events = await this.fetchEventsFromUserRelays(
filter,
hexKey,
userRelaysType
)
// Sort events by creation date in descending order
events.sort((a, b) => b.created_at - a.created_at)
if (events.length > 0) {
const event = events[0]
// if the aTag was specified in filter then cache the fetched event before returning
if (filter['#a']) {
const aTag = filter['#a'][0]
this.events.set(aTag, event)
}
// return the event
return event
}
// return null if event array is empty
return null
}
/**
* Subscribes to events from multiple relays.
*
* This method connects to the specified relay URLs and subscribes to events
* using the provided filter. It handles incoming events through the given
* `eventHandler` callback and manages the subscription lifecycle.
*
* @param filter - The filter criteria to apply when subscribing to events.
* @param relayUrls - An optional array of relay URLs to connect to. The default relay URL (`APP_RELAY`) is added automatically.
* @param eventHandler - A callback function to handle incoming events. It receives an `Event` object.
*
*/
subscribeForEvents = async (
filter: Filter,
relayUrls: string[] = [],
eventHandler: (event: Event) => void
) => {
const appRelay = import.meta.env.VITE_APP_RELAY
if (!relayUrls.includes(appRelay)) {
/**
* NOTE: To avoid side-effects on external relayUrls array passed as argument
* re-assigned relayUrls with added sigit relay instead of just appending to same array
*/
relayUrls = [...relayUrls, appRelay] // Add app relay to relays array if not exists already
}
// connect to all specified relays
const relayPromises = relayUrls.map((relayUrl) =>
this.connectRelay(relayUrl)
)
// Use Promise.allSettled to wait for all promises to settle
const results = await Promise.allSettled(relayPromises)
// Extract non-null values from fulfilled promises in a single pass
const relays = results.reduce<Relay[]>((acc, result) => {
if (result.status === 'fulfilled') {
const value = result.value
if (value) {
acc.push(value)
}
}
return acc
}, [])
// Check if any relays are connected
if (relays.length === 0) {
throw new Error('No relay is connected to fetch events!')
}
const processedEvents: string[] = [] // To keep track of processed events
// Create a promise for each relay subscription
const subscriptions = relays.map((relay) =>
relay.subscribe([filter], {
// Handle incoming events
onevent: (e) => {
// Process event only if it hasn't been processed before
if (!processedEvents.includes(e.id)) {
processedEvents.push(e.id)
eventHandler(e) // Call the event handler with the event
}
}
})
)
return subscriptions
}
getTotalZapAmount = async (
user: string,
eTag: string,
aTag?: string,
currentLoggedInUser?: string
) => {
const metadataController = await MetadataController.getInstance()
const relayUrls = await metadataController.findUserRelays(
user,
UserRelaysType.Read
)
const appRelay = import.meta.env.VITE_APP_RELAY
if (!relayUrls.includes(appRelay)) {
relayUrls.push(appRelay)
}
// Connect to all specified relays
const relayPromises = relayUrls.map((relayUrl) =>
this.connectRelay(relayUrl)
)
// Use Promise.allSettled to wait for all promises to settle
const results = await Promise.allSettled(relayPromises)
// Extract non-null values from fulfilled promises in a single pass
const relays = results.reduce<Relay[]>((acc, result) => {
if (result.status === 'fulfilled') {
const value = result.value
if (value) {
acc.push(value)
}
}
return acc
}, [])
let accumulatedZapAmount = 0
let hasZapped = false
const eventIds = new Set<string>() // To keep track of event IDs and avoid duplicates
const filters: Filter[] = [
{
kinds: [kinds.Zap],
'#e': [eTag]
}
]
if (aTag) {
filters.push({
kinds: [kinds.Zap],
'#a': [aTag]
})
}
// Create a promise for each relay subscription
const subPromises = relays.map((relay) => {
return new Promise<void>((resolve) => {
// Subscribe to the relay with the specified filter
const sub = relay.subscribe(filters, {
// Handle incoming events
onevent: (e) => {
// Add the event to the array if it's not a duplicate
if (!eventIds.has(e.id)) {
eventIds.add(e.id) // Record the event ID
const zapRequestStr = e.tags.find(
(t) => t[0] === 'description'
)?.[1]
if (!zapRequestStr) return
const error = nip57.validateZapRequest(zapRequestStr)
if (error) return
let zapRequest: Event | null = null
try {
zapRequest = JSON.parse(zapRequestStr)
} catch (error) {
log(
true,
LogType.Error,
'Error occurred in parsing zap request',
error
)
}
if (!zapRequest) return
const amount = extractZapAmount(zapRequest)
accumulatedZapAmount += amount
if (amount > 0) {
if (!hasZapped) {
hasZapped = zapRequest.pubkey === currentLoggedInUser
}
}
}
},
// Handle the End-Of-Stream (EOSE) message
oneose: () => {
sub.close() // Close the subscription
resolve() // Resolve the promise when EOSE is received
}
})
})
})
// Wait for all subscriptions to complete
await Promise.allSettled(subPromises)
return {
accumulatedZapAmount,
hasZapped
}
}
}