001 /*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016 package org.opengion.fukurou.process;
017
018 import org.opengion.fukurou.util.Argument;
019 import org.opengion.fukurou.util.SystemParameter;
020 import org.opengion.fukurou.util.LogWriter;
021 import org.opengion.fukurou.util.HybsEntry ;
022 import org.opengion.fukurou.util.Closer;
023 import org.opengion.fukurou.model.Formatter;
024 import org.opengion.fukurou.db.ConnectionFactory;
025
026 import java.util.Map ;
027 import java.util.LinkedHashMap ;
028
029 import java.sql.Connection;
030 import java.sql.PreparedStatement;
031 import java.sql.ParameterMetaData;
032 import java.sql.SQLException;
033
034 /**
035 * Process_DBMerge は、UPDATE と INSERT を指定し ??タベ?スを追?新
036 * する、ChainProcess インターフェースの実?ラスです?
037 * 上?プロセスチェインの??タは上流から下流へと渡されます?)から
038 * 受け取っ?LineModel を?に、DBTableModel 形式ファイルを?力します?
039 *
040 * ??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に
041 * 設定された接?Connection)を使用します?
042 *
043 * 引数??中にスペ?スを含??合?、ダブルコー??ション("") で括って下さ??
044 * 引数??の ?』?前後には、スペ?スは挟めません。??key=value の様に
045 * 繋げてください?
046 *
047 * SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?
048 *
049 * @og.formSample
050 * Process_DBMerge -dbid=DBGE -insertTable=GE41
051 *
052 * [ -dbid=DB接続ID ] ??-dbid=DBGE (? Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規?
053 * [ -update=検索SQL? ] ??-update="UPDATE GE41 SET NAME_JA = [NAME_JA],LABEL_NAME = [LABEL_NAME]
054 * WHERE SYSTEM_ID = [SYSTEM_ID] AND CLM = [CLM]"
055 * [ -updateFile=登録SQL?ァ??? ] ??-updateFile=update.sql
056 * ?? -update ?-updateFile が指定されな??合?、エラーです?
057 * [ -update_XXXX=固定? ] ??-update_SYSTEM_ID=GE
058 * SQL?の{@XXXX}??を指定?固定?で置き換えます?
059 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'
060 * [ -insertTable=登録???゙ルID ] ??INSERT??する?合?不要?INSERT する場合???ブルID
061 * [ -insert=検索SQL? ] ??-insert="INSERT INTO GE41 (SYSTEM_ID,CLM,NAME_JA,LABEL_NAME)
062 * VALUES ([SYSTEM_ID],[CLM],[NAME_JA],[LABEL_NAME])"
063 * [ -insertFile=登録SQL?ァ??? ] ??-insertFile=insert.sql
064 * ?? -insert ?-insertFile ??-table が指定されな??合?、エラーです?
065 * [ -insert_XXXX=固定? ] ??-insert_SYSTEM_ID=GE
066 * SQL?の{@XXXX}??を指定?固定?で置き換えます?
067 * WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'
068 * [ -const_XXXX=固定? ] ??-const_FGJ=1
069 * LineModel のキー(const_ に続く??)の値に、固定?を設定します?
070 * キーが異なれ?、?のカラ?を指定できます?
071 * [ -commitCnt=commit処?定] ???数毎にコミットを発行します?0 の場合?、終?でコミットしません?
072 * [ -display=false|true ] ??結果を標準?力に表示する(true)かしな?false)?初期値:false[表示しない])
073 *
074 * @version 4.0
075 * @author Kazuhiko Hasegawa
076 * @since JDK5.0,
077 */
078 public class Process_DBMerge extends AbstractProcess implements ChainProcess {
079 private static final String UPDATE_KEY = "update_" ;
080 private static final String INSERT_KEY = "insert_" ;
081 private static final String CNST_KEY = "const_" ;
082
083 private Connection connection = null;
084 private PreparedStatement insPstmt = null ;
085 private PreparedStatement updPstmt = null ;
086 private ParameterMetaData insPmeta = null ; // 5.1.2.0 (2010/01/01) setObject に、Type を渡す?(PostgreSQL対?
087 private ParameterMetaData updPmeta = null ; // 5.1.2.0 (2010/01/01) setObject に、Type を渡す?(PostgreSQL対?
088 private boolean useParamMetaData = false; // 5.1.2.0 (2010/01/01) setObject に、Type を渡す?(PostgreSQL対?
089
090 private String dbid = null;
091 private String insert = null;
092 private String update = null;
093 private String insertTable = null;
094 private int[] insClmNos = null; // insert 時?ファイルのヘッ??のカラ?号
095 private int[] updClmNos = null; // update 時?ファイルのヘッ??のカラ?号
096 private int commitCnt = 0; // コミットするまとめ件数
097 private boolean display = false; // 表示しな?
098
099 private String[] cnstClm = null; // 固定?を設定するカラ?
100 private int[] cnstClmNos = null; // 固定?を設定するカラ?号
101 private String[] constVal = null; // カラ?号に対応した固定?
102
103 private boolean firstRow = true; // ??の?目
104 private int count = 0;
105 private int insCount = 0;
106 private int updCount = 0;
107
108 private static final Map<String,String> mustProparty ; // ?プロパティ???チェ?用 Map
109 private static final Map<String,String> usableProparty ; // ?プロパティ?整合?チェ? Map
110
111 static {
112 mustProparty = new LinkedHashMap<String,String>();
113
114 usableProparty = new LinkedHashMap<String,String>();
115 usableProparty.put( "dbid", "Process_DBParam の -configFile で?す?DBConfig.xml ファイルで規? );
116 usableProparty.put( "update", "更新SQL?sql or sqlFile ??)" +
117 CR + "? \"UPDATE GE41 " +
118 CR + "SET NAME_JA = [NAME_JA],LABEL_NAME = [LABEL_NAME] " +
119 CR + "WHERE SYSTEM_ID = [SYSTEM_ID] AND CLM = [CLM]\"" );
120 usableProparty.put( "updateFile", "更新SQLファイル(sql or sqlFile ??)? update.sql" );
121 usableProparty.put( "update_", "SQL?の{@XXXX}??を指定?固定?で置き換えます?" +
122 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" );
123 usableProparty.put( "insert", "登録SQL?sql or sqlFile ??)" +
124 CR + "? \"INSERT INTO GE41 " +
125 CR + "(SYSTEM_ID,CLM,NAME_JA,LABEL_NAME) " +
126 CR + "VALUES ([SYSTEM_ID],[CLM],[NAME_JA],[LABEL_NAME])\"" );
127 usableProparty.put( "insertFile", "登録SQLファイル(sql or sqlFile ??)? insert.sql" );
128 usableProparty.put( "insertTable", "INSERT する場合???ブルID SQL??する?合?不要?" );
129 usableProparty.put( "insert_", "SQL?の{@XXXX}??を指定?固定?で置き換えます?" +
130 CR + "WHERE SYSTEM_ID='{@SYSTEM_ID}' ?WHERE SYSTEM_ID='GE'" );
131 usableProparty.put( "const_", "LineModel のキー(const_ に続く??)の値に、固定?? +
132 CR + "設定します?キーが異なれ?、?のカラ?を指定できます?" +
133 CR + "? -sql_SYSTEM_ID=GE" );
134 usableProparty.put( "commitCnt", "?数毎にコミットを発行します?" +
135 CR + "0 の場合?、終?でコミットしません(初期値: 0)" );
136 usableProparty.put( "display", "結果を標準?力に表示する(true)かしな?false)? +
137 CR + "(初期値:false:表示しな?" );
138 }
139
140 /**
141 * ?ォルトコンストラクター?
142 * こ?クラスは、動??されます??ォルトコンストラクターで?
143 * super クラスに対して、?な初期化を行っておきます?
144 *
145 */
146 public Process_DBMerge() {
147 super( "org.opengion.fukurou.process.Process_DBMerge",mustProparty,usableProparty );
148 }
149
150 /**
151 * プロセスの初期化を行います?初めに??、呼び出されます?
152 * 初期処?ファイルオープン??オープン?に使用します?
153 *
154 * @og.rev 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
155 * @og.rev 5.3.8.0 (2011/08/01) useParamMetaData ?ConnectionFactory経由で取得?(PostgreSQL対?
156 *
157 * @param paramProcess ??タベ?スの接続???などを持って?オブジェク?
158 */
159 public void init( final ParamProcess paramProcess ) {
160 Argument arg = getArgument();
161
162 insertTable = arg.getProparty("insertTable");
163 update = arg.getFileProparty("update","updateFile",false);
164 insert = arg.getFileProparty("insert","insertFile",false);
165 commitCnt = arg.getProparty("commitCnt",commitCnt);
166 display = arg.getProparty("display",display);
167
168 dbid = arg.getProparty("dbid");
169 connection = paramProcess.getConnection( dbid );
170 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
171 // useParamMetaData = ApplicationInfo.useParameterMetaData( connection );
172 useParamMetaData = ConnectionFactory.useParameterMetaData( dbid ); // 5.3.8.0 (2011/08/01)
173
174 if( insert == null && insertTable == null ) {
175 String errMsg = "insert また?、insertFile を指定しな??合?、insertTable を??してください?;
176 throw new RuntimeException( errMsg );
177 }
178
179 if( insert != null && insertTable != null ) {
180 String errMsg = "insert また?、insertFile と、insertTable は、両方同時に?できません?"
181 + insert + "],[" + insertTable + "]";
182 throw new RuntimeException( errMsg );
183 }
184
185 // 3.8.0.1 (2005/06/17) {@DATE.XXXX} 変換処??追?
186 // {@DATE.YMDH} などの??を?yyyyMMddHHmmss 型?日付に置き換えます?
187 // SQL?? {@XXXX} ??の固定?への置き換?
188 HybsEntry[] entry =arg.getEntrys(UPDATE_KEY); // 配?
189 SystemParameter sysParam = new SystemParameter( update );
190 update = sysParam.replace( entry );
191
192 if( insert != null ) {
193 entry =arg.getEntrys(INSERT_KEY); // 配?
194 sysParam = new SystemParameter( insert );
195 insert = sysParam.replace( entry );
196 }
197
198 HybsEntry[] cnstKey = arg.getEntrys( CNST_KEY ); // 配?
199 int csize = cnstKey.length;
200 cnstClm = new String[csize];
201 constVal = new String[csize];
202 for( int i=0; i<csize; i++ ) {
203 cnstClm[i] = cnstKey[i].getKey();
204 constVal[i] = cnstKey[i].getValue();
205 }
206 }
207
208 /**
209 * プロセスの終?行います??に??、呼び出されます?
210 * 終???ファイルクローズ??クローズ?に使用します?
211 *
212 * @og.rev 4.0.0.0 (2007/11/27) commit,rollback,remove 処?追?
213 * @og.rev 5.1.2.0 (2010/01/01) insPmeta , updPmeta のクリア
214 *
215 * @param isOK ト?タルで、OK?たかど?[true:成功/false:失敗]
216 */
217 public void end( final boolean isOK ) {
218 boolean flag1 = Closer.stmtClose( updPstmt );
219 updPstmt = null;
220 boolean flag2 = Closer.stmtClose( insPstmt );
221 insPstmt = null;
222
223 insPmeta = null ; // 5.1.2.0 (2010/01/01)
224 updPmeta = null ; // 5.1.2.0 (2010/01/01)
225
226 // close に失敗して?のに commit しても良??か?
227 if( isOK ) {
228 Closer.commit( connection );
229 }
230 else {
231 Closer.rollback( connection );
232 }
233 ConnectionFactory.remove( connection,dbid );
234
235 if( ! flag1 ) {
236 String errMsg = "update ス??トメントをクローズ出来ません? + CR
237 + " update=[" + update + "] , commit=[" + isOK + "]" ;
238 throw new RuntimeException( errMsg );
239 }
240
241 if( ! flag2 ) {
242 String errMsg = "insert ス??トメントをクローズ出来ません? + CR
243 + " insert=[" + insert + "] , commit=[" + isOK + "]" ;
244 throw new RuntimeException( errMsg );
245 }
246 }
247
248 /**
249 * 引数の LineModel を??るメソ?です?
250 * 変換処?? LineModel を返します?
251 * 後続??行わな?????タのフィルタリングを行う場?は?
252 * null ??タを返します?つまり?null ??タは、後続??行わな?
253 * フラグの代わりにも使用して?す?
254 * なお?変換処?? LineModel と、オリジナルの LineModel が?
255 * 同?、コピ?(クローン)か?、各処?ソ??決めて?す?
256 * ドキュメントに明記されて???合?、副作用が問題になる?合??
257 * ???とに自?コピ?(クローン)して下さ??
258 *
259 * @og.rev 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
260 * @og.rev 5.3.8.0 (2011/08/01) useParamMetaData setNull 対?PostgreSQL対?
261 * @og.rev 5.7.2.2 (2014/01/24) SQL実行エラーを少し詳細に出力します?
262 *
263 * @param data ラインモ? オリジナルのLineModel
264 *
265 * @return 処?換後?LineModel
266 */
267 public LineModel action( final LineModel data ) {
268 count++ ;
269 int updCnt = 0;
270 try {
271 if( firstRow ) {
272 makePrepareStatement( insertTable,data );
273
274 int size = cnstClm.length;
275 cnstClmNos = new int[size];
276 for( int i=0; i<size; i++ ) {
277 cnstClmNos[i] = data.getColumnNo( cnstClm[i] );
278 }
279
280 firstRow = false;
281 }
282
283 // 固定?置き換え??
284 for( int j=0; j<cnstClmNos.length; j++ ) {
285 data.setValue( cnstClmNos[j],constVal[j] );
286 }
287
288 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
289 if( useParamMetaData ) {
290 for( int i=0; i<updClmNos.length; i++ ) {
291 int type = updPmeta.getParameterType( i+1 );
292 // 5.3.8.0 (2011/08/01) setNull 対?
293 // updPstmt.setObject( i+1,data.getValue(updClmNos[i]),type );
294 Object val = data.getValue(updClmNos[i]);
295 if( val == null || ( val instanceof String && ((String)val).isEmpty() ) ) {
296 updPstmt.setNull( i+1, type );
297 }
298 else {
299 updPstmt.setObject( i+1, val, type );
300 }
301 }
302 }
303 else {
304 for( int i=0; i<updClmNos.length; i++ ) {
305 updPstmt.setObject( i+1,data.getValue(updClmNos[i]) );
306 }
307 }
308
309 updCnt = updPstmt.executeUpdate();
310 if( updCnt == 0 ) {
311 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
312 if( useParamMetaData ) {
313 for( int i=0; i<insClmNos.length; i++ ) {
314 int type = insPmeta.getParameterType( i+1 );
315 // 5.3.8.0 (2011/08/01) setNull 対?
316 // insPstmt.setObject( i+1,data.getValue(insClmNos[i]),type );
317 Object val = data.getValue(insClmNos[i]);
318 if( val == null || ( val instanceof String && ((String)val).isEmpty() ) ) {
319 insPstmt.setNull( i+1, type );
320 }
321 else {
322 insPstmt.setObject( i+1, val, type );
323 }
324 }
325 }
326 else {
327 for( int i=0; i<insClmNos.length; i++ ) {
328 insPstmt.setObject( i+1,data.getValue(insClmNos[i]) );
329 }
330 }
331 int insCnt = insPstmt.executeUpdate();
332 if( insCnt == 0 ) {
333 String errMsg = "?件も追?れませんでした?" + data.getRowNo() + "]件目" + CR
334 + " insert=[" + insert + "]" + CR
335 + " data=[" + data.dataLine() + "]" + CR ; // 5.7.2.2 (2014/01/24) エラー時に??タも?力します?
336 throw new RuntimeException( errMsg );
337 }
338 insCount++ ;
339 }
340 else if( updCnt > 1 ) {
341 String errMsg = "??" + updCnt + ")が同時に更新されました?" + data.getRowNo() + "]件目" + CR
342 + " update=[" + update + "]" + CR
343 + " data=[" + data.dataLine() + "]" + CR ; // 5.7.2.2 (2014/01/24) エラー時に??タも?力します?
344 throw new RuntimeException( errMsg );
345 }
346 else {
347 updCount ++ ;
348 }
349
350 if( commitCnt > 0 && ( count%commitCnt == 0 ) ) {
351 Closer.commit( connection );
352 }
353 if( display ) { printKey( count,updCnt,data ); }
354 }
355 catch (SQLException ex) {
356 String errMsg = "登録処?エラーが発生しました?" + data.getRowNo() + "]件目" + CR
357 + ((updCnt == 1) ?
358 " update=[" + update + "]"
359 : " insert=[" + insert + "]" + CR
360 + " insertTable=[" + insertTable + "]" )
361 + CR
362 + "errCode=[" + ex.getErrorCode() + "] State=[" + ex.getSQLState() + "]" + CR
363 + "data=[" + data.dataLine() + "]" + CR ; // 5.7.2.2 (2014/01/24) エラー時に??タも?力します?
364 throw new RuntimeException( errMsg,ex );
365 }
366 return data;
367 }
368
369 /**
370 * ?で使用する PreparedStatement を作?します?
371 * 引数?? SQL また?、LineModel から作?した SQL より構築します?
372 *
373 * @og.rev 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
374 * @og.rev 5.7.2.2 (2014/01/24) SQL実行エラーを少し詳細に出力します?
375 *
376 * @param table 処?象の??ブルID
377 * @param data ラインモ? 処?象のLineModel
378 */
379 private void makePrepareStatement( final String table,final LineModel data ) {
380 if( insert == null ) {
381 StringBuilder buf = new StringBuilder();
382 String[] names = data.getNames();
383 int size = names.length;
384
385 buf.append( "INSERT INTO " ).append( table ).append( " (" );
386 buf.append( names[0] );
387 for( int i=1; i<size; i++ ) {
388 buf.append( "," ).append( names[i] );
389 }
390 buf.append( " ) VALUES ( ?" );
391 for( int i=1; i<size; i++ ) {
392 buf.append( ",?" );
393 }
394 buf.append( " )" );
395 insert = buf.toString();
396
397 // カラ?号を設定します?
398 insClmNos = new int[size];
399 for( int i=0; i<size; i++ ) {
400 insClmNos[i] = i;
401 }
402 }
403 else {
404 Formatter format = new Formatter( data );
405 format.setFormat( insert );
406 insert = format.getQueryFormatString();
407 insClmNos = format.getClmNos();
408 }
409
410 Formatter format = new Formatter( data );
411 format.setFormat( update );
412 update = format.getQueryFormatString();
413 updClmNos = format.getClmNos();
414
415 try {
416 insPstmt = connection.prepareStatement( insert );
417 updPstmt = connection.prepareStatement( update );
418 // 5.1.2.0 (2010/01/01) setObject に ParameterMetaData の getParameterType を渡す?(PostgreSQL対?
419 if( useParamMetaData ) {
420 insPmeta = insPstmt.getParameterMetaData();
421 updPmeta = updPstmt.getParameterMetaData();
422 }
423 }
424 catch (SQLException ex) {
425 // 5.7.2.2 (2014/01/24) SQL実行エラーを少し詳細に出力します?
426 String errMsg = "PreparedStatement を取得できませんでした? + CR
427 + "errMsg=[" + ex.getMessage() + "]" + CR
428 + "errCode=[" + ex.getErrorCode() + "] State=[" + ex.getSQLState() + "]" + CR
429 + "insert=[" + insert + "]" + CR
430 + "update=[" + update + "]" + CR
431 + "table=[" + table + "]" + CR
432 + "nameLine=[" + data.nameLine() + "]" + CR
433 + "data=[" + data.dataLine() + "]" + CR ;
434 throw new RuntimeException( errMsg,ex );
435 }
436 }
437
438 /**
439 * 画面出力用のフォーマットを作?します?
440 *
441 * @param rowNo ??タ読み取り件数
442 * @param updCnt 更新件数
443 * @param data ラインモ?
444 */
445 private void printKey( final int rowNo , final int updCnt , final LineModel data ) {
446 StringBuilder buf = new StringBuilder();
447
448 if( updCnt > 0 ) { buf.append( "UPDATE " ); }
449 else { buf.append( "INSERT " ); }
450
451 buf.append( "row=[" ).append( rowNo ).append( "] : " );
452 for( int i=0; i < updClmNos.length; i++ ) {
453 if( i == 0 ) { buf.append( "key: " ); }
454 else { buf.append( " and " ); }
455 buf.append( data.getName( updClmNos[i] ) );
456 buf.append( " = " );
457 buf.append( data.getValue( updClmNos[i] ) );
458 }
459
460 println( buf.toString() );
461 }
462
463 /**
464 * プロセスの処?果のレポ?ト表現を返します?
465 * 処??ログラ?、?力件数、?力件数などの??です?
466 * こ???をそのまま、標準?力に出すことで、結果レポ?トと出来るよ?
467 * 形式で出してください?
468 *
469 * @return 処?果のレポ??
470 */
471 public String report() {
472 String report = "[" + getClass().getName() + "]" + CR
473 + TAB + "DBID : " + dbid + CR
474 + TAB + "Input Count : " + count + CR
475 + TAB + "Update Count : " + updCount + CR
476 + TAB + "Insert Count : " + insCount ;
477
478 return report ;
479 }
480
481 /**
482 * こ?クラスの使用方法を返します?
483 *
484 * @return こ?クラスの使用方?
485 */
486 public String usage() {
487 StringBuilder buf = new StringBuilder();
488
489 buf.append( "Process_DBMerge は、UPDATE と INSERT を指定し ??タベ?スを追?新" ).append( CR );
490 buf.append( "する、ChainProcess インターフェースの実?ラスです?" ).append( CR );
491 buf.append( "上?プロセスチェインの??タは上流から下流へと渡されます?)から" ).append( CR );
492 buf.append( "受け取っ?LineModel を?に、データベ?スの存在チェ?を行い? ).append( CR );
493 buf.append( "下流への処?振り?けます?" ).append( CR );
494 buf.append( CR );
495 buf.append( "??タベ?ス接続?等?、ParamProcess のサブクラス(Process_DBParam)に" ).append( CR );
496 buf.append( "設定された接?Connection)を使用します?" ).append( CR );
497 buf.append( CR );
498 buf.append( "引数??中に空白を含??合?、ダブルコー??ション(\"\") で括って下さ??" ).append( CR );
499 buf.append( "引数??の ?』?前後には、空白は挟めません。??key=value の様に" ).append( CR );
500 buf.append( "繋げてください? ).append( CR );
501 buf.append( CR );
502 buf.append( "SQL?は、{@DATE.YMDH}等?シス?変数が使用できます?" ).append( CR );
503 buf.append( CR ).append( CR );
504 buf.append( getArgument().usage() ).append( CR );
505
506 return buf.toString();
507 }
508
509 /**
510 * こ?クラスは、main メソ?から実行できません?
511 *
512 * @param args コマンド引数配?
513 */
514 public static void main( final String[] args ) {
515 LogWriter.log( new Process_DBMerge().usage() );
516 }
517 }