跳到主要内容

项目作品集

这里展示我在后端开发和 AI 工程领域的核心项目,涵盖高并发系统、分布式架构、AI Agent 等技术方向。


🏢 企业级项目

1. 电商高并发订单系统

项目背景:支撑千万级用户的电商平台核心交易系统

技术架构

┌─────────────────────────────────────────────────────────────┐
│ 订单系统架构图 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 用户请求 ──▶ 网关 ──▶ 订单服务 ──▶ 库存服务 │
│ │ │ │ │
│ │ ▼ ▼ │
│ │ Redis缓存 消息队列 │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ 限流组件 订单数据库 异步处理 │
│ │
└─────────────────────────────────────────────────────────────┘

核心技术

  • 语言:Go + Gin 框架
  • 数据库:MySQL 主从 + Redis 集群 + MongoDB
  • 消息队列:Kafka + RocketMQ
  • 缓存策略:多级缓存 + 缓存预热
  • 限流熔断:令牌桶 + 滑动窗口

关键实现

// 库存扣减 - Redis Lua 原子操作
const stockDeductScript = `
local stock_key = KEYS[1]
local deduct_num = tonumber(ARGV[1])
local current_stock = tonumber(redis.call('get', stock_key) or 0)

if current_stock >= deduct_num then
redis.call('decrby', stock_key, deduct_num)
return current_stock - deduct_num
else
return -1
end
`

func (s *StockService) DeductStock(ctx context.Context, sku string, num int) error {
result, err := s.redis.Eval(ctx, stockDeductScript, []string{
fmt.Sprintf("stock:%s", sku),
}, num).Result()

if err != nil {
return err
}

if result.(int64) < 0 {
return errors.New("库存不足")
}

return nil
}

性能指标

  • QPS:峰值 50,000+
  • RT:P99 < 100ms,P95 < 50ms
  • 可用性:99.99%(年故障时间 < 1 小时)
  • 错误率:< 0.01%

业务成果

  • 支撑日订单量从 1 万提升到 100 万+
  • 双 11 峰值流量无故障运行
  • 系统容量提升 10 倍,成本降低 30%

2. AI Agent 智能平台

项目背景:企业级 AI 助手平台,支持多场景智能化任务处理

系统架构

┌─────────────────────────────────────────────────────────────┐
│ AI Agent 平台架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 用户界面 ──▶ API网关 ──▶ Agent调度器 ──▶ LLM服务 │
│ │ │ │ │
│ │ ▼ ▼ │
│ │ 工具管理器 向量数据库 │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ 权限控制 执行引擎 知识库 │
│ │
└─────────────────────────────────────────────────────────────┘

核心技术

  • 后端:Python + FastAPI + Celery
  • AI 框架:LangChain + OpenAI API + 本地模型
  • 向量数据库:Pinecone + Chroma
  • 工具集成:RESTful API + 数据库连接器
  • 部署:Docker + Kubernetes

Agent 设计模式

class ReActAgent:
"""ReAct (Reasoning + Acting) Agent 实现"""

def __init__(self, llm, tools, memory):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
self.memory = memory
self.max_iterations = 10

async def run(self, task: str) -> str:
"""执行任务的主循环"""
self.memory.add_message("user", task)

for i in range(self.max_iterations):
# Reasoning: 分析当前状态,决定下一步行动
prompt = self._build_react_prompt(task)
response = await self.llm.agenerate(prompt)

# 解析 LLM 响应
thought, action, action_input = self._parse_response(response)

if action == "Final Answer":
return action_input

# Acting: 执行工具调用
if action in self.tools:
try:
result = await self.tools[action].run(action_input)
self.memory.add_observation(result)
except Exception as e:
self.memory.add_observation(f"Error: {str(e)}")
else:
self.memory.add_observation(f"Unknown action: {action}")

return "任务执行超时,请重试"

def _build_react_prompt(self, task: str) -> str:
"""构建 ReAct 提示词"""
return f"""
你是一个智能助手,需要通过推理和行动来完成任务。

可用工具:
{self._format_tools()}

请按照以下格式思考和行动:
Thought: 分析当前情况,思考下一步
Action: 选择要使用的工具
Action Input: 工具的输入参数
Observation: 工具执行结果

任务: {task}

历史记录:
{self.memory.get_history()}

开始思考:
"""

工具生态

# 数据库查询工具
class DatabaseTool:
name = "database_query"
description = "执行 SQL 查询获取数据"

async def run(self, query: str) -> str:
# 安全检查:防止 SQL 注入
if not self._is_safe_query(query):
return "查询包含不安全操作,已拒绝执行"

result = await self.db.execute(query)
return self._format_result(result)

# API 调用工具
class APITool:
name = "api_call"
description = "调用外部 API 获取数据"

async def run(self, config: dict) -> str:
url = config.get("url")
method = config.get("method", "GET")

async with httpx.AsyncClient() as client:
response = await client.request(method, url, **config)
return response.json()

# 代码执行工具
class CodeExecutor:
name = "code_execution"
description = "在安全沙箱中执行代码"

async def run(self, code: str, language: str = "python") -> str:
# 在 Docker 容器中安全执行
result = await self._execute_in_sandbox(code, language)
return result

RAG 增强

class RAGService:
"""检索增强生成服务"""

def __init__(self, vector_db, embedding_model):
self.vector_db = vector_db
self.embedding_model = embedding_model

async def retrieve_context(self, query: str, top_k: int = 5) -> List[str]:
"""检索相关上下文"""
# 1. 查询向量化
query_embedding = await self.embedding_model.embed(query)

# 2. 向量检索
results = await self.vector_db.similarity_search(
query_embedding,
top_k=top_k
)

# 3. 重排序(可选)
reranked_results = await self._rerank(query, results)

return [doc.content for doc in reranked_results]

async def generate_with_context(self, query: str, context: List[str]) -> str:
"""基于上下文生成回答"""
prompt = f"""
基于以下上下文信息回答问题:

上下文:
{chr(10).join(context)}

问题:{query}

请基于上下文提供准确的回答,如果上下文中没有相关信息,请明确说明。
"""

return await self.llm.agenerate(prompt)

性能指标

  • 响应时间:平均 < 3 秒,P95 < 8 秒
  • 成功率:85%+(复杂任务)
  • 并发支持:1000+ 并发用户
  • 工具覆盖:20+ 集成工具

应用场景

  • 数据分析:自动生成报表、数据可视化
  • 代码助手:代码生成、Bug 修复、重构建议
  • 客服机器人:智能问答、工单处理
  • 运维助手:日志分析、故障诊断

3. 分布式锁治理平台

项目背景:解决微服务架构下的并发控制和资源竞争问题

技术方案对比

方案优势劣势适用场景
Redis 锁性能高、实现简单可能丢锁、时钟依赖高性能场景
etcd 锁强一致性、自动续期性能相对较低强一致性要求
数据库锁事务保证、易理解性能瓶颈简单场景

Redis 分布式锁实现

type RedisDistributedLock struct {
client *redis.Client
key string
value string
ttl time.Duration
watchdog *time.Ticker
ctx context.Context
cancel context.CancelFunc
}

func NewRedisLock(client *redis.Client, key string, ttl time.Duration) *RedisDistributedLock {
ctx, cancel := context.WithCancel(context.Background())
return &RedisDistributedLock{
client: client,
key: key,
value: generateUniqueID(),
ttl: ttl,
ctx: ctx,
cancel: cancel,
}
}

func (l *RedisDistributedLock) TryLock(ctx context.Context) error {
// Lua 脚本保证原子性
script := `
if redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) then
return 1
else
return 0
end
`

result, err := l.client.Eval(ctx, script, []string{l.key}, l.value, l.ttl.Milliseconds()).Result()
if err != nil {
return err
}

if result.(int64) == 1 {
// 启动看门狗自动续期
l.startWatchdog()
return nil
}

return errors.New("获取锁失败")
}

func (l *RedisDistributedLock) Unlock(ctx context.Context) error {
// 停止看门狗
l.cancel()

// Lua 脚本确保只能删除自己的锁
script := `
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
`

_, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
return err
}

func (l *RedisDistributedLock) startWatchdog() {
// 看门狗:定期续期防止锁过期
l.watchdog = time.NewTicker(l.ttl / 3)

go func() {
defer l.watchdog.Stop()

for {
select {
case <-l.ctx.Done():
return
case <-l.watchdog.C:
l.renewLock()
}
}
}()
}

func (l *RedisDistributedLock) renewLock() {
script := `
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('pexpire', KEYS[1], ARGV[2])
else
return 0
end
`

l.client.Eval(l.ctx, script, []string{l.key}, l.value, l.ttl.Milliseconds())
}

etcd 分布式锁实现

type EtcdDistributedLock struct {
client *clientv3.Client
session *concurrency.Session
mutex *concurrency.Mutex
key string
}

func NewEtcdLock(client *clientv3.Client, key string, ttl int) (*EtcdDistributedLock, error) {
session, err := concurrency.NewSession(client, concurrency.WithTTL(ttl))
if err != nil {
return nil, err
}

mutex := concurrency.NewMutex(session, key)

return &EtcdDistributedLock{
client: client,
session: session,
mutex: mutex,
key: key,
}, nil
}

func (l *EtcdDistributedLock) Lock(ctx context.Context) error {
return l.mutex.Lock(ctx)
}

func (l *EtcdDistributedLock) Unlock(ctx context.Context) error {
err := l.mutex.Unlock(ctx)
if err != nil {
return err
}
return l.session.Close()
}

统一锁接口

// 分布式锁接口
type DistributedLock interface {
TryLock(ctx context.Context) error
Lock(ctx context.Context) error
Unlock(ctx context.Context) error
IsLocked() bool
}

