This commit is contained in:
zerone 2025-02-24 20:43:53 +08:00
parent f7ddee5eb4
commit 6a978c4275
5 changed files with 205 additions and 48 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

View File

@ -206,3 +206,21 @@ MIT License - See LICENSE file for details
- [CCXT](https://github.com/ccxt/ccxt) for exchange integrations - [CCXT](https://github.com/ccxt/ccxt) for exchange integrations
- [Model Context Protocol](https://modelcontextprotocol.io) for the MCP specification - [Model Context Protocol](https://modelcontextprotocol.io) for the MCP specification
- The cryptocurrency exchanges for providing market data APIs - The cryptocurrency exchanges for providing market data APIs
"crypto": {
"command": "/opt/anaconda3/envs/mcp/bin/python",
"args": ["/Users/zerone/code/RAG/mcp-server-ccxt/src/server.py"]
},
"mcp-crypto-price": {
"command": "/Users/zerone/.nvm/versions/node/v20.18.3/bin/node",
"args": ["/Users/zerone/code/RAG/mcp-crypto-price/dist/index.js"],
"env": {
"COINCAP_API_KEY": "fc14196f237776db1cdc070e76f6223f9360457148b1cc1fb488fb0da5cd025b"
}
}

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
ccxt>=4.2.15
mcp-server>=0.1.0
python-dateutil>=2.8.2

View File

@ -6,33 +6,48 @@ from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions from mcp.server.models import InitializationOptions
import mcp.server.stdio import mcp.server.stdio
from datetime import datetime, timedelta from datetime import datetime, timedelta
import sys
import platform
import signal
# Initialize server # 初始化服务器
server = Server("crypto-server") server = Server("crypto-server")
# Define supported exchanges and their instances # 定义支持的交易所及其实例
# 这里列出了所有支持的加密货币交易所使用ccxt库提供的接口
SUPPORTED_EXCHANGES = { SUPPORTED_EXCHANGES = {
'binance': ccxt.binance, 'binance': ccxt.binance, # 币安
'coinbase': ccxt.coinbase, 'coinbase': ccxt.coinbase, # 比特币基地
'kraken': ccxt.kraken, 'kraken': ccxt.kraken, # 北海巨妖
'kucoin': ccxt.kucoin, 'kucoin': ccxt.kucoin, # 库币
'hyperliquid': ccxt.hyperliquid, 'hyperliquid': ccxt.hyperliquid, # Hyperliquid
'huobi': ccxt.huobi, 'huobi': ccxt.huobi, # 火币
'bitfinex': ccxt.bitfinex, 'bitfinex': ccxt.bitfinex, # Bitfinex
'bybit': ccxt.bybit, 'bybit': ccxt.bybit, # Bybit
'okx': ccxt.okx, 'okx': ccxt.okx, # OKX
'mexc': ccxt.mexc 'mexc': ccxt.mexc # MEXC
} }
# Exchange instances cache # 交易所实例缓存,用于存储已创建的交易所实例
exchange_instances = {} exchange_instances = {}
async def get_exchange(exchange_id: str) -> ccxt.Exchange: async def get_exchange(exchange_id: str) -> ccxt.Exchange:
"""Get or create an exchange instance.""" """
获取或创建一个交易所实例
Args:
exchange_id: 交易所ID
Returns:
ccxt.Exchange: 交易所实例
Raises:
ValueError: 当提供了不支持的交易所ID时
"""
exchange_id = exchange_id.lower() exchange_id = exchange_id.lower()
if exchange_id not in SUPPORTED_EXCHANGES: if exchange_id not in SUPPORTED_EXCHANGES:
raise ValueError(f"Unsupported exchange: {exchange_id}") raise ValueError(f"不支持的交易所: {exchange_id}")
if exchange_id not in exchange_instances: if exchange_id not in exchange_instances:
exchange_class = SUPPORTED_EXCHANGES[exchange_id] exchange_class = SUPPORTED_EXCHANGES[exchange_id]
@ -42,25 +57,39 @@ async def get_exchange(exchange_id: str) -> ccxt.Exchange:
async def format_ticker(ticker: Dict[str, Any], exchange_id: str) -> str: async def format_ticker(ticker: Dict[str, Any], exchange_id: str) -> str:
"""Format ticker data into a readable string.""" """
将ticker数据格式化为可读字符串
Args:
ticker: 交易对的ticker数据
exchange_id: 交易所ID
Returns:
str: 格式化后的ticker信息
"""
return ( return (
f"Exchange: {exchange_id.upper()}\n" f"交易所: {exchange_id.upper()}\n"
f"Symbol: {ticker.get('symbol')}\n" f"交易对: {ticker.get('symbol')}\n"
f"Last Price: {ticker.get('last', 'N/A')}\n" f"最新价格: {ticker.get('last', 'N/A')}\n"
f"24h High: {ticker.get('high', 'N/A')}\n" f"24小时最高: {ticker.get('high', 'N/A')}\n"
f"24h Low: {ticker.get('low', 'N/A')}\n" f"24小时最低: {ticker.get('low', 'N/A')}\n"
f"24h Volume: {ticker.get('baseVolume', 'N/A')}\n" f"24小时成交量: {ticker.get('baseVolume', 'N/A')}\n"
f"Bid: {ticker.get('bid', 'N/A')}\n" f"买一价: {ticker.get('bid', 'N/A')}\n"
f"Ask: {ticker.get('ask', 'N/A')}\n" f"卖一价: {ticker.get('ask', 'N/A')}\n"
"---" "---"
) )
def get_exchange_schema() -> Dict[str, Any]: def get_exchange_schema() -> Dict[str, Any]:
"""Get the JSON schema for exchange selection.""" """
获取交易所选择的JSON模式
Returns:
Dict[str, Any]: 包含交易所选择选项的JSON模式
"""
return { return {
"type": "string", "type": "string",
"description": f"Exchange to use (supported: {', '.join(SUPPORTED_EXCHANGES.keys())})", "description": f"要使用的交易所(支持的交易所:{', '.join(SUPPORTED_EXCHANGES.keys())}",
"enum": list(SUPPORTED_EXCHANGES.keys()), "enum": list(SUPPORTED_EXCHANGES.keys()),
"default": "binance" "default": "binance"
} }
@ -99,18 +128,23 @@ def format_ohlcv_data(ohlcv_data: List[List], timeframe: str) -> str:
@server.list_tools() @server.list_tools()
async def handle_list_tools() -> List[types.Tool]: async def handle_list_tools() -> List[types.Tool]:
"""List available cryptocurrency tools.""" """
列出可用的加密货币工具
Returns:
List[types.Tool]: 可用工具列表
"""
return [ return [
# Market Data Tools # 市场数据工具
types.Tool( types.Tool(
name="get-price", name="get-price",
description="Get current price of a cryptocurrency pair from a specific exchange", description="从指定交易所获取加密货币交易对的当前价格",
inputSchema={ inputSchema={
"type": "object", "type": "object",
"properties": { "properties": {
"symbol": { "symbol": {
"type": "string", "type": "string",
"description": "Trading pair symbol (e.g., BTC/USDT, ETH/USDT)", "description": "交易对符号例如BTC/USDT, ETH/USDT",
}, },
"exchange": get_exchange_schema() "exchange": get_exchange_schema()
}, },
@ -119,13 +153,13 @@ async def handle_list_tools() -> List[types.Tool]:
), ),
types.Tool( types.Tool(
name="get-market-summary", name="get-market-summary",
description="Get detailed market summary for a cryptocurrency pair from a specific exchange", description="从指定交易所获取加密货币交易对的详细市场摘要",
inputSchema={ inputSchema={
"type": "object", "type": "object",
"properties": { "properties": {
"symbol": { "symbol": {
"type": "string", "type": "string",
"description": "Trading pair symbol (e.g., BTC/USDT, ETH/USDT)", "description": "交易对符号例如BTC/USDT, ETH/USDT",
}, },
"exchange": get_exchange_schema() "exchange": get_exchange_schema()
}, },
@ -134,13 +168,13 @@ async def handle_list_tools() -> List[types.Tool]:
), ),
types.Tool( types.Tool(
name="get-top-volumes", name="get-top-volumes",
description="Get top cryptocurrencies by trading volume from a specific exchange", description="从指定交易所获取按交易量排名的热门加密货币",
inputSchema={ inputSchema={
"type": "object", "type": "object",
"properties": { "properties": {
"limit": { "limit": {
"type": "number", "type": "number",
"description": "Number of pairs to return (default: 5)", "description": "返回的交易对数量默认5",
}, },
"exchange": get_exchange_schema() "exchange": get_exchange_schema()
} }
@ -148,32 +182,32 @@ async def handle_list_tools() -> List[types.Tool]:
), ),
types.Tool( types.Tool(
name="list-exchanges", name="list-exchanges",
description="List all supported cryptocurrency exchanges", description="列出所有支持的加密货币交易所",
inputSchema={ inputSchema={
"type": "object", "type": "object",
"properties": {} "properties": {}
}, },
), ),
# Historical Data Tools # 历史数据工具
types.Tool( types.Tool(
name="get-historical-ohlcv", name="get-historical-ohlcv",
description="Get historical OHLCV (candlestick) data for a trading pair", description="获取交易对的历史OHLCVK线图数据",
inputSchema={ inputSchema={
"type": "object", "type": "object",
"properties": { "properties": {
"symbol": { "symbol": {
"type": "string", "type": "string",
"description": "Trading pair symbol (e.g., BTC/USDT, ETH/USDT)", "description": "交易对符号例如BTC/USDT, ETH/USDT",
}, },
"timeframe": { "timeframe": {
"type": "string", "type": "string",
"description": "Timeframe for candlesticks (e.g., 1m, 5m, 15m, 1h, 4h, 1d)", "description": "K线图的时间周期例如1m, 5m, 15m, 1h, 4h, 1d",
"enum": ["1m", "5m", "15m", "1h", "4h", "1d"], "enum": ["1m", "5m", "15m", "1h", "4h", "1d"],
"default": "1h" "default": "1h"
}, },
"days_back": { "days_back": {
"type": "number", "type": "number",
"description": "Number of days of historical data to fetch (default: 7, max: 30)", "description": "获取历史数据的天数默认7最大30",
"default": 7, "default": 7,
"maximum": 30 "maximum": 30
}, },
@ -184,13 +218,13 @@ async def handle_list_tools() -> List[types.Tool]:
), ),
types.Tool( types.Tool(
name="get-price-change", name="get-price-change",
description="Get price change statistics over different time periods", description="获取不同时间段的价格变化统计",
inputSchema={ inputSchema={
"type": "object", "type": "object",
"properties": { "properties": {
"symbol": { "symbol": {
"type": "string", "type": "string",
"description": "Trading pair symbol (e.g., BTC/USDT, ETH/USDT)", "description": "交易对符号例如BTC/USDT, ETH/USDT",
}, },
"exchange": get_exchange_schema() "exchange": get_exchange_schema()
}, },
@ -199,17 +233,17 @@ async def handle_list_tools() -> List[types.Tool]:
), ),
types.Tool( types.Tool(
name="get-volume-history", name="get-volume-history",
description="Get trading volume history over time", description="获取一段时间内的交易量历史",
inputSchema={ inputSchema={
"type": "object", "type": "object",
"properties": { "properties": {
"symbol": { "symbol": {
"type": "string", "type": "string",
"description": "Trading pair symbol (e.g., BTC/USDT, ETH/USDT)", "description": "交易对符号例如BTC/USDT, ETH/USDT",
}, },
"days": { "days": {
"type": "number", "type": "number",
"description": "Number of days of volume history (default: 7, max: 30)", "description": "获取交易量历史的天数默认7最大30",
"default": 7, "default": 7,
"maximum": 30 "maximum": 30
}, },
@ -391,10 +425,48 @@ async def main():
) )
def handle_sigint():
"""Handle keyboard interrupt"""
for instance in exchange_instances.values():
asyncio.create_task(instance.close())
sys.exit(0)
def configure_asyncio_event_loop():
"""Configure the appropriate event loop based on the platform"""
if platform.system() == 'Windows':
# Windows specific configuration
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
# Unix-like systems configuration
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Set up signal handlers
try:
if platform.system() != 'Windows':
# Unix-like systems support these signals
loop.add_signal_handler(signal.SIGINT, handle_sigint)
loop.add_signal_handler(signal.SIGTERM, handle_sigint)
else:
# Windows alternative
signal.signal(signal.SIGINT, lambda x, y: handle_sigint())
except NotImplementedError:
# Fallback for environments where signals are not supported
pass
return loop
def run_server(): def run_server():
"""Wrapper to run the async main function""" """Wrapper to run the async main function with proper event loop configuration"""
asyncio.run(main()) try:
loop = configure_asyncio_event_loop()
loop.run_until_complete(main())
except KeyboardInterrupt:
handle_sigint()
finally:
loop.close()
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) run_server()

64
src/test_crypto.py Normal file
View File

@ -0,0 +1,64 @@
import asyncio
from server import handle_list_tools, handle_call_tool
async def test_crypto_server():
"""
测试加密货币服务器的各项功能
"""
try:
# 1. 测试获取所有可用工具
print("=== 测试获取所有工具 ===")
tools = await handle_list_tools()
print(f"可用工具数量: {len(tools)}")
for tool in tools:
print(f"\n工具名称: {tool.name}")
print(f"描述: {tool.description}")
# 2. 测试获取币安BTC/USDT的价格
print("\n=== 测试获取BTC价格 ===")
price_result = await handle_call_tool("get-price", {
"symbol": "BTC/USDT",
"exchange": "binance"
})
print("BTC价格信息:")
for content in price_result:
print(content.text)
# 3. 测试获取市场摘要
print("\n=== 测试获取市场摘要 ===")
summary_result = await handle_call_tool("get-market-summary", {
"symbol": "ETH/USDT",
"exchange": "binance"
})
print("ETH市场摘要:")
for content in summary_result:
print(content.text)
# 4. 测试获取交易量排名
print("\n=== 测试获取交易量排名 ===")
volume_result = await handle_call_tool("get-top-volumes", {
"limit": 3,
"exchange": "binance"
})
print("交易量前3的交易对:")
for content in volume_result:
print(content.text)
# 5. 测试获取历史K线数据
print("\n=== 测试获取历史K线数据 ===")
ohlcv_result = await handle_call_tool("get-historical-ohlcv", {
"symbol": "BTC/USDT",
"exchange": "binance",
"timeframe": "1h",
"days_back": 1
})
print("BTC/USDT的历史K线数据:")
for content in ohlcv_result:
print(content.text)
except Exception as e:
print(f"测试过程中发生错误: {str(e)}")
if __name__ == "__main__":
# 运行测试
asyncio.run(test_crypto_server())