Создание API в функциональном стиле с Kysely и H3: современный подход к серверной разработке

Ларцев Дмитрий

В современной веб-разработке все больше внимания уделяется функциональному программированию и типобезопасности. Комбинация Kysely (типобезопасный SQL-билдер) и H3 (легковесный HTTP-сервер для Node.js) предоставляет мощный инструментарий для создания чистых, предсказуемых и масштабируемых API. В этой статье мы рассмотрим, как эти технологии работают вместе и как их можно использовать для построения элегантных серверных приложений.

Создание API в функциональном стиле с Kysely и H3: современный подход к серверной разработке

Введение в технологии

Ниже — не просто обзор, а рабочие паттерны и исправления острых углов, через которые обычно проезжают в продакшене.

Kysely: типобезопасный SQL-билдер

Kysely — современный SQL-билдер для TypeScript с очень сильной типовой системой. В отличие от тяжелых ORM (где SQL прячут под ковёр), Kysely оставляет SQL на виду, но гарантирует типовую корректность на этапе компиляции. Это значит, что:

  • Ренейм колонки — и TypeScript сразу покажет все места, где вы забыли адаптироваться.
  • Джоины и алиасы проверяются по типам, меньше “немых” багов.
  • Удобно строить динамические запросы без боли типов.

Практика:

  • Для больших схем — добавляйте kysely-codegen (или генерируйте типы от вашей БД), чтобы гарантировать актуальность типов.
  • Используйте CamelCasePlugin для маппинга snake_case колонок к camelCase в коде.
  • Для сложных кейсов пригодится sqlraw и кастомные выражения.

H3: минималистичный HTTP-сервер

H3 — унифицированный HTTP-слой, который одинаково комфортно чувствует себя в Node.js, Bun, Deno и Edge-средах (Cloudflare Workers). В реальных проектах он хорош, потому что:

  • Чёткий, предсказуемый API обработчиков.
  • Отлично сочетается с функциональным стилем (middleware — обычные функции).
  • Легкий рантайм и совместимость с Nitro.

В продакшене уделяйте внимание:

  • Ограничениям body (чтобы не класть процесс большими загрузками).
  • Потоковой передаче (стриминг ответов).
  • Контексту запроса (event.context) — туда кладём user, requestId и всё, что нужно дальше по цепочке.

Архитектурные принципы функционального API

Идея простая: каждая функция делает одну вещь, композиция собирает поведение. Это снижает количество “магии” и облегчает тестирование.

1. Разделение ответственности

// Слой базы данных
export const findUserById = async (id: string) => {
  const db = useDatabase()
  return await db
    .selectFrom('users')
    .where('id', '=', id)
    .selectAll()
    .executeTakeFirst()
}

// Слой валидации
export const validateUserData = (data: unknown) => {
  return userSchema.parse(data)
}

// Слой бизнес-логики
export const processUserUpdate = async (id: string, data: UserUpdateData) => {
  const validatedData = validateUserData(data)
  return await updateUser(id, validatedData)
}

Практика:

  • Бизнес-правила не должны знать о HTTP. Это делает логику переносимой и простой для unit-тестов.
  • Репозитории не должны знать о схемах валидации — пусть принимают уже валидные данные.

2. Композиция функций

const withAuth = (handler: Function) => (event: H3Event) => {
  const token = getHeader(event, 'authorization')
  if (!token) {
    throw createError({ statusCode: 401, statusMessage: 'Unauthorized' })
  }
  return handler(event)
}

const withValidation = (schema: ZodSchema) => (handler: Function) => 
  async (event: H3Event) => {
    const body = await readBody(event)
    const validatedData = schema.parse(body)
    return handler(event, validatedData)
  }

export default defineEventHandler(
  withAuth(
    withValidation(userUpdateSchema)(
      async (event: H3Event, data: UserUpdateData) => {
        return await processUserUpdate(getRouterParam(event, 'id'), data)
      }
    )
  )
)

