c# - MSMQ System.Messaging high resource usage -
i have make c# console application uses timer connects msmq every 10 seconds data insert oracle database. issue log in , log off domain , create high cpu create security audit log waste resources.
my console application runs task schedule. code below
using system; using system.collections.generic; using system.linq; using system.text; using system.messaging; using system.xml; using system.io; using system.timers; using oracle.dataaccess.client; using system.data; namespace msmq_news { class program { private static system.timers.timer atimer; static void main(string[] args) { try { // create timer ten second interval. atimer = new system.timers.timer(60000);//10000 // hook elapsed event timer. atimer.elapsed += new elapsedeventhandler(ontimedevent); // set interval 2 seconds (2000 milliseconds). //atimer.interval = 10000; atimer.enabled = true; atimer.start(); console.writeline("press enter key exit program."); console.readline(); } catch (exception ex) { log(" main -- " + ex.message); } } private static void ontimedevent(object source, elapsedeventargs e) { // in case wants inherit class , lock ... object _padlock = new object(); try { atimer.stop(); lock (_padlock) { console.writeline("the elapsed event raised @ {0}", e.signaltime); processqueuemsgs(); } } catch (exception ex) { log(" ontimedevent -- " + ex.message); } { atimer.start(); } } private static void processqueuemsgs() { try { while ((datetime.now.hour >= 06) && (datetime.now.hour <= 16)) { datetime dt = datetime.now; receivenewsdetail(dt); receivenewsheader(dt); } closeapp(); } catch (exception ex) { log(" processqueuemsgs -- " + ex.message); } } static bool queueexist(string queuename) { try { if (messagequeue.exists(queuename)) return true; else return false; } catch (exception ex) { log(" queueexist -- " + ex.message); return false; } } private static void receivenewsheader(datetime dt) { try { messagequeue mqnewsheader = null; string value = "", _tmp = ""; _tmp = "<newsheader></newsheader> "; /*if (queueexist(@".\q_ws_ampnewsheaderrep"))*/ mqnewsheader = new messagequeue(@".\q_ws_ampnewsheaderrep"); int msgcount = getmessagecount(mqnewsheader, @".\q_ws_ampnewsheaderrep"); (int = 0; < msgcount; i++) { message msg = mqnewsheader.receive(); msg.formatter = new activexmessageformatter(); //need avoid ??? arabic characters using (streamreader strdr = new streamreader(msg.bodystream, system.text.encoding.default)) { value = strdr.readtoend(); } value = value.replace("\0", string.empty); if (value != _tmp) { loadnewsheader(value, dt); } } } catch (exception ex) { log("from receivenewsheader -- " + ex.message); } } private static void receivenewsdetail(datetime dt) { try { messagequeue mqnewsdetails = null; string value = "", _tmp = ""; _tmp = "<news></news> "; /*if (queueexist(@".\q_ws_ampnewsrep"))*/ mqnewsdetails = new messagequeue(@".\q_ws_ampnewsrep"); int msgcount = getmessagecount(mqnewsdetails, @".\q_ws_ampnewsrep"); (int = 0; < msgcount; i++) { message msg = mqnewsdetails.receive(); msg.formatter = new activexmessageformatter(); //need avoid ??? arabic characters using (streamreader strdr = new streamreader(msg.bodystream, system.text.encoding.default)) { value = strdr.readtoend(); } value = value.replace("\0", string.empty); if (value != _tmp) { loadnewsdetail(value, dt); } } } catch (exception ex) { log("from receivenewsdetail -- " + ex.message); } } private static void loadnewsheader(string text , datetime dt) { try { //text = replacespecialcharacters(text); //text = clean(text); //xmldocument _xmldoc = new xmldocument(); //_xmldoc.loadxml(text); //string filename = "newsheader.xml"; text = text.replace("<arabicfields>", "<arabicfields>\n\t\t"); //createxmlfile(filename, text); xmldocument _xmldoc = loadxmldoc(text); string sql = ""; xmlnodelist newsheaderlist = _xmldoc.selectnodes("newsheader/newsheaderrep"); if (newsheaderlist.count > 0) { oracleparameter ptruncate = new oracleparameter("p_table_name", oracledbtype.varchar2); ptruncate.value = "companies_news"; databaseoperation(commandtype.storedprocedure, "truncate_table", ptruncate); } foreach (xmlnode news in newsheaderlist) { xmlnodelist newsidlist = news.selectnodes("newsid"); sql = "insert companies_news(newsid, newsid_seqno, newsstatus, language_cd, sec_cd, releasedate, releasetime, title, stg_time) values("; foreach (xmlnode newsid in newsidlist) { sql += "'" + newsid["id"].innertext + "',"; sql += "" + newsid["seqno"].innertext + ","; } sql += "'" + news["newsstatus"].innertext + "',"; xmlnodelist newsitemlist = news.selectnodes("newsitem"); foreach (xmlnode newsitem in newsitemlist) { sql += "'" + newsitem["languageid"].innertext + "',"; if (newsitem["reseccode"] != null) sql += "'" + newsitem["reseccode"].innertext + "',"; else sql += "' ',"; xmlnodelist releasetimelist = newsitem.selectnodes("releasetime"); foreach (xmlnode releasetime in releasetimelist) { sql += "to_date('" + releasetime["date"].innertext + "','yyyymmdd'),"; sql += "" + releasetime["time"].innertext + ","; } } xmlnodelist arabicfieldslist = news.selectnodes("arabicfields"); foreach (xmlnode arabicfields in arabicfieldslist) { sql += "'" + revertspecialcharacters(arabicfields["title_ar"].innertext) + "',"; } sql += "to_date('" + dt.tostring() + "','mm/dd/yyyy hh12:mi:ss pm'))"; databaseoperation(commandtype.text, sql, null); console.writeline("header : " + datetime.now.tostring()); } if (sql != "") //recordcount("select count(*) companies_news_details") > 0 { oracleparameter prefresh = new oracleparameter("p_table_names", oracledbtype.varchar2); prefresh.value = "companies_news"; databaseoperation(commandtype.storedprocedure, "refresh_vw_all", prefresh); } } catch (exception ex) { log("from loadnewsheader -- " + ex.message); } } private static void loadnewsdetail(string text, datetime dt) { try { //string filename = "newsdetail.xml"; text = text.replace("<arabicfields>", "<arabicfields>\n\t\t"); //text = createxmlfile(filename); //text = text.replace("<arabicfields>", "<arabicfields>\n\t\t"); xmldocument _xmldoc = loadxmldoc(text); string sql = ""; xmlnodelist newslist = _xmldoc.selectnodes("news/newsrep"); if (newslist.count > 0) { oracleparameter ptruncate = new oracleparameter("p_table_name", oracledbtype.varchar2); ptruncate.value = "companies_news_details"; databaseoperation(commandtype.storedprocedure, "truncate_table", ptruncate); } foreach (xmlnode news in newslist) { xmlnodelist newsidlist = news.selectnodes("newsid"); sql = "insert companies_news_details(newsid_id, newsid_seqno, newstext_1,newstext_2,stg_time) values("; foreach (xmlnode newsid in newsidlist) { sql += "" + newsid["id"].innertext + ","; sql += "" + newsid["seqno"].innertext + ","; } xmlnodelist arabicfieldslist = news.selectnodes("arabicfields"); foreach (xmlnode arabicfields in arabicfieldslist) { // log(" before arabic text data -- :" + arabicfields["newstext_ar"].innertext); if (arabicfields["newstext_ar"].innertext.length > 4000) { sql += "'" + revertspecialcharacters(arabicfields["newstext_ar"].innertext.substring(0, 3999)).replace("\n",environment.newline) + "',"; sql += "'" + revertspecialcharacters(arabicfields["newstext_ar"].innertext.substring(3999, arabicfields["newstext_ar"].innertext.length)).replace("\n", environment.newline) + "',"; sql += "to_date('" + dt.tostring() + "','mm/dd/yyyy hh12:mi:ss pm')"; } else { sql += "'" + revertspecialcharacters(arabicfields["newstext_ar"].innertext).replace("\n", environment.newline) + "','',"; sql += "to_date('" + dt.tostring() + "','mm/dd/yyyy hh12:mi:ss pm')"; } sql += ")"; databaseoperation(commandtype.text, sql, null); console.writeline("detail : " + datetime.now.tostring()); } } if (sql != "") //recordcount("select count(*) companies_news_details") > 0 { oracleparameter prefresh = new oracleparameter("p_table_names", oracledbtype.varchar2); prefresh.value = "companies_news_details"; databaseoperation(commandtype.storedprocedure, "refresh_vw_all", prefresh); } } catch (exception ex) { log("from loadnewsdetail -- " + ex.message); } } private static void closeapp() { system.environment.exit(0); } protected static int getmessagecount(messagequeue q, string queuename) { var _messagequeue = new messagequeue(queuename, queueaccessmode.peek); _messagequeue.refresh(); //done correct count sends 0 var x = _messagequeue.getmessageenumerator2(); int icount = 0; while (x.movenext()) { icount++; } return icount; } private static void databaseoperation(commandtype cmdtype, string sql, oracleparameter param) { string oracleconnectionstring = system.configuration.configurationsettings.appsettings["oracleconnectionstring"]; using (oracleconnection con = new oracleconnection()) { con.connectionstring = oracleconnectionstring; con.open(); oraclecommand command = con.createcommand(); command.commandtype = cmdtype; command.commandtext = sql; if (param != null) command.parameters.add(param); command.executenonquery(); command.dispose(); con.close(); } } private static string revertspecialcharacters(string pvalue) { string _retval = string.empty; _retval = pvalue.replace("'", "''"); return _retval; } public static void log(string message) { // create writer , open file: streamwriter log; //c:\software\msmq_new_news_fix if (!file.exists(@"c:\msmq_new_news_fix\log.txt")) { log = new streamwriter(@"c:\msmq_new_news_fix\log.txt"); } else { log = file.appendtext(@"c:\msmq_new_news_fix\log.txt"); } // write file: log.writeline(datetime.now.tostring() + " : " + message); // close stream: log.close(); } public static xmldocument loadxmldoc(string xmltext) { xmldocument doc = new xmldocument(); try { string xmltoload = parsexmlfile(xmltext); doc.loadxml(xmltoload); } catch (exception ex) { log("from loadxmldoc -- " + ex.message); } return doc; } private static string parsexmlfile(string xmltext) { stringbuilder formatedxml = new stringbuilder(); try { stringreader xmlreader = new stringreader(xmltext); while (xmlreader.peek() >= 0) formatedxml.append(replacespecialchars(xmlreader.readline()) + "\n"); } catch (exception ex) { log("from parsexmlfile -- " + ex.message); } return formatedxml.tostring(); } private static string replacespecialchars(string xmldata) { try { //if (xmldata.contains("objectref")) return "<objectref></objectref>"; int grtrposat = xmldata.indexof(">"); int closeposat = xmldata.indexof("</"); int lenthtoreplace = 0; if (grtrposat > closeposat) return xmldata; lenthtoreplace = (closeposat <= 0 && grtrposat <= 0) ? xmldata.length : (closeposat - grtrposat) - 1; //get string between xml element. e.g. <contactname>hanna moos</contactname>, //you 'hanna moos' string data = xmldata.substring(grtrposat + 1, lenthtoreplace); string formatteddata = data.replace("&", "&").replace("<", "<") .replace(">", ">").replace("'", "'"); if (lenthtoreplace > 0) xmldata = xmldata.replace(data, formatteddata); return xmldata; } catch (exception ex) { log("from replacespecialchars -- " + ex.message); return ""; } } } }
how can solve above issue
why not host queue reader process in windows service. continually poll queue each 10 seconds.
then use windows scheduler start/stop service @ relevant times create service window.
this means won't need complicated in scheduled task, , won't loading , unloading time.
Comments
Post a Comment