Commit 05b0cbbb authored by 王晓亮's avatar 王晓亮
Browse files

1

No related merge requests found
Showing with 3220 additions and 97 deletions
+3220 -97
No preview for this file type
......@@ -15,6 +15,9 @@
####### 华杰机器人
#######################################################################
1478002873 70q606 # hyper-v 江西华杰01
1903989249 70q606 # hyper-v 江西华杰02
1502678884 70q606 # hyper-v 江西华杰03
#######################################################################
####### 顺智机器人
......
import os
from openai import OpenAI
try:
client = OpenAI(
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx",
# api_key=os.getenv("DASHSCOPE_API_KEY"),
# sk-1f30e07928a74bedb80ab38b8578eb13
api_key="sk-1f30e07928a74bedb80ab38b8578eb13",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
completion = client.chat.completions.create(
model="qwen-plus", # 模型列表:https://help.aliyun.com/zh/model-studio/getting-started/models
messages=[
{'role': 'system', 'content': 'You are a helpful assistant.'},
{'role': 'user', 'content': '你是谁?'}
]
)
print(completion.choices[0].message.content)
except Exception as e:
print(f"错误信息:{e}")
print("请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code")
from utils.log_util import *
from utils.io_util import *
import sys
sys.path.append("..")
from utils.io_util import *
from utils.log_util import *
_global_dict = {}
def get_app_version():
return "1.0.0.3"
def _g_init():
global _global_dict
def set_g_value(name, value):
_global_dict[name] = value
def set_g_value(key, value):
_global_dict[key] = value
def get_g_value(name, defValue=None):
def get_g_value(key, defValue=None, saveDefaultIfEmpty=True):
try:
return _global_dict[name]
return _global_dict[key]
except KeyError:
if saveDefaultIfEmpty:
_global_dict[key] = defValue
return _global_dict[key]
return defValue
def get_g_value_i(name, defValue=None):
def get_g_value_i(key, defValue=None):
try:
return _global_dict[name]
return _global_dict[key]
except KeyError:
if defValue != None:
_global_dict[name] = defValue
_global_dict[key] = defValue
return defValue
......@@ -57,6 +64,10 @@ def init_cfg(fn):
cfg = read_yaml(fn)
if cfg is None:
log("配置文件丢失")
exit(-1)
sys.exit(-1)
set_g_value("cfg", cfg)
def is_exsists_arg(key):
return key in sys.argv
# -*- coding: utf-8 -*-
import pandas as pd
# 读取CSV文件
data = pd.read_csv('C:/Users/AegisAdmin/Documents/机器人-向日葵.csv')
# 转换为JSON格式并存储到文件中
data.to_json('robot_sunflower.json', orient='records', force_ascii=False)
\ No newline at end of file
......@@ -15,7 +15,7 @@ init_cfg("daemon-settings.yaml")
lock_pid_file = get_cfg("lock_file")
if lock_pid_file is None:
lock_pid_file = make_pid_file(os.path.abspath(__file__))
if not check_singleton(lock_pid_file):
print("already running")
sys.exit(-1)
......@@ -31,7 +31,7 @@ argv_len = len(sys.argv)
if argv_len < 2:
print(f"not enough arguments: {sys.argv}")
sys.exit(0)
task_type = sys.argv[1]
dop_flag = 1 # 默认在测试
......@@ -40,9 +40,11 @@ if get_cfg("env", "development") == "production":
set_g_value("dop_flag", dop_flag)
def _log(msg):
log(msg, CHANNEL)
tasks = get_cfg("tasks")
today_record = None
......@@ -50,7 +52,7 @@ _today = date.today()
do_log = get_cfg("do_log_flag", 0) > 0
if today_record!=_today:
if today_record != _today:
today_record = _today
task_time_records = {}
......@@ -62,6 +64,7 @@ def close_by_task(task):
except:
log(traceback.format_exc(), "daemon")
def start_by_task(task):
try:
if "pmode" in task and task["pmode"] == "runas":
......@@ -73,27 +76,56 @@ def start_by_task(task):
except:
log(traceback.format_exc(), "daemon")
tasks = get_cfg("tasks")
# print(tasks)
for tk in tasks:
task = tasks[tk]
if task_type != task["title"]:
continue
if "enabled" in task and not task["enabled"]:
continue
if "process_name" not in task:
_log("未定义监控的进程名:process_name")
continue
if "cmd" not in task:
_log("未定义监控的进程关闭时的处理程序:cmd")
continue
print(f'start task: {task["title"]}\n{task["process_name"]}\n{task["cmd"]}')
print(
f'start task: {task["title"]}\n{task["process_name"]}\n{task["cmd"]}')
if "start" == task["operation"]:
############################################################################################
# 判断监控的程序可执行文件是否存在
# task["cmd"] 一般是可执行程序的完全路径,不带参数,参数由该执行程序的配置表读入
# 如果存在更新文件
executable_file = task["cmd"]
exec_update_file = f'{executable_file}.update'
if os.path.exists(exec_update_file):
exec_prev = f'{executable_file}.previous'
try:
close_by_task(task)
tm.sleep(3)
if os.path.exists(exec_prev):
_log(
f"The file '{exec_prev}' exists. Deleting the file...")
os.remove(exec_prev)
_log(f"The file '{exec_prev}' has been deleted.")
os.rename(f'{executable_file}', exec_prev)
_log(f"Backup file '{exec_prev}' created.")
tm.sleep(1)
_log(f"The file {exec_update_file} -> {executable_file}")
os.rename(exec_update_file, executable_file)
_log(f"The file '{executable_file}' update successfully.")
tm.sleep(1)
except Exception as ex:
_log(f"{exec_update_file} 更新失败:\n{str(ex)}")
############################################################################################
if not exists_process(task["process_name"]):
start_by_task(task)
elif "restart" == task["operation"]:
......@@ -103,9 +135,9 @@ for tk in tasks:
start_by_task(task)
elif "shutdown" == task["operation"]:
close_by_task(task)
# tmp_r = input("按回车继续") # DEBUG
print(f'fin')
release_singleton(lock_pid_file)
\ No newline at end of file
release_singleton(lock_pid_file)
# -*- coding: utf-8 -*- ##设置编码方式
import os
from datetime import datetime
from datetime import timedelta
import sys
sys.path.append("..")
import random
from common.app_common import *
from bll.zhs_bll_xzbot_api import *
import sqlite3 as sql3
def init_xzbot_db():
# 初始化客户端本地db
try:
db_file_name = 'xzbot_client.db'
if os.path.exists(db_file_name):
log_bot_db(f"{db_file_name} 文件已存在,跳过初始化")
return
con = sql3.connect(db_file_name)
with con:
# 机器人表
con.execute("""
CREATE TABLE xzbot (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
name TEXT, -- 名称
robot_id TEXT, -- 机器人id
remark TEXT,
last_exec_time TIMESTAMP NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'localtime')), -- 消息的最后发送时间
last_active_time TIMESTAMP NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'localtime')), -- 机器人的最后活跃时间
status INTEGER
);
""")
# 消息历史
con.execute("""
CREATE TABLE xzbot_msg (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
robot_id TEXT, -- 机器人id
msg_id INTEGER, -- 消息id
cmd TEXT, -- 指令类型
customer_name TEXT,
target TEXT,
content TEXT,
error INTEGER DEFAULT 0,
err_msg TEXT,
retry INTEGER DEFAULT 0,
create_at TIMESTAMP NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'localtime')), -- 消息的最后发送时间
update_at TIMESTAMP NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'localtime')), -- 机器人的最后活跃时间
status INTEGER
);
""")
# CREATE INDEX index_name ON table_name(column_name...);
con.execute("""
CREATE UNIQUE INDEX idx_msg_id ON xzbot_msg(msg_id);
""")
# 聊天群记录
con.execute("""
CREATE TABLE xzbot_room (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
name TEXT,
create_at TIMESTAMP NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'localtime')),
update_at TIMESTAMP NULL DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'localtime')),
status INTEGER
);
""")
# CREATE INDEX index_name ON table_name(column_name...);
con.execute("""
CREATE INDEX idx_room_name ON xzbot_room(name);
""")
except Exception as ex:
log_bot_db_err(str(ex))
# 通过 sqlacodegen 生成 sqlalchemy orm 代码:
# sqlacodegen sqlite:///<database.db>
def remove_obsolete_msgs():
msg_expire_days = get_cfg("msg_cache_days", 30)
print(f"删除超过 {msg_expire_days} 天的消息记录")
remove_msgs_before(datetime.now() + timedelta(days=-msg_expire_days), get_db_session())
def test_insert_msgs():
msgs = []
random_id = int(random.random() * 10000000)
msgs.append(XzbotMsg(
msg_id = 1765998401930000000 + random_id,
robot_id = "robot" + str(random_id),
target = "test_target",
content = "txt:" + str(random_id),
# create_at = datetime.now(),
update_at = datetime.now(),
status = XzbotMsgStatusEnum.已发送|XzbotMsgStatusEnum.发送异常
))
session_lite = create_xzbot_db_session()
for msg in msgs:
session_lite.add(msg)
session_lite.commit()
def test_get_msgs_before():
msgs = get_msgs_before(datetime.now()+timedelta(minutes=-1), create_xzbot_db_session())
if msgs is not None:
for msg in msgs:
print(msg.msg_id, msg.content)
def test_mark_msg():
msgs = []
session_lite = create_xzbot_db_session()
msgs = get_msgs_before(datetime.now()+timedelta(minutes=-15), session_lite)
mark_msgs(msgs, XzbotMsgStatusEnum.已发送, session_lite)
session_lite.commit()
def test_remove_msgs_before():
session_lite = create_xzbot_db_session()
remove_msgs_before(datetime.now()+timedelta(minutes=-15), session_lite)
session_lite.commit()
def test_add_qywx_room(title):
session_lite = create_xzbot_db_session()
qr = test_get_qywx_room(title, session_lite)
if qr.count() < 1:
session_lite.add(XzbotRoom(
name = title,
create_at = datetime.now(),
update_at = datetime.now(),
status = 0
))
else:
# qr.update({"update_at":datetime.now()})
qr.update({XzbotRoom.update_at:datetime.now()})
session_lite.commit()
def test_get_qywx_room(title, session = None):
if session is None:
session = create_xzbot_db_session()
qr = session.query(XzbotRoom)\
.filter(XzbotRoom.name == title)
# .filter(and_(XzbotRoom.create_at.__lt__(date))
print(f'room num: {qr.count()}')
for x in qr:
print(x.name, x.create_at, x.update_at)
return qr
def test_remove_qywx_room_by_title(title, session = None):
if session is None:
session = create_xzbot_db_session()
qr = session.query(XzbotRoom)\
.filter(XzbotRoom.name == title).delete()
if qr > 0:
print(f'删除成功{qr}条记录')
session.commit()
return qr
\ No newline at end of file
......@@ -151,7 +151,7 @@ class EcsQywxMsg(Base):
__table_args__ = {'comment': '企业外部通信消息'}
id = Column(BIGINT(20), primary_key=True, comment='消息id')
type = Column(INTEGER(4), server_default=text("'0'"), comment='类型 0 员工向企业微信发送 ')
type = Column(INTEGER(4), server_default=text("'0'"), comment='类型\\r\\n0 普通消息\\r\\n1 强制重发消息\\r\\n999999 系统通知机器人客户端')
channel_id = Column(BIGINT(20), nullable=False, index=True, server_default=text("'0'"), comment='信道id')
robot_id = Column(String(50), nullable=False, index=True, server_default=text("'0'"), comment='机器人id')
customer_id = Column(String(50), nullable=False, index=True, server_default=text("'0'"), comment='客户id')
......@@ -160,8 +160,9 @@ class EcsQywxMsg(Base):
msg = Column(LONGTEXT, comment='消息')
json = Column(JSON, comment='消息体')
status = Column(INTEGER(4), server_default=text("'0'"), comment='状态 0 正常 1 已发送 2 已确认发送 3 客户端发送异常 4 需要重发 5 次日再发 6 信道停用暂停发送 99 异常')
error_code = Column(BIGINT(20), nullable=False, index=True, server_default=text("'0'"), comment='错误编码')
error_msg = Column(VARCHAR(200), index=True, comment='错误消息')
error_code = Column(BIGINT(20), nullable=False, index=True, server_default=text("'0'"), comment='错误码\\r\\n100001 消息超过4000字符\\r\\n101010 夜晚发送的消息,需要次日发送\\r\\n201102 群名错误,或其他原因搜索不到群\\r\\n201103 群名不可修改\\r\\n400400 反馈状态时接口调用失败\\r\\n400401 重发反馈成功')
error_msg = Column(VARCHAR(200), index=True, comment='错误提示')
retry = Column(Integer, server_default=text("'0'"), comment='重试次数')
create_at = Column(DateTime, index=True, comment='创建时间')
creator_id = Column(VARCHAR(50), index=True, comment='创建人id')
creator_name = Column(VARCHAR(200), comment='创建人')
......@@ -187,6 +188,26 @@ class EcsQywxQa(Base):
update_by = Column(String(50), comment='最后更新人ID')
class EcsServant(Base):
__tablename__ = 'ecs_servant'
__table_args__ = {'comment': '企业微信发送错误通知人'}
id = Column(BigInteger, primary_key=True, comment='企业外部通信渠道id')
user_id = Column(VARCHAR(50), comment='助记符')
user_name = Column(VARCHAR(20), comment='企业外部通信渠道名称')
channel_id = Column(BigInteger, comment='发送通知的机器人信道id')
qywx_account = Column(VARCHAR(200), comment='分类名')
wx_account = Column(VARCHAR(200), comment='机器人id')
target = Column(VARCHAR(200), comment='发送话术的目标,通常是聊天群名称')
remark = Column(VARCHAR(1000), comment='备注')
status = Column(Integer, server_default=text("'0'"), comment='状态\\r\\n0 正常;\\r\\n1 启用;\\r\\n2 停用;\\r\\n99 异常;')
create_at = Column(DateTime)
creator_id = Column(VARCHAR(50))
creator_name = Column(VARCHAR(200))
update_at = Column(DateTime)
updater_id = Column(VARCHAR(50))
updater_name = Column(VARCHAR(200))
class ZhsMsg(Base):
__tablename__ = 'zhs_msg'
__table_args__ = {'comment': '站内消息'}
......
# coding: utf-8
from sqlalchemy import Column, Integer, TIMESTAMP, Table, Text, text
from sqlalchemy.sql.sqltypes import NullType
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
metadata = Base.metadata
t_sqlite_sequence = Table(
'sqlite_sequence', metadata,
Column('name', NullType),
Column('seq', NullType)
)
class Xzbot(Base):
__tablename__ = 'xzbot'
id = Column(Integer, primary_key=True)
name = Column(Text)
robot_id = Column(Text)
remark = Column(Text)
last_exec_time = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
last_active_time = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
status = Column(Integer)
class XzbotMsgTypeEnum():
普通 = 0
强制重新执行 = 1
系统指令 = 999999
class XzbotMsgStatusEnum():
正常 = 0
已发送 = 1
发送异常 = 2
执行成功 = 4
class XzbotMsg(Base):
__tablename__ = 'xzbot_msg'
id = Column(Integer, primary_key=True)
robot_id = Column(Text)
msg_id = Column(Integer)
cmd = Column(Text)
customer_name = Column(Text)
target = Column(Text)
content = Column(Text)
error = Column(Integer)
err_msg = Column(Text)
retry = Column(Integer)
create_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
update_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
status = Column(Integer)
class XzbotRoom(Base):
__tablename__ = 'xzbot_room'
id = Column(Integer, primary_key=True)
name = Column(Text)
create_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
update_at = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"))
status = Column(Integer)
\ No newline at end of file
......@@ -16,9 +16,10 @@ sqlacodegen postgres://develop:123456@112.51.249.247:5432/am_test?charset=utf8 >
sqlacodegen postgres://develop:123456@112.51.249.247:5432/crm2_unified_test > zhs_dal_crm.py
## ECS
sqlacodegen mysql://zhsRoot:ZhsBookman@2021@36.137.119.36:5672/zhs_ecs_db?charset=utf8 > zhs_dal_ecs.py
sqlacodegen mysql://zhsRoot:ZhsBookman%402021@36.212.131.92:8672/zhs_ecs_db_test?charset=utf8 > zhs_dal_ecs_test.py
# 客户端使用
sqlacodegen sqlite:///xzbot_client.db --outfile=zhs_dal_xzbot.py
## pgsql
......
[{"机器码":"VM","通讯软件类型":"qywx","向日葵":452953059,"长期验证码":"dv4487","备注":"小E 福清"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":826875855,"长期验证码":"78exoy","备注":"小F 泉州\/石狮"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":412310538,"长期验证码":"cx3ha1","备注":"小G 漳州"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":1148192350,"长期验证码":"787hr6","备注":"小C 福州"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":1407541418,"长期验证码":"l391v2","备注":"小D 中控"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":1366325110,"长期验证码":"70q606","备注":"小H 三明"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":1393496433,"长期验证码":"70q606","备注":"满钇漳州"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":1538700248,"长期验证码":"70q606","备注":"满钇泉州"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":1186005787,"长期验证码":"70q606","备注":"满钇三明"},{"机器码":"VM","通讯软件类型":"wx","向日葵":1409646745,"长期验证码":"l391v2","备注":"连江快点办"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":1478002873,"长期验证码":"70q606","备注":"华杰01"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":1903989249,"长期验证码":"70q606","备注":"华杰02"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":1502678884,"长期验证码":"70q606","备注":"华杰03"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":1296993460,"长期验证码":"70q606","备注":"顺智01"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":1874792478,"长期验证码":"70q606","备注":"顺智02"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":434361822,"长期验证码":"2d4ya7","备注":"永安众事达"},{"机器码":"VM","通讯软件类型":"qywx","向日葵":1394646693,"长期验证码":"bg79j3","备注":"满钇石狮"}]
\ No newline at end of file
......@@ -6,7 +6,7 @@ pyinstaller -F --noconsole .\task_scheduler.py
#coding=utf-8
# import pymysql
import os
# import sys
import sys
import traceback
import time as tm
# from zhs_mysql_common import *
......
lock_file: "c:\\task_scheduler_lock.pid"
tasks:
# 每日客户属性查询计划
task_cpqp_plan:
title: "TaskCpqpGen"
time_begin: "00:00:00"
time_end: "01:59:59"
cmd: "D:\\workspace\\zhs\\行事历任务定时执行\\TaskConsole\\ZHS.Task.Console.exe task_cpqp_gen"
# 晓智保活
task_start_xzbot:
title: "TaskXZBotKeepAlive"
time_begin: "07:00:00"
time_end: "19:59:59"
cmd: "C:\\qywx\\xzbot\\daemon__tool.exe xzbot_start"
at_once: True
interp: 10
interp: 120
# 凌晨客户属性查询计划执行
task_cpqp_exec_morning:
title: "TaskCpqpExecMorning"
time_begin: "02:00:00"
time_end: "07:59:59"
cmd: "D:\\workspace\\zhs\\行事历任务定时执行\\TaskConsole\\ZHS.Task.Console.exe task_cpqp_exec"
# 每日重启晓智
task_restart_xzbot:
title: "TaskXZBotRestartEveryDay"
time_begin: "07:00:00"
time_end: "07:30:00"
cmd: "C:\\qywx\\xzbot\\daemon__tool.exe xzbot_restart"
at_once: True
interp: 15
# 午间客户属性查询计划执行
task_cpqp_exec_noon:
title: "TaskCpqpExecNoon"
time_begin: "12:00:00"
time_end: "12:59:59"
cmd: "D:\\workspace\\zhs\\行事历任务定时执行\\TaskConsole\\ZHS.Task.Console.exe task_cpqp_exec"
at_once: True
interp: 15
# 夜间客户属性查询计划执行
task_cpqp_exec_night:
title: "TaskCpqpExecNight"
time_begin: "19:00:00"
time_end: "23:59:59"
cmd: "D:\\workspace\\zhs\\行事历任务定时执行\\TaskConsole\\ZHS.Task.Console.exe task_cpqp_exec"
at_once: True
interp: 15
# 每日日常任务查询计划生成
task_routine_plan_gen:
title: "TaskRoutinePlanGen"
time_begin: "00:00:00"
time_end: "3:59:59"
cmd: "D:\\workspace\\zhs\\行事历任务定时执行\\TaskConsole\\ZHS.Task.Console.exe task_routine_plan_gen"
at_once: True
interp: 10
# 每日日常任务查询计划执行
task_routine_plan_exec:
title: "TaskRoutinePlanExec"
time_begin: "04:00:00"
time_end: "23:59:59"
cmd: "D:\\workspace\\zhs\\行事历任务定时执行\\TaskConsole\\ZHS.Task.Console.exe task_routine_plan_exec"
at_once: True
interp: 30
# 每日日常任务生成
task_mutation:
title: "TaskMutation"
time_begin: "00:00:00"
time_end: "23:59:59"
cmd: "D:\\workspace\\zhs\\行事历任务定时执行\\TaskConsole\\ZHS.Task.Console.exe task_mutation"
at_once: True
interp: 60
\ No newline at end of file
interp: 3600
......@@ -3,13 +3,11 @@ import sys
sys.path.append("..")
import os
import psutil
def close_program(p_name):
os.system('%s%s' % ("taskkill /F /IM ", p_name))
import psutil
def print_sys_usage():
# 打印本机的内存信息
......@@ -20,3 +18,32 @@ def print_sys_usage():
#本机cpu的总占用率
print ('cpu使用率: \t'+(str)(psutil.cpu_percent(0))+'%')
def close_program(p_name):
os.system('%s%s' % ("taskkill /F /IM ", p_name))
def make_pid_file(f_n):
return os.path.split(f_n)[0] + "/" + os.path.split(f_n)[-1].split(".py")[0] + ".pid"
def check_singleton(pid_file):
print(pid_file)
if os.path.exists(pid_file):
pid = ""
with open(pid_file, 'r') as fp:
pid = int(fp.readlines()[0])
if psutil.pid_exists(pid):
print(pid_file, "进程在运行中")
return False
else:
# 可能异常退出
os.remove(pid_file)
with open(pid_file, 'w') as fp:
fp.write(str(os.getpid()))
return True
def release_singleton(pid_file):
os.remove(pid_file)
import asyncio
import websockets
import simplejson as json
from dataclasses import dataclass
import uuid
@dataclass
class Message:
type: str
content: str
def to_json(self):
return json.dumps(self.__dict__)
async def send_message():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
# Send authentication message
auth_message = json.dumps({"username": "aaa", "password": "456"})
await websocket.send(auth_message)
auth_response = await websocket.recv()
auth_data = json.loads(auth_response)
if auth_data["status"] == "success":
print("Authentication successful")
greeting_message = Message(type="greeting", content="Server")
await websocket.send(greeting_message.to_json())
response = await websocket.recv()
print(f"Received from server: {response}")
echo_message = Message(type="echo", content="lalala")
await websocket.send(echo_message.to_json())
response = await websocket.recv()
print(f"Received from server: {response}")
farewell_message = Message(type="farewell", content="Server")
await websocket.send(farewell_message.to_json())
response = await websocket.recv()
print(f"Received from server: {response}")
else:
print(f"Authentication failed: {auth_data['reason']}")
asyncio.get_event_loop().run_until_complete(send_message())
import asyncio
import websockets
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import simplejson as json
import uuid
@dataclass
class Message:
type: str
content: str
def to_json(self):
return json.dumps(self.__dict__)
@staticmethod
def from_json(json_str):
data = json.loads(json_str)
return Message(**data)
connected_clients = {}
# Dummy user database for authentication
user_db = {
"abc": "123",
"aaa": "456"
}
async def handle_message(websocket, message, session):
if message.type == "greeting":
response = f'Hello, {session["user_name"]} From Server.'
elif message.type == "farewell":
response = f'Goodbye, {session["user_name"]} From Server.'
elif message.type == "echo":
response = f'{session["user_name"]} says: {message.content}'
else:
response = "Unknown message type"
await websocket.send(response)
async def authenticate(websocket, session):
try:
auth_message = await websocket.recv()
auth_data = json.loads(auth_message)
username = auth_data.get("username")
password = auth_data.get("password")
session["user_name"] = username
if user_db.get(username) == password:
await websocket.send(json.dumps({"status": "success", "uuid": session["client_id"]}))
return True
else:
await websocket.send(json.dumps({"status": "failure", "reason": "Invalid credentials"}))
return False
except Exception as e:
await websocket.send(json.dumps({"status": "failure", "reason": str(e)}))
return False
async def handler(websocket):
# Assign a unique client ID
client_id = str(uuid.uuid4())
print(f"Client {client_id} connected")
tmp_session = {"client_id": client_id, "websocket": websocket,
"last_active": datetime.now(), "auth_time": datetime.now()}
if not await authenticate(websocket, tmp_session):
await websocket.close()
print(f"Client {client_id} failed authentication and disconnected")
return
connected_clients[client_id] = tmp_session
print(f"Client {client_id} registered")
try:
while True:
try:
# 15 minutes timeout
message = await asyncio.wait_for(websocket.recv(), timeout=900)
connected_clients[client_id]["last_active"] = datetime.now()
message_obj = Message.from_json(message)
await handle_message(websocket, message_obj, connected_clients[client_id])
except asyncio.TimeoutError:
print(
f"Client {client_id} did not send any message in 15 minutes, disconnecting")
await websocket.close()
break
now = datetime.now()
if (now - connected_clients[client_id]["auth_time"]) > timedelta(hours=1):
print(
f"Client {client_id} authentication expired, requiring re-authentication")
await websocket.send(json.dumps({"status": "expired"}))
if not await authenticate(websocket, connected_clients[client_id]):
await websocket.close()
break
connected_clients[client_id]["auth_time"] = now
except websockets.ConnectionClosed:
print(f"Client {client_id} disconnected")
finally:
# Remove the client from the dictionary when they disconnect
del connected_clients[client_id]
print(f"Client {client_id} removed from connected clients")
async def main():
async def register(websocket):
client_id = str(uuid.uuid4())
if client_id in connected_clients:
# Disconnect the old connection
old_websocket = connected_clients[client_id]["websocket"]
await old_websocket.close()
print(f"Old connection for client {client_id} closed")
connected_clients[client_id] = {
"websocket": websocket, "last_active": datetime.now(), "auth_time": datetime.now()}
print(f"Client {client_id} registered")
try:
await handler(websocket)
finally:
del connected_clients[client_id]
print(f"Client {client_id} unregistered")
start_server = websockets.serve(register, "localhost", 8765)
await start_server
await asyncio.Future() # run forever
asyncio.run(main())
......@@ -8,18 +8,25 @@ SELECT * FROM ecs_qywx_msg
-- SELECT * FROM ecs_qywx_msg where create_at > '2024-12-14' AND STATUS = 3 AND customer_name = '福州隆民物业管理有限公司'
-- SELECT * FROM ecs_qywx_msg where create_at > '2024-12-14' AND creator_name = '王美琴'
-- UPDATE ecs_qywx_msg SET STATUS = 0, TYPE = 1
where channel_id = 409106743313351 and create_at >= '2025-01-10' AND STATUS > 1
-- UPDATE ecs_qywx_msg SET STATUS = 0, TYPE = 1
WHERE
channel_id = 409106743313351 and create_at >= '2025-01-14 12' AND STATUS > 1
SELECT COUNT(*) FROM ecs_qywx_msg
-- SELECT * FROM ecs_qywx_msg
where
1=1
-- and TYPE = 1
-- and TYPE = 0
and create_at >= '2025-01-02'
-- AND STATUS = 0
-- and STATUS = 3
AND customer_name <> 'check' AND customer_name <> 'warn'
AND creator_name = ' 黄琰琰'
-- AND creator_name LIKE '%晓清%'
-- AND creator_name = ' 黄琰琰'
-- AND creator_name LIKE '%晓清%'
-- AND target LIKE '%翔兴%'
-- AND creator_name LIKE '%李艳群%'
\ No newline at end of file
AND creator_name LIKE '%艳%'
\ No newline at end of file
......@@ -45,13 +45,11 @@ def get_cfg(key, defVal=None):
return cfg[key]
return defVal
def safe_property(obj, key, defVal=None):
if key in obj:
return obj[key]
return defVal
def get_tmp_dir():
return ".\\Tmp\\"
......@@ -66,7 +64,7 @@ def init_cfg(fn):
cfg = read_yaml(fn)
if cfg is None:
log("配置文件丢失")
exit(-1)
sys.exit(-1)
set_g_value("cfg", cfg)
......
......@@ -207,6 +207,7 @@ class EcsServant(Base):
update_at = Column(DateTime)
updater_id = Column(VARCHAR(50))
updater_name = Column(VARCHAR(200))
class ZhsMsg(Base):
__tablename__ = 'zhs_msg'
__table_args__ = {'comment': '站内消息'}
......
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment