LLMをつかったチャットボットで、内部知識にない場合ネットに検索にいき、表示するデモをつくりました。
言語モデルはSLMに相当するものを使いました。
コードは、ClaudeCodeをつかってつくっていますが、いくつかある手法から何を選んで実装するのか、という点を考えながら実装しました。
できたコードをみて学べることが多いです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
#!/usr/bin/env python3 """ LangChain AgentとToolsを使ったチャットボット MCPサーバーをLangChain Toolとして統合 """ import asyncio import json import sys from typing import Optional, List, Dict, Any import torch from transformers import pipeline from langchain.llms.base import LLM from langchain.callbacks.manager import CallbackManagerForLLMRun from langchain.agents import Tool, AgentExecutor, create_react_agent from langchain.prompts import PromptTemplate from langchain.schema import BaseMessage, HumanMessage, AIMessage class TransformersLLM(LLM): """Transformers pipeline用のカスタムLLMラッパー""" pipeline: Any = None max_new_tokens: int = 512 temperature: float = 0.7 @property def _llm_type(self) -> str: return "transformers" def _call( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: """Transformers pipelineを呼び出してレスポンスを取得""" try: if self.pipeline is None: return "エラー: LLMパイプラインが初期化されていません" # テキスト生成 result = self.pipeline( prompt, max_new_tokens=self.max_new_tokens, temperature=self.temperature, do_sample=True, top_p=0.9, return_full_text=False ) if result and len(result) > 0: return result[0]["generated_text"].strip() else: return "エラー: 応答を生成できませんでした" except Exception as e: return f"エラー: LLM実行中にエラーが発生しました: {str(e)}" class MCPSearchTool: """MCP検索サーバーをLangChain Toolとして提供""" def __init__(self, server_path: str): self.server_path = server_path self.process: Optional[asyncio.subprocess.Process] = None self.request_id = 0 async def start(self): """MCPサーバープロセスを開始""" self.process = await asyncio.create_subprocess_exec( sys.executable, self.server_path, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await asyncio.sleep(0.5) # 初期化 await self._send_request("initialize", { "protocolVersion": "2024-11-05", "clientInfo": {"name": "chatbot-client", "version": "1.0.0"} }) async def _send_request(self, method: str, params: Optional[Dict] = None) -> Dict: """MCPサーバーにリクエストを送信""" if not self.process: raise RuntimeError("MCPサーバーが起動していません") self.request_id += 1 request = { "jsonrpc": "2.0", "id": self.request_id, "method": method, "params": params or {} } request_json = json.dumps(request) + "\n" self.process.stdin.write(request_json.encode()) await self.process.stdin.drain() response_line = await self.process.stdout.readline() if not response_line: stderr = await self.process.stderr.read() error_msg = stderr.decode() if stderr else "不明なエラー" raise RuntimeError(f"MCPサーバーエラー: {error_msg}") return json.loads(response_line.decode()) async def search(self, query: str) -> str: """ウェブ検索を実行してテキストで結果を返す""" try: response = await self._send_request("tools/call", { "name": "search_web", "arguments": {"query": query, "num_results": 3} }) if "result" in response: content = response["result"]["content"][0]["text"] results = json.loads(content) # 検索結果を整形 formatted_results = [] for i, result in enumerate(results, 1): if "error" in result: return f"検索エラー: {result['error']}" formatted_results.append( f"[結果{i}] {result.get('title', 'N/A')}\n" f"{result.get('snippet', 'N/A')}\n" f"URL: {result.get('url', 'N/A')}" ) return "\n\n".join(formatted_results) else: return f"エラー: {response.get('error', {}).get('message', '不明')}" except Exception as e: return f"検索実行エラー: {str(e)}" async def close(self): """MCPサーバープロセスを終了""" if self.process: self.process.terminate() await self.process.wait() class SimpleChatbot: """シンプルなチャットボット(MCP検索統合)""" def __init__(self, llm_pipeline, mcp_tool: MCPSearchTool): self.llm = TransformersLLM(pipeline=llm_pipeline) self.mcp_tool = mcp_tool self.chat_history: List[BaseMessage] = [] # 回答用プロンプト self.answer_template = """以下の検索結果を参考にして、質問に答えてください。 検索結果: {context} 質問: {question} 回答: """ # 不明応答パターン self.unknown_patterns = [ "わかりません", "分かりません", "知りません", "申し訳ありません", "できません", "リアルタイム", "最新", "アクセスできません", "情報がありません", "データがありません", "don't know", "cannot", "unable to", ] async def _is_unknown_response(self, response: str) -> bool: """LLMの応答が「分からない」系かチェック""" response_lower = response.lower() for pattern in self.unknown_patterns: if pattern in response_lower: return True return False async def chat(self, user_input: str) -> str: """チャット処理""" self.chat_history.append(HumanMessage(content=user_input)) # まずLLMに直接聞いてみる print("[システム] 内部知識で回答を試みています...", file=sys.stderr) prompt = f"質問: {user_input}\n\n回答: " response = self.llm(prompt) # 「分からない」応答なら検索を実行 if await self._is_unknown_response(response): print("[システム] 内部知識では不十分です。ウェブ検索を実行します...", file=sys.stderr) search_results = await self.mcp_tool.search(user_input) # 検索結果を使って再度回答 prompt = self.answer_template.format( context=search_results, question=user_input ) response = self.llm(prompt) self.chat_history.append(AIMessage(content=response)) return response async def main(): """メイン処理""" print("=" * 60) print("チャットボット起動中...") print("=" * 60) # LLMのロード print("1. LLMをロード中...") print(f" モデル: google/gemma-2-2b-jpn-it") print(f" デバイス: {'CUDA' if torch.cuda.is_available() else 'CPU'}") llm_pipe = pipeline( "text-generation", model="google/gemma-2-2b-jpn-it", model_kwargs={"dtype": torch.bfloat16}, device="cuda" if torch.cuda.is_available() else "cpu", ) print(" ✓ LLMロード完了") # MCPツール初期化 print("2. MCP検索ツール起動中...") mcp_tool = MCPSearchTool("mcp_search_server.py") await mcp_tool.start() print(" ✓ MCP検索ツール起動完了") print() print("終了: 'quit' または 'exit'") print("=" * 60) # チャットボット初期化 chatbot = SimpleChatbot(llm_pipe, mcp_tool) try: while True: user_input = input("\nあなた: ").strip() if user_input.lower() in ["quit", "exit", "終了"]: print("終了します") break if not user_input: continue response = await chatbot.chat(user_input) print(f"\nボット: {response}") except KeyboardInterrupt: print("\n\n終了します") finally: await mcp_tool.close() if __name__ == "__main__": asyncio.run(main()) |
mcp_search_server.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
#!/usr/bin/env python3 """ MCP Server for Web Search (Stdio) ウェブ検索機能を提供するMCPサーバー """ import asyncio import json import sys from typing import Any, Dict, List import httpx from bs4 import BeautifulSoup class MCPSearchServer: """MCP準拠の検索サーバー""" def __init__(self): self.name = "search-server" self.version = "1.0.0" async def search_web(self, query: str, num_results: int = 5) -> List[Dict[str, Any]]: """ ウェブ検索を実行 DuckDuckGoのHTMLスクレイピングを使用 """ try: async with httpx.AsyncClient(timeout=10.0) as client: # DuckDuckGo検索 url = "https://html.duckduckgo.com/html/" data = {"q": query} headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } response = await client.post(url, data=data, headers=headers, follow_redirects=True) if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') results = [] # 検索結果を抽出 for result in soup.select('.result')[:num_results]: title_elem = result.select_one('.result__title') snippet_elem = result.select_one('.result__snippet') link_elem = result.select_one('.result__url') if title_elem and link_elem: title = title_elem.get_text(strip=True) snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" link = link_elem.get('href', '') results.append({ "title": title, "snippet": snippet, "url": link }) return results else: return [{"error": f"検索エラー: ステータスコード {response.status_code}"}] except Exception as e: return [{"error": f"検索中にエラーが発生: {str(e)}"}] async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """MCPリクエストを処理""" method = request.get("method") params = request.get("params", {}) request_id = request.get("id") response = { "jsonrpc": "2.0", "id": request_id } try: if method == "initialize": response["result"] = { "protocolVersion": "2024-11-05", "serverInfo": { "name": self.name, "version": self.version }, "capabilities": { "tools": {} } } elif method == "tools/list": response["result"] = { "tools": [ { "name": "search_web", "description": "ウェブ検索を実行して最新情報を取得します", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "検索クエリ" }, "num_results": { "type": "integer", "description": "取得する結果の数", "default": 5 } }, "required": ["query"] } } ] } elif method == "tools/call": tool_name = params.get("name") arguments = params.get("arguments", {}) if tool_name == "search_web": query = arguments.get("query") num_results = arguments.get("num_results", 5) results = await self.search_web(query, num_results) response["result"] = { "content": [ { "type": "text", "text": json.dumps(results, ensure_ascii=False, indent=2) } ] } else: response["error"] = { "code": -32601, "message": f"Unknown tool: {tool_name}" } else: response["error"] = { "code": -32601, "message": f"Unknown method: {method}" } except Exception as e: response["error"] = { "code": -32603, "message": f"Internal error: {str(e)}" } return response async def run(self): """Stdioでサーバーを実行""" # 標準エラー出力にログを出力 print("MCP Search Server started", file=sys.stderr) while True: try: # 標準入力から1行読み取り line = await asyncio.get_event_loop().run_in_executor( None, sys.stdin.readline ) if not line: break # JSONリクエストをパース request = json.loads(line.strip()) # リクエストを処理 response = await self.handle_request(request) # 標準出力にJSONレスポンスを書き込み print(json.dumps(response, ensure_ascii=False), flush=True) except json.JSONDecodeError as e: print(f"JSON parse error: {e}", file=sys.stderr) except Exception as e: print(f"Error: {e}", file=sys.stderr) async def main(): server = MCPSearchServer() await server.run() if __name__ == "__main__": asyncio.run(main()) |