fix(douyu): compact long audience trends adaptively
This commit is contained in:
@@ -889,25 +889,43 @@ class DouyuPlugin(MessagePluginInterface):
|
|||||||
self._danmu_recorders[room_id] = recorder
|
self._danmu_recorders[room_id] = recorder
|
||||||
return recorder
|
return recorder
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def _normalize_audience_points(points: List[Dict[str, Any]], limit: int = 720) -> List[Dict[str, Any]]:
|
def _normalize_audience_points(cls, points: List[Dict[str, Any]], limit: int = 720) -> List[Dict[str, Any]]:
|
||||||
normalized_map: Dict[str, Dict[str, Any]] = {}
|
minute_map: Dict[str, Dict[str, Any]] = {}
|
||||||
for item in points or []:
|
for item in points or []:
|
||||||
timestamp = str(item.get("timestamp") or "").strip()
|
timestamp = str(item.get("timestamp") or "").strip()
|
||||||
if not timestamp:
|
point_dt = cls._parse_session_time(timestamp)
|
||||||
|
if not point_dt:
|
||||||
continue
|
continue
|
||||||
minute_key = timestamp[:16]
|
minute_key = point_dt.strftime("%Y-%m-%d %H:%M")
|
||||||
normalized_map[minute_key] = {
|
minute_map[minute_key] = {
|
||||||
"timestamp": timestamp,
|
"timestamp": timestamp,
|
||||||
"vip_count": int(item.get("vip_count", 0) or 0),
|
"vip_count": int(item.get("vip_count", 0) or 0),
|
||||||
"diamond_count": int(item.get("diamond_count", 0) or 0),
|
"diamond_count": int(item.get("diamond_count", 0) or 0),
|
||||||
}
|
}
|
||||||
normalized = list(normalized_map.values())
|
|
||||||
|
normalized = list(minute_map.values())
|
||||||
normalized.sort(key=lambda row: row.get("timestamp", ""))
|
normalized.sort(key=lambda row: row.get("timestamp", ""))
|
||||||
if len(normalized) > limit:
|
if len(normalized) <= limit:
|
||||||
normalized = normalized[-limit:]
|
|
||||||
return normalized
|
return normalized
|
||||||
|
|
||||||
|
bucket_size_minutes = max((len(normalized) + limit - 1) // limit, 1)
|
||||||
|
bucket_map: Dict[str, Dict[str, Any]] = {}
|
||||||
|
for item in normalized:
|
||||||
|
point_dt = cls._parse_session_time(str(item.get("timestamp") or ""))
|
||||||
|
if not point_dt:
|
||||||
|
continue
|
||||||
|
total_minutes = int(point_dt.timestamp() // 60)
|
||||||
|
bucket_start_minutes = total_minutes - (total_minutes % bucket_size_minutes)
|
||||||
|
bucket_key = str(bucket_start_minutes)
|
||||||
|
bucket_map[bucket_key] = item
|
||||||
|
|
||||||
|
compressed = list(bucket_map.values())
|
||||||
|
compressed.sort(key=lambda row: row.get("timestamp", ""))
|
||||||
|
if len(compressed) > limit:
|
||||||
|
compressed = compressed[-limit:]
|
||||||
|
return compressed
|
||||||
|
|
||||||
def _record_room_audience_point(self, room_id: str, point: Dict[str, Any]) -> None:
|
def _record_room_audience_point(self, room_id: str, point: Dict[str, Any]) -> None:
|
||||||
if not self.redis_manager or not room_id:
|
if not self.redis_manager or not room_id:
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user