Практика:

  • Задайте точные типы обработчикам: eventHandler вместо Function. Kysely и Zod так лучше “схлопывают” типы.
  • Добавьте requestId в контекст и логируйте этапы пайплайна — в проде это экономит часы поиска багов. Полезно связать с AsyncLocalStorage.

Настройка проекта

Ниже — конкретные улучшения кода, которые в продакшене экономят нервы.

Конфигурация базы данных (важно: соединения и MySQL RETURNING)

В оригинальном коде есть два острых момента:

  • useDatabase() каждый раз создаёт новый пул — дорого и может “съесть” соединения.
  • .returningAll() не поддерживается MySQL-диалектом Kysely — упадёте в рантайме.

Исправляем.

// database/types.ts
export interface Database {
  users: {
    id: string
    email: string
    name: string
    created_at: Date
    updated_at: Date
  }
  posts: {
    id: string
    title: string
    content: string
    author_id: string
    published: boolean
    created_at: Date
    updated_at: Date
  }
  tags: {
    id: string
    name: string
    slug: string
  }
  post_tags: {
    post_id: string
    tag_id: string
  }
}
// database/connection.ts
import { createPool } from 'mysql2'
import { Kysely, MysqlDialect } from 'kysely'
import type { Database } from './types'

let _db: Kysely<Database> | null = null

export const createDatabase = () => {
  const config = useRuntimeConfig()

  const dialect = new MysqlDialect({
    pool: createPool({
      database: config.dbName,
      host: config.dbHost,
      user: config.dbUser,
      password: config.dbPassword,
      connectionLimit: 10,
      // Практика:
      // timezone: 'Z', // храните всё в UTC
      // dateStrings: true, // если хотите строки вместо Date
    }),
  })

  return new Kysely<Database>({ dialect })
}

export const useDatabase = () => {
  if (!_db) _db = createDatabase()
  return _db
}

Практика:

  • Делайте singleton для пула. Для per-request-транзакций — передавайте trx явно.
  • Стандартизируйте time zone (UTC) и типы дат. Разные TZ на сервере и в БД — классическая причина “поехавших” дат.
  • Для PostgreSQL можно включать .returning(), но для MySQL — нет, нужно возвращать данные вручную.

Создание схем валидации

Zod — ок. Добавьте строгую схему и нормализацию.

// schemas/user.ts
import { z } from 'zod'

export const userCreateSchema = z.object({
  email: z.string().email().transform((s) => s.toLowerCase().trim()),
  name: z.string().min(2).max(100).trim(),
}).strict()

export const userUpdateSchema = z.object({
  email: z.string().email().transform((s) => s.toLowerCase().trim()).optional(),
  name: z.string().min(2).max(100).trim().optional(),
}).strict()

export type UserCreateData = z.infer<typeof userCreateSchema>
export type UserUpdateData = z.infer<typeof userUpdateSchema>

Создание репозиториев (исправляем MySQL returning и даём “боевые” детали)

Базовый репозиторий

// repositories/base.repository.ts
import type { Kysely } from 'kysely'
import type { Database } from '../database/types'

export abstract class BaseRepository<TTable extends keyof Database> {
  constructor(protected db: Kysely<Database>) {}

  protected abstract getTableName(): TTable

  async findById(id: string) {
    return await this.db
      .selectFrom(this.getTableName())
      .where('id', '=', id)
      .selectAll()
      .executeTakeFirst()
  }

  async findAll(limit = 10, offset = 0) {
    return await this.db
      .selectFrom(this.getTableName())
      .selectAll()
      .limit(limit)
      .offset(offset)
      .execute()
  }

  async create(
    data: Omit<Database[TTable], 'id' | 'created_at' | 'updated_at'>
  ) {
    const id = crypto.randomUUID()
    const now = new Date()

    // В MySQL нет RETURNING — возвращаем то, что знаем, либо делаем follow-up SELECT
    await this.db
      .insertInto(this.getTableName())
      .values({
        ...(data as any),
        id,
        created_at: now,
        updated_at: now,
      } as any)
      .executeTakeFirst()

    // Follow-up SELECT — если нужно вернуть точные поля из БД (триггеры, дефолты)
    const row = await this.findById(id)
    return row ?? { id, ...(data as any), created_at: now, updated_at: now }
  }

  async update(id: string, data: Partial<Database[TTable]>) {
    await this.db
      .updateTable(this.getTableName())
      .set({
        ...(data as any),
        updated_at: new Date(),
      })
      .where('id', '=', id)
      .executeTakeFirst()

    return await this.findById(id)
  }

  async delete(id: string) {
    await this.db
      .deleteFrom(this.getTableName())
      .where('id', '=', id)
      .execute()
    return { id }
  }
}

Практика:

  • Для уникальных ключей всегда ставьте индекс на уровне БД и ловите ошибки дублей (в MySQL — ER_DUP_ENTRY). Проверки “в приложении” без уникального индекса гонку не решают.
  • Если нужно массово апдейтить — подумайте о compiled queries и батчинге.

Специализированные репозитории

Без изменений по сути, но проверьте индексы: posts(author_id, published), tags(name), post_tags(post_id, tag_id).

// repositories/user.repository.ts
import { BaseRepository } from './base.repository'
import type { Database } from '../database/types'

export class UserRepository extends BaseRepository<'users'> {
  protected getTableName() { return 'users' as const }

  async findByEmail(email: string) {
    return await this.db
      .selectFrom('users')
      .where('email', '=', email)
      .selectAll()
      .executeTakeFirst()
  }

  async findWithPosts(userId: string) {
    return await this.db
      .selectFrom('users')
      .leftJoin('posts', 'posts.author_id', 'users.id')
      .where('users.id', '=', userId)
      .select([
        'users.id',
        'users.email',
        'users.name',
        'users.created_at',
        'users.updated_at',
        'posts.id as post_id',
        'posts.title as post_title',
        'posts.content as post_content',
        'posts.published as post_published',
      ])
      .execute()
  }
}
// repositories/post.repository.ts
import { BaseRepository } from './base.repository'

export class PostRepository extends BaseRepository<'posts'> {
  protected getTableName() { return 'posts' as const }

  async findByAuthor(authorId: string, published = true) {
    let query = this.db
      .selectFrom('posts')
      .where('author_id', '=', authorId)

    if (published) {
      query = query.where('published', '=', true)
    }

    return await query.selectAll().execute()
  }

  async findWithTags(postId: string) {
    return await this.db
      .selectFrom('posts')
      .leftJoin('post_tags', 'post_tags.post_id', 'posts.id')
      .leftJoin('tags', 'tags.id', 'post_tags.tag_id')
      .where('posts.id', '=', postId)
      .select([
        'posts.id',
        'posts.title',
        'posts.content',
        'posts.author_id',
        'posts.published',
        'posts.created_at',
        'tags.id as tag_id',
        'tags.name as tag_name',
        'tags.slug as tag_slug',
      ])
      .execute()
  }

  async findByTag(tagName: string) {
    return await this.db
      .selectFrom('posts')
      .innerJoin('post_tags', 'post_tags.post_id', 'posts.id')
      .innerJoin('tags', 'tags.id', 'post_tags.tag_id')
      .where('tags.name', '=', tagName)
      .where('posts.published', '=', true)
      .selectAll('posts')
      .execute()
  }
}

Создание сервисов (устраняем гонки, добавляем DI)

Оригинальный сервис создаёт репозиторий внутри себя — это усложняет тестирование. Лучше инъецировать зависимости. И критично: проверка уникальности email должна полагаться на уникальный индекс.

// services/user.service.ts
import { UserRepository } from '../repositories/user.repository'
import type { UserCreateData, UserUpdateData } from '../schemas/user'

export class UserService {
  constructor(private userRepository: UserRepository) {}

  async createUser(data: UserCreateData) {
    try {
      // Полагайтесь на UNIQUE(email) в БД
      return await this.userRepository.create(data as any)
    } catch (err: any) {
      // MySQL
      if (err?.code === 'ER_DUP_ENTRY') {
        throw new Error('User with this email already exists')
      }
      // Postgres: if (err?.code === '23505') ...
      throw err
    }
  }