// 锁管理器
type LockManager struct {
locks map[string]DistributedLock
mu sync.RWMutex
}

func (m *LockManager) GetLock(key string, lockType LockType) DistributedLock {
m.mu.RLock()
if lock, exists := m.locks[key]; exists {
m.mu.RUnlock()
return lock
}
m.mu.RUnlock()

m.mu.Lock()
defer m.mu.Unlock()

// 双重检查
if lock, exists := m.locks[key]; exists {
return lock
}

var lock DistributedLock
switch lockType {
case RedisLock:
lock = NewRedisLock(m.redisClient, key, 30*time.Second)
case EtcdLock:
lock, _ = NewEtcdLock(m.etcdClient, key, 30)
case DatabaseLock:
lock = NewDatabaseLock(m.db, key)
}

m.locks[key] = lock
return lock
}

性能测试结果

Redis 锁:
获锁 QPS: 10000+
平均延迟: 1-2ms
P99 延迟: 5ms

etcd 锁:
获锁 QPS: 1000+
平均延迟: 5-10ms
P99 延迟: 20ms

数据库锁:
获锁 QPS: 500+
平均延迟: 10-20ms
P99 延迟: 50ms

应用效果

  • 库存一致性:解决超卖问题,准确率 99.99%+
  • 接口幂等:重复请求处理,成功率 100%
  • 资源竞争:消除并发冲突,错误率 < 0.01%

🚀 开源项目

1. Go 高性能 Web 框架

项目地址github.com/kanelli/fast-gin

项目特色

  • 基于 Gin 优化的高性能 Web 框架
  • 内置限流、熔断、链路追踪
  • 支持优雅关闭、健康检查
  • 集成 Prometheus 监控

核心代码

// 框架初始化
func NewFastGin(opts ...Option) *FastGin {
app := &FastGin{
Engine: gin.New(),
config: defaultConfig(),
}

// 应用配置
for _, opt := range opts {
opt(app)
}

// 注册中间件
app.setupMiddlewares()

return app
}

// 限流中间件
func RateLimitMiddleware(limiter *rate.Limiter) gin.HandlerFunc {
return func(c *gin.Context) {
if !limiter.Allow() {
c.JSON(http.StatusTooManyRequests, gin.H{
"error": "请求过于频繁",
})
c.Abort()
return
}
c.Next()
}
}

// 熔断中间件
func CircuitBreakerMiddleware(cb *breaker.CircuitBreaker) gin.HandlerFunc {
return func(c *gin.Context) {
err := cb.Execute(func() error {
c.Next()
if c.Writer.Status() >= 500 {
return errors.New("server error")
}
return nil
})

if err != nil {
c.JSON(http.StatusServiceUnavailable, gin.H{
"error": "服务暂时不可用",
})
c.Abort()
}
}
}

性能指标

  • QPS:比原生 Gin 提升 20%
  • 内存占用:降低 15%
  • 响应时间:P95 < 10ms

2. AI 工具链

项目地址github.com/kanelli/ai-toolchain

功能特性

  • LLM 统一接口适配器
  • Prompt 模板管理
  • 向量数据库抽象层
  • Agent 工作流引擎

使用示例

from ai_toolchain import LLMAdapter, PromptTemplate, VectorStore

# LLM 适配器
llm = LLMAdapter.create("openai", api_key="xxx")

# Prompt 模板
template = PromptTemplate.load("code_review.yaml")
prompt = template.render(code=source_code, language="python")

# 生成回答
response = await llm.generate(prompt)
print(response.content)

📊 技术影响力

技术文章

  • 掘金:10+ 技术文章,累计阅读 50万+
  • 博客:分享架构设计和性能优化经验
  • GitHub:开源项目获得 1000+ Star

技术分享

  • 公司内训:微服务架构、性能调优专题
  • 技术会议:分享高并发系统设计经验
  • 代码 Review:指导团队技术方案设计

团队贡献

  • 技术选型:主导团队技术栈升级
  • 最佳实践:建立代码规范和开发流程
  • 知识传承:培养 5+ 高级工程师

🎯 未来规划

技术方向

  • AI Native:深入 LLM 应用开发和 Agent 设计
  • 云原生:Kubernetes、Service Mesh 实践
  • 性能工程:极致性能优化和稳定性保障

开源计划

  • 开源高性能微服务框架
  • 发布 AI Agent 开发工具包
  • 贡献云原生生态项目

学习目标

  • 深入学习分布式系统理论
  • 掌握 AI 大模型训练和推理
  • 提升系统架构设计能力

通过这些项目,我积累了丰富的大规模系统设计和 AI 工程实践经验。每个项目都经历了从 0 到 1 的完整过程,包括需求分析、架构设计、技术选型、开发实现、性能优化、运维监控等各个环节。

我相信技术的价值在于解决实际问题,创造业务价值。未来我将继续在后端架构和 AI 工程领域深耕,为构建更高效、更智能的系统贡献力量。