news 2026/5/26 7:15:27

社区公告推送小程序,输入公告信息,自动推送给小区用户,支持点赞和评论,解决社区公告传达不及时的问题。

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
社区公告推送小程序,输入公告信息,自动推送给小区用户,支持点赞和评论,解决社区公告传达不及时的问题。

我将为您创建一个完整的社区公告推送小程序。以下是完整的项目结构:

项目结构

community_announcement_system/

├── main.py # 主程序入口

├── models.py # 数据模型模块

├── announcement_manager.py # 公告管理核心模块

├── user_manager.py # 用户管理模块

├── notification_service.py # 推送通知服务模块

├── database.py # 数据库操作模块

├── utils.py # 工具函数模块

├── config.py # 配置文件

├── templates.py # HTML模板模块

├── static/

│ ├── style.css # 样式文件

│ └── script.js # JavaScript文件

├── data/

│ └── community_data.json # 数据存储文件

├── README.md # 使用说明

├── requirements.txt # 依赖包列表

└── demo.py # 演示程序

1. requirements.txt

flask==2.3.3

flask-sqlalchemy==3.0.5

flask-login==0.6.3

werkzeug==2.3.7

jinja2==3.1.2

wtforms==3.0.1

email-validator==2.0.0

python-dateutil==2.8.2

sqlite3

2. config.py

"""

配置文件 - 存储应用程序的配置参数

基于创新创业理论:标准化配置提升系统稳定性

"""

import os

from datetime import timedelta

class Config:

# 基础配置

SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'

# 数据库配置

DATABASE_URL = os.environ.get('DATABASE_URL') or 'sqlite:///community.db'

# 应用配置

APP_NAME = "智慧社区公告系统"

APP_DESCRIPTION = "高效、便捷的社区信息推送平台"

# 推送配置

NOTIFICATION_ENABLED = True

PUSH_DELAY_SECONDS = 2 # 推送延迟时间

# 分页配置

ANNOUNCEMENTS_PER_PAGE = 10

COMMENTS_PER_PAGE = 5

# 权限配置

MAX_TITLE_LENGTH = 100

MAX_CONTENT_LENGTH = 2000

MAX_COMMENT_LENGTH = 500

# 时间配置

ANNOUNCEMENT_EXPIRY_DAYS = 30 # 公告过期天数

SESSION_TIMEOUT = timedelta(hours=24)

# 用户角色

ROLES = {

'admin': '管理员',

'resident': '居民',

'committee': '业委会成员'

}

# 公告类型

ANNOUNCEMENT_TYPES = {

'notice': '通知公告',

'activity': '活动通知',

'emergency': '紧急通知',

'service': '物业服务',

'other': '其他'

}

# 优先级等级

PRIORITY_LEVELS = {

'low': '普通',

'medium': '重要',

'high': '紧急',

'urgent': '特急'

}

class DevelopmentConfig(Config):

DEBUG = True

TESTING = True

class ProductionConfig(Config):

DEBUG = False

TESTING = False

# 配置映射

config = {

'development': DevelopmentConfig,

'production': ProductionConfig,

'default': DevelopmentConfig

}

3. models.py

"""

数据模型模块 - 定义系统的核心数据结构和关系

基于创新创业理论:数据模型是产品的骨架

"""

from flask_sqlalchemy import SQLAlchemy

from flask_login import UserMixin

from werkzeug.security import generate_password_hash, check_password_hash

from datetime import datetime

import json

db = SQLAlchemy()

class User(UserMixin, db.Model):

"""用户模型 - 存储社区居民信息"""

__tablename__ = 'users'

id = db.Column(db.Integer, primary_key=True)

username = db.Column(db.String(80), unique=True, nullable=False, comment='用户名')

email = db.Column(db.String(120), unique=True, nullable=False, comment='邮箱')

password_hash = db.Column(db.String(128), nullable=False, comment='密码哈希')

real_name = db.Column(db.String(50), nullable=False, comment='真实姓名')

phone = db.Column(db.String(20), comment='联系电话')

room_number = db.Column(db.String(20), comment='房间号')

role = db.Column(db.String(20), default='resident', comment='用户角色')

# 关联关系

announcements = db.relationship('Announcement', backref='author', lazy=True)

comments = db.relationship('Comment', backref='user', lazy=True)

likes = db.relationship('Like', backref='user', lazy=True)

# 时间戳

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

def set_password(self, password):

"""设置密码"""

self.password_hash = generate_password_hash(password)

def check_password(self, password):

"""验证密码"""

return check_password_hash(self.password_hash, password)

def get_avatar_url(self):

"""获取头像URL"""

return f"/static/avatars/{self.role}.png"

def to_dict(self, include_private=False):

"""转换为字典格式"""

data = {

'id': self.id,

'username': self.username,

'real_name': self.real_name,

'role': self.role,

'room_number': self.room_number,

'created_at': self.created_at.isoformat() if self.created_at else None

}

if include_private:

data.update({

'email': self.email,

'phone': self.phone

})

return data

def __repr__(self):

return f'<User {self.username}>'

class Announcement(db.Model):

"""公告模型 - 存储社区公告信息"""

__tablename__ = 'announcements'

id = db.Column(db.Integer, primary_key=True)

title = db.Column(db.String(200), nullable=False, comment='标题')

content = db.Column(db.Text, nullable=False, comment='内容')

type = db.Column(db.String(20), default='notice', comment='类型')

priority = db.Column(db.String(20), default='medium', comment='优先级')

# 作者信息

author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

# 推送状态

is_published = db.Column(db.Boolean, default=False, comment='是否已发布')

is_pinned = db.Column(db.Boolean, default=False, comment='是否置顶')

push_status = db.Column(db.String(20), default='draft', comment='推送状态')

# 统计数据

view_count = db.Column(db.Integer, default=0, comment='浏览次数')

like_count = db.Column(db.Integer, default=0, comment='点赞数')

comment_count = db.Column(db.Integer, default=0, comment='评论数')

# 时间信息

publish_time = db.Column(db.DateTime, comment='发布时间')

expiry_date = db.Column(db.DateTime, comment='过期时间')

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

# 关联关系

comments = db.relationship('Comment', backref='announcement', lazy=True, cascade='all, delete-orphan')

likes = db.relationship('Like', backref='announcement', lazy=True, cascade='all, delete-orphan')

def get_type_display(self):

"""获取类型显示名称"""

from config import Config

return Config.ANNOUNCEMENT_TYPES.get(self.type, '未知')

def get_priority_display(self):

"""获取优先级显示名称"""

from config import Config

return Config.PRIORITY_LEVELS.get(self.priority, '普通')

def get_author_name(self):

"""获取作者姓名"""

return self.author.real_name if self.author else '未知'

def is_expired(self):

"""判断是否过期"""

if not self.expiry_date:

return False

return datetime.utcnow() > self.expiry_date

def increment_view_count(self):

"""增加浏览次数"""

self.view_count += 1

db.session.commit()

def to_dict(self, include_content=True):

"""转换为字典格式"""

data = {

'id': self.id,

'title': self.title,

'type': self.type,

'type_display': self.get_type_display(),

'priority': self.priority,

'priority_display': self.get_priority_display(),

'author_name': self.get_author_name(),

'is_published': self.is_published,

'is_pinned': self.is_pinned,

'push_status': self.push_status,

'view_count': self.view_count,

'like_count': self.like_count,

'comment_count': self.comment_count,

'publish_time': self.publish_time.isoformat() if self.publish_time else None,

'expiry_date': self.expiry_date.isoformat() if self.expiry_date else None,

'created_at': self.created_at.isoformat() if self.created_at else None

}

if include_content:

data['content'] = self.content

return data

def __repr__(self):

return f'<Announcement {self.title}>'

class Comment(db.Model):