  async updateUser(id: string, data: UserUpdateData) {
    const user = await this.userRepository.findById(id)
    if (!user) throw new Error('User not found')

    // Если email меняется — конфликт поймаем на уровне БД
    return await this.userRepository.update(id, data as any)
  }

  async getUserWithPosts(id: string) {
    const user = await this.userRepository.findById(id)
    if (!user) throw new Error('User not found')
    return await this.userRepository.findWithPosts(id)
  }

  async deleteUser(id: string) {
    const user = await this.userRepository.findById(id)
    if (!user) throw new Error('User not found')
    return await this.userRepository.delete(id)
  }
}

Композиция зависимостей на уровне обработчика:

// composition root (пример)
import { useDatabase } from '../database/connection'
import { UserRepository } from '../repositories/user.repository'
import { UserService } from '../services/user.service'

export const makeUserService = () => {
  const db = useDatabase()
  const repo = new UserRepository(db)
  return new UserService(repo)
}

Создание API-обработчиков (типизация, ошибки, контекст)

Добавим типы и нормализуем ошибки.

// utils/error-handler.ts
export const handleServiceError = (error: unknown) => {
  if (error instanceof Error) {
    if (error.message.includes('not found')) {
      return createError({ statusCode: 404, statusMessage: error.message })
    }
    if (error.message.includes('already exists')) {
      return createError({ statusCode: 409, statusMessage: error.message })
    }
  }
  return createError({ statusCode: 500, statusMessage: 'Internal Server Error' })
}
// utils/validation.ts
import { z, ZodError, ZodSchema } from 'zod'
import type { H3Event } from 'h3'

export const withValidation = <T>(schema: ZodSchema<T>) => {
  return async (event: H3Event): Promise<T> => {
    try {
      const body = await readBody(event)
      return schema.parse(body)
    } catch (error) {
      if (error instanceof ZodError) {
        throw createError({
          statusCode: 400,
          statusMessage: 'Validation Error',
          data: error.errors,
        })
      }
      throw error
    }
  }
}
// api/users/index.get.ts
import { makeUserService } from '../../composition'
import { handleServiceError } from '../../utils/error-handler'

export default defineEventHandler(async (event) => {
  try {
    const service = makeUserService()
    const query = getQuery(event)
    const limit = Math.min(Number(query.limit) || 10, 100)
    const offset = Number(query.offset) || 0
    return await service['userRepository'].findAll(limit, offset)
  } catch (error) {
    throw handleServiceError(error)
  }
})
// api/users/index.post.ts
import { makeUserService } from '../../composition'
import { userCreateSchema } from '../../schemas/user'
import { withValidation } from '../../utils/validation'
import { handleServiceError } from '../../utils/error-handler'

export default defineEventHandler(async (event) => {
  try {
    const service = makeUserService()
    const data = await withValidation(userCreateSchema)(event)
    return await service.createUser(data)
  } catch (error) {
    throw handleServiceError(error)
  }
})
// api/users/[id].get.ts
import { makeUserService } from '../../composition'
import { handleServiceError } from '../../utils/error-handler'

export default defineEventHandler(async (event) => {
  try {
    const service = makeUserService()
    const id = getRouterParam(event, 'id')
    if (!id) throw createError({ statusCode: 400, statusMessage: 'User ID is required' })
    return await service.getUserWithPosts(id)
  } catch (error) {
    throw handleServiceError(error)
  }
})
// api/users/[id].put.ts
import { makeUserService } from '../../composition'
import { userUpdateSchema } from '../../schemas/user'
import { withValidation } from '../../utils/validation'
import { handleServiceError } from '../../utils/error-handler'

export default defineEventHandler(async (event) => {
  try {
    const service = makeUserService()
    const id = getRouterParam(event, 'id')
    if (!id) throw createError({ statusCode: 400, statusMessage: 'User ID is required' })
    const data = await withValidation(userUpdateSchema)(event)
    return await service.updateUser(id, data)
  } catch (error) {
    throw handleServiceError(error)
  }
})

