想要将dify接入钉钉机器人中进行问答,首先需要创建应用,添加企业机器人,创建消息卡片,开发机器人回复消息的代码,最后接入dify,使用dify的工作流进行回答。
一、机器人准备
1、创建应用:创建应用 - 钉钉开放平台
2、添加应用能力:添加应用能力 - 钉钉开放平台
3、配置企业机器人:配置企业机器人 - 钉钉开放平台
4、新建卡片模板:打字机效果流式 AI 卡片 - 钉钉开放平台
二、开发代码
1、安装官方SDK
pip install dingtalk_stream loguru
2、修改模板代码
以下为官方模板代码,根据自己的需求修改消息接收和回复逻辑
import os import logging import asyncio import argparse from loguru import logger from dingtalk_stream import AckMessage import dingtalk_stream from http import HTTPStatus from dashscope import Generation from typing import Callable def define_options(): parser = argparse.ArgumentParser() parser.add_argument( "--client_id", dest="client_id", default=os.getenv("DINGTALK_APP_CLIENT_ID"), help="app_key or suite_key from https://open-dev.digntalk.com", ) parser.add_argument( "--client_secret", dest="client_secret", default=os.getenv("DINGTALK_APP_CLIENT_SECRET"), help="app_secret or suite_secret from https://open-dev.digntalk.com", ) options = parser.parse_args() return options async def call_with_stream(request_content: str, callback: Callable[[str], None]): messages = [{"role": "user", "content": request_content}] responses = Generation.call( Generation.Models.qwen_turbo, messages=messages, result_format="message", # set the result to be "message" format. stream=True, # set stream output. incremental_output=True, # get streaming output incrementally. ) full_content = "" # with incrementally we need to merge output. length = 0 for response in responses: if response.status_code == HTTPStatus.OK: full_content += response.output.choices[0]["message"]["content"] full_content_length = len(full_content) if full_content_length - length > 20: await callback(full_content) logger.info( f"调用流式更新接口更新内容:current_length: {length}, next_length: {full_content_length}" ) length = full_content_length else: raise Exception( f"Request id: {response.request_id}, Status code: {response.status_code}, error code: {response.code}, error message: {response.message}" ) await callback(full_content) logger.info( f"Request Content: {request_content}\nFull response: {full_content}\nFull response length: {len(full_content)}" ) return full_content async def handle_reply_and_update_card(self: dingtalk_stream.ChatbotHandler, incoming_message: dingtalk_stream.ChatbotMessage): # 卡片模板 ID card_template_id = "8aebdfb9-28f4-4a98-98f5-396c3dde41a0.schema" # 该模板只用于测试使用,如需投入线上使用,请导入卡片模板 json 到自己的应用下 content_key = "content" card_data = {content_key: ""} card_instance = dingtalk_stream.AICardReplier( self.dingtalk_client, incoming_message ) # 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards card_instance_id = await card_instance.async_create_and_deliver_card( card_template_id, card_data ) # 再流式更新卡片: https://open.dingtalk.com/document/isvapp/api-streamingupdate async def callback(content_value: str): return await card_instance.async_streaming( card_instance_id, content_key=content_key, content_value=content_value, append=False, finished=False, failed=False, ) try: full_content_value = await call_with_stream( incoming_message.text.content, callback ) await card_instance.async_streaming( card_instance_id, content_key=content_key, content_value=full_content_value, append=False, finished=True, failed=False, ) except Exception as e: self.logger.exception(e) await card_instance.async_streaming( card_instance_id, content_key=content_key, content_value="", append=False, finished=False, failed=True, ) class CardBotHandler(dingtalk_stream.ChatbotHandler): def __init__(self, logger: logging.Logger = logger): super(dingtalk_stream.ChatbotHandler, self).__init__() if logger: self.logger = logger async def process(self, callback: dingtalk_stream.CallbackMessage): incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) self.logger.info(f"收到消息:{incoming_message}") if incoming_message.message_type != "text": self.reply_text("俺只看得懂文字喔~", incoming_message) return AckMessage.STATUS_OK, "OK" asyncio.create_task(handle_reply_and_update_card(self, incoming_message)) return AckMessage.STATUS_OK, "OK" def main(): options = define_options() credential = dingtalk_stream.Credential(options.client_id, options.client_secret) client = dingtalk_stream.DingTalkStreamClient(credential) client.register_callback_handler( dingtalk_stream.ChatbotMessage.TOPIC, CardBotHandler() ) client.start_forever() if __name__ == "__main__": main()3、接入dify
根据dify创建的工作流的API文档进行调用,如果想进行多轮对话可以保存返回的conversation_id
full_content = "" length = 0 timeout = aiohttp.ClientTimeout(total=60) async with aiohttp.ClientSession(timeout=timeout) as session: try: async with session.post(DIFY_CHAT_URL, headers=headers, json=payload) as resp: if resp.status != 200: error_text = await resp.text() raise Exception(f"Dify API error: {resp.status}, {error_text}") async for line in resp.content: line = line.decode('utf-8').strip() if not line or not line.startswith("data:"): continue data_str = line[5:].strip() # 去掉 "data:" if data_str == "[DONE]": break try: data = json.loads(data_str) if 'conversation_id' in data and not conv_id: # 首次创建对话,保存 ID update_conversation_id(user_id, data['conversation_id']) except json.JSONDecodeError: continue if 'answer' in data: chunk = data['answer'] full_content += chunk # 策略1:每次收到 chunk 都回调 # await callback(full_content) # 策略2:累积一定长度再回调 if len(full_content) - length >= 20: await callback(full_content) logger.info( f"调用流式更新接口更新内容:current_length: {length}, next_length: {len(full_content)}" ) length = len(full_content) # 最终回调(确保完整内容被处理) if full_content and len(full_content) > length: await callback(full_content) logger.info( f"Request Content: {request_content}\nFull response: {full_content}\nFull response length: {len(full_content)}" ) return full_content except Exception as e: logger.error(f"调用 Dify 流式接口出错: {e}") raise4、拓展
继续开发,不仅可以进行文字对话,还能够进行图片问答,先调用self.get_image_download_url方法获取钉钉上传的图像,再调用v1/files/upload接口即可上传图像到dify