diff --git a/src/app/api/neatqueue-webhook/route.ts b/src/app/api/neatqueue-webhook/route.ts
index c896a2d..ac09cc3 100644
--- a/src/app/api/neatqueue-webhook/route.ts
+++ b/src/app/api/neatqueue-webhook/route.ts
@@ -1,4 +1,10 @@
import crypto from 'node:crypto'
+import { globalEmitter } from '@/lib/events'
+import { syncHistory } from '@/server/api/routers/history'
+import type { PlayerState } from '@/server/api/routers/player-state'
+import { PLAYER_STATE_KEY, redis } from '@/server/redis'
+import { leaderboardService } from '@/server/services/leaderboard'
+import { RANKED_CHANNEL, VANILLA_CHANNEL } from '@/shared/constants'
import { type NextRequest, NextResponse } from 'next/server'
const EXPECTED_QUERY_SECRET = process.env.WEBHOOK_QUERY_SECRET
@@ -42,8 +48,6 @@ function verifyQuerySecret(req: NextRequest): boolean {
* Verifies query secret, logs payload, and handles actions.
*/
export async function POST(req: NextRequest) {
- let payload: any
-
try {
const isVerified = verifyQuerySecret(req)
if (!isVerified) {
@@ -54,7 +58,66 @@ export async function POST(req: NextRequest) {
)
}
- payload = await req.json()
+ const payload = await req.json()
+
+ switch (payload.action) {
+ case 'JOIN_QUEUE': {
+ const state: PlayerState = {
+ status: 'queuing',
+ queueStartTime: Date.now(),
+ }
+ const userId = payload.new_players[0].id
+ console.log('-----JOIN QUEUE-----')
+ console.dir(payload, { depth: null })
+ console.log(userId)
+ await redis.set(PLAYER_STATE_KEY(userId), JSON.stringify(state))
+ globalEmitter.emit(`state-change:${userId}`, state)
+ break
+ }
+
+ case 'MATCH_STARTED': {
+ const playerIds = payload.players.map((p: any) => p.id) as string[]
+
+ await Promise.all(
+ playerIds.map(async (id) => {
+ const state = {
+ status: 'in_game',
+ currentMatch: {
+ opponentId: playerIds.find((p) => p !== id),
+ startTime: Date.now(),
+ },
+ }
+ await redis.set(PLAYER_STATE_KEY(id), JSON.stringify(state))
+ globalEmitter.emit(`state-change:${id}`, state)
+ })
+ )
+ break
+ }
+
+ case 'MATCH_COMPLETED': {
+ const playerIds = payload.teams.map((p: any) => p[0].id) as string[]
+ console.log({ playerIds })
+ await syncHistory()
+ if ([RANKED_CHANNEL, VANILLA_CHANNEL].includes(payload.channel)) {
+ await leaderboardService.refreshLeaderboard(payload.channel)
+ }
+ await Promise.all(
+ playerIds.map(async (id) => {
+ await redis.del(PLAYER_STATE_KEY(id))
+ globalEmitter.emit(`state-change:${id}`, { status: 'idle' })
+ })
+ ).catch(console.error)
+
+ break
+ }
+
+ case 'LEAVE_QUEUE': {
+ const userId = payload.players_removed[0].id
+ await redis.del(PLAYER_STATE_KEY(userId))
+ globalEmitter.emit(`state-change:${userId}`, { status: 'idle' })
+ break
+ }
+ }
console.log(
'--- Verified Webhook Received (Query Auth) ---',
diff --git a/src/app/api/refresh-leaderboard/route.ts b/src/app/api/refresh-leaderboard/route.ts
index 278d30f..947cede 100644
--- a/src/app/api/refresh-leaderboard/route.ts
+++ b/src/app/api/refresh-leaderboard/route.ts
@@ -1,5 +1,5 @@
import { env } from '@/env'
-import { LeaderboardService } from '@/server/services/leaderboard'
+import { leaderboardService } from '@/server/services/leaderboard'
import { RANKED_CHANNEL, VANILLA_CHANNEL } from '@/shared/constants'
import { headers } from 'next/headers'
@@ -14,12 +14,10 @@ export async function POST() {
}
try {
- const service = new LeaderboardService()
-
for (const channelId of CHANNEL_IDS) {
try {
console.log(`refreshing leaderboard for ${channelId}...`)
- await service.refreshLeaderboard(channelId)
+ await leaderboardService.refreshLeaderboard(channelId)
} catch (err) {
console.error('refresh failed:', err)
return new Response('internal error', { status: 500 })
diff --git a/src/app/stream-card/[id]/_components/stream-card-client.tsx b/src/app/stream-card/[id]/_components/stream-card-client.tsx
index b4675a1..3d978b5 100644
--- a/src/app/stream-card/[id]/_components/stream-card-client.tsx
+++ b/src/app/stream-card/[id]/_components/stream-card-client.tsx
@@ -1,37 +1,18 @@
'use client'
+import { cn } from '@/lib/utils'
+import type { SelectGames } from '@/server/db/types'
+import type { LeaderboardEntry } from '@/server/services/neatqueue.service'
import { RANKED_CHANNEL } from '@/shared/constants'
import { api } from '@/trpc/react'
+import { Swords } from 'lucide-react'
import { useParams } from 'next/navigation'
-import { useEffect } from 'react'
+import { type ComponentPropsWithoutRef, useEffect, useState } from 'react'
-export function StreamCardClient() {
- const { id } = useParams()
- if (!id || typeof id !== 'string') {
- return null
- }
-
- const [gamesQueryResult, gamesQuery] =
- api.history.user_games.useSuspenseQuery({ user_id: id })
- const games = gamesQueryResult || [] // Ensure games is always an array
-
- const [rankedUserRank, rankedUserQuery] =
- api.leaderboard.get_user_rank.useSuspenseQuery({
- channel_id: RANKED_CHANNEL,
- user_id: id,
- })
-
- useEffect(() => {
- const interval = setInterval(async () => {
- await Promise.all([gamesQuery.refetch(), rankedUserQuery.refetch()])
- }, 1000 * 60)
-
- return () => clearInterval(interval)
- }, [])
-
- if (!rankedUserRank || !games?.length) {
- return null
- }
+function getPlayerData(
+ playerLeaderboardEntry: LeaderboardEntry,
+ games: SelectGames[]
+) {
const filteredGamesByLeaderboard = games.filter(
(game) => game.gameType === 'ranked'
)
@@ -53,10 +34,10 @@ export function StreamCardClient() {
}
const lastGame = filteredGamesByLeaderboard.at(0)
-
const currentName = lastGame?.playerName
const meaningful_games = games_played - ties
- const playerData = {
+
+ return {
username: currentName,
games: games_played,
meaningful_games,
@@ -67,22 +48,151 @@ export function StreamCardClient() {
meaningful_games > 0 ? Math.ceil((wins / meaningful_games) * 100) : 0,
lossRate:
meaningful_games > 0 ? Math.floor((losses / meaningful_games) * 100) : 0,
- rank: rankedUserRank.rank,
- mmr: Math.round(rankedUserRank.mmr),
- mmrChange: `${(lastGame?.mmrChange ?? 0) >= 0 ? '+' : ''}${Math.round(lastGame?.mmrChange ?? 0)}`,
- streak: rankedUserRank?.streak,
+ rank: playerLeaderboardEntry.rank,
+ mmr: Math.round(playerLeaderboardEntry.mmr),
+ mmrChange: `${(lastGame?.mmrChange ?? 0) >= 0 ? '+' : ''}${Math.round(
+ lastGame?.mmrChange ?? 0
+ )}`,
+ streak: playerLeaderboardEntry?.streak,
}
+}
+
+export function StreamCardClient() {
+ const { id } = useParams()
+ if (!id || typeof id !== 'string') {
+ return null
+ }
+
+ const [gamesQueryResult, gamesQuery] =
+ api.history.user_games.useSuspenseQuery({ user_id: id })
+ const games = gamesQueryResult || []
+
+ const [rankedUserRank, rankedUserQuery] =
+ api.leaderboard.get_user_rank.useSuspenseQuery({
+ channel_id: RANKED_CHANNEL,
+ user_id: id,
+ })
+
+ const result = api.playerState.onStateChange.useSubscription(
+ { userId: id },
+ {
+ onData: async () => {
+ await Promise.all([gamesQuery.refetch(), rankedUserQuery.refetch()])
+ },
+ }
+ )
+
+ const playerState = result.data?.data
+
+ if (!rankedUserRank || !games?.length) {
+ return null
+ }
+
+ const playerData = getPlayerData(rankedUserRank, games)
+
+ const isQueuing = playerState?.status === 'queuing'
+ const opponentId = playerState?.currentMatch?.opponentId
+ return (
+
+
+ {isQueuing && playerState.queueStartTime && (
+
+ )}
+
+ {opponentId && (
+ <>
+
+
+ {' '}
+
+ >
+ )}
+
+ )
+}
+
+export function formatDuration(ms: number): string {
+ const seconds = Math.floor(ms / 1000)
+ const minutes = Math.floor(seconds / 60)
+ const hours = Math.floor(minutes / 60)
+
+ if (hours > 0) {
+ return `${hours}h ${minutes % 60}m`
+ }
+ if (minutes > 0) {
+ return `${minutes}m ${seconds % 60}s`
+ }
+ return `${seconds}s`
+}
+
+function QueueTimer({ startTime }: { startTime: number }) {
+ const [queueTime, setQueueTime] = useState(Date.now() - startTime)
+
+ useEffect(() => {
+ const interval = setInterval(() => {
+ setQueueTime(Date.now() - startTime)
+ }, 1000)
+ return () => clearInterval(interval)
+ })
+
+ return (
+
+
Queueing for
+
+ {formatDuration(queueTime)}
+
+
+ )
+}
+
+function Opponent({ id }: { id: string }) {
+ const [gamesQueryResult] = api.history.user_games.useSuspenseQuery({
+ user_id: id,
+ })
+ const games = gamesQueryResult || []
+
+ const [rankedUserRank] = api.leaderboard.get_user_rank.useSuspenseQuery({
+ channel_id: RANKED_CHANNEL,
+ user_id: id,
+ })
+ if (!rankedUserRank || !games?.length) {
+ return null
+ }
+
+ const playerData = getPlayerData(rankedUserRank, games)
+ return
+}
+
+function PlayerInfo({
+ playerData,
+ className,
+ children,
+ isReverse = false,
+ ...rest
+}: {
+ playerData: ReturnType
+ isReverse?: boolean
+} & ComponentPropsWithoutRef<'div'>) {
return (
-
+
{playerData.rank}
{/* Player Name */}
-
+
@@ -95,7 +205,7 @@ export function StreamCardClient() {
{/* Win Rate */}
-
Win Rate:
+
Win Rate:
{playerData.winRate}%
@@ -117,10 +227,17 @@ export function StreamCardClient() {
{/* Streak */}
-
+
Streak:
{playerData.streak}
+
+ {children}
)
}
diff --git a/src/lib/events.ts b/src/lib/events.ts
new file mode 100644
index 0000000..3e6b31d
--- /dev/null
+++ b/src/lib/events.ts
@@ -0,0 +1,56 @@
+import { EventEmitter } from 'node:events'
+export const globalEmitter = new EventEmitter()
+export function createEventIterator
(
+ emitter: EventEmitter,
+ eventName: string,
+ opts?: { signal?: AbortSignal }
+): AsyncIterableIterator<[T]> {
+ const events: [T][] = []
+ let resolvePromise: (() => void) | null = null
+ let done = false
+
+ const push = (data: T) => {
+ events.push([data])
+ if (resolvePromise) {
+ resolvePromise()
+ resolvePromise = null
+ }
+ }
+
+ const cleanup = () => {
+ done = true
+ emitter.off(eventName, push)
+ if (resolvePromise) {
+ resolvePromise()
+ }
+ }
+
+ emitter.on(eventName, push)
+ opts?.signal?.addEventListener('abort', cleanup)
+
+ return {
+ [Symbol.asyncIterator]() {
+ return this
+ },
+ async next() {
+ if (events.length > 0) {
+ return { value: events.shift()!, done: false }
+ }
+ if (done) {
+ return { value: undefined, done: true }
+ }
+ await new Promise((resolve) => {
+ resolvePromise = resolve
+ })
+ return this.next()
+ },
+ async return() {
+ cleanup()
+ return { value: undefined, done: true }
+ },
+ async throw(error) {
+ cleanup()
+ throw error
+ },
+ }
+}
diff --git a/src/server/api/root.ts b/src/server/api/root.ts
index 41765fb..aa5f044 100644
--- a/src/server/api/root.ts
+++ b/src/server/api/root.ts
@@ -1,6 +1,7 @@
import { discord_router } from '@/server/api/routers/discord'
import { history_router } from '@/server/api/routers/history'
import { leaderboard_router } from '@/server/api/routers/leaderboard'
+import { playerStateRouter } from '@/server/api/routers/player-state'
import { createCallerFactory, createTRPCRouter } from '@/server/api/trpc'
/**
@@ -12,6 +13,7 @@ export const appRouter = createTRPCRouter({
history: history_router,
discord: discord_router,
leaderboard: leaderboard_router,
+ playerState: playerStateRouter,
})
// export type definition of API
diff --git a/src/server/api/routers/player-state.ts b/src/server/api/routers/player-state.ts
new file mode 100644
index 0000000..25e7dbc
--- /dev/null
+++ b/src/server/api/routers/player-state.ts
@@ -0,0 +1,50 @@
+import { createEventIterator, globalEmitter } from '@/lib/events'
+import { redis } from '@/server/redis'
+import { tracked } from '@trpc/server'
+import { z } from 'zod'
+import { createTRPCRouter, publicProcedure } from '../trpc'
+
+export type PlayerState = {
+ status: 'idle' | 'queuing' | 'in_game'
+ queueStartTime?: number
+ currentMatch?: {
+ opponentId: string
+ startTime: number
+ }
+}
+
+const PLAYER_STATE_KEY = (userId: string) => `player:${userId}:state`
+
+export const playerStateRouter = createTRPCRouter({
+ getState: publicProcedure
+ .input(z.string())
+ .query(async ({ input: userId }) => {
+ const state = await redis.get(PLAYER_STATE_KEY(userId))
+ return state ? (JSON.parse(state) as PlayerState) : null
+ }),
+ onStateChange: publicProcedure
+ .input(
+ z.object({
+ userId: z.string(),
+ lastEventId: z.string().optional(),
+ })
+ )
+ .subscription(async function* ({ input, ctx, signal }) {
+ const iterator = createEventIterator(
+ globalEmitter,
+ `state-change:${input.userId}`,
+ { signal: signal }
+ )
+
+ // get initial state
+ const initialState = await redis.get(PLAYER_STATE_KEY(input.userId))
+ if (initialState) {
+ yield tracked('initial', JSON.parse(initialState) as PlayerState)
+ }
+
+ // listen for updates
+ for await (const [state] of iterator) {
+ yield tracked(Date.now().toString(), state)
+ }
+ }),
+})
diff --git a/src/server/redis.ts b/src/server/redis.ts
index ec0024a..ddc8972 100644
--- a/src/server/redis.ts
+++ b/src/server/redis.ts
@@ -2,3 +2,5 @@ import { env } from '@/env'
import { Redis } from 'ioredis'
export const redis = new Redis(env.REDIS_URL)
+
+export const PLAYER_STATE_KEY = (userId: string) => `player:${userId}:state`
diff --git a/src/server/services/leaderboard.ts b/src/server/services/leaderboard.ts
index e54e546..d15c7d0 100644
--- a/src/server/services/leaderboard.ts
+++ b/src/server/services/leaderboard.ts
@@ -1,5 +1,5 @@
import { redis } from '../redis'
-import { neatqueue_service } from './neatqueue.service'
+import { type LeaderboardEntry, neatqueue_service } from './neatqueue.service'
export class LeaderboardService {
private getZSetKey(channel_id: string) {
@@ -56,23 +56,19 @@ export class LeaderboardService {
async getUserRank(channel_id: string, user_id: string) {
try {
- const zsetKey = this.getZSetKey(channel_id)
- const rank = await redis.zrevrank(zsetKey, user_id)
-
- if (rank === null) return null
-
const userData = await redis.hgetall(this.getUserKey(user_id, channel_id))
if (!userData) return null
return {
- rank: rank + 1,
...userData,
mmr: Number(userData.mmr),
streak: userData.streak,
- }
+ } as unknown as LeaderboardEntry
} catch (error) {
console.error('Error getting user rank:', error)
throw error
}
}
}
+
+export const leaderboardService = new LeaderboardService()
diff --git a/src/trpc/react.tsx b/src/trpc/react.tsx
index dabe85a..8aa199a 100644
--- a/src/trpc/react.tsx
+++ b/src/trpc/react.tsx
@@ -1,7 +1,12 @@
'use client'
import { type QueryClient, QueryClientProvider } from '@tanstack/react-query'
-import { httpBatchLink, httpBatchStreamLink, loggerLink } from '@trpc/client'
+import {
+ httpBatchLink,
+ httpSubscriptionLink,
+ loggerLink,
+ splitLink,
+} from '@trpc/client'
import { createTRPCReact } from '@trpc/react-query'
import type { inferRouterInputs, inferRouterOutputs } from '@trpc/server'
import { useState } from 'react'
@@ -49,14 +54,24 @@ export function TRPCReactProvider(props: { children: React.ReactNode }) {
process.env.NODE_ENV === 'development' ||
(op.direction === 'down' && op.result instanceof Error),
}),
- httpBatchLink({
- transformer: SuperJSON,
- url: `${getBaseUrl()}/api/trpc`,
- headers: () => {
- const headers = new Headers()
- headers.set('x-trpc-source', 'nextjs-react')
- return headers
+
+ splitLink({
+ condition(op) {
+ return op.type === 'subscription'
},
+ true: httpSubscriptionLink({
+ url: `${getBaseUrl()}/api/trpc`,
+ transformer: SuperJSON,
+ }),
+ false: httpBatchLink({
+ transformer: SuperJSON,
+ url: `${getBaseUrl()}/api/trpc`,
+ headers: () => {
+ const headers = new Headers()
+ headers.set('x-trpc-source', 'nextjs-react')
+ return headers
+ },
+ }),
}),
],
})