"""评论模型 - 存储用户对公告的评论"""

__tablename__ = 'comments'

id = db.Column(db.Integer, primary_key=True)

content = db.Column(db.Text, nullable=False, comment='评论内容')

is_reply = db.Column(db.Boolean, default=False, comment='是否为回复')

parent_id = db.Column(db.Integer, db.ForeignKey('comments.id'), comment='父评论ID')

# 关联信息

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

# 时间信息

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')

# 自关联关系

replies = db.relationship('Comment', backref=db.backref('parent', remote_side=[id]), lazy=True)

def get_user_name(self):

"""获取用户姓名"""

return self.user.real_name if self.user else '匿名用户'

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'content': self.content,

'is_reply': self.is_reply,

'parent_id': self.parent_id,

'user_name': self.get_user_name(),

'user_id': self.user_id,

'announcement_id': self.announcement_id,

'created_at': self.created_at.isoformat() if self.created_at else None,

'replies': [reply.to_dict() for reply in self.replies] if self.replies else []

}

def __repr__(self):

return f'<Comment {self.content[:20]}>'

class Like(db.Model):

"""点赞模型 - 存储用户对公告的点赞记录"""

__tablename__ = 'likes'

id = db.Column(db.Integer, primary_key=True)

user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

# 时间信息

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

# 唯一约束

__table_args__ = (db.UniqueConstraint('user_id', 'announcement_id', name='unique_user_announcement_like'),)

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'user_id': self.user_id,

'announcement_id': self.announcement_id,

'created_at': self.created_at.isoformat() if self.created_at else None

}

def __repr__(self):

return f'<Like user:{self.user_id} announcement:{self.announcement_id}>'

class PushRecord(db.Model):

"""推送记录模型 - 记录公告推送历史"""

__tablename__ = 'push_records'

id = db.Column(db.Integer, primary_key=True)

announcement_id = db.Column(db.Integer, db.ForeignKey('announcements.id'), nullable=False)

recipient_count = db.Column(db.Integer, default=0, comment='接收人数')

success_count = db.Column(db.Integer, default=0, comment='成功推送数')

failure_count = db.Column(db.Integer, default=0, comment='推送失败数')

# 时间信息

push_time = db.Column(db.DateTime, default=datetime.utcnow, comment='推送时间')

created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')

# 关联关系

announcement = db.relationship('Announcement', backref='push_records')

def to_dict(self):

"""转换为字典格式"""

return {

'id': self.id,

'announcement_id': self.announcement_id,

'recipient_count': self.recipient_count,

'success_count': self.success_count,

'failure_count': self.failure_count,

'push_time': self.push_time.isoformat() if self.push_time else None,

'created_at': self.created_at.isoformat() if self.created_at else None

}

def __repr__(self):

return f'<PushRecord announcement:{self.announcement_id}>'

4. database.py

"""

数据库操作模块 - 封装所有数据库相关的操作

基于创新创业理论:数据层抽象提升系统可维护性

"""

from sqlalchemy import create_engine, desc, asc, func, and_, or_

from sqlalchemy.orm import sessionmaker, scoped_session

from sqlalchemy.exc import SQLAlchemyError

import logging

from datetime import datetime, timedelta

import json

from models import db, User, Announcement, Comment, Like, PushRecord

from config import Config

class DatabaseManager:

"""数据库管理器 - 统一管理所有数据库操作"""

def __init__(self, app=None):

self.app = app

self.engine = None

self.Session = None

if app:

self.init_app(app)

def init_app(self, app):

"""初始化数据库连接"""

self.app = app

self.engine = create_engine(

app.config['SQLALCHEMY_DATABASE_URI'],

echo=app.config.get('SQLALCHEMY_ECHO', False)

)

self.Session = scoped_session(sessionmaker(bind=self.engine))

db.init_app(app)

def get_session(self):

"""获取数据库会话"""

return self.Session()

def close_session(self, session):

"""关闭数据库会话"""

if session:

session.close()

