| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676 |
- # coding=utf-8
- import json
- import os
- import random
- import re
- import time
- import webbrowser
- import smtplib
- from email.mime.text import MIMEText
- from email.mime.multipart import MIMEMultipart
- from email.header import Header
- from email.utils import formataddr, formatdate, make_msgid
- from datetime import datetime
- from pathlib import Path
- from typing import Dict, List, Tuple, Optional, Union
- import pytz
- import requests
- import yaml
- VERSION = "3.2.0"
- # === SMTP邮件配置 ===
- SMTP_CONFIGS = {
- # Gmail(使用 STARTTLS)
- "gmail.com": {"server": "smtp.gmail.com", "port": 587, "encryption": "TLS"},
- # QQ邮箱(使用 SSL,更稳定)
- "qq.com": {"server": "smtp.qq.com", "port": 465, "encryption": "SSL"},
- # Outlook(使用 STARTTLS)
- "outlook.com": {
- "server": "smtp-mail.outlook.com",
- "port": 587,
- "encryption": "TLS",
- },
- "hotmail.com": {
- "server": "smtp-mail.outlook.com",
- "port": 587,
- "encryption": "TLS",
- },
- "live.com": {"server": "smtp-mail.outlook.com", "port": 587, "encryption": "TLS"},
- # 网易邮箱(使用 SSL,更稳定)
- "163.com": {"server": "smtp.163.com", "port": 465, "encryption": "SSL"},
- "126.com": {"server": "smtp.126.com", "port": 465, "encryption": "SSL"},
- # 新浪邮箱(使用 SSL)
- "sina.com": {"server": "smtp.sina.com", "port": 465, "encryption": "SSL"},
- # 搜狐邮箱(使用 SSL)
- "sohu.com": {"server": "smtp.sohu.com", "port": 465, "encryption": "SSL"},
- # 天翼邮箱(使用 SSL)
- "189.cn": {"server": "smtp.189.cn", "port": 465, "encryption": "SSL"},
- }
- # === 配置管理 ===
- def load_config():
- """加载配置文件"""
- config_path = os.environ.get("CONFIG_PATH", "config/config.yaml")
- if not Path(config_path).exists():
- raise FileNotFoundError(f"配置文件 {config_path} 不存在")
- with open(config_path, "r", encoding="utf-8") as f:
- config_data = yaml.safe_load(f)
- print(f"配置文件加载成功: {config_path}")
- # 构建配置
- config = {
- "VERSION_CHECK_URL": config_data["app"]["version_check_url"],
- "SHOW_VERSION_UPDATE": config_data["app"]["show_version_update"],
- "REQUEST_INTERVAL": config_data["crawler"]["request_interval"],
- "REPORT_MODE": os.environ.get("REPORT_MODE", "").strip()
- or config_data["report"]["mode"],
- "RANK_THRESHOLD": config_data["report"]["rank_threshold"],
- "SORT_BY_POSITION_FIRST": os.environ.get("SORT_BY_POSITION_FIRST", "").strip().lower()
- in ("true", "1")
- if os.environ.get("SORT_BY_POSITION_FIRST", "").strip()
- else config_data["report"].get("sort_by_position_first", False),
- "MAX_NEWS_PER_KEYWORD": int(
- os.environ.get("MAX_NEWS_PER_KEYWORD", "").strip() or "0"
- )
- or config_data["report"].get("max_news_per_keyword", 0),
- "USE_PROXY": config_data["crawler"]["use_proxy"],
- "DEFAULT_PROXY": config_data["crawler"]["default_proxy"],
- "ENABLE_CRAWLER": os.environ.get("ENABLE_CRAWLER", "").strip().lower()
- in ("true", "1")
- if os.environ.get("ENABLE_CRAWLER", "").strip()
- else config_data["crawler"]["enable_crawler"],
- "ENABLE_NOTIFICATION": os.environ.get("ENABLE_NOTIFICATION", "").strip().lower()
- in ("true", "1")
- if os.environ.get("ENABLE_NOTIFICATION", "").strip()
- else config_data["notification"]["enable_notification"],
- "MESSAGE_BATCH_SIZE": config_data["notification"]["message_batch_size"],
- "DINGTALK_BATCH_SIZE": config_data["notification"].get(
- "dingtalk_batch_size", 20000
- ),
- "FEISHU_BATCH_SIZE": config_data["notification"].get("feishu_batch_size", 29000),
- "BATCH_SEND_INTERVAL": config_data["notification"]["batch_send_interval"],
- "FEISHU_MESSAGE_SEPARATOR": config_data["notification"][
- "feishu_message_separator"
- ],
- "PUSH_WINDOW": {
- "ENABLED": os.environ.get("PUSH_WINDOW_ENABLED", "").strip().lower()
- in ("true", "1")
- if os.environ.get("PUSH_WINDOW_ENABLED", "").strip()
- else config_data["notification"]
- .get("push_window", {})
- .get("enabled", False),
- "TIME_RANGE": {
- "START": os.environ.get("PUSH_WINDOW_START", "").strip()
- or config_data["notification"]
- .get("push_window", {})
- .get("time_range", {})
- .get("start", "08:00"),
- "END": os.environ.get("PUSH_WINDOW_END", "").strip()
- or config_data["notification"]
- .get("push_window", {})
- .get("time_range", {})
- .get("end", "22:00"),
- },
- "ONCE_PER_DAY": os.environ.get("PUSH_WINDOW_ONCE_PER_DAY", "").strip().lower()
- in ("true", "1")
- if os.environ.get("PUSH_WINDOW_ONCE_PER_DAY", "").strip()
- else config_data["notification"]
- .get("push_window", {})
- .get("once_per_day", True),
- "RECORD_RETENTION_DAYS": int(
- os.environ.get("PUSH_WINDOW_RETENTION_DAYS", "").strip() or "0"
- )
- or config_data["notification"]
- .get("push_window", {})
- .get("push_record_retention_days", 7),
- },
- "WEIGHT_CONFIG": {
- "RANK_WEIGHT": config_data["weight"]["rank_weight"],
- "FREQUENCY_WEIGHT": config_data["weight"]["frequency_weight"],
- "HOTNESS_WEIGHT": config_data["weight"]["hotness_weight"],
- },
- "PLATFORMS": config_data["platforms"],
- }
- # 通知渠道配置(环境变量优先)
- notification = config_data.get("notification", {})
- webhooks = notification.get("webhooks", {})
- config["FEISHU_WEBHOOK_URL"] = os.environ.get(
- "FEISHU_WEBHOOK_URL", ""
- ).strip() or webhooks.get("feishu_url", "")
- config["DINGTALK_WEBHOOK_URL"] = os.environ.get(
- "DINGTALK_WEBHOOK_URL", ""
- ).strip() or webhooks.get("dingtalk_url", "")
- config["WEWORK_WEBHOOK_URL"] = os.environ.get(
- "WEWORK_WEBHOOK_URL", ""
- ).strip() or webhooks.get("wework_url", "")
- config["WEWORK_MSG_TYPE"] = os.environ.get(
- "WEWORK_MSG_TYPE", ""
- ).strip() or webhooks.get("wework_msg_type", "markdown")
- config["TELEGRAM_BOT_TOKEN"] = os.environ.get(
- "TELEGRAM_BOT_TOKEN", ""
- ).strip() or webhooks.get("telegram_bot_token", "")
- config["TELEGRAM_CHAT_ID"] = os.environ.get(
- "TELEGRAM_CHAT_ID", ""
- ).strip() or webhooks.get("telegram_chat_id", "")
- # 邮件配置
- config["EMAIL_FROM"] = os.environ.get("EMAIL_FROM", "").strip() or webhooks.get(
- "email_from", ""
- )
- config["EMAIL_PASSWORD"] = os.environ.get(
- "EMAIL_PASSWORD", ""
- ).strip() or webhooks.get("email_password", "")
- config["EMAIL_TO"] = os.environ.get("EMAIL_TO", "").strip() or webhooks.get(
- "email_to", ""
- )
- config["EMAIL_SMTP_SERVER"] = os.environ.get(
- "EMAIL_SMTP_SERVER", ""
- ).strip() or webhooks.get("email_smtp_server", "")
- config["EMAIL_SMTP_PORT"] = os.environ.get(
- "EMAIL_SMTP_PORT", ""
- ).strip() or webhooks.get("email_smtp_port", "")
- # ntfy配置
- config["NTFY_SERVER_URL"] = os.environ.get(
- "NTFY_SERVER_URL", "https://ntfy.sh"
- ).strip() or webhooks.get("ntfy_server_url", "https://ntfy.sh")
- config["NTFY_TOPIC"] = os.environ.get("NTFY_TOPIC", "").strip() or webhooks.get(
- "ntfy_topic", ""
- )
- config["NTFY_TOKEN"] = os.environ.get("NTFY_TOKEN", "").strip() or webhooks.get(
- "ntfy_token", ""
- )
- # 输出配置来源信息
- notification_sources = []
- if config["FEISHU_WEBHOOK_URL"]:
- source = "环境变量" if os.environ.get("FEISHU_WEBHOOK_URL") else "配置文件"
- notification_sources.append(f"飞书({source})")
- if config["DINGTALK_WEBHOOK_URL"]:
- source = "环境变量" if os.environ.get("DINGTALK_WEBHOOK_URL") else "配置文件"
- notification_sources.append(f"钉钉({source})")
- if config["WEWORK_WEBHOOK_URL"]:
- source = "环境变量" if os.environ.get("WEWORK_WEBHOOK_URL") else "配置文件"
- notification_sources.append(f"企业微信({source})")
- if config["TELEGRAM_BOT_TOKEN"] and config["TELEGRAM_CHAT_ID"]:
- token_source = (
- "环境变量" if os.environ.get("TELEGRAM_BOT_TOKEN") else "配置文件"
- )
- chat_source = "环境变量" if os.environ.get("TELEGRAM_CHAT_ID") else "配置文件"
- notification_sources.append(f"Telegram({token_source}/{chat_source})")
- if config["EMAIL_FROM"] and config["EMAIL_PASSWORD"] and config["EMAIL_TO"]:
- from_source = "环境变量" if os.environ.get("EMAIL_FROM") else "配置文件"
- notification_sources.append(f"邮件({from_source})")
- if config["NTFY_SERVER_URL"] and config["NTFY_TOPIC"]:
- server_source = "环境变量" if os.environ.get("NTFY_SERVER_URL") else "配置文件"
- notification_sources.append(f"ntfy({server_source})")
- if notification_sources:
- print(f"通知渠道配置来源: {', '.join(notification_sources)}")
- else:
- print("未配置任何通知渠道")
- return config
- print("正在加载配置...")
- CONFIG = load_config()
- print(f"TrendRadar v{VERSION} 配置加载完成")
- print(f"监控平台数量: {len(CONFIG['PLATFORMS'])}")
- # === 工具函数 ===
- def get_beijing_time():
- """获取北京时间"""
- return datetime.now(pytz.timezone("Asia/Shanghai"))
- def format_date_folder():
- """格式化日期文件夹"""
- return get_beijing_time().strftime("%Y年%m月%d日")
- def format_time_filename():
- """格式化时间文件名"""
- return get_beijing_time().strftime("%H时%M分")
- def clean_title(title: str) -> str:
- """清理标题中的特殊字符"""
- if not isinstance(title, str):
- title = str(title)
- cleaned_title = title.replace("\n", " ").replace("\r", " ")
- cleaned_title = re.sub(r"\s+", " ", cleaned_title)
- cleaned_title = cleaned_title.strip()
- return cleaned_title
- def ensure_directory_exists(directory: str):
- """确保目录存在"""
- Path(directory).mkdir(parents=True, exist_ok=True)
- def get_output_path(subfolder: str, filename: str) -> str:
- """获取输出路径"""
- date_folder = format_date_folder()
- output_dir = Path("output") / date_folder / subfolder
- ensure_directory_exists(str(output_dir))
- return str(output_dir / filename)
- def check_version_update(
- current_version: str, version_url: str, proxy_url: Optional[str] = None
- ) -> Tuple[bool, Optional[str]]:
- """检查版本更新"""
- try:
- proxies = None
- if proxy_url:
- proxies = {"http": proxy_url, "https": proxy_url}
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
- "Accept": "text/plain, */*",
- "Cache-Control": "no-cache",
- }
- response = requests.get(
- version_url, proxies=proxies, headers=headers, timeout=10
- )
- response.raise_for_status()
- remote_version = response.text.strip()
- print(f"当前版本: {current_version}, 远程版本: {remote_version}")
- # 比较版本
- def parse_version(version_str):
- try:
- parts = version_str.strip().split(".")
- if len(parts) != 3:
- raise ValueError("版本号格式不正确")
- return int(parts[0]), int(parts[1]), int(parts[2])
- except:
- return 0, 0, 0
- current_tuple = parse_version(current_version)
- remote_tuple = parse_version(remote_version)
- need_update = current_tuple < remote_tuple
- return need_update, remote_version if need_update else None
- except Exception as e:
- print(f"版本检查失败: {e}")
- return False, None
- def is_first_crawl_today() -> bool:
- """检测是否是当天第一次爬取"""
- date_folder = format_date_folder()
- txt_dir = Path("output") / date_folder / "txt"
- if not txt_dir.exists():
- return True
- files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"])
- return len(files) <= 1
- def html_escape(text: str) -> str:
- """HTML转义"""
- if not isinstance(text, str):
- text = str(text)
- return (
- text.replace("&", "&")
- .replace("<", "<")
- .replace(">", ">")
- .replace('"', """)
- .replace("'", "'")
- )
- # === 推送记录管理 ===
- class PushRecordManager:
- """推送记录管理器"""
- def __init__(self):
- self.record_dir = Path("output") / ".push_records"
- self.ensure_record_dir()
- self.cleanup_old_records()
- def ensure_record_dir(self):
- """确保记录目录存在"""
- self.record_dir.mkdir(parents=True, exist_ok=True)
- def get_today_record_file(self) -> Path:
- """获取今天的记录文件路径"""
- today = get_beijing_time().strftime("%Y%m%d")
- return self.record_dir / f"push_record_{today}.json"
- def cleanup_old_records(self):
- """清理过期的推送记录"""
- retention_days = CONFIG["PUSH_WINDOW"]["RECORD_RETENTION_DAYS"]
- current_time = get_beijing_time()
- for record_file in self.record_dir.glob("push_record_*.json"):
- try:
- date_str = record_file.stem.replace("push_record_", "")
- file_date = datetime.strptime(date_str, "%Y%m%d")
- file_date = pytz.timezone("Asia/Shanghai").localize(file_date)
- if (current_time - file_date).days > retention_days:
- record_file.unlink()
- print(f"清理过期推送记录: {record_file.name}")
- except Exception as e:
- print(f"清理记录文件失败 {record_file}: {e}")
- def has_pushed_today(self) -> bool:
- """检查今天是否已经推送过"""
- record_file = self.get_today_record_file()
- if not record_file.exists():
- return False
- try:
- with open(record_file, "r", encoding="utf-8") as f:
- record = json.load(f)
- return record.get("pushed", False)
- except Exception as e:
- print(f"读取推送记录失败: {e}")
- return False
- def record_push(self, report_type: str):
- """记录推送"""
- record_file = self.get_today_record_file()
- now = get_beijing_time()
- record = {
- "pushed": True,
- "push_time": now.strftime("%Y-%m-%d %H:%M:%S"),
- "report_type": report_type,
- }
- try:
- with open(record_file, "w", encoding="utf-8") as f:
- json.dump(record, f, ensure_ascii=False, indent=2)
- print(f"推送记录已保存: {report_type} at {now.strftime('%H:%M:%S')}")
- except Exception as e:
- print(f"保存推送记录失败: {e}")
- def is_in_time_range(self, start_time: str, end_time: str) -> bool:
- """检查当前时间是否在指定时间范围内"""
- now = get_beijing_time()
- current_time = now.strftime("%H:%M")
-
- def normalize_time(time_str: str) -> str:
- """将时间字符串标准化为 HH:MM 格式"""
- try:
- parts = time_str.strip().split(":")
- if len(parts) != 2:
- raise ValueError(f"时间格式错误: {time_str}")
-
- hour = int(parts[0])
- minute = int(parts[1])
-
- if not (0 <= hour <= 23 and 0 <= minute <= 59):
- raise ValueError(f"时间范围错误: {time_str}")
-
- return f"{hour:02d}:{minute:02d}"
- except Exception as e:
- print(f"时间格式化错误 '{time_str}': {e}")
- return time_str
-
- normalized_start = normalize_time(start_time)
- normalized_end = normalize_time(end_time)
- normalized_current = normalize_time(current_time)
-
- result = normalized_start <= normalized_current <= normalized_end
-
- if not result:
- print(f"时间窗口判断:当前 {normalized_current},窗口 {normalized_start}-{normalized_end}")
-
- return result
- # === 数据获取 ===
- class DataFetcher:
- """数据获取器"""
- def __init__(self, proxy_url: Optional[str] = None):
- self.proxy_url = proxy_url
- def fetch_data(
- self,
- id_info: Union[str, Tuple[str, str]],
- max_retries: int = 2,
- min_retry_wait: int = 3,
- max_retry_wait: int = 5,
- ) -> Tuple[Optional[str], str, str]:
- """获取指定ID数据,支持重试"""
- if isinstance(id_info, tuple):
- id_value, alias = id_info
- else:
- id_value = id_info
- alias = id_value
- url = f"https://newsnow.busiyi.world/api/s?id={id_value}&latest"
- proxies = None
- if self.proxy_url:
- proxies = {"http": self.proxy_url, "https": self.proxy_url}
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
- "Accept": "application/json, text/plain, */*",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- "Connection": "keep-alive",
- "Cache-Control": "no-cache",
- }
- retries = 0
- while retries <= max_retries:
- try:
- response = requests.get(
- url, proxies=proxies, headers=headers, timeout=10
- )
- response.raise_for_status()
- data_text = response.text
- data_json = json.loads(data_text)
- status = data_json.get("status", "未知")
- if status not in ["success", "cache"]:
- raise ValueError(f"响应状态异常: {status}")
- status_info = "最新数据" if status == "success" else "缓存数据"
- print(f"获取 {id_value} 成功({status_info})")
- return data_text, id_value, alias
- except Exception as e:
- retries += 1
- if retries <= max_retries:
- base_wait = random.uniform(min_retry_wait, max_retry_wait)
- additional_wait = (retries - 1) * random.uniform(1, 2)
- wait_time = base_wait + additional_wait
- print(f"请求 {id_value} 失败: {e}. {wait_time:.2f}秒后重试...")
- time.sleep(wait_time)
- else:
- print(f"请求 {id_value} 失败: {e}")
- return None, id_value, alias
- return None, id_value, alias
- def crawl_websites(
- self,
- ids_list: List[Union[str, Tuple[str, str]]],
- request_interval: int = CONFIG["REQUEST_INTERVAL"],
- ) -> Tuple[Dict, Dict, List]:
- """爬取多个网站数据"""
- results = {}
- id_to_name = {}
- failed_ids = []
- for i, id_info in enumerate(ids_list):
- if isinstance(id_info, tuple):
- id_value, name = id_info
- else:
- id_value = id_info
- name = id_value
- id_to_name[id_value] = name
- response, _, _ = self.fetch_data(id_info)
- if response:
- try:
- data = json.loads(response)
- results[id_value] = {}
- for index, item in enumerate(data.get("items", []), 1):
- title = item.get("title")
- # 跳过无效标题(None、float、空字符串)
- if title is None or isinstance(title, float) or not str(title).strip():
- continue
- title = str(title).strip()
- url = item.get("url", "")
- mobile_url = item.get("mobileUrl", "")
- if title in results[id_value]:
- results[id_value][title]["ranks"].append(index)
- else:
- results[id_value][title] = {
- "ranks": [index],
- "url": url,
- "mobileUrl": mobile_url,
- }
- except json.JSONDecodeError:
- print(f"解析 {id_value} 响应失败")
- failed_ids.append(id_value)
- except Exception as e:
- print(f"处理 {id_value} 数据出错: {e}")
- failed_ids.append(id_value)
- else:
- failed_ids.append(id_value)
- if i < len(ids_list) - 1:
- actual_interval = request_interval + random.randint(-10, 20)
- actual_interval = max(50, actual_interval)
- time.sleep(actual_interval / 1000)
- print(f"成功: {list(results.keys())}, 失败: {failed_ids}")
- return results, id_to_name, failed_ids
- # === 数据处理 ===
- def save_titles_to_file(results: Dict, id_to_name: Dict, failed_ids: List) -> str:
- """保存标题到文件"""
- file_path = get_output_path("txt", f"{format_time_filename()}.txt")
- with open(file_path, "w", encoding="utf-8") as f:
- for id_value, title_data in results.items():
- # id | name 或 id
- name = id_to_name.get(id_value)
- if name and name != id_value:
- f.write(f"{id_value} | {name}\n")
- else:
- f.write(f"{id_value}\n")
- # 按排名排序标题
- sorted_titles = []
- for title, info in title_data.items():
- cleaned_title = clean_title(title)
- if isinstance(info, dict):
- ranks = info.get("ranks", [])
- url = info.get("url", "")
- mobile_url = info.get("mobileUrl", "")
- else:
- ranks = info if isinstance(info, list) else []
- url = ""
- mobile_url = ""
- rank = ranks[0] if ranks else 1
- sorted_titles.append((rank, cleaned_title, url, mobile_url))
- sorted_titles.sort(key=lambda x: x[0])
- for rank, cleaned_title, url, mobile_url in sorted_titles:
- line = f"{rank}. {cleaned_title}"
- if url:
- line += f" [URL:{url}]"
- if mobile_url:
- line += f" [MOBILE:{mobile_url}]"
- f.write(line + "\n")
- f.write("\n")
- if failed_ids:
- f.write("==== 以下ID请求失败 ====\n")
- for id_value in failed_ids:
- f.write(f"{id_value}\n")
- return file_path
- def load_frequency_words(
- frequency_file: Optional[str] = None,
- ) -> Tuple[List[Dict], List[str]]:
- """加载频率词配置"""
- if frequency_file is None:
- frequency_file = os.environ.get(
- "FREQUENCY_WORDS_PATH", "config/frequency_words.txt"
- )
- frequency_path = Path(frequency_file)
- if not frequency_path.exists():
- raise FileNotFoundError(f"频率词文件 {frequency_file} 不存在")
- with open(frequency_path, "r", encoding="utf-8") as f:
- content = f.read()
- word_groups = [group.strip() for group in content.split("\n\n") if group.strip()]
- processed_groups = []
- filter_words = []
- for group in word_groups:
- words = [word.strip() for word in group.split("\n") if word.strip()]
- group_required_words = []
- group_normal_words = []
- group_filter_words = []
- group_max_count = 0 # 默认不限制
- for word in words:
- if word.startswith("@"):
- # 解析最大显示数量(只接受正整数)
- try:
- count = int(word[1:])
- if count > 0:
- group_max_count = count
- except (ValueError, IndexError):
- pass # 忽略无效的@数字格式
- elif word.startswith("!"):
- filter_words.append(word[1:])
- group_filter_words.append(word[1:])
- elif word.startswith("+"):
- group_required_words.append(word[1:])
- else:
- group_normal_words.append(word)
- if group_required_words or group_normal_words:
- if group_normal_words:
- group_key = " ".join(group_normal_words)
- else:
- group_key = " ".join(group_required_words)
- processed_groups.append(
- {
- "required": group_required_words,
- "normal": group_normal_words,
- "group_key": group_key,
- "max_count": group_max_count, # 新增字段
- }
- )
- return processed_groups, filter_words
- def parse_file_titles(file_path: Path) -> Tuple[Dict, Dict]:
- """解析单个txt文件的标题数据,返回(titles_by_id, id_to_name)"""
- titles_by_id = {}
- id_to_name = {}
- with open(file_path, "r", encoding="utf-8") as f:
- content = f.read()
- sections = content.split("\n\n")
- for section in sections:
- if not section.strip() or "==== 以下ID请求失败 ====" in section:
- continue
- lines = section.strip().split("\n")
- if len(lines) < 2:
- continue
- # id | name 或 id
- header_line = lines[0].strip()
- if " | " in header_line:
- parts = header_line.split(" | ", 1)
- source_id = parts[0].strip()
- name = parts[1].strip()
- id_to_name[source_id] = name
- else:
- source_id = header_line
- id_to_name[source_id] = source_id
- titles_by_id[source_id] = {}
- for line in lines[1:]:
- if line.strip():
- try:
- title_part = line.strip()
- rank = None
- # 提取排名
- if ". " in title_part and title_part.split(". ")[0].isdigit():
- rank_str, title_part = title_part.split(". ", 1)
- rank = int(rank_str)
- # 提取 MOBILE URL
- mobile_url = ""
- if " [MOBILE:" in title_part:
- title_part, mobile_part = title_part.rsplit(" [MOBILE:", 1)
- if mobile_part.endswith("]"):
- mobile_url = mobile_part[:-1]
- # 提取 URL
- url = ""
- if " [URL:" in title_part:
- title_part, url_part = title_part.rsplit(" [URL:", 1)
- if url_part.endswith("]"):
- url = url_part[:-1]
- title = clean_title(title_part.strip())
- ranks = [rank] if rank is not None else [1]
- titles_by_id[source_id][title] = {
- "ranks": ranks,
- "url": url,
- "mobileUrl": mobile_url,
- }
- except Exception as e:
- print(f"解析标题行出错: {line}, 错误: {e}")
- return titles_by_id, id_to_name
- def read_all_today_titles(
- current_platform_ids: Optional[List[str]] = None,
- ) -> Tuple[Dict, Dict, Dict]:
- """读取当天所有标题文件,支持按当前监控平台过滤"""
- date_folder = format_date_folder()
- txt_dir = Path("output") / date_folder / "txt"
- if not txt_dir.exists():
- return {}, {}, {}
- all_results = {}
- final_id_to_name = {}
- title_info = {}
- files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"])
- for file_path in files:
- time_info = file_path.stem
- titles_by_id, file_id_to_name = parse_file_titles(file_path)
- if current_platform_ids is not None:
- filtered_titles_by_id = {}
- filtered_id_to_name = {}
- for source_id, title_data in titles_by_id.items():
- if source_id in current_platform_ids:
- filtered_titles_by_id[source_id] = title_data
- if source_id in file_id_to_name:
- filtered_id_to_name[source_id] = file_id_to_name[source_id]
- titles_by_id = filtered_titles_by_id
- file_id_to_name = filtered_id_to_name
- final_id_to_name.update(file_id_to_name)
- for source_id, title_data in titles_by_id.items():
- process_source_data(
- source_id, title_data, time_info, all_results, title_info
- )
- return all_results, final_id_to_name, title_info
- def process_source_data(
- source_id: str,
- title_data: Dict,
- time_info: str,
- all_results: Dict,
- title_info: Dict,
- ) -> None:
- """处理来源数据,合并重复标题"""
- if source_id not in all_results:
- all_results[source_id] = title_data
- if source_id not in title_info:
- title_info[source_id] = {}
- for title, data in title_data.items():
- ranks = data.get("ranks", [])
- url = data.get("url", "")
- mobile_url = data.get("mobileUrl", "")
- title_info[source_id][title] = {
- "first_time": time_info,
- "last_time": time_info,
- "count": 1,
- "ranks": ranks,
- "url": url,
- "mobileUrl": mobile_url,
- }
- else:
- for title, data in title_data.items():
- ranks = data.get("ranks", [])
- url = data.get("url", "")
- mobile_url = data.get("mobileUrl", "")
- if title not in all_results[source_id]:
- all_results[source_id][title] = {
- "ranks": ranks,
- "url": url,
- "mobileUrl": mobile_url,
- }
- title_info[source_id][title] = {
- "first_time": time_info,
- "last_time": time_info,
- "count": 1,
- "ranks": ranks,
- "url": url,
- "mobileUrl": mobile_url,
- }
- else:
- existing_data = all_results[source_id][title]
- existing_ranks = existing_data.get("ranks", [])
- existing_url = existing_data.get("url", "")
- existing_mobile_url = existing_data.get("mobileUrl", "")
- merged_ranks = existing_ranks.copy()
- for rank in ranks:
- if rank not in merged_ranks:
- merged_ranks.append(rank)
- all_results[source_id][title] = {
- "ranks": merged_ranks,
- "url": existing_url or url,
- "mobileUrl": existing_mobile_url or mobile_url,
- }
- title_info[source_id][title]["last_time"] = time_info
- title_info[source_id][title]["ranks"] = merged_ranks
- title_info[source_id][title]["count"] += 1
- if not title_info[source_id][title].get("url"):
- title_info[source_id][title]["url"] = url
- if not title_info[source_id][title].get("mobileUrl"):
- title_info[source_id][title]["mobileUrl"] = mobile_url
- def detect_latest_new_titles(current_platform_ids: Optional[List[str]] = None) -> Dict:
- """检测当日最新批次的新增标题,支持按当前监控平台过滤"""
- date_folder = format_date_folder()
- txt_dir = Path("output") / date_folder / "txt"
- if not txt_dir.exists():
- return {}
- files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"])
- if len(files) < 2:
- return {}
- # 解析最新文件
- latest_file = files[-1]
- latest_titles, _ = parse_file_titles(latest_file)
- # 如果指定了当前平台列表,过滤最新文件数据
- if current_platform_ids is not None:
- filtered_latest_titles = {}
- for source_id, title_data in latest_titles.items():
- if source_id in current_platform_ids:
- filtered_latest_titles[source_id] = title_data
- latest_titles = filtered_latest_titles
- # 汇总历史标题(按平台过滤)
- historical_titles = {}
- for file_path in files[:-1]:
- historical_data, _ = parse_file_titles(file_path)
- # 过滤历史数据
- if current_platform_ids is not None:
- filtered_historical_data = {}
- for source_id, title_data in historical_data.items():
- if source_id in current_platform_ids:
- filtered_historical_data[source_id] = title_data
- historical_data = filtered_historical_data
- for source_id, titles_data in historical_data.items():
- if source_id not in historical_titles:
- historical_titles[source_id] = set()
- for title in titles_data.keys():
- historical_titles[source_id].add(title)
- # 找出新增标题
- new_titles = {}
- for source_id, latest_source_titles in latest_titles.items():
- historical_set = historical_titles.get(source_id, set())
- source_new_titles = {}
- for title, title_data in latest_source_titles.items():
- if title not in historical_set:
- source_new_titles[title] = title_data
- if source_new_titles:
- new_titles[source_id] = source_new_titles
- return new_titles
- # === 统计和分析 ===
- def calculate_news_weight(
- title_data: Dict, rank_threshold: int = CONFIG["RANK_THRESHOLD"]
- ) -> float:
- """计算新闻权重,用于排序"""
- ranks = title_data.get("ranks", [])
- if not ranks:
- return 0.0
- count = title_data.get("count", len(ranks))
- weight_config = CONFIG["WEIGHT_CONFIG"]
- # 排名权重:Σ(11 - min(rank, 10)) / 出现次数
- rank_scores = []
- for rank in ranks:
- score = 11 - min(rank, 10)
- rank_scores.append(score)
- rank_weight = sum(rank_scores) / len(ranks) if ranks else 0
- # 频次权重:min(出现次数, 10) × 10
- frequency_weight = min(count, 10) * 10
- # 热度加成:高排名次数 / 总出现次数 × 100
- high_rank_count = sum(1 for rank in ranks if rank <= rank_threshold)
- hotness_ratio = high_rank_count / len(ranks) if ranks else 0
- hotness_weight = hotness_ratio * 100
- total_weight = (
- rank_weight * weight_config["RANK_WEIGHT"]
- + frequency_weight * weight_config["FREQUENCY_WEIGHT"]
- + hotness_weight * weight_config["HOTNESS_WEIGHT"]
- )
- return total_weight
- def matches_word_groups(
- title: str, word_groups: List[Dict], filter_words: List[str]
- ) -> bool:
- """检查标题是否匹配词组规则"""
- # 防御性类型检查:确保 title 是有效字符串
- if not isinstance(title, str):
- title = str(title) if title is not None else ""
- if not title.strip():
- return False
- # 如果没有配置词组,则匹配所有标题(支持显示全部新闻)
- if not word_groups:
- return True
- title_lower = title.lower()
- # 过滤词检查
- if any(filter_word.lower() in title_lower for filter_word in filter_words):
- return False
- # 词组匹配检查
- for group in word_groups:
- required_words = group["required"]
- normal_words = group["normal"]
- # 必须词检查
- if required_words:
- all_required_present = all(
- req_word.lower() in title_lower for req_word in required_words
- )
- if not all_required_present:
- continue
- # 普通词检查
- if normal_words:
- any_normal_present = any(
- normal_word.lower() in title_lower for normal_word in normal_words
- )
- if not any_normal_present:
- continue
- return True
- return False
- def format_time_display(first_time: str, last_time: str) -> str:
- """格式化时间显示"""
- if not first_time:
- return ""
- if first_time == last_time or not last_time:
- return first_time
- else:
- return f"[{first_time} ~ {last_time}]"
- def format_rank_display(ranks: List[int], rank_threshold: int, format_type: str) -> str:
- """统一的排名格式化方法"""
- if not ranks:
- return ""
- unique_ranks = sorted(set(ranks))
- min_rank = unique_ranks[0]
- max_rank = unique_ranks[-1]
- if format_type == "html":
- highlight_start = "<font color='red'><strong>"
- highlight_end = "</strong></font>"
- elif format_type == "feishu":
- highlight_start = "<font color='red'>**"
- highlight_end = "**</font>"
- elif format_type == "dingtalk":
- highlight_start = "**"
- highlight_end = "**"
- elif format_type == "wework":
- highlight_start = "**"
- highlight_end = "**"
- elif format_type == "telegram":
- highlight_start = "<b>"
- highlight_end = "</b>"
- else:
- highlight_start = "**"
- highlight_end = "**"
- if min_rank <= rank_threshold:
- if min_rank == max_rank:
- return f"{highlight_start}[{min_rank}]{highlight_end}"
- else:
- return f"{highlight_start}[{min_rank} - {max_rank}]{highlight_end}"
- else:
- if min_rank == max_rank:
- return f"[{min_rank}]"
- else:
- return f"[{min_rank} - {max_rank}]"
- def count_word_frequency(
- results: Dict,
- word_groups: List[Dict],
- filter_words: List[str],
- id_to_name: Dict,
- title_info: Optional[Dict] = None,
- rank_threshold: int = CONFIG["RANK_THRESHOLD"],
- new_titles: Optional[Dict] = None,
- mode: str = "daily",
- ) -> Tuple[List[Dict], int]:
- """统计词频,支持必须词、频率词、过滤词,并标记新增标题"""
- # 如果没有配置词组,创建一个包含所有新闻的虚拟词组
- if not word_groups:
- print("频率词配置为空,将显示所有新闻")
- word_groups = [{"required": [], "normal": [], "group_key": "全部新闻"}]
- filter_words = [] # 清空过滤词,显示所有新闻
- is_first_today = is_first_crawl_today()
- # 确定处理的数据源和新增标记逻辑
- if mode == "incremental":
- if is_first_today:
- # 增量模式 + 当天第一次:处理所有新闻,都标记为新增
- results_to_process = results
- all_news_are_new = True
- else:
- # 增量模式 + 当天非第一次:只处理新增的新闻
- results_to_process = new_titles if new_titles else {}
- all_news_are_new = True
- elif mode == "current":
- # current 模式:只处理当前时间批次的新闻,但统计信息来自全部历史
- if title_info:
- latest_time = None
- for source_titles in title_info.values():
- for title_data in source_titles.values():
- last_time = title_data.get("last_time", "")
- if last_time:
- if latest_time is None or last_time > latest_time:
- latest_time = last_time
- # 只处理 last_time 等于最新时间的新闻
- if latest_time:
- results_to_process = {}
- for source_id, source_titles in results.items():
- if source_id in title_info:
- filtered_titles = {}
- for title, title_data in source_titles.items():
- if title in title_info[source_id]:
- info = title_info[source_id][title]
- if info.get("last_time") == latest_time:
- filtered_titles[title] = title_data
- if filtered_titles:
- results_to_process[source_id] = filtered_titles
- print(
- f"当前榜单模式:最新时间 {latest_time},筛选出 {sum(len(titles) for titles in results_to_process.values())} 条当前榜单新闻"
- )
- else:
- results_to_process = results
- else:
- results_to_process = results
- all_news_are_new = False
- else:
- # 当日汇总模式:处理所有新闻
- results_to_process = results
- all_news_are_new = False
- total_input_news = sum(len(titles) for titles in results.values())
- filter_status = (
- "全部显示"
- if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻"
- else "频率词过滤"
- )
- print(f"当日汇总模式:处理 {total_input_news} 条新闻,模式:{filter_status}")
- word_stats = {}
- total_titles = 0
- processed_titles = {}
- matched_new_count = 0
- if title_info is None:
- title_info = {}
- if new_titles is None:
- new_titles = {}
- for group in word_groups:
- group_key = group["group_key"]
- word_stats[group_key] = {"count": 0, "titles": {}}
- for source_id, titles_data in results_to_process.items():
- total_titles += len(titles_data)
- if source_id not in processed_titles:
- processed_titles[source_id] = {}
- for title, title_data in titles_data.items():
- if title in processed_titles.get(source_id, {}):
- continue
- # 使用统一的匹配逻辑
- matches_frequency_words = matches_word_groups(
- title, word_groups, filter_words
- )
- if not matches_frequency_words:
- continue
- # 如果是增量模式或 current 模式第一次,统计匹配的新增新闻数量
- if (mode == "incremental" and all_news_are_new) or (
- mode == "current" and is_first_today
- ):
- matched_new_count += 1
- source_ranks = title_data.get("ranks", [])
- source_url = title_data.get("url", "")
- source_mobile_url = title_data.get("mobileUrl", "")
- # 找到匹配的词组(防御性转换确保类型安全)
- title_lower = str(title).lower() if not isinstance(title, str) else title.lower()
- for group in word_groups:
- required_words = group["required"]
- normal_words = group["normal"]
- # 如果是"全部新闻"模式,所有标题都匹配第一个(唯一的)词组
- if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻":
- group_key = group["group_key"]
- word_stats[group_key]["count"] += 1
- if source_id not in word_stats[group_key]["titles"]:
- word_stats[group_key]["titles"][source_id] = []
- else:
- # 原有的匹配逻辑
- if required_words:
- all_required_present = all(
- req_word.lower() in title_lower
- for req_word in required_words
- )
- if not all_required_present:
- continue
- if normal_words:
- any_normal_present = any(
- normal_word.lower() in title_lower
- for normal_word in normal_words
- )
- if not any_normal_present:
- continue
- group_key = group["group_key"]
- word_stats[group_key]["count"] += 1
- if source_id not in word_stats[group_key]["titles"]:
- word_stats[group_key]["titles"][source_id] = []
- first_time = ""
- last_time = ""
- count_info = 1
- ranks = source_ranks if source_ranks else []
- url = source_url
- mobile_url = source_mobile_url
- # 对于 current 模式,从历史统计信息中获取完整数据
- if (
- mode == "current"
- and title_info
- and source_id in title_info
- and title in title_info[source_id]
- ):
- info = title_info[source_id][title]
- first_time = info.get("first_time", "")
- last_time = info.get("last_time", "")
- count_info = info.get("count", 1)
- if "ranks" in info and info["ranks"]:
- ranks = info["ranks"]
- url = info.get("url", source_url)
- mobile_url = info.get("mobileUrl", source_mobile_url)
- elif (
- title_info
- and source_id in title_info
- and title in title_info[source_id]
- ):
- info = title_info[source_id][title]
- first_time = info.get("first_time", "")
- last_time = info.get("last_time", "")
- count_info = info.get("count", 1)
- if "ranks" in info and info["ranks"]:
- ranks = info["ranks"]
- url = info.get("url", source_url)
- mobile_url = info.get("mobileUrl", source_mobile_url)
- if not ranks:
- ranks = [99]
- time_display = format_time_display(first_time, last_time)
- source_name = id_to_name.get(source_id, source_id)
- # 判断是否为新增
- is_new = False
- if all_news_are_new:
- # 增量模式下所有处理的新闻都是新增,或者当天第一次的所有新闻都是新增
- is_new = True
- elif new_titles and source_id in new_titles:
- # 检查是否在新增列表中
- new_titles_for_source = new_titles[source_id]
- is_new = title in new_titles_for_source
- word_stats[group_key]["titles"][source_id].append(
- {
- "title": title,
- "source_name": source_name,
- "first_time": first_time,
- "last_time": last_time,
- "time_display": time_display,
- "count": count_info,
- "ranks": ranks,
- "rank_threshold": rank_threshold,
- "url": url,
- "mobileUrl": mobile_url,
- "is_new": is_new,
- }
- )
- if source_id not in processed_titles:
- processed_titles[source_id] = {}
- processed_titles[source_id][title] = True
- break
- # 最后统一打印汇总信息
- if mode == "incremental":
- if is_first_today:
- total_input_news = sum(len(titles) for titles in results.values())
- filter_status = (
- "全部显示"
- if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻"
- else "频率词匹配"
- )
- print(
- f"增量模式:当天第一次爬取,{total_input_news} 条新闻中有 {matched_new_count} 条{filter_status}"
- )
- else:
- if new_titles:
- total_new_count = sum(len(titles) for titles in new_titles.values())
- filter_status = (
- "全部显示"
- if len(word_groups) == 1
- and word_groups[0]["group_key"] == "全部新闻"
- else "匹配频率词"
- )
- print(
- f"增量模式:{total_new_count} 条新增新闻中,有 {matched_new_count} 条{filter_status}"
- )
- if matched_new_count == 0 and len(word_groups) > 1:
- print("增量模式:没有新增新闻匹配频率词,将不会发送通知")
- else:
- print("增量模式:未检测到新增新闻")
- elif mode == "current":
- total_input_news = sum(len(titles) for titles in results_to_process.values())
- if is_first_today:
- filter_status = (
- "全部显示"
- if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻"
- else "频率词匹配"
- )
- print(
- f"当前榜单模式:当天第一次爬取,{total_input_news} 条当前榜单新闻中有 {matched_new_count} 条{filter_status}"
- )
- else:
- matched_count = sum(stat["count"] for stat in word_stats.values())
- filter_status = (
- "全部显示"
- if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻"
- else "频率词匹配"
- )
- print(
- f"当前榜单模式:{total_input_news} 条当前榜单新闻中有 {matched_count} 条{filter_status}"
- )
- stats = []
- # 创建 group_key 到位置和最大数量的映射
- group_key_to_position = {
- group["group_key"]: idx for idx, group in enumerate(word_groups)
- }
- group_key_to_max_count = {
- group["group_key"]: group.get("max_count", 0) for group in word_groups
- }
- for group_key, data in word_stats.items():
- all_titles = []
- for source_id, title_list in data["titles"].items():
- all_titles.extend(title_list)
- # 按权重排序
- sorted_titles = sorted(
- all_titles,
- key=lambda x: (
- -calculate_news_weight(x, rank_threshold),
- min(x["ranks"]) if x["ranks"] else 999,
- -x["count"],
- ),
- )
- # 应用最大显示数量限制(优先级:单独配置 > 全局配置)
- group_max_count = group_key_to_max_count.get(group_key, 0)
- if group_max_count == 0:
- # 使用全局配置
- group_max_count = CONFIG.get("MAX_NEWS_PER_KEYWORD", 0)
- if group_max_count > 0:
- sorted_titles = sorted_titles[:group_max_count]
- stats.append(
- {
- "word": group_key,
- "count": data["count"],
- "position": group_key_to_position.get(group_key, 999),
- "titles": sorted_titles,
- "percentage": (
- round(data["count"] / total_titles * 100, 2)
- if total_titles > 0
- else 0
- ),
- }
- )
- # 根据配置选择排序优先级
- if CONFIG.get("SORT_BY_POSITION_FIRST", False):
- # 先按配置位置,再按热点条数
- stats.sort(key=lambda x: (x["position"], -x["count"]))
- else:
- # 先按热点条数,再按配置位置(原逻辑)
- stats.sort(key=lambda x: (-x["count"], x["position"]))
- return stats, total_titles
- # === 报告生成 ===
- def prepare_report_data(
- stats: List[Dict],
- failed_ids: Optional[List] = None,
- new_titles: Optional[Dict] = None,
- id_to_name: Optional[Dict] = None,
- mode: str = "daily",
- ) -> Dict:
- """准备报告数据"""
- processed_new_titles = []
- # 在增量模式下隐藏新增新闻区域
- hide_new_section = mode == "incremental"
- # 只有在非隐藏模式下才处理新增新闻部分
- if not hide_new_section:
- filtered_new_titles = {}
- if new_titles and id_to_name:
- word_groups, filter_words = load_frequency_words()
- for source_id, titles_data in new_titles.items():
- filtered_titles = {}
- for title, title_data in titles_data.items():
- if matches_word_groups(title, word_groups, filter_words):
- filtered_titles[title] = title_data
- if filtered_titles:
- filtered_new_titles[source_id] = filtered_titles
- if filtered_new_titles and id_to_name:
- for source_id, titles_data in filtered_new_titles.items():
- source_name = id_to_name.get(source_id, source_id)
- source_titles = []
- for title, title_data in titles_data.items():
- url = title_data.get("url", "")
- mobile_url = title_data.get("mobileUrl", "")
- ranks = title_data.get("ranks", [])
- processed_title = {
- "title": title,
- "source_name": source_name,
- "time_display": "",
- "count": 1,
- "ranks": ranks,
- "rank_threshold": CONFIG["RANK_THRESHOLD"],
- "url": url,
- "mobile_url": mobile_url,
- "is_new": True,
- }
- source_titles.append(processed_title)
- if source_titles:
- processed_new_titles.append(
- {
- "source_id": source_id,
- "source_name": source_name,
- "titles": source_titles,
- }
- )
- processed_stats = []
- for stat in stats:
- if stat["count"] <= 0:
- continue
- processed_titles = []
- for title_data in stat["titles"]:
- processed_title = {
- "title": title_data["title"],
- "source_name": title_data["source_name"],
- "time_display": title_data["time_display"],
- "count": title_data["count"],
- "ranks": title_data["ranks"],
- "rank_threshold": title_data["rank_threshold"],
- "url": title_data.get("url", ""),
- "mobile_url": title_data.get("mobileUrl", ""),
- "is_new": title_data.get("is_new", False),
- }
- processed_titles.append(processed_title)
- processed_stats.append(
- {
- "word": stat["word"],
- "count": stat["count"],
- "percentage": stat.get("percentage", 0),
- "titles": processed_titles,
- }
- )
- return {
- "stats": processed_stats,
- "new_titles": processed_new_titles,
- "failed_ids": failed_ids or [],
- "total_new_count": sum(
- len(source["titles"]) for source in processed_new_titles
- ),
- }
- def format_title_for_platform(
- platform: str, title_data: Dict, show_source: bool = True
- ) -> str:
- """统一的标题格式化方法"""
- rank_display = format_rank_display(
- title_data["ranks"], title_data["rank_threshold"], platform
- )
- link_url = title_data["mobile_url"] or title_data["url"]
- cleaned_title = clean_title(title_data["title"])
- if platform == "feishu":
- if link_url:
- formatted_title = f"[{cleaned_title}]({link_url})"
- else:
- formatted_title = cleaned_title
- title_prefix = "🆕 " if title_data.get("is_new") else ""
- if show_source:
- result = f"<font color='grey'>[{title_data['source_name']}]</font> {title_prefix}{formatted_title}"
- else:
- result = f"{title_prefix}{formatted_title}"
- if rank_display:
- result += f" {rank_display}"
- if title_data["time_display"]:
- result += f" <font color='grey'>- {title_data['time_display']}</font>"
- if title_data["count"] > 1:
- result += f" <font color='green'>({title_data['count']}次)</font>"
- return result
- elif platform == "dingtalk":
- if link_url:
- formatted_title = f"[{cleaned_title}]({link_url})"
- else:
- formatted_title = cleaned_title
- title_prefix = "🆕 " if title_data.get("is_new") else ""
- if show_source:
- result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
- else:
- result = f"{title_prefix}{formatted_title}"
- if rank_display:
- result += f" {rank_display}"
- if title_data["time_display"]:
- result += f" - {title_data['time_display']}"
- if title_data["count"] > 1:
- result += f" ({title_data['count']}次)"
- return result
- elif platform == "wework":
- if link_url:
- formatted_title = f"[{cleaned_title}]({link_url})"
- else:
- formatted_title = cleaned_title
- title_prefix = "🆕 " if title_data.get("is_new") else ""
- if show_source:
- result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
- else:
- result = f"{title_prefix}{formatted_title}"
- if rank_display:
- result += f" {rank_display}"
- if title_data["time_display"]:
- result += f" - {title_data['time_display']}"
- if title_data["count"] > 1:
- result += f" ({title_data['count']}次)"
- return result
- elif platform == "telegram":
- if link_url:
- formatted_title = f'<a href="{link_url}">{html_escape(cleaned_title)}</a>'
- else:
- formatted_title = cleaned_title
- title_prefix = "🆕 " if title_data.get("is_new") else ""
- if show_source:
- result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
- else:
- result = f"{title_prefix}{formatted_title}"
- if rank_display:
- result += f" {rank_display}"
- if title_data["time_display"]:
- result += f" <code>- {title_data['time_display']}</code>"
- if title_data["count"] > 1:
- result += f" <code>({title_data['count']}次)</code>"
- return result
- elif platform == "ntfy":
- if link_url:
- formatted_title = f"[{cleaned_title}]({link_url})"
- else:
- formatted_title = cleaned_title
- title_prefix = "🆕 " if title_data.get("is_new") else ""
- if show_source:
- result = f"[{title_data['source_name']}] {title_prefix}{formatted_title}"
- else:
- result = f"{title_prefix}{formatted_title}"
- if rank_display:
- result += f" {rank_display}"
- if title_data["time_display"]:
- result += f" `- {title_data['time_display']}`"
- if title_data["count"] > 1:
- result += f" `({title_data['count']}次)`"
- return result
- elif platform == "html":
- rank_display = format_rank_display(
- title_data["ranks"], title_data["rank_threshold"], "html"
- )
- link_url = title_data["mobile_url"] or title_data["url"]
- escaped_title = html_escape(cleaned_title)
- escaped_source_name = html_escape(title_data["source_name"])
- if link_url:
- escaped_url = html_escape(link_url)
- formatted_title = f'[{escaped_source_name}] <a href="{escaped_url}" target="_blank" class="news-link">{escaped_title}</a>'
- else:
- formatted_title = (
- f'[{escaped_source_name}] <span class="no-link">{escaped_title}</span>'
- )
- if rank_display:
- formatted_title += f" {rank_display}"
- if title_data["time_display"]:
- escaped_time = html_escape(title_data["time_display"])
- formatted_title += f" <font color='grey'>- {escaped_time}</font>"
- if title_data["count"] > 1:
- formatted_title += f" <font color='green'>({title_data['count']}次)</font>"
- if title_data.get("is_new"):
- formatted_title = f"<div class='new-title'>🆕 {formatted_title}</div>"
- return formatted_title
- else:
- return cleaned_title
- def generate_html_report(
- stats: List[Dict],
- total_titles: int,
- failed_ids: Optional[List] = None,
- new_titles: Optional[Dict] = None,
- id_to_name: Optional[Dict] = None,
- mode: str = "daily",
- is_daily_summary: bool = False,
- update_info: Optional[Dict] = None,
- ) -> str:
- """生成HTML报告"""
- if is_daily_summary:
- if mode == "current":
- filename = "当前榜单汇总.html"
- elif mode == "incremental":
- filename = "当日增量.html"
- else:
- filename = "当日汇总.html"
- else:
- filename = f"{format_time_filename()}.html"
- file_path = get_output_path("html", filename)
- report_data = prepare_report_data(stats, failed_ids, new_titles, id_to_name, mode)
- html_content = render_html_content(
- report_data, total_titles, is_daily_summary, mode, update_info
- )
- with open(file_path, "w", encoding="utf-8") as f:
- f.write(html_content)
- if is_daily_summary:
- root_file_path = Path("index.html")
- with open(root_file_path, "w", encoding="utf-8") as f:
- f.write(html_content)
- return file_path
- def render_html_content(
- report_data: Dict,
- total_titles: int,
- is_daily_summary: bool = False,
- mode: str = "daily",
- update_info: Optional[Dict] = None,
- ) -> str:
- """渲染HTML内容"""
- html = """
- <!DOCTYPE html>
- <html>
- <head>
- <meta charset="UTF-8">
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
- <title>热点新闻分析</title>
- <script src="https://cdnjs.cloudflare.com/ajax/libs/html2canvas/1.4.1/html2canvas.min.js" integrity="sha512-BNaRQnYJYiPSqHHDb58B0yaPfCu+Wgds8Gp/gU33kqBtgNS4tSPHuGibyoeqMV/TJlSKda6FXzoEyYGjTe+vXA==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
- <style>
- * { box-sizing: border-box; }
- body {
- font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', system-ui, sans-serif;
- margin: 0;
- padding: 16px;
- background: #fafafa;
- color: #333;
- line-height: 1.5;
- }
-
- .container {
- max-width: 600px;
- margin: 0 auto;
- background: white;
- border-radius: 12px;
- overflow: hidden;
- box-shadow: 0 2px 16px rgba(0,0,0,0.06);
- }
-
- .header {
- background: linear-gradient(135deg, #4f46e5 0%, #7c3aed 100%);
- color: white;
- padding: 32px 24px;
- text-align: center;
- position: relative;
- }
-
- .save-buttons {
- position: absolute;
- top: 16px;
- right: 16px;
- display: flex;
- gap: 8px;
- }
-
- .save-btn {
- background: rgba(255, 255, 255, 0.2);
- border: 1px solid rgba(255, 255, 255, 0.3);
- color: white;
- padding: 8px 16px;
- border-radius: 6px;
- cursor: pointer;
- font-size: 13px;
- font-weight: 500;
- transition: all 0.2s ease;
- backdrop-filter: blur(10px);
- white-space: nowrap;
- }
-
- .save-btn:hover {
- background: rgba(255, 255, 255, 0.3);
- border-color: rgba(255, 255, 255, 0.5);
- transform: translateY(-1px);
- }
-
- .save-btn:active {
- transform: translateY(0);
- }
-
- .save-btn:disabled {
- opacity: 0.6;
- cursor: not-allowed;
- }
-
- .header-title {
- font-size: 22px;
- font-weight: 700;
- margin: 0 0 20px 0;
- }
-
- .header-info {
- display: grid;
- grid-template-columns: 1fr 1fr;
- gap: 16px;
- font-size: 14px;
- opacity: 0.95;
- }
-
- .info-item {
- text-align: center;
- }
-
- .info-label {
- display: block;
- font-size: 12px;
- opacity: 0.8;
- margin-bottom: 4px;
- }
-
- .info-value {
- font-weight: 600;
- font-size: 16px;
- }
-
- .content {
- padding: 24px;
- }
-
- .word-group {
- margin-bottom: 40px;
- }
-
- .word-group:first-child {
- margin-top: 0;
- }
-
- .word-header {
- display: flex;
- align-items: center;
- justify-content: space-between;
- margin-bottom: 20px;
- padding-bottom: 8px;
- border-bottom: 1px solid #f0f0f0;
- }
-
- .word-info {
- display: flex;
- align-items: center;
- gap: 12px;
- }
-
- .word-name {
- font-size: 17px;
- font-weight: 600;
- color: #1a1a1a;
- }
-
- .word-count {
- color: #666;
- font-size: 13px;
- font-weight: 500;
- }
-
- .word-count.hot { color: #dc2626; font-weight: 600; }
- .word-count.warm { color: #ea580c; font-weight: 600; }
-
- .word-index {
- color: #999;
- font-size: 12px;
- }
-
- .news-item {
- margin-bottom: 20px;
- padding: 16px 0;
- border-bottom: 1px solid #f5f5f5;
- position: relative;
- display: flex;
- gap: 12px;
- align-items: center;
- }
-
- .news-item:last-child {
- border-bottom: none;
- }
-
- .news-item.new::after {
- content: "NEW";
- position: absolute;
- top: 12px;
- right: 0;
- background: #fbbf24;
- color: #92400e;
- font-size: 9px;
- font-weight: 700;
- padding: 3px 6px;
- border-radius: 4px;
- letter-spacing: 0.5px;
- }
-
- .news-number {
- color: #999;
- font-size: 13px;
- font-weight: 600;
- min-width: 20px;
- text-align: center;
- flex-shrink: 0;
- background: #f8f9fa;
- border-radius: 50%;
- width: 24px;
- height: 24px;
- display: flex;
- align-items: center;
- justify-content: center;
- align-self: flex-start;
- margin-top: 8px;
- }
-
- .news-content {
- flex: 1;
- min-width: 0;
- padding-right: 40px;
- }
-
- .news-item.new .news-content {
- padding-right: 50px;
- }
-
- .news-header {
- display: flex;
- align-items: center;
- gap: 8px;
- margin-bottom: 8px;
- flex-wrap: wrap;
- }
-
- .source-name {
- color: #666;
- font-size: 12px;
- font-weight: 500;
- }
-
- .rank-num {
- color: #fff;
- background: #6b7280;
- font-size: 10px;
- font-weight: 700;
- padding: 2px 6px;
- border-radius: 10px;
- min-width: 18px;
- text-align: center;
- }
-
- .rank-num.top { background: #dc2626; }
- .rank-num.high { background: #ea580c; }
-
- .time-info {
- color: #999;
- font-size: 11px;
- }
-
- .count-info {
- color: #059669;
- font-size: 11px;
- font-weight: 500;
- }
-
- .news-title {
- font-size: 15px;
- line-height: 1.4;
- color: #1a1a1a;
- margin: 0;
- }
-
- .news-link {
- color: #2563eb;
- text-decoration: none;
- }
-
- .news-link:hover {
- text-decoration: underline;
- }
-
- .news-link:visited {
- color: #7c3aed;
- }
-
- .new-section {
- margin-top: 40px;
- padding-top: 24px;
- border-top: 2px solid #f0f0f0;
- }
-
- .new-section-title {
- color: #1a1a1a;
- font-size: 16px;
- font-weight: 600;
- margin: 0 0 20px 0;
- }
-
- .new-source-group {
- margin-bottom: 24px;
- }
-
- .new-source-title {
- color: #666;
- font-size: 13px;
- font-weight: 500;
- margin: 0 0 12px 0;
- padding-bottom: 6px;
- border-bottom: 1px solid #f5f5f5;
- }
-
- .new-item {
- display: flex;
- align-items: center;
- gap: 12px;
- padding: 8px 0;
- border-bottom: 1px solid #f9f9f9;
- }
-
- .new-item:last-child {
- border-bottom: none;
- }
-
- .new-item-number {
- color: #999;
- font-size: 12px;
- font-weight: 600;
- min-width: 18px;
- text-align: center;
- flex-shrink: 0;
- background: #f8f9fa;
- border-radius: 50%;
- width: 20px;
- height: 20px;
- display: flex;
- align-items: center;
- justify-content: center;
- }
-
- .new-item-rank {
- color: #fff;
- background: #6b7280;
- font-size: 10px;
- font-weight: 700;
- padding: 3px 6px;
- border-radius: 8px;
- min-width: 20px;
- text-align: center;
- flex-shrink: 0;
- }
-
- .new-item-rank.top { background: #dc2626; }
- .new-item-rank.high { background: #ea580c; }
-
- .new-item-content {
- flex: 1;
- min-width: 0;
- }
-
- .new-item-title {
- font-size: 14px;
- line-height: 1.4;
- color: #1a1a1a;
- margin: 0;
- }
-
- .error-section {
- background: #fef2f2;
- border: 1px solid #fecaca;
- border-radius: 8px;
- padding: 16px;
- margin-bottom: 24px;
- }
-
- .error-title {
- color: #dc2626;
- font-size: 14px;
- font-weight: 600;
- margin: 0 0 8px 0;
- }
-
- .error-list {
- list-style: none;
- padding: 0;
- margin: 0;
- }
-
- .error-item {
- color: #991b1b;
- font-size: 13px;
- padding: 2px 0;
- font-family: 'SF Mono', Consolas, monospace;
- }
-
- .footer {
- margin-top: 32px;
- padding: 20px 24px;
- background: #f8f9fa;
- border-top: 1px solid #e5e7eb;
- text-align: center;
- }
-
- .footer-content {
- font-size: 13px;
- color: #6b7280;
- line-height: 1.6;
- }
-
- .footer-link {
- color: #4f46e5;
- text-decoration: none;
- font-weight: 500;
- transition: color 0.2s ease;
- }
-
- .footer-link:hover {
- color: #7c3aed;
- text-decoration: underline;
- }
-
- .project-name {
- font-weight: 600;
- color: #374151;
- }
-
- @media (max-width: 480px) {
- body { padding: 12px; }
- .header { padding: 24px 20px; }
- .content { padding: 20px; }
- .footer { padding: 16px 20px; }
- .header-info { grid-template-columns: 1fr; gap: 12px; }
- .news-header { gap: 6px; }
- .news-content { padding-right: 45px; }
- .news-item { gap: 8px; }
- .new-item { gap: 8px; }
- .news-number { width: 20px; height: 20px; font-size: 12px; }
- .save-buttons {
- position: static;
- margin-bottom: 16px;
- display: flex;
- gap: 8px;
- justify-content: center;
- flex-direction: column;
- width: 100%;
- }
- .save-btn {
- width: 100%;
- }
- }
- </style>
- </head>
- <body>
- <div class="container">
- <div class="header">
- <div class="save-buttons">
- <button class="save-btn" onclick="saveAsImage()">保存为图片</button>
- <button class="save-btn" onclick="saveAsMultipleImages()">分段保存</button>
- </div>
- <div class="header-title">热点新闻分析</div>
- <div class="header-info">
- <div class="info-item">
- <span class="info-label">报告类型</span>
- <span class="info-value">"""
- # 处理报告类型显示
- if is_daily_summary:
- if mode == "current":
- html += "当前榜单"
- elif mode == "incremental":
- html += "增量模式"
- else:
- html += "当日汇总"
- else:
- html += "实时分析"
- html += """</span>
- </div>
- <div class="info-item">
- <span class="info-label">新闻总数</span>
- <span class="info-value">"""
- html += f"{total_titles} 条"
- # 计算筛选后的热点新闻数量
- hot_news_count = sum(len(stat["titles"]) for stat in report_data["stats"])
- html += """</span>
- </div>
- <div class="info-item">
- <span class="info-label">热点新闻</span>
- <span class="info-value">"""
- html += f"{hot_news_count} 条"
- html += """</span>
- </div>
- <div class="info-item">
- <span class="info-label">生成时间</span>
- <span class="info-value">"""
- now = get_beijing_time()
- html += now.strftime("%m-%d %H:%M")
- html += """</span>
- </div>
- </div>
- </div>
-
- <div class="content">"""
- # 处理失败ID错误信息
- if report_data["failed_ids"]:
- html += """
- <div class="error-section">
- <div class="error-title">⚠️ 请求失败的平台</div>
- <ul class="error-list">"""
- for id_value in report_data["failed_ids"]:
- html += f'<li class="error-item">{html_escape(id_value)}</li>'
- html += """
- </ul>
- </div>"""
- # 处理主要统计数据
- if report_data["stats"]:
- total_count = len(report_data["stats"])
- for i, stat in enumerate(report_data["stats"], 1):
- count = stat["count"]
- # 确定热度等级
- if count >= 10:
- count_class = "hot"
- elif count >= 5:
- count_class = "warm"
- else:
- count_class = ""
- escaped_word = html_escape(stat["word"])
- html += f"""
- <div class="word-group">
- <div class="word-header">
- <div class="word-info">
- <div class="word-name">{escaped_word}</div>
- <div class="word-count {count_class}">{count} 条</div>
- </div>
- <div class="word-index">{i}/{total_count}</div>
- </div>"""
- # 处理每个词组下的新闻标题,给每条新闻标上序号
- for j, title_data in enumerate(stat["titles"], 1):
- is_new = title_data.get("is_new", False)
- new_class = "new" if is_new else ""
- html += f"""
- <div class="news-item {new_class}">
- <div class="news-number">{j}</div>
- <div class="news-content">
- <div class="news-header">
- <span class="source-name">{html_escape(title_data["source_name"])}</span>"""
- # 处理排名显示
- ranks = title_data.get("ranks", [])
- if ranks:
- min_rank = min(ranks)
- max_rank = max(ranks)
- rank_threshold = title_data.get("rank_threshold", 10)
- # 确定排名等级
- if min_rank <= 3:
- rank_class = "top"
- elif min_rank <= rank_threshold:
- rank_class = "high"
- else:
- rank_class = ""
- if min_rank == max_rank:
- rank_text = str(min_rank)
- else:
- rank_text = f"{min_rank}-{max_rank}"
- html += f'<span class="rank-num {rank_class}">{rank_text}</span>'
- # 处理时间显示
- time_display = title_data.get("time_display", "")
- if time_display:
- # 简化时间显示格式,将波浪线替换为~
- simplified_time = (
- time_display.replace(" ~ ", "~")
- .replace("[", "")
- .replace("]", "")
- )
- html += (
- f'<span class="time-info">{html_escape(simplified_time)}</span>'
- )
- # 处理出现次数
- count_info = title_data.get("count", 1)
- if count_info > 1:
- html += f'<span class="count-info">{count_info}次</span>'
- html += """
- </div>
- <div class="news-title">"""
- # 处理标题和链接
- escaped_title = html_escape(title_data["title"])
- link_url = title_data.get("mobile_url") or title_data.get("url", "")
- if link_url:
- escaped_url = html_escape(link_url)
- html += f'<a href="{escaped_url}" target="_blank" class="news-link">{escaped_title}</a>'
- else:
- html += escaped_title
- html += """
- </div>
- </div>
- </div>"""
- html += """
- </div>"""
- # 处理新增新闻区域
- if report_data["new_titles"]:
- html += f"""
- <div class="new-section">
- <div class="new-section-title">本次新增热点 (共 {report_data['total_new_count']} 条)</div>"""
- for source_data in report_data["new_titles"]:
- escaped_source = html_escape(source_data["source_name"])
- titles_count = len(source_data["titles"])
- html += f"""
- <div class="new-source-group">
- <div class="new-source-title">{escaped_source} · {titles_count}条</div>"""
- # 为新增新闻也添加序号
- for idx, title_data in enumerate(source_data["titles"], 1):
- ranks = title_data.get("ranks", [])
- # 处理新增新闻的排名显示
- rank_class = ""
- if ranks:
- min_rank = min(ranks)
- if min_rank <= 3:
- rank_class = "top"
- elif min_rank <= title_data.get("rank_threshold", 10):
- rank_class = "high"
- if len(ranks) == 1:
- rank_text = str(ranks[0])
- else:
- rank_text = f"{min(ranks)}-{max(ranks)}"
- else:
- rank_text = "?"
- html += f"""
- <div class="new-item">
- <div class="new-item-number">{idx}</div>
- <div class="new-item-rank {rank_class}">{rank_text}</div>
- <div class="new-item-content">
- <div class="new-item-title">"""
- # 处理新增新闻的链接
- escaped_title = html_escape(title_data["title"])
- link_url = title_data.get("mobile_url") or title_data.get("url", "")
- if link_url:
- escaped_url = html_escape(link_url)
- html += f'<a href="{escaped_url}" target="_blank" class="news-link">{escaped_title}</a>'
- else:
- html += escaped_title
- html += """
- </div>
- </div>
- </div>"""
- html += """
- </div>"""
- html += """
- </div>"""
- html += """
- </div>
-
- <div class="footer">
- <div class="footer-content">
- 由 <span class="project-name">TrendRadar</span> 生成 ·
- <a href="https://github.com/sansan0/TrendRadar" target="_blank" class="footer-link">
- GitHub 开源项目
- </a>"""
- if update_info:
- html += f"""
- <br>
- <span style="color: #ea580c; font-weight: 500;">
- 发现新版本 {update_info['remote_version']},当前版本 {update_info['current_version']}
- </span>"""
- html += """
- </div>
- </div>
- </div>
-
- <script>
- async function saveAsImage() {
- const button = event.target;
- const originalText = button.textContent;
-
- try {
- button.textContent = '生成中...';
- button.disabled = true;
- window.scrollTo(0, 0);
-
- // 等待页面稳定
- await new Promise(resolve => setTimeout(resolve, 200));
-
- // 截图前隐藏按钮
- const buttons = document.querySelector('.save-buttons');
- buttons.style.visibility = 'hidden';
-
- // 再次等待确保按钮完全隐藏
- await new Promise(resolve => setTimeout(resolve, 100));
-
- const container = document.querySelector('.container');
-
- const canvas = await html2canvas(container, {
- backgroundColor: '#ffffff',
- scale: 1.5,
- useCORS: true,
- allowTaint: false,
- imageTimeout: 10000,
- removeContainer: false,
- foreignObjectRendering: false,
- logging: false,
- width: container.offsetWidth,
- height: container.offsetHeight,
- x: 0,
- y: 0,
- scrollX: 0,
- scrollY: 0,
- windowWidth: window.innerWidth,
- windowHeight: window.innerHeight
- });
-
- buttons.style.visibility = 'visible';
-
- const link = document.createElement('a');
- const now = new Date();
- const filename = `TrendRadar_热点新闻分析_${now.getFullYear()}${String(now.getMonth() + 1).padStart(2, '0')}${String(now.getDate()).padStart(2, '0')}_${String(now.getHours()).padStart(2, '0')}${String(now.getMinutes()).padStart(2, '0')}.png`;
-
- link.download = filename;
- link.href = canvas.toDataURL('image/png', 1.0);
-
- // 触发下载
- document.body.appendChild(link);
- link.click();
- document.body.removeChild(link);
-
- button.textContent = '保存成功!';
- setTimeout(() => {
- button.textContent = originalText;
- button.disabled = false;
- }, 2000);
-
- } catch (error) {
- const buttons = document.querySelector('.save-buttons');
- buttons.style.visibility = 'visible';
- button.textContent = '保存失败';
- setTimeout(() => {
- button.textContent = originalText;
- button.disabled = false;
- }, 2000);
- }
- }
-
- async function saveAsMultipleImages() {
- const button = event.target;
- const originalText = button.textContent;
- const container = document.querySelector('.container');
- const scale = 1.5;
- const maxHeight = 5000 / scale;
-
- try {
- button.textContent = '分析中...';
- button.disabled = true;
-
- // 获取所有可能的分割元素
- const newsItems = Array.from(container.querySelectorAll('.news-item'));
- const wordGroups = Array.from(container.querySelectorAll('.word-group'));
- const newSection = container.querySelector('.new-section');
- const errorSection = container.querySelector('.error-section');
- const header = container.querySelector('.header');
- const footer = container.querySelector('.footer');
-
- // 计算元素位置和高度
- const containerRect = container.getBoundingClientRect();
- const elements = [];
-
- // 添加header作为必须包含的元素
- elements.push({
- type: 'header',
- element: header,
- top: 0,
- bottom: header.offsetHeight,
- height: header.offsetHeight
- });
-
- // 添加错误信息(如果存在)
- if (errorSection) {
- const rect = errorSection.getBoundingClientRect();
- elements.push({
- type: 'error',
- element: errorSection,
- top: rect.top - containerRect.top,
- bottom: rect.bottom - containerRect.top,
- height: rect.height
- });
- }
-
- // 按word-group分组处理news-item
- wordGroups.forEach(group => {
- const groupRect = group.getBoundingClientRect();
- const groupNewsItems = group.querySelectorAll('.news-item');
-
- // 添加word-group的header部分
- const wordHeader = group.querySelector('.word-header');
- if (wordHeader) {
- const headerRect = wordHeader.getBoundingClientRect();
- elements.push({
- type: 'word-header',
- element: wordHeader,
- parent: group,
- top: groupRect.top - containerRect.top,
- bottom: headerRect.bottom - containerRect.top,
- height: headerRect.height
- });
- }
-
- // 添加每个news-item
- groupNewsItems.forEach(item => {
- const rect = item.getBoundingClientRect();
- elements.push({
- type: 'news-item',
- element: item,
- parent: group,
- top: rect.top - containerRect.top,
- bottom: rect.bottom - containerRect.top,
- height: rect.height
- });
- });
- });
-
- // 添加新增新闻部分
- if (newSection) {
- const rect = newSection.getBoundingClientRect();
- elements.push({
- type: 'new-section',
- element: newSection,
- top: rect.top - containerRect.top,
- bottom: rect.bottom - containerRect.top,
- height: rect.height
- });
- }
-
- // 添加footer
- const footerRect = footer.getBoundingClientRect();
- elements.push({
- type: 'footer',
- element: footer,
- top: footerRect.top - containerRect.top,
- bottom: footerRect.bottom - containerRect.top,
- height: footer.offsetHeight
- });
-
- // 计算分割点
- const segments = [];
- let currentSegment = { start: 0, end: 0, height: 0, includeHeader: true };
- let headerHeight = header.offsetHeight;
- currentSegment.height = headerHeight;
-
- for (let i = 1; i < elements.length; i++) {
- const element = elements[i];
- const potentialHeight = element.bottom - currentSegment.start;
-
- // 检查是否需要创建新分段
- if (potentialHeight > maxHeight && currentSegment.height > headerHeight) {
- // 在前一个元素结束处分割
- currentSegment.end = elements[i - 1].bottom;
- segments.push(currentSegment);
-
- // 开始新分段
- currentSegment = {
- start: currentSegment.end,
- end: 0,
- height: element.bottom - currentSegment.end,
- includeHeader: false
- };
- } else {
- currentSegment.height = potentialHeight;
- currentSegment.end = element.bottom;
- }
- }
-
- // 添加最后一个分段
- if (currentSegment.height > 0) {
- currentSegment.end = container.offsetHeight;
- segments.push(currentSegment);
- }
-
- button.textContent = `生成中 (0/${segments.length})...`;
-
- // 隐藏保存按钮
- const buttons = document.querySelector('.save-buttons');
- buttons.style.visibility = 'hidden';
-
- // 为每个分段生成图片
- const images = [];
- for (let i = 0; i < segments.length; i++) {
- const segment = segments[i];
- button.textContent = `生成中 (${i + 1}/${segments.length})...`;
-
- // 创建临时容器用于截图
- const tempContainer = document.createElement('div');
- tempContainer.style.cssText = `
- position: absolute;
- left: -9999px;
- top: 0;
- width: ${container.offsetWidth}px;
- background: white;
- `;
- tempContainer.className = 'container';
-
- // 克隆容器内容
- const clonedContainer = container.cloneNode(true);
-
- // 移除克隆内容中的保存按钮
- const clonedButtons = clonedContainer.querySelector('.save-buttons');
- if (clonedButtons) {
- clonedButtons.style.display = 'none';
- }
-
- tempContainer.appendChild(clonedContainer);
- document.body.appendChild(tempContainer);
-
- // 等待DOM更新
- await new Promise(resolve => setTimeout(resolve, 100));
-
- // 使用html2canvas截取特定区域
- const canvas = await html2canvas(clonedContainer, {
- backgroundColor: '#ffffff',
- scale: scale,
- useCORS: true,
- allowTaint: false,
- imageTimeout: 10000,
- logging: false,
- width: container.offsetWidth,
- height: segment.end - segment.start,
- x: 0,
- y: segment.start,
- windowWidth: window.innerWidth,
- windowHeight: window.innerHeight
- });
-
- images.push(canvas.toDataURL('image/png', 1.0));
-
- // 清理临时容器
- document.body.removeChild(tempContainer);
- }
-
- // 恢复按钮显示
- buttons.style.visibility = 'visible';
-
- // 下载所有图片
- const now = new Date();
- const baseFilename = `TrendRadar_热点新闻分析_${now.getFullYear()}${String(now.getMonth() + 1).padStart(2, '0')}${String(now.getDate()).padStart(2, '0')}_${String(now.getHours()).padStart(2, '0')}${String(now.getMinutes()).padStart(2, '0')}`;
-
- for (let i = 0; i < images.length; i++) {
- const link = document.createElement('a');
- link.download = `${baseFilename}_part${i + 1}.png`;
- link.href = images[i];
- document.body.appendChild(link);
- link.click();
- document.body.removeChild(link);
-
- // 延迟一下避免浏览器阻止多个下载
- await new Promise(resolve => setTimeout(resolve, 100));
- }
-
- button.textContent = `已保存 ${segments.length} 张图片!`;
- setTimeout(() => {
- button.textContent = originalText;
- button.disabled = false;
- }, 2000);
-
- } catch (error) {
- console.error('分段保存失败:', error);
- const buttons = document.querySelector('.save-buttons');
- buttons.style.visibility = 'visible';
- button.textContent = '保存失败';
- setTimeout(() => {
- button.textContent = originalText;
- button.disabled = false;
- }, 2000);
- }
- }
-
- document.addEventListener('DOMContentLoaded', function() {
- window.scrollTo(0, 0);
- });
- </script>
- </body>
- </html>
- """
- return html
- def render_feishu_content(
- report_data: Dict, update_info: Optional[Dict] = None, mode: str = "daily"
- ) -> str:
- """渲染飞书内容"""
- text_content = ""
- if report_data["stats"]:
- text_content += f"📊 **热点词汇统计**\n\n"
- total_count = len(report_data["stats"])
- for i, stat in enumerate(report_data["stats"]):
- word = stat["word"]
- count = stat["count"]
- sequence_display = f"<font color='grey'>[{i + 1}/{total_count}]</font>"
- if count >= 10:
- text_content += f"🔥 {sequence_display} **{word}** : <font color='red'>{count}</font> 条\n\n"
- elif count >= 5:
- text_content += f"📈 {sequence_display} **{word}** : <font color='orange'>{count}</font> 条\n\n"
- else:
- text_content += f"📌 {sequence_display} **{word}** : {count} 条\n\n"
- for j, title_data in enumerate(stat["titles"], 1):
- formatted_title = format_title_for_platform(
- "feishu", title_data, show_source=True
- )
- text_content += f" {j}. {formatted_title}\n"
- if j < len(stat["titles"]):
- text_content += "\n"
- if i < len(report_data["stats"]) - 1:
- text_content += f"\n{CONFIG['FEISHU_MESSAGE_SEPARATOR']}\n\n"
- if not text_content:
- if mode == "incremental":
- mode_text = "增量模式下暂无新增匹配的热点词汇"
- elif mode == "current":
- mode_text = "当前榜单模式下暂无匹配的热点词汇"
- else:
- mode_text = "暂无匹配的热点词汇"
- text_content = f"📭 {mode_text}\n\n"
- if report_data["new_titles"]:
- if text_content and "暂无匹配" not in text_content:
- text_content += f"\n{CONFIG['FEISHU_MESSAGE_SEPARATOR']}\n\n"
- text_content += (
- f"🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
- )
- for source_data in report_data["new_titles"]:
- text_content += (
- f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n"
- )
- for j, title_data in enumerate(source_data["titles"], 1):
- title_data_copy = title_data.copy()
- title_data_copy["is_new"] = False
- formatted_title = format_title_for_platform(
- "feishu", title_data_copy, show_source=False
- )
- text_content += f" {j}. {formatted_title}\n"
- text_content += "\n"
- if report_data["failed_ids"]:
- if text_content and "暂无匹配" not in text_content:
- text_content += f"\n{CONFIG['FEISHU_MESSAGE_SEPARATOR']}\n\n"
- text_content += "⚠️ **数据获取失败的平台:**\n\n"
- for i, id_value in enumerate(report_data["failed_ids"], 1):
- text_content += f" • <font color='red'>{id_value}</font>\n"
- now = get_beijing_time()
- text_content += (
- f"\n\n<font color='grey'>更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}</font>"
- )
- if update_info:
- text_content += f"\n<font color='grey'>TrendRadar 发现新版本 {update_info['remote_version']},当前 {update_info['current_version']}</font>"
- return text_content
- def render_dingtalk_content(
- report_data: Dict, update_info: Optional[Dict] = None, mode: str = "daily"
- ) -> str:
- """渲染钉钉内容"""
- text_content = ""
- total_titles = sum(
- len(stat["titles"]) for stat in report_data["stats"] if stat["count"] > 0
- )
- now = get_beijing_time()
- text_content += f"**总新闻数:** {total_titles}\n\n"
- text_content += f"**时间:** {now.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
- text_content += f"**类型:** 热点分析报告\n\n"
- text_content += "---\n\n"
- if report_data["stats"]:
- text_content += f"📊 **热点词汇统计**\n\n"
- total_count = len(report_data["stats"])
- for i, stat in enumerate(report_data["stats"]):
- word = stat["word"]
- count = stat["count"]
- sequence_display = f"[{i + 1}/{total_count}]"
- if count >= 10:
- text_content += f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
- elif count >= 5:
- text_content += f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
- else:
- text_content += f"📌 {sequence_display} **{word}** : {count} 条\n\n"
- for j, title_data in enumerate(stat["titles"], 1):
- formatted_title = format_title_for_platform(
- "dingtalk", title_data, show_source=True
- )
- text_content += f" {j}. {formatted_title}\n"
- if j < len(stat["titles"]):
- text_content += "\n"
- if i < len(report_data["stats"]) - 1:
- text_content += f"\n---\n\n"
- if not report_data["stats"]:
- if mode == "incremental":
- mode_text = "增量模式下暂无新增匹配的热点词汇"
- elif mode == "current":
- mode_text = "当前榜单模式下暂无匹配的热点词汇"
- else:
- mode_text = "暂无匹配的热点词汇"
- text_content += f"📭 {mode_text}\n\n"
- if report_data["new_titles"]:
- if text_content and "暂无匹配" not in text_content:
- text_content += f"\n---\n\n"
- text_content += (
- f"🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
- )
- for source_data in report_data["new_titles"]:
- text_content += f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
- for j, title_data in enumerate(source_data["titles"], 1):
- title_data_copy = title_data.copy()
- title_data_copy["is_new"] = False
- formatted_title = format_title_for_platform(
- "dingtalk", title_data_copy, show_source=False
- )
- text_content += f" {j}. {formatted_title}\n"
- text_content += "\n"
- if report_data["failed_ids"]:
- if text_content and "暂无匹配" not in text_content:
- text_content += f"\n---\n\n"
- text_content += "⚠️ **数据获取失败的平台:**\n\n"
- for i, id_value in enumerate(report_data["failed_ids"], 1):
- text_content += f" • **{id_value}**\n"
- text_content += f"\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
- if update_info:
- text_content += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**"
- return text_content
- def split_content_into_batches(
- report_data: Dict,
- format_type: str,
- update_info: Optional[Dict] = None,
- max_bytes: int = None,
- mode: str = "daily",
- ) -> List[str]:
- """分批处理消息内容,确保词组标题+至少第一条新闻的完整性"""
- if max_bytes is None:
- if format_type == "dingtalk":
- max_bytes = CONFIG.get("DINGTALK_BATCH_SIZE", 20000)
- elif format_type == "feishu":
- max_bytes = CONFIG.get("FEISHU_BATCH_SIZE", 29000)
- elif format_type == "ntfy":
- max_bytes = 3800
- else:
- max_bytes = CONFIG.get("MESSAGE_BATCH_SIZE", 4000)
- batches = []
- total_titles = sum(
- len(stat["titles"]) for stat in report_data["stats"] if stat["count"] > 0
- )
- now = get_beijing_time()
- base_header = ""
- if format_type == "wework":
- base_header = f"**总新闻数:** {total_titles}\n\n\n\n"
- elif format_type == "telegram":
- base_header = f"总新闻数: {total_titles}\n\n"
- elif format_type == "ntfy":
- base_header = f"**总新闻数:** {total_titles}\n\n"
- elif format_type == "feishu":
- base_header = ""
- elif format_type == "dingtalk":
- base_header = f"**总新闻数:** {total_titles}\n\n"
- base_header += f"**时间:** {now.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
- base_header += f"**类型:** 热点分析报告\n\n"
- base_header += "---\n\n"
- base_footer = ""
- if format_type == "wework":
- base_footer = f"\n\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
- if update_info:
- base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**"
- elif format_type == "telegram":
- base_footer = f"\n\n更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
- if update_info:
- base_footer += f"\nTrendRadar 发现新版本 {update_info['remote_version']},当前 {update_info['current_version']}"
- elif format_type == "ntfy":
- base_footer = f"\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
- if update_info:
- base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**"
- elif format_type == "feishu":
- base_footer = f"\n\n<font color='grey'>更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}</font>"
- if update_info:
- base_footer += f"\n<font color='grey'>TrendRadar 发现新版本 {update_info['remote_version']},当前 {update_info['current_version']}</font>"
- elif format_type == "dingtalk":
- base_footer = f"\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
- if update_info:
- base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**"
- stats_header = ""
- if report_data["stats"]:
- if format_type == "wework":
- stats_header = f"📊 **热点词汇统计**\n\n"
- elif format_type == "telegram":
- stats_header = f"📊 热点词汇统计\n\n"
- elif format_type == "ntfy":
- stats_header = f"📊 **热点词汇统计**\n\n"
- elif format_type == "feishu":
- stats_header = f"📊 **热点词汇统计**\n\n"
- elif format_type == "dingtalk":
- stats_header = f"📊 **热点词汇统计**\n\n"
- current_batch = base_header
- current_batch_has_content = False
- if (
- not report_data["stats"]
- and not report_data["new_titles"]
- and not report_data["failed_ids"]
- ):
- if mode == "incremental":
- mode_text = "增量模式下暂无新增匹配的热点词汇"
- elif mode == "current":
- mode_text = "当前榜单模式下暂无匹配的热点词汇"
- else:
- mode_text = "暂无匹配的热点词汇"
- simple_content = f"📭 {mode_text}\n\n"
- final_content = base_header + simple_content + base_footer
- batches.append(final_content)
- return batches
- # 处理热点词汇统计
- if report_data["stats"]:
- total_count = len(report_data["stats"])
- # 添加统计标题
- test_content = current_batch + stats_header
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- < max_bytes
- ):
- current_batch = test_content
- current_batch_has_content = True
- else:
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + stats_header
- current_batch_has_content = True
- # 逐个处理词组(确保词组标题+第一条新闻的原子性)
- for i, stat in enumerate(report_data["stats"]):
- word = stat["word"]
- count = stat["count"]
- sequence_display = f"[{i + 1}/{total_count}]"
- # 构建词组标题
- word_header = ""
- if format_type == "wework":
- if count >= 10:
- word_header = (
- f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
- )
- elif count >= 5:
- word_header = (
- f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
- )
- else:
- word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n"
- elif format_type == "telegram":
- if count >= 10:
- word_header = f"🔥 {sequence_display} {word} : {count} 条\n\n"
- elif count >= 5:
- word_header = f"📈 {sequence_display} {word} : {count} 条\n\n"
- else:
- word_header = f"📌 {sequence_display} {word} : {count} 条\n\n"
- elif format_type == "ntfy":
- if count >= 10:
- word_header = (
- f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
- )
- elif count >= 5:
- word_header = (
- f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
- )
- else:
- word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n"
- elif format_type == "feishu":
- if count >= 10:
- word_header = f"🔥 <font color='grey'>{sequence_display}</font> **{word}** : <font color='red'>{count}</font> 条\n\n"
- elif count >= 5:
- word_header = f"📈 <font color='grey'>{sequence_display}</font> **{word}** : <font color='orange'>{count}</font> 条\n\n"
- else:
- word_header = f"📌 <font color='grey'>{sequence_display}</font> **{word}** : {count} 条\n\n"
- elif format_type == "dingtalk":
- if count >= 10:
- word_header = (
- f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
- )
- elif count >= 5:
- word_header = (
- f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
- )
- else:
- word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n"
- # 构建第一条新闻
- first_news_line = ""
- if stat["titles"]:
- first_title_data = stat["titles"][0]
- if format_type == "wework":
- formatted_title = format_title_for_platform(
- "wework", first_title_data, show_source=True
- )
- elif format_type == "telegram":
- formatted_title = format_title_for_platform(
- "telegram", first_title_data, show_source=True
- )
- elif format_type == "ntfy":
- formatted_title = format_title_for_platform(
- "ntfy", first_title_data, show_source=True
- )
- elif format_type == "feishu":
- formatted_title = format_title_for_platform(
- "feishu", first_title_data, show_source=True
- )
- elif format_type == "dingtalk":
- formatted_title = format_title_for_platform(
- "dingtalk", first_title_data, show_source=True
- )
- else:
- formatted_title = f"{first_title_data['title']}"
- first_news_line = f" 1. {formatted_title}\n"
- if len(stat["titles"]) > 1:
- first_news_line += "\n"
- # 原子性检查:词组标题+第一条新闻必须一起处理
- word_with_first_news = word_header + first_news_line
- test_content = current_batch + word_with_first_news
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- # 当前批次容纳不下,开启新批次
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + stats_header + word_with_first_news
- current_batch_has_content = True
- start_index = 1
- else:
- current_batch = test_content
- current_batch_has_content = True
- start_index = 1
- # 处理剩余新闻条目
- for j in range(start_index, len(stat["titles"])):
- title_data = stat["titles"][j]
- if format_type == "wework":
- formatted_title = format_title_for_platform(
- "wework", title_data, show_source=True
- )
- elif format_type == "telegram":
- formatted_title = format_title_for_platform(
- "telegram", title_data, show_source=True
- )
- elif format_type == "ntfy":
- formatted_title = format_title_for_platform(
- "ntfy", title_data, show_source=True
- )
- elif format_type == "feishu":
- formatted_title = format_title_for_platform(
- "feishu", title_data, show_source=True
- )
- elif format_type == "dingtalk":
- formatted_title = format_title_for_platform(
- "dingtalk", title_data, show_source=True
- )
- else:
- formatted_title = f"{title_data['title']}"
- news_line = f" {j + 1}. {formatted_title}\n"
- if j < len(stat["titles"]) - 1:
- news_line += "\n"
- test_content = current_batch + news_line
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + stats_header + word_header + news_line
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- # 词组间分隔符
- if i < len(report_data["stats"]) - 1:
- separator = ""
- if format_type == "wework":
- separator = f"\n\n\n\n"
- elif format_type == "telegram":
- separator = f"\n\n"
- elif format_type == "ntfy":
- separator = f"\n\n"
- elif format_type == "feishu":
- separator = f"\n{CONFIG['FEISHU_MESSAGE_SEPARATOR']}\n\n"
- elif format_type == "dingtalk":
- separator = f"\n---\n\n"
- test_content = current_batch + separator
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- < max_bytes
- ):
- current_batch = test_content
- # 处理新增新闻(同样确保来源标题+第一条新闻的原子性)
- if report_data["new_titles"]:
- new_header = ""
- if format_type == "wework":
- new_header = f"\n\n\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
- elif format_type == "telegram":
- new_header = (
- f"\n\n🆕 本次新增热点新闻 (共 {report_data['total_new_count']} 条)\n\n"
- )
- elif format_type == "ntfy":
- new_header = f"\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
- elif format_type == "feishu":
- new_header = f"\n{CONFIG['FEISHU_MESSAGE_SEPARATOR']}\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
- elif format_type == "dingtalk":
- new_header = f"\n---\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
- test_content = current_batch + new_header
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + new_header
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- # 逐个处理新增新闻来源
- for source_data in report_data["new_titles"]:
- source_header = ""
- if format_type == "wework":
- source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
- elif format_type == "telegram":
- source_header = f"{source_data['source_name']} ({len(source_data['titles'])} 条):\n\n"
- elif format_type == "ntfy":
- source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
- elif format_type == "feishu":
- source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
- elif format_type == "dingtalk":
- source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
- # 构建第一条新增新闻
- first_news_line = ""
- if source_data["titles"]:
- first_title_data = source_data["titles"][0]
- title_data_copy = first_title_data.copy()
- title_data_copy["is_new"] = False
- if format_type == "wework":
- formatted_title = format_title_for_platform(
- "wework", title_data_copy, show_source=False
- )
- elif format_type == "telegram":
- formatted_title = format_title_for_platform(
- "telegram", title_data_copy, show_source=False
- )
- elif format_type == "feishu":
- formatted_title = format_title_for_platform(
- "feishu", title_data_copy, show_source=False
- )
- elif format_type == "dingtalk":
- formatted_title = format_title_for_platform(
- "dingtalk", title_data_copy, show_source=False
- )
- else:
- formatted_title = f"{title_data_copy['title']}"
- first_news_line = f" 1. {formatted_title}\n"
- # 原子性检查:来源标题+第一条新闻
- source_with_first_news = source_header + first_news_line
- test_content = current_batch + source_with_first_news
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + new_header + source_with_first_news
- current_batch_has_content = True
- start_index = 1
- else:
- current_batch = test_content
- current_batch_has_content = True
- start_index = 1
- # 处理剩余新增新闻
- for j in range(start_index, len(source_data["titles"])):
- title_data = source_data["titles"][j]
- title_data_copy = title_data.copy()
- title_data_copy["is_new"] = False
- if format_type == "wework":
- formatted_title = format_title_for_platform(
- "wework", title_data_copy, show_source=False
- )
- elif format_type == "telegram":
- formatted_title = format_title_for_platform(
- "telegram", title_data_copy, show_source=False
- )
- elif format_type == "feishu":
- formatted_title = format_title_for_platform(
- "feishu", title_data_copy, show_source=False
- )
- elif format_type == "dingtalk":
- formatted_title = format_title_for_platform(
- "dingtalk", title_data_copy, show_source=False
- )
- else:
- formatted_title = f"{title_data_copy['title']}"
- news_line = f" {j + 1}. {formatted_title}\n"
- test_content = current_batch + news_line
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + new_header + source_header + news_line
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- current_batch += "\n"
- if report_data["failed_ids"]:
- failed_header = ""
- if format_type == "wework":
- failed_header = f"\n\n\n\n⚠️ **数据获取失败的平台:**\n\n"
- elif format_type == "telegram":
- failed_header = f"\n\n⚠️ 数据获取失败的平台:\n\n"
- elif format_type == "ntfy":
- failed_header = f"\n\n⚠️ **数据获取失败的平台:**\n\n"
- elif format_type == "feishu":
- failed_header = f"\n{CONFIG['FEISHU_MESSAGE_SEPARATOR']}\n\n⚠️ **数据获取失败的平台:**\n\n"
- elif format_type == "dingtalk":
- failed_header = f"\n---\n\n⚠️ **数据获取失败的平台:**\n\n"
- test_content = current_batch + failed_header
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + failed_header
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- for i, id_value in enumerate(report_data["failed_ids"], 1):
- if format_type == "feishu":
- failed_line = f" • <font color='red'>{id_value}</font>\n"
- elif format_type == "dingtalk":
- failed_line = f" • **{id_value}**\n"
- else:
- failed_line = f" • {id_value}\n"
- test_content = current_batch + failed_line
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + failed_header + failed_line
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- # 完成最后批次
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- return batches
- def send_to_notifications(
- stats: List[Dict],
- failed_ids: Optional[List] = None,
- report_type: str = "当日汇总",
- new_titles: Optional[Dict] = None,
- id_to_name: Optional[Dict] = None,
- update_info: Optional[Dict] = None,
- proxy_url: Optional[str] = None,
- mode: str = "daily",
- html_file_path: Optional[str] = None,
- ) -> Dict[str, bool]:
- """发送数据到多个通知平台"""
- results = {}
- if CONFIG["PUSH_WINDOW"]["ENABLED"]:
- push_manager = PushRecordManager()
- time_range_start = CONFIG["PUSH_WINDOW"]["TIME_RANGE"]["START"]
- time_range_end = CONFIG["PUSH_WINDOW"]["TIME_RANGE"]["END"]
- if not push_manager.is_in_time_range(time_range_start, time_range_end):
- now = get_beijing_time()
- print(
- f"推送窗口控制:当前时间 {now.strftime('%H:%M')} 不在推送时间窗口 {time_range_start}-{time_range_end} 内,跳过推送"
- )
- return results
- if CONFIG["PUSH_WINDOW"]["ONCE_PER_DAY"]:
- if push_manager.has_pushed_today():
- print(f"推送窗口控制:今天已推送过,跳过本次推送")
- return results
- else:
- print(f"推送窗口控制:今天首次推送")
- report_data = prepare_report_data(stats, failed_ids, new_titles, id_to_name, mode)
- feishu_url = CONFIG["FEISHU_WEBHOOK_URL"]
- dingtalk_url = CONFIG["DINGTALK_WEBHOOK_URL"]
- wework_url = CONFIG["WEWORK_WEBHOOK_URL"]
- telegram_token = CONFIG["TELEGRAM_BOT_TOKEN"]
- telegram_chat_id = CONFIG["TELEGRAM_CHAT_ID"]
- email_from = CONFIG["EMAIL_FROM"]
- email_password = CONFIG["EMAIL_PASSWORD"]
- email_to = CONFIG["EMAIL_TO"]
- email_smtp_server = CONFIG.get("EMAIL_SMTP_SERVER", "")
- email_smtp_port = CONFIG.get("EMAIL_SMTP_PORT", "")
- ntfy_server_url = CONFIG["NTFY_SERVER_URL"]
- ntfy_topic = CONFIG["NTFY_TOPIC"]
- ntfy_token = CONFIG.get("NTFY_TOKEN", "")
- update_info_to_send = update_info if CONFIG["SHOW_VERSION_UPDATE"] else None
- # 发送到飞书
- if feishu_url:
- results["feishu"] = send_to_feishu(
- feishu_url, report_data, report_type, update_info_to_send, proxy_url, mode
- )
- # 发送到钉钉
- if dingtalk_url:
- results["dingtalk"] = send_to_dingtalk(
- dingtalk_url, report_data, report_type, update_info_to_send, proxy_url, mode
- )
- # 发送到企业微信
- if wework_url:
- results["wework"] = send_to_wework(
- wework_url, report_data, report_type, update_info_to_send, proxy_url, mode
- )
- # 发送到 Telegram
- if telegram_token and telegram_chat_id:
- results["telegram"] = send_to_telegram(
- telegram_token,
- telegram_chat_id,
- report_data,
- report_type,
- update_info_to_send,
- proxy_url,
- mode,
- )
- # 发送到 ntfy
- if ntfy_server_url and ntfy_topic:
- results["ntfy"] = send_to_ntfy(
- ntfy_server_url,
- ntfy_topic,
- ntfy_token,
- report_data,
- report_type,
- update_info_to_send,
- proxy_url,
- mode,
- )
- # 发送邮件
- if email_from and email_password and email_to:
- results["email"] = send_to_email(
- email_from,
- email_password,
- email_to,
- report_type,
- html_file_path,
- email_smtp_server,
- email_smtp_port,
- )
- if not results:
- print("未配置任何通知渠道,跳过通知发送")
- # 如果成功发送了任何通知,且启用了每天只推一次,则记录推送
- if (
- CONFIG["PUSH_WINDOW"]["ENABLED"]
- and CONFIG["PUSH_WINDOW"]["ONCE_PER_DAY"]
- and any(results.values())
- ):
- push_manager = PushRecordManager()
- push_manager.record_push(report_type)
- return results
- def send_to_feishu(
- webhook_url: str,
- report_data: Dict,
- report_type: str,
- update_info: Optional[Dict] = None,
- proxy_url: Optional[str] = None,
- mode: str = "daily",
- ) -> bool:
- """发送到飞书(支持分批发送)"""
- headers = {"Content-Type": "application/json"}
- proxies = None
- if proxy_url:
- proxies = {"http": proxy_url, "https": proxy_url}
- # 获取分批内容,使用飞书专用的批次大小
- batches = split_content_into_batches(
- report_data,
- "feishu",
- update_info,
- max_bytes=CONFIG.get("FEISHU_BATCH_SIZE", 29000),
- mode=mode,
- )
- print(f"飞书消息分为 {len(batches)} 批次发送 [{report_type}]")
- # 逐批发送
- for i, batch_content in enumerate(batches, 1):
- batch_size = len(batch_content.encode("utf-8"))
- print(
- f"发送飞书第 {i}/{len(batches)} 批次,大小:{batch_size} 字节 [{report_type}]"
- )
- # 添加批次标识
- if len(batches) > 1:
- batch_header = f"**[第 {i}/{len(batches)} 批次]**\n\n"
- # 将批次标识插入到适当位置(在统计标题之后)
- if "📊 **热点词汇统计**" in batch_content:
- batch_content = batch_content.replace(
- "📊 **热点词汇统计**\n\n", f"📊 **热点词汇统计** {batch_header}"
- )
- else:
- # 如果没有统计标题,直接在开头添加
- batch_content = batch_header + batch_content
- total_titles = sum(
- len(stat["titles"]) for stat in report_data["stats"] if stat["count"] > 0
- )
- now = get_beijing_time()
- payload = {
- "msg_type": "text",
- "content": {
- "total_titles": total_titles,
- "timestamp": now.strftime("%Y-%m-%d %H:%M:%S"),
- "report_type": report_type,
- "text": batch_content,
- },
- }
- try:
- response = requests.post(
- webhook_url, headers=headers, json=payload, proxies=proxies, timeout=30
- )
- if response.status_code == 200:
- result = response.json()
- # 检查飞书的响应状态
- if result.get("StatusCode") == 0 or result.get("code") == 0:
- print(f"飞书第 {i}/{len(batches)} 批次发送成功 [{report_type}]")
- # 批次间间隔
- if i < len(batches):
- time.sleep(CONFIG["BATCH_SEND_INTERVAL"])
- else:
- error_msg = result.get("msg") or result.get("StatusMessage", "未知错误")
- print(
- f"飞书第 {i}/{len(batches)} 批次发送失败 [{report_type}],错误:{error_msg}"
- )
- return False
- else:
- print(
- f"飞书第 {i}/{len(batches)} 批次发送失败 [{report_type}],状态码:{response.status_code}"
- )
- return False
- except Exception as e:
- print(f"飞书第 {i}/{len(batches)} 批次发送出错 [{report_type}]:{e}")
- return False
- print(f"飞书所有 {len(batches)} 批次发送完成 [{report_type}]")
- return True
- def send_to_dingtalk(
- webhook_url: str,
- report_data: Dict,
- report_type: str,
- update_info: Optional[Dict] = None,
- proxy_url: Optional[str] = None,
- mode: str = "daily",
- ) -> bool:
- """发送到钉钉(支持分批发送)"""
- headers = {"Content-Type": "application/json"}
- proxies = None
- if proxy_url:
- proxies = {"http": proxy_url, "https": proxy_url}
- # 获取分批内容,使用钉钉专用的批次大小
- batches = split_content_into_batches(
- report_data,
- "dingtalk",
- update_info,
- max_bytes=CONFIG.get("DINGTALK_BATCH_SIZE", 20000),
- mode=mode,
- )
- print(f"钉钉消息分为 {len(batches)} 批次发送 [{report_type}]")
- # 逐批发送
- for i, batch_content in enumerate(batches, 1):
- batch_size = len(batch_content.encode("utf-8"))
- print(
- f"发送钉钉第 {i}/{len(batches)} 批次,大小:{batch_size} 字节 [{report_type}]"
- )
- # 添加批次标识
- if len(batches) > 1:
- batch_header = f"**[第 {i}/{len(batches)} 批次]**\n\n"
- # 将批次标识插入到适当位置(在标题之后)
- if "📊 **热点词汇统计**" in batch_content:
- batch_content = batch_content.replace(
- "📊 **热点词汇统计**\n\n", f"📊 **热点词汇统计** {batch_header}\n\n"
- )
- else:
- # 如果没有统计标题,直接在开头添加
- batch_content = batch_header + batch_content
- payload = {
- "msgtype": "markdown",
- "markdown": {
- "title": f"TrendRadar 热点分析报告 - {report_type}",
- "text": batch_content,
- },
- }
- try:
- response = requests.post(
- webhook_url, headers=headers, json=payload, proxies=proxies, timeout=30
- )
- if response.status_code == 200:
- result = response.json()
- if result.get("errcode") == 0:
- print(f"钉钉第 {i}/{len(batches)} 批次发送成功 [{report_type}]")
- # 批次间间隔
- if i < len(batches):
- time.sleep(CONFIG["BATCH_SEND_INTERVAL"])
- else:
- print(
- f"钉钉第 {i}/{len(batches)} 批次发送失败 [{report_type}],错误:{result.get('errmsg')}"
- )
- return False
- else:
- print(
- f"钉钉第 {i}/{len(batches)} 批次发送失败 [{report_type}],状态码:{response.status_code}"
- )
- return False
- except Exception as e:
- print(f"钉钉第 {i}/{len(batches)} 批次发送出错 [{report_type}]:{e}")
- return False
- print(f"钉钉所有 {len(batches)} 批次发送完成 [{report_type}]")
- return True
- def strip_markdown(text: str) -> str:
- """去除文本中的 markdown 语法格式,用于个人微信推送"""
- # 去除粗体 **text** 或 __text__
- text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
- text = re.sub(r'__(.+?)__', r'\1', text)
- # 去除斜体 *text* 或 _text_
- text = re.sub(r'\*(.+?)\*', r'\1', text)
- text = re.sub(r'_(.+?)_', r'\1', text)
- # 去除删除线 ~~text~~
- text = re.sub(r'~~(.+?)~~', r'\1', text)
- # 转换链接 [text](url) -> text url(保留 URL)
- text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1 \2', text)
- # 如果不需要保留 URL,可以使用下面这行(只保留标题文本):
- # text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text)
- # 去除图片  -> alt
- text = re.sub(r'!\[(.+?)\]\(.+?\)', r'\1', text)
- # 去除行内代码 `code`
- text = re.sub(r'`(.+?)`', r'\1', text)
- # 去除引用符号 >
- text = re.sub(r'^>\s*', '', text, flags=re.MULTILINE)
- # 去除标题符号 # ## ### 等
- text = re.sub(r'^#+\s*', '', text, flags=re.MULTILINE)
- # 去除水平分割线 --- 或 ***
- text = re.sub(r'^[\-\*]{3,}\s*$', '', text, flags=re.MULTILINE)
- # 去除 HTML 标签 <font color='xxx'>text</font> -> text
- text = re.sub(r'<font[^>]*>(.+?)</font>', r'\1', text)
- text = re.sub(r'<[^>]+>', '', text)
- # 清理多余的空行(保留最多两个连续空行)
- text = re.sub(r'\n{3,}', '\n\n', text)
- return text.strip()
- def send_to_wework(
- webhook_url: str,
- report_data: Dict,
- report_type: str,
- update_info: Optional[Dict] = None,
- proxy_url: Optional[str] = None,
- mode: str = "daily",
- ) -> bool:
- """发送到企业微信(支持分批发送,支持 markdown 和 text 两种格式)"""
- headers = {"Content-Type": "application/json"}
- proxies = None
- if proxy_url:
- proxies = {"http": proxy_url, "https": proxy_url}
- # 获取消息类型配置(markdown 或 text)
- msg_type = CONFIG.get("WEWORK_MSG_TYPE", "markdown").lower()
- is_text_mode = msg_type == "text"
- if is_text_mode:
- print(f"企业微信使用 text 格式(个人微信模式)[{report_type}]")
- else:
- print(f"企业微信使用 markdown 格式(群机器人模式)[{report_type}]")
- # 获取分批内容
- batches = split_content_into_batches(report_data, "wework", update_info, mode=mode)
- print(f"企业微信消息分为 {len(batches)} 批次发送 [{report_type}]")
- # 逐批发送
- for i, batch_content in enumerate(batches, 1):
- # 添加批次标识
- if len(batches) > 1:
- if is_text_mode:
- batch_header = f"[第 {i}/{len(batches)} 批次]\n\n"
- else:
- batch_header = f"**[第 {i}/{len(batches)} 批次]**\n\n"
- batch_content = batch_header + batch_content
- # 根据消息类型构建 payload
- if is_text_mode:
- # text 格式:去除 markdown 语法
- plain_content = strip_markdown(batch_content)
- payload = {"msgtype": "text", "text": {"content": plain_content}}
- batch_size = len(plain_content.encode("utf-8"))
- else:
- # markdown 格式:保持原样
- payload = {"msgtype": "markdown", "markdown": {"content": batch_content}}
- batch_size = len(batch_content.encode("utf-8"))
- print(
- f"发送企业微信第 {i}/{len(batches)} 批次,大小:{batch_size} 字节 [{report_type}]"
- )
- try:
- response = requests.post(
- webhook_url, headers=headers, json=payload, proxies=proxies, timeout=30
- )
- if response.status_code == 200:
- result = response.json()
- if result.get("errcode") == 0:
- print(f"企业微信第 {i}/{len(batches)} 批次发送成功 [{report_type}]")
- # 批次间间隔
- if i < len(batches):
- time.sleep(CONFIG["BATCH_SEND_INTERVAL"])
- else:
- print(
- f"企业微信第 {i}/{len(batches)} 批次发送失败 [{report_type}],错误:{result.get('errmsg')}"
- )
- return False
- else:
- print(
- f"企业微信第 {i}/{len(batches)} 批次发送失败 [{report_type}],状态码:{response.status_code}"
- )
- return False
- except Exception as e:
- print(f"企业微信第 {i}/{len(batches)} 批次发送出错 [{report_type}]:{e}")
- return False
- print(f"企业微信所有 {len(batches)} 批次发送完成 [{report_type}]")
- return True
- def send_to_telegram(
- bot_token: str,
- chat_id: str,
- report_data: Dict,
- report_type: str,
- update_info: Optional[Dict] = None,
- proxy_url: Optional[str] = None,
- mode: str = "daily",
- ) -> bool:
- """发送到Telegram(支持分批发送)"""
- headers = {"Content-Type": "application/json"}
- url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
- proxies = None
- if proxy_url:
- proxies = {"http": proxy_url, "https": proxy_url}
- # 获取分批内容
- batches = split_content_into_batches(
- report_data, "telegram", update_info, mode=mode
- )
- print(f"Telegram消息分为 {len(batches)} 批次发送 [{report_type}]")
- # 逐批发送
- for i, batch_content in enumerate(batches, 1):
- batch_size = len(batch_content.encode("utf-8"))
- print(
- f"发送Telegram第 {i}/{len(batches)} 批次,大小:{batch_size} 字节 [{report_type}]"
- )
- # 添加批次标识
- if len(batches) > 1:
- batch_header = f"<b>[第 {i}/{len(batches)} 批次]</b>\n\n"
- batch_content = batch_header + batch_content
- payload = {
- "chat_id": chat_id,
- "text": batch_content,
- "parse_mode": "HTML",
- "disable_web_page_preview": True,
- }
- try:
- response = requests.post(
- url, headers=headers, json=payload, proxies=proxies, timeout=30
- )
- if response.status_code == 200:
- result = response.json()
- if result.get("ok"):
- print(f"Telegram第 {i}/{len(batches)} 批次发送成功 [{report_type}]")
- # 批次间间隔
- if i < len(batches):
- time.sleep(CONFIG["BATCH_SEND_INTERVAL"])
- else:
- print(
- f"Telegram第 {i}/{len(batches)} 批次发送失败 [{report_type}],错误:{result.get('description')}"
- )
- return False
- else:
- print(
- f"Telegram第 {i}/{len(batches)} 批次发送失败 [{report_type}],状态码:{response.status_code}"
- )
- return False
- except Exception as e:
- print(f"Telegram第 {i}/{len(batches)} 批次发送出错 [{report_type}]:{e}")
- return False
- print(f"Telegram所有 {len(batches)} 批次发送完成 [{report_type}]")
- return True
- def send_to_email(
- from_email: str,
- password: str,
- to_email: str,
- report_type: str,
- html_file_path: str,
- custom_smtp_server: Optional[str] = None,
- custom_smtp_port: Optional[int] = None,
- ) -> bool:
- """发送邮件通知"""
- try:
- if not html_file_path or not Path(html_file_path).exists():
- print(f"错误:HTML文件不存在或未提供: {html_file_path}")
- return False
- print(f"使用HTML文件: {html_file_path}")
- with open(html_file_path, "r", encoding="utf-8") as f:
- html_content = f.read()
- domain = from_email.split("@")[-1].lower()
- if custom_smtp_server and custom_smtp_port:
- # 使用自定义 SMTP 配置
- smtp_server = custom_smtp_server
- smtp_port = int(custom_smtp_port)
- # 根据端口判断加密方式:465=SSL, 587=TLS
- if smtp_port == 465:
- use_tls = False # SSL 模式(SMTP_SSL)
- elif smtp_port == 587:
- use_tls = True # TLS 模式(STARTTLS)
- else:
- # 其他端口优先尝试 TLS(更安全,更广泛支持)
- use_tls = True
- elif domain in SMTP_CONFIGS:
- # 使用预设配置
- config = SMTP_CONFIGS[domain]
- smtp_server = config["server"]
- smtp_port = config["port"]
- use_tls = config["encryption"] == "TLS"
- else:
- print(f"未识别的邮箱服务商: {domain},使用通用 SMTP 配置")
- smtp_server = f"smtp.{domain}"
- smtp_port = 587
- use_tls = True
- msg = MIMEMultipart("alternative")
- # 严格按照 RFC 标准设置 From header
- sender_name = "TrendRadar"
- msg["From"] = formataddr((sender_name, from_email))
- # 设置收件人
- recipients = [addr.strip() for addr in to_email.split(",")]
- if len(recipients) == 1:
- msg["To"] = recipients[0]
- else:
- msg["To"] = ", ".join(recipients)
- # 设置邮件主题
- now = get_beijing_time()
- subject = f"TrendRadar 热点分析报告 - {report_type} - {now.strftime('%m月%d日 %H:%M')}"
- msg["Subject"] = Header(subject, "utf-8")
- # 设置其他标准 header
- msg["MIME-Version"] = "1.0"
- msg["Date"] = formatdate(localtime=True)
- msg["Message-ID"] = make_msgid()
- # 添加纯文本部分(作为备选)
- text_content = f"""
- TrendRadar 热点分析报告
- ========================
- 报告类型:{report_type}
- 生成时间:{now.strftime('%Y-%m-%d %H:%M:%S')}
- 请使用支持HTML的邮件客户端查看完整报告内容。
- """
- text_part = MIMEText(text_content, "plain", "utf-8")
- msg.attach(text_part)
- html_part = MIMEText(html_content, "html", "utf-8")
- msg.attach(html_part)
- print(f"正在发送邮件到 {to_email}...")
- print(f"SMTP 服务器: {smtp_server}:{smtp_port}")
- print(f"发件人: {from_email}")
- try:
- if use_tls:
- # TLS 模式
- server = smtplib.SMTP(smtp_server, smtp_port, timeout=30)
- server.set_debuglevel(0) # 设为1可以查看详细调试信息
- server.ehlo()
- server.starttls()
- server.ehlo()
- else:
- # SSL 模式
- server = smtplib.SMTP_SSL(smtp_server, smtp_port, timeout=30)
- server.set_debuglevel(0)
- server.ehlo()
- # 登录
- server.login(from_email, password)
- # 发送邮件
- server.send_message(msg)
- server.quit()
- print(f"邮件发送成功 [{report_type}] -> {to_email}")
- return True
- except smtplib.SMTPServerDisconnected:
- print(f"邮件发送失败:服务器意外断开连接,请检查网络或稍后重试")
- return False
- except smtplib.SMTPAuthenticationError as e:
- print(f"邮件发送失败:认证错误,请检查邮箱和密码/授权码")
- print(f"详细错误: {str(e)}")
- return False
- except smtplib.SMTPRecipientsRefused as e:
- print(f"邮件发送失败:收件人地址被拒绝 {e}")
- return False
- except smtplib.SMTPSenderRefused as e:
- print(f"邮件发送失败:发件人地址被拒绝 {e}")
- return False
- except smtplib.SMTPDataError as e:
- print(f"邮件发送失败:邮件数据错误 {e}")
- return False
- except smtplib.SMTPConnectError as e:
- print(f"邮件发送失败:无法连接到 SMTP 服务器 {smtp_server}:{smtp_port}")
- print(f"详细错误: {str(e)}")
- return False
- except Exception as e:
- print(f"邮件发送失败 [{report_type}]:{e}")
- import traceback
- traceback.print_exc()
- return False
- def send_to_ntfy(
- server_url: str,
- topic: str,
- token: Optional[str],
- report_data: Dict,
- report_type: str,
- update_info: Optional[Dict] = None,
- proxy_url: Optional[str] = None,
- mode: str = "daily",
- ) -> bool:
- """发送到ntfy(支持分批发送,严格遵守4KB限制)"""
- # 避免 HTTP header 编码问题
- report_type_en_map = {
- "当日汇总": "Daily Summary",
- "当前榜单汇总": "Current Ranking",
- "增量更新": "Incremental Update",
- "实时增量": "Realtime Incremental",
- "实时当前榜单": "Realtime Current Ranking",
- }
- report_type_en = report_type_en_map.get(report_type, "News Report")
- headers = {
- "Content-Type": "text/plain; charset=utf-8",
- "Markdown": "yes",
- "Title": report_type_en,
- "Priority": "default",
- "Tags": "news",
- }
- if token:
- headers["Authorization"] = f"Bearer {token}"
-
- # 构建完整URL,确保格式正确
- base_url = server_url.rstrip("/")
- if not base_url.startswith(("http://", "https://")):
- base_url = f"https://{base_url}"
- url = f"{base_url}/{topic}"
- proxies = None
- if proxy_url:
- proxies = {"http": proxy_url, "https": proxy_url}
- # 获取分批内容,使用ntfy专用的4KB限制
- batches = split_content_into_batches(
- report_data, "ntfy", update_info, max_bytes=3800, mode=mode
- )
- total_batches = len(batches)
- print(f"ntfy消息分为 {total_batches} 批次发送 [{report_type}]")
- # 反转批次顺序,使得在ntfy客户端显示时顺序正确
- # ntfy显示最新消息在上面,所以我们从最后一批开始推送
- reversed_batches = list(reversed(batches))
-
- print(f"ntfy将按反向顺序推送(最后批次先推送),确保客户端显示顺序正确")
- # 逐批发送(反向顺序)
- success_count = 0
- for idx, batch_content in enumerate(reversed_batches, 1):
- # 计算正确的批次编号(用户视角的编号)
- actual_batch_num = total_batches - idx + 1
-
- batch_size = len(batch_content.encode("utf-8"))
- print(
- f"发送ntfy第 {actual_batch_num}/{total_batches} 批次(推送顺序: {idx}/{total_batches}),大小:{batch_size} 字节 [{report_type}]"
- )
- # 检查消息大小,确保不超过4KB
- if batch_size > 4096:
- print(f"警告:ntfy第 {actual_batch_num} 批次消息过大({batch_size} 字节),可能被拒绝")
- # 添加批次标识(使用正确的批次编号)
- current_headers = headers.copy()
- if total_batches > 1:
- batch_header = f"**[第 {actual_batch_num}/{total_batches} 批次]**\n\n"
- batch_content = batch_header + batch_content
- current_headers["Title"] = (
- f"{report_type_en} ({actual_batch_num}/{total_batches})"
- )
- try:
- response = requests.post(
- url,
- headers=current_headers,
- data=batch_content.encode("utf-8"),
- proxies=proxies,
- timeout=30,
- )
- if response.status_code == 200:
- print(f"ntfy第 {actual_batch_num}/{total_batches} 批次发送成功 [{report_type}]")
- success_count += 1
- if idx < total_batches:
- # 公共服务器建议 2-3 秒,自托管可以更短
- interval = 2 if "ntfy.sh" in server_url else 1
- time.sleep(interval)
- elif response.status_code == 429:
- print(
- f"ntfy第 {actual_batch_num}/{total_batches} 批次速率限制 [{report_type}],等待后重试"
- )
- time.sleep(10) # 等待10秒后重试
- # 重试一次
- retry_response = requests.post(
- url,
- headers=current_headers,
- data=batch_content.encode("utf-8"),
- proxies=proxies,
- timeout=30,
- )
- if retry_response.status_code == 200:
- print(f"ntfy第 {actual_batch_num}/{total_batches} 批次重试成功 [{report_type}]")
- success_count += 1
- else:
- print(
- f"ntfy第 {actual_batch_num}/{total_batches} 批次重试失败,状态码:{retry_response.status_code}"
- )
- elif response.status_code == 413:
- print(
- f"ntfy第 {actual_batch_num}/{total_batches} 批次消息过大被拒绝 [{report_type}],消息大小:{batch_size} 字节"
- )
- else:
- print(
- f"ntfy第 {actual_batch_num}/{total_batches} 批次发送失败 [{report_type}],状态码:{response.status_code}"
- )
- try:
- print(f"错误详情:{response.text}")
- except:
- pass
- except requests.exceptions.ConnectTimeout:
- print(f"ntfy第 {actual_batch_num}/{total_batches} 批次连接超时 [{report_type}]")
- except requests.exceptions.ReadTimeout:
- print(f"ntfy第 {actual_batch_num}/{total_batches} 批次读取超时 [{report_type}]")
- except requests.exceptions.ConnectionError as e:
- print(f"ntfy第 {actual_batch_num}/{total_batches} 批次连接错误 [{report_type}]:{e}")
- except Exception as e:
- print(f"ntfy第 {actual_batch_num}/{total_batches} 批次发送异常 [{report_type}]:{e}")
- # 判断整体发送是否成功
- if success_count == total_batches:
- print(f"ntfy所有 {total_batches} 批次发送完成 [{report_type}]")
- return True
- elif success_count > 0:
- print(f"ntfy部分发送成功:{success_count}/{total_batches} 批次 [{report_type}]")
- return True # 部分成功也视为成功
- else:
- print(f"ntfy发送完全失败 [{report_type}]")
- return False
- # === 主分析器 ===
- class NewsAnalyzer:
- """新闻分析器"""
- # 模式策略定义
- MODE_STRATEGIES = {
- "incremental": {
- "mode_name": "增量模式",
- "description": "增量模式(只关注新增新闻,无新增时不推送)",
- "realtime_report_type": "实时增量",
- "summary_report_type": "当日汇总",
- "should_send_realtime": True,
- "should_generate_summary": True,
- "summary_mode": "daily",
- },
- "current": {
- "mode_name": "当前榜单模式",
- "description": "当前榜单模式(当前榜单匹配新闻 + 新增新闻区域 + 按时推送)",
- "realtime_report_type": "实时当前榜单",
- "summary_report_type": "当前榜单汇总",
- "should_send_realtime": True,
- "should_generate_summary": True,
- "summary_mode": "current",
- },
- "daily": {
- "mode_name": "当日汇总模式",
- "description": "当日汇总模式(所有匹配新闻 + 新增新闻区域 + 按时推送)",
- "realtime_report_type": "",
- "summary_report_type": "当日汇总",
- "should_send_realtime": False,
- "should_generate_summary": True,
- "summary_mode": "daily",
- },
- }
- def __init__(self):
- self.request_interval = CONFIG["REQUEST_INTERVAL"]
- self.report_mode = CONFIG["REPORT_MODE"]
- self.rank_threshold = CONFIG["RANK_THRESHOLD"]
- self.is_github_actions = os.environ.get("GITHUB_ACTIONS") == "true"
- self.is_docker_container = self._detect_docker_environment()
- self.update_info = None
- self.proxy_url = None
- self._setup_proxy()
- self.data_fetcher = DataFetcher(self.proxy_url)
- if self.is_github_actions:
- self._check_version_update()
- def _detect_docker_environment(self) -> bool:
- """检测是否运行在 Docker 容器中"""
- try:
- if os.environ.get("DOCKER_CONTAINER") == "true":
- return True
- if os.path.exists("/.dockerenv"):
- return True
- return False
- except Exception:
- return False
- def _should_open_browser(self) -> bool:
- """判断是否应该打开浏览器"""
- return not self.is_github_actions and not self.is_docker_container
- def _setup_proxy(self) -> None:
- """设置代理配置"""
- if not self.is_github_actions and CONFIG["USE_PROXY"]:
- self.proxy_url = CONFIG["DEFAULT_PROXY"]
- print("本地环境,使用代理")
- elif not self.is_github_actions and not CONFIG["USE_PROXY"]:
- print("本地环境,未启用代理")
- else:
- print("GitHub Actions环境,不使用代理")
- def _check_version_update(self) -> None:
- """检查版本更新"""
- try:
- need_update, remote_version = check_version_update(
- VERSION, CONFIG["VERSION_CHECK_URL"], self.proxy_url
- )
- if need_update and remote_version:
- self.update_info = {
- "current_version": VERSION,
- "remote_version": remote_version,
- }
- print(f"发现新版本: {remote_version} (当前: {VERSION})")
- else:
- print("版本检查完成,当前为最新版本")
- except Exception as e:
- print(f"版本检查出错: {e}")
- def _get_mode_strategy(self) -> Dict:
- """获取当前模式的策略配置"""
- return self.MODE_STRATEGIES.get(self.report_mode, self.MODE_STRATEGIES["daily"])
- def _has_notification_configured(self) -> bool:
- """检查是否配置了任何通知渠道"""
- return any(
- [
- CONFIG["FEISHU_WEBHOOK_URL"],
- CONFIG["DINGTALK_WEBHOOK_URL"],
- CONFIG["WEWORK_WEBHOOK_URL"],
- (CONFIG["TELEGRAM_BOT_TOKEN"] and CONFIG["TELEGRAM_CHAT_ID"]),
- (
- CONFIG["EMAIL_FROM"]
- and CONFIG["EMAIL_PASSWORD"]
- and CONFIG["EMAIL_TO"]
- ),
- (CONFIG["NTFY_SERVER_URL"] and CONFIG["NTFY_TOPIC"]),
- ]
- )
- def _has_valid_content(
- self, stats: List[Dict], new_titles: Optional[Dict] = None
- ) -> bool:
- """检查是否有有效的新闻内容"""
- if self.report_mode in ["incremental", "current"]:
- # 增量模式和current模式下,只要stats有内容就说明有匹配的新闻
- return any(stat["count"] > 0 for stat in stats)
- else:
- # 当日汇总模式下,检查是否有匹配的频率词新闻或新增新闻
- has_matched_news = any(stat["count"] > 0 for stat in stats)
- has_new_news = bool(
- new_titles and any(len(titles) > 0 for titles in new_titles.values())
- )
- return has_matched_news or has_new_news
- def _load_analysis_data(
- self,
- ) -> Optional[Tuple[Dict, Dict, Dict, Dict, List, List]]:
- """统一的数据加载和预处理,使用当前监控平台列表过滤历史数据"""
- try:
- # 获取当前配置的监控平台ID列表
- current_platform_ids = []
- for platform in CONFIG["PLATFORMS"]:
- current_platform_ids.append(platform["id"])
- print(f"当前监控平台: {current_platform_ids}")
- all_results, id_to_name, title_info = read_all_today_titles(
- current_platform_ids
- )
- if not all_results:
- print("没有找到当天的数据")
- return None
- total_titles = sum(len(titles) for titles in all_results.values())
- print(f"读取到 {total_titles} 个标题(已按当前监控平台过滤)")
- new_titles = detect_latest_new_titles(current_platform_ids)
- word_groups, filter_words = load_frequency_words()
- return (
- all_results,
- id_to_name,
- title_info,
- new_titles,
- word_groups,
- filter_words,
- )
- except Exception as e:
- print(f"数据加载失败: {e}")
- return None
- def _prepare_current_title_info(self, results: Dict, time_info: str) -> Dict:
- """从当前抓取结果构建标题信息"""
- title_info = {}
- for source_id, titles_data in results.items():
- title_info[source_id] = {}
- for title, title_data in titles_data.items():
- ranks = title_data.get("ranks", [])
- url = title_data.get("url", "")
- mobile_url = title_data.get("mobileUrl", "")
- title_info[source_id][title] = {
- "first_time": time_info,
- "last_time": time_info,
- "count": 1,
- "ranks": ranks,
- "url": url,
- "mobileUrl": mobile_url,
- }
- return title_info
- def _run_analysis_pipeline(
- self,
- data_source: Dict,
- mode: str,
- title_info: Dict,
- new_titles: Dict,
- word_groups: List[Dict],
- filter_words: List[str],
- id_to_name: Dict,
- failed_ids: Optional[List] = None,
- is_daily_summary: bool = False,
- ) -> Tuple[List[Dict], str]:
- """统一的分析流水线:数据处理 → 统计计算 → HTML生成"""
- # 统计计算
- stats, total_titles = count_word_frequency(
- data_source,
- word_groups,
- filter_words,
- id_to_name,
- title_info,
- self.rank_threshold,
- new_titles,
- mode=mode,
- )
- # HTML生成
- html_file = generate_html_report(
- stats,
- total_titles,
- failed_ids=failed_ids,
- new_titles=new_titles,
- id_to_name=id_to_name,
- mode=mode,
- is_daily_summary=is_daily_summary,
- update_info=self.update_info if CONFIG["SHOW_VERSION_UPDATE"] else None,
- )
- return stats, html_file
- def _send_notification_if_needed(
- self,
- stats: List[Dict],
- report_type: str,
- mode: str,
- failed_ids: Optional[List] = None,
- new_titles: Optional[Dict] = None,
- id_to_name: Optional[Dict] = None,
- html_file_path: Optional[str] = None,
- ) -> bool:
- """统一的通知发送逻辑,包含所有判断条件"""
- has_notification = self._has_notification_configured()
- if (
- CONFIG["ENABLE_NOTIFICATION"]
- and has_notification
- and self._has_valid_content(stats, new_titles)
- ):
- send_to_notifications(
- stats,
- failed_ids or [],
- report_type,
- new_titles,
- id_to_name,
- self.update_info,
- self.proxy_url,
- mode=mode,
- html_file_path=html_file_path,
- )
- return True
- elif CONFIG["ENABLE_NOTIFICATION"] and not has_notification:
- print("⚠️ 警告:通知功能已启用但未配置任何通知渠道,将跳过通知发送")
- elif not CONFIG["ENABLE_NOTIFICATION"]:
- print(f"跳过{report_type}通知:通知功能已禁用")
- elif (
- CONFIG["ENABLE_NOTIFICATION"]
- and has_notification
- and not self._has_valid_content(stats, new_titles)
- ):
- mode_strategy = self._get_mode_strategy()
- if "实时" in report_type:
- print(
- f"跳过实时推送通知:{mode_strategy['mode_name']}下未检测到匹配的新闻"
- )
- else:
- print(
- f"跳过{mode_strategy['summary_report_type']}通知:未匹配到有效的新闻内容"
- )
- return False
- def _generate_summary_report(self, mode_strategy: Dict) -> Optional[str]:
- """生成汇总报告(带通知)"""
- summary_type = (
- "当前榜单汇总" if mode_strategy["summary_mode"] == "current" else "当日汇总"
- )
- print(f"生成{summary_type}报告...")
- # 加载分析数据
- analysis_data = self._load_analysis_data()
- if not analysis_data:
- return None
- all_results, id_to_name, title_info, new_titles, word_groups, filter_words = (
- analysis_data
- )
- # 运行分析流水线
- stats, html_file = self._run_analysis_pipeline(
- all_results,
- mode_strategy["summary_mode"],
- title_info,
- new_titles,
- word_groups,
- filter_words,
- id_to_name,
- is_daily_summary=True,
- )
- print(f"{summary_type}报告已生成: {html_file}")
- # 发送通知
- self._send_notification_if_needed(
- stats,
- mode_strategy["summary_report_type"],
- mode_strategy["summary_mode"],
- failed_ids=[],
- new_titles=new_titles,
- id_to_name=id_to_name,
- html_file_path=html_file,
- )
- return html_file
- def _generate_summary_html(self, mode: str = "daily") -> Optional[str]:
- """生成汇总HTML"""
- summary_type = "当前榜单汇总" if mode == "current" else "当日汇总"
- print(f"生成{summary_type}HTML...")
- # 加载分析数据
- analysis_data = self._load_analysis_data()
- if not analysis_data:
- return None
- all_results, id_to_name, title_info, new_titles, word_groups, filter_words = (
- analysis_data
- )
- # 运行分析流水线
- _, html_file = self._run_analysis_pipeline(
- all_results,
- mode,
- title_info,
- new_titles,
- word_groups,
- filter_words,
- id_to_name,
- is_daily_summary=True,
- )
- print(f"{summary_type}HTML已生成: {html_file}")
- return html_file
- def _initialize_and_check_config(self) -> None:
- """通用初始化和配置检查"""
- now = get_beijing_time()
- print(f"当前北京时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
- if not CONFIG["ENABLE_CRAWLER"]:
- print("爬虫功能已禁用(ENABLE_CRAWLER=False),程序退出")
- return
- has_notification = self._has_notification_configured()
- if not CONFIG["ENABLE_NOTIFICATION"]:
- print("通知功能已禁用(ENABLE_NOTIFICATION=False),将只进行数据抓取")
- elif not has_notification:
- print("未配置任何通知渠道,将只进行数据抓取,不发送通知")
- else:
- print("通知功能已启用,将发送通知")
- mode_strategy = self._get_mode_strategy()
- print(f"报告模式: {self.report_mode}")
- print(f"运行模式: {mode_strategy['description']}")
- def _crawl_data(self) -> Tuple[Dict, Dict, List]:
- """执行数据爬取"""
- ids = []
- for platform in CONFIG["PLATFORMS"]:
- if "name" in platform:
- ids.append((platform["id"], platform["name"]))
- else:
- ids.append(platform["id"])
- print(
- f"配置的监控平台: {[p.get('name', p['id']) for p in CONFIG['PLATFORMS']]}"
- )
- print(f"开始爬取数据,请求间隔 {self.request_interval} 毫秒")
- ensure_directory_exists("output")
- results, id_to_name, failed_ids = self.data_fetcher.crawl_websites(
- ids, self.request_interval
- )
- title_file = save_titles_to_file(results, id_to_name, failed_ids)
- print(f"标题已保存到: {title_file}")
- return results, id_to_name, failed_ids
- def _execute_mode_strategy(
- self, mode_strategy: Dict, results: Dict, id_to_name: Dict, failed_ids: List
- ) -> Optional[str]:
- """执行模式特定逻辑"""
- # 获取当前监控平台ID列表
- current_platform_ids = [platform["id"] for platform in CONFIG["PLATFORMS"]]
- new_titles = detect_latest_new_titles(current_platform_ids)
- time_info = Path(save_titles_to_file(results, id_to_name, failed_ids)).stem
- word_groups, filter_words = load_frequency_words()
- # current模式下,实时推送需要使用完整的历史数据来保证统计信息的完整性
- if self.report_mode == "current":
- # 加载完整的历史数据(已按当前平台过滤)
- analysis_data = self._load_analysis_data()
- if analysis_data:
- (
- all_results,
- historical_id_to_name,
- historical_title_info,
- historical_new_titles,
- _,
- _,
- ) = analysis_data
- print(
- f"current模式:使用过滤后的历史数据,包含平台:{list(all_results.keys())}"
- )
- stats, html_file = self._run_analysis_pipeline(
- all_results,
- self.report_mode,
- historical_title_info,
- historical_new_titles,
- word_groups,
- filter_words,
- historical_id_to_name,
- failed_ids=failed_ids,
- )
- combined_id_to_name = {**historical_id_to_name, **id_to_name}
- print(f"HTML报告已生成: {html_file}")
- # 发送实时通知(使用完整历史数据的统计结果)
- summary_html = None
- if mode_strategy["should_send_realtime"]:
- self._send_notification_if_needed(
- stats,
- mode_strategy["realtime_report_type"],
- self.report_mode,
- failed_ids=failed_ids,
- new_titles=historical_new_titles,
- id_to_name=combined_id_to_name,
- html_file_path=html_file,
- )
- else:
- print("❌ 严重错误:无法读取刚保存的数据文件")
- raise RuntimeError("数据一致性检查失败:保存后立即读取失败")
- else:
- title_info = self._prepare_current_title_info(results, time_info)
- stats, html_file = self._run_analysis_pipeline(
- results,
- self.report_mode,
- title_info,
- new_titles,
- word_groups,
- filter_words,
- id_to_name,
- failed_ids=failed_ids,
- )
- print(f"HTML报告已生成: {html_file}")
- # 发送实时通知(如果需要)
- summary_html = None
- if mode_strategy["should_send_realtime"]:
- self._send_notification_if_needed(
- stats,
- mode_strategy["realtime_report_type"],
- self.report_mode,
- failed_ids=failed_ids,
- new_titles=new_titles,
- id_to_name=id_to_name,
- html_file_path=html_file,
- )
- # 生成汇总报告(如果需要)
- summary_html = None
- if mode_strategy["should_generate_summary"]:
- if mode_strategy["should_send_realtime"]:
- # 如果已经发送了实时通知,汇总只生成HTML不发送通知
- summary_html = self._generate_summary_html(
- mode_strategy["summary_mode"]
- )
- else:
- # daily模式:直接生成汇总报告并发送通知
- summary_html = self._generate_summary_report(mode_strategy)
- # 打开浏览器(仅在非容器环境)
- if self._should_open_browser() and html_file:
- if summary_html:
- summary_url = "file://" + str(Path(summary_html).resolve())
- print(f"正在打开汇总报告: {summary_url}")
- webbrowser.open(summary_url)
- else:
- file_url = "file://" + str(Path(html_file).resolve())
- print(f"正在打开HTML报告: {file_url}")
- webbrowser.open(file_url)
- elif self.is_docker_container and html_file:
- if summary_html:
- print(f"汇总报告已生成(Docker环境): {summary_html}")
- else:
- print(f"HTML报告已生成(Docker环境): {html_file}")
- return summary_html
- def run(self) -> None:
- """执行分析流程"""
- try:
- self._initialize_and_check_config()
- mode_strategy = self._get_mode_strategy()
- results, id_to_name, failed_ids = self._crawl_data()
- self._execute_mode_strategy(mode_strategy, results, id_to_name, failed_ids)
- except Exception as e:
- print(f"分析流程执行出错: {e}")
- raise
- def main():
- try:
- analyzer = NewsAnalyzer()
- analyzer.run()
- except FileNotFoundError as e:
- print(f"❌ 配置文件错误: {e}")
- print("\n请确保以下文件存在:")
- print(" • config/config.yaml")
- print(" • config/frequency_words.txt")
- print("\n参考项目文档进行正确配置")
- except Exception as e:
- print(f"❌ 程序运行错误: {e}")
- raise
- if __name__ == "__main__":
- main()
|