Практика:

  • Ограничьте limit и валидируйте offset.
  • Сразу включите pino-логгер: время, метод, путь, requestId. На больших нагрузках без этого — как без фар.

Продвинутые паттерны

Аутентификация и контекст

Расширяем типы для event.context, чтобы TypeScript знал про user:

// types/h3.d.ts
import 'h3'

declare module 'h3' {
  interface H3EventContext {
    user?: { id: string; email: string; roles: string[] }
    requestId?: string
  }
}
// middleware/auth.ts
export const withAuth = (handler: (event: H3Event) => Promise<any>) => {
  return async (event: H3Event) => {
    const token = getHeader(event, 'authorization')?.replace('Bearer ', '')
    if (!token) {
      throw createError({ statusCode: 401, statusMessage: 'Authorization token required' })
    }
    try {
      const user = await verifyToken(token) // реализуйте по своему
      event.context.user = user
      return handler(event)
    } catch {
      throw createError({ statusCode: 401, statusMessage: 'Invalid token' })
    }
  }
}

Практика:

  • Добавьте AsyncLocalStorage для requestId и прокидывайте в логи, запросы к БД и во внешние сервисы — корреляция спасает в инцидентах.

Кэширование: от Map к Redis

In-memory Map полезен для локальной разработки, но в продакшене используйте Redis (ioredis) или KeyDB. Добавьте lock от “стада” (dogpile) и инвалидацию.

// utils/cache.ts (эскиз под Redis)
import { redis } from '../infra/redis'

export const withCache = <T>(
  key: string,
  ttlSec: number,
  fetcher: () => Promise<T>
) => {
  return async (): Promise<T> => {
    const cached = await redis.get(key)
    if (cached) return JSON.parse(cached)
    const lockKey = `${key}:lock`
    const acquired = await redis.set(lockKey, '1', 'NX', 'EX', 5)
    if (!acquired) {
      // Ждём, потом читаем снова
      await new Promise(r => setTimeout(r, 100))
      const after = await redis.get(key)
      if (after) return JSON.parse(after)
    }
    const data = await fetcher()
    await redis.set(key, JSON.stringify(data), 'EX', ttlSec)
    await redis.del(lockKey)
    return data
  }
}

Также подумайте о ETag/If-None-Match для GET — это уменьшит трафик и нагрузку.

Транзакции

Хорошо, что вы используете trx и передаёте его вниз. Добавим нюанс: строго передавайте trx в репозитории — это делает код очевидным.

// utils/transaction.ts
import type { Kysely } from 'kysely'
import type { Database } from '../database/types'
import { useDatabase } from '../database/connection'

export const withTransaction = async <T>(
  callback: (trx: Kysely<Database>) => Promise<T>
): Promise<T> => {
  const db = useDatabase()
  return await db.transaction().execute(async (trx) => callback(trx))
}
// services/post.service.ts
import { PostRepository } from '../repositories/post.repository'
import { withTransaction } from '../utils/transaction'
import type { Kysely } from 'kysely'
import type { Database } from '../database/types'

export class PostService {
  constructor(private makeRepo: (db: Kysely<Database>) => PostRepository) {}

  async createPostWithTags(postData: any, tagIds: string[]) {
    return withTransaction(async (trx) => {
      const postRepo = this.makeRepo(trx)
      const post = await postRepo.create(postData)
      if (tagIds.length) {
        await trx
          .insertInto('post_tags')
          .values(tagIds.map((tagId) => ({ post_id: post!.id, tag_id: tagId })))
          .execute()
      }
      return post
    })
  }
}

Практика:

  • Для MySQL учитывайте изоляцию (InnoDB, REPEATABLE READ по умолчанию). Сложные отчёты — на уровне снапшотов/версий.
  • Если нужен outbox-паттерн (события в Kafka/Rabbit) — пишите в таблицу outbox в той же транзакции, и выносите отправку воркером (BullMQ + Redis).

Производительность и гигиена кода

  • Пулы соединений: следите за connectionLimit и тайм-аутами. На пиках нехватка соединений — частая причина деградации.
  • Компилированные запросы: для “горячих” эндпоинтов снизят накладные на парсинг SQL.
  • Индексы: EXPLAIN — ваш друг. На реальном трафике неправильный индекс превращает сервер в тыкву.
  • Логи и метрики: pino (лог), prom-client (метрики) + Grafana/Prometheus. Ошибки с кодами и временем — минимум.

Тестирование (DI вместо new внутри сервиса)

Ваш пример теста подразумевает моки, но сервис создаёт репозиторий сам. С DI это реально.

// tests/services/user.service.test.ts
import { describe, it, expect, beforeEach, vi } from 'vitest'
import { UserService } from '../../services/user.service'

describe('UserService', () => {
  let userService: UserService
  let mockRepo: any

  beforeEach(() => {
    mockRepo = {
      create: vi.fn(),
      findById: vi.fn(),
      findByEmail: vi.fn(),
      update: vi.fn(),
    }
    userService = new UserService(mockRepo)
  })

  it('should create user successfully', async () => {
    const userData = { email: 'test@example.com', name: 'Test User' }
    mockRepo.create.mockResolvedValue({ id: '1', ...userData })
    const result = await userService.createUser(userData as any)
    expect(mockRepo.create).toHaveBeenCalled()
    expect(result).toEqual({ id: '1', ...userData })
  })

  it('should throw error on duplicate email', async () => {
    const userData = { email: 'test@example.com', name: 'Test User' }
    mockRepo.create.mockRejectedValue({ code: 'ER_DUP_ENTRY' })
    await expect(userService.createUser(userData as any))
      .rejects.toThrow('User with this email already exists')
  })
})

Если хотите интеграционные тесты — используйте Testcontainers (поднимает реальную БД в Docker), это сильно повышает доверие к тестам.

История из практики

Когда мы запустили первый релиз с проверкой уникальности email “на приложении”, под пиковой нагрузкой начали прилетать дублёры — классическая гонка между SELECT и INSERT. Решение: уникальный индекс на уровне БД + ловля конфликтов (ER_DUP_ENTRY), а затем аккуратный маппинг ошибки в 409. После этого инцидентов не видели. Это не “как в Express, где можно накидать middleware и молиться” — тут вам помогают строгие типы и чёткая композиция функций, но инварианты должны оставаться в БД.

Когда стоит подумать о NestJS, Fastify, Prisma и др.

  • NestJS: если нужна строгая модульная архитектура, DI “из коробки”, гварды, интерцепторы, пайпы.
  • Fastify: если хочется максимальной производительности HTTP-слоя и экосистемы плагинов, но H3 уже достаточно проворен.
  • Prisma: удобные миграции, DX на высоте, но абстракции ORM могут быть тяжелее. В связке с Kysely иногда используют Prisma только для схем/миграций.
  • Redis + BullMQ: джобы, ретраи, rate limiting, отложенная обработка.
  • TypeORM: можно, но Kysely обычно даёт чище типы и более прозрачный SQL.

Заключение

Комбинация Kysely + H3 — это:

  • Типобезопасность на уровне компиляции и прозрачный SQL.
  • Чистая функциональная композиция без “магического” фреймворка.
  • Производительность без навесного жирка.

Ключевые практики, чтобы не стрелять себе в ногу:

  • Singleton-пул БД и отказ от .returning() в MySQL (делайте follow-up SELECT).
  • Уникальные индексы и обработка ошибок дублей вместо “проверок на глаз”.
  • DI для тестируемости.
  • Кэш в Redis, а не в памяти.
  • Логи, метрики, requestId через AsyncLocalStorage.

Этот подход хорошо масштабируется и на проектах с высокими нагрузками ведёт себя предсказуемо. А если когда-нибудь потребуется “корпоративная” обвязка — многое из написанного здесь безболезненно переезжает в NestJS или Fastify, потому что изначально построено на простых и чистых функциях.