# 用户相关操作

def create_user(self, user_data):

"""创建新用户"""

session = self.get_session()

try:

# 检查用户名和邮箱是否已存在

existing_user = session.query(User).filter(

or_(

User.username == user_data['username'],

User.email == user_data['email']

)

).first()

if existing_user:

return None, "用户名或邮箱已存在"

user = User(

username=user_data['username'],

email=user_data['email'],

real_name=user_data['real_name'],

phone=user_data.get('phone', ''),

room_number=user_data.get('room_number', ''),

role=user_data.get('role', 'resident')

)

user.set_password(user_data['password'])

session.add(user)

session.commit()

return user, "用户创建成功"

except SQLAlchemyError as e:

session.rollback()

logging.error(f"创建用户失败: {e}")

return None, f"数据库错误: {str(e)}"

finally:

self.close_session(session)

def get_user_by_username(self, username):

"""根据用户名获取用户"""

session = self.get_session()

try:

return session.query(User).filter(User.username == username).first()

except SQLAlchemyError as e:

logging.error(f"查询用户失败: {e}")

return None

finally:

self.close_session(session)

def get_user_by_id(self, user_id):

"""根据用户ID获取用户"""

session = self.get_session()

try:

return session.query(User).filter(User.id == user_id).first()

except SQLAlchemyError as e:

logging.error(f"查询用户失败: {e}")

return None

finally:

self.close_session(session)

def get_users_paginated(self, page=1, per_page=20, filters=None):

"""分页获取用户列表"""

session = self.get_session()

try:

query = session.query(User)

# 应用过滤条件

if filters:

if filters.get('role'):

query = query.filter(User.role == filters['role'])

if filters.get('keyword'):

keyword = f"%{filters['keyword']}%"

query = query.filter(

or_(

User.username.like(keyword),

User.real_name.like(keyword),

User.room_number.like(keyword)

)

)

# 排序和分页

users = query.order_by(desc(User.created_at)).paginate(

page=page, per_page=per_page, error_out=False

)

return users

except SQLAlchemyError as e:

logging.error(f"获取用户列表失败: {e}")

return None

finally:

self.close_session(session)

# 公告相关操作

def create_announcement(self, announcement_data, author_id):

"""创建新公告"""

session = self.get_session()

try:

# 计算过期时间

expiry_days = announcement_data.get('expiry_days', 30)

expiry_date = datetime.utcnow() + timedelta(days=expiry_days)

announcement = Announcement(

title=announcement_data['title'],

content=announcement_data['content'],

type=announcement_data.get('type', 'notice'),

priority=announcement_data.get('priority', 'medium'),

author_id=author_id,

is_pinned=announcement_data.get('is_pinned', False),

expiry_date=expiry_date

)

session.add(announcement)

session.commit()

return announcement, "公告创建成功"

except SQLAlchemyError as e:

session.rollback()

logging.error(f"创建公告失败: {e}")

return None, f"数据库错误: {str(e)}"

finally:

self.close_session(session)

def get_announcement_by_id(self, announcement_id):

"""根据ID获取公告"""

session = self.get_session()

try:

return session.query(Announcement).filter(

Announcement.id == announcement_id

).first()

except SQLAlchemyError as e:

logging.error(f"查询公告失败: {e}")

return None

finally:

self.close_session(session)

def get_announcements_paginated(self, page=1, per_page=10, filters=None, user_id=None):

"""分页获取公告列表"""

session = self.get_session()

try:

query = session.query(Announcement).filter(

Announcement.is_published == True

)

# 排除过期的公告

query = query.filter(

or_(

Announcement.expiry_date.is_(None),

Announcement.expiry_date > datetime.utcnow()

)

)

# 应用过滤条件

if filters:

if filters.get('type'):

query = query.filter(Announcement.type == filters['type'])

if filters.get('priority'):

query = query.filter(Announcement.priority == filters['priority'])

if filters.get('author_id'):

query = query.filter(Announcement.author_id == filters['author_id'])

if filters.get('keyword'):

keyword = f"%{filters['keyword']}%"

query = query.filter(

or_(

Announcement.title.like(keyword),

Announcement.content.like(keyword)

)

)

# 排序:置顶的在前,然后按发布时间倒序

query = query.order_by(

desc(Announcement.is_pinned),

desc(Announcement.publish_time)

)

announcements = query.paginate(

page=page, per_page=per_page, error_out=False

)

# 如果用户已登录,标记已读状态

if user_id:

for announcement in announcements.items:

# 这里可以添加已读状态的逻辑

pass

return announcements

except SQLAlchemyError as e:

logging.error(f"获取公告列表失败: {e}")

return None

finally:

self.close_session(session)

def publish_announcement(self, announcement_id):

"""发布公告"""

session = self.get_session()

try:

announcement = session.query(Announcement).filter(

Announcement.id == announcement_id

).first()

if not announcement:

return None, "公告不存在"

announcement.is_published = True

announcement.publish_time = datetime.utcnow()

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/23 8:19:31

各省市GDP最低城市,其中有你的城市吗?

一张地图&#xff0c;直观揭示了我国区域经济的另一个剖面&#xff1a;在省域内部&#xff0c;发展同样存在不均衡。这些“省内经济洼地”的规模与成因各异&#xff0c;共同构成了中国经济的基底图谱。 数据显示&#xff0c;各省最低城市的经济体量差距巨大。 江苏的“末位”…

作者头像 李华
网站建设 2026/5/3 20:36:38

Paperzz 毕业论文:把 “毕业劫” 拆成 “四步走” 的无痛毕业指南

Paperzz-AI官网免费论文查重复率AIGC检测/开题报告/文献综述/论文初稿 paperzz - 毕业论文-AIGC论文检测-AI智能降重-ai智能写作https://www.paperzz.cc/dissertation “开题被打回 3 次&#xff0c;文献格式改到眼瞎&#xff0c;数据跑不出结果只能熬夜凑字数”—— 这届毕业…

作者头像 李华
网站建设 2026/5/24 3:46:26

自考必备!8个AI论文工具助你轻松搞定格式规范!

自考必备&#xff01;8个AI论文工具助你轻松搞定格式规范&#xff01; 自考路上的得力助手&#xff1a;AI 工具如何助你轻松应对论文挑战 在自考过程中&#xff0c;论文写作往往是许多考生面临的最大难题之一。无论是格式规范、内容逻辑&#xff0c;还是语言表达&#xff0c;都…

作者头像 李华
网站建设 2026/5/23 15:27:52

学云计算需要用什么配置的电脑?

学习云计算时&#xff0c;电脑是我们必不可少的工具之一&#xff0c;但并非所有的电脑都可以满足专业学习和实践操作的需要&#xff0c;所以选择电脑也需要多方面考虑&#xff0c;那么学云计算需要用什么配置的电脑?请看下文。1、处理器云计算涉及虚拟机的运行、编程开发和数据…

作者头像 李华
网站建设 2026/5/9 13:03:35

基于spring boot的民宿预约管理系统的设计与实现

三、系统分析 进行民宿预约管理系统的开发&#xff0c;首先需要进行系统需求分析。对用户需求进行调研&#xff0c;接着设计系统的体系构造和数据库表构造&#xff0c;确定使用的开发工具和后台数据库。 系统分析的重点是对用户和系统的需求进行相关分析&#xff0c;包括对系统…

作者头像 李华
网站建设 2026/5/22 2:30:54

python基于Vue的四川火锅餐饮地方美食文化点评网站的设计与实现_django Flask pycharm项目

目录已开发项目效果实现截图关于博主开发技术路线相关技术介绍核心代码参考示例结论源码lw获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;已开发项目效果实现截图 同行可拿货,招校园代理 ,本人源头供货商 python基于Vue的四川火锅餐饮地方美食…

作者头像 李华