table_name | datasize(M) |
ac_in_lhcb-production_job | 0.2813M |
ac_in_lhcb-production_jobstep | 0.3906M |
ac_key_lhcb-production_job_finalmajorstatus | 0.0156M |
ac_key_lhcb-production_job_finalminorstatus | 0.0156M |
ac_key_lhcb-production_job_jobclass | 0.0156M |
ac_key_lhcb-production_job_jobgroup | 1.5156M |
ac_key_lhcb-production_job_jobtype | 0.0156M |
ac_key_lhcb-production_job_processingtype | 0.0156M |
ac_key_lhcb-production_job_site | 0.0156M |
0.0156M | |
ac_key_lhcb-production_job_usergroup | 0.0156M |
ac_key_lhcb-production_jobstep_eventtype | 0.0156M |
ac_key_lhcb-production_jobstep_finalstepstate | 0.0156M |
ac_key_lhcb-production_jobstep_jobgroup | 0.0781M |
ac_key_lhcb-production_jobstep_processingstep | 0.0156M |
ac_key_lhcb-production_jobstep_processingtype | 0.0156M |
ac_key_lhcb-production_jobstep_runnumber | 0.1875M |
ac_key_lhcb-production_jobstep_site | 0.0156M |
ac_type_lhcb-production_job | 4905M |
ac_type_lhcb-production_jobstep | 803M |
The fields in "ac_in_lhcb-production_job types as fellow: ac_in_lhcb-production_job:
Field | Type |
JobGroup | varchar(64) |
DiskSpace | bigint(20) unsigned |
InputDataSize | bigint(20) unsigned |
FinalMajorStatus | varchar(32) |
OutputDataSize | bigint(20) unsigned |
id | int(11) |
InputSandBoxSize | bigint(20) unsigned |
OutputDataFiles | int(10) unsigned |
NormCPUTime | int(10) unsigned |
takenSince | datetime |
User | varchar(32) |
taken | tinyint(1) |
JobType | varchar(32) |
JobClass | varchar(32) |
ProcessingType | varchar(32) |
ExecTime | int(10) unsigned |
CPUTime | int(10) unsigned |
startTime | int(10) unsigned |
UserGroup | varchar(32) |
FinalMinorStatus | varchar(64) |
Site | varchar(32) |
ProcessedEvents | int(10) unsigned |
OutputSandBoxSize | bigint(20) unsigned |
InputDataFiles | int(10) unsigned |
endTime | int(10) unsigned |
Compare to the two types as stated above , the meaning of type ‘ac_key_*’ is obvious.These tables store the value of fields. There only have two fields :id and value. For example ,table ‘ac_key_lhcb-production_job_user’ store the user’s name who has use DIRAC to submit his jobs and table ‘ac_key_lhcb-production_job_site’ only store the site where job was executed.
job_jobtype | job_processingtype | job_site | job_user | job_usergroup | ||||||||||
id | value | id | value | id | value | id | value | id | value | |||||
1 | test | 1 | uknown | 1 | LCG.LPN.fr | 1 | joel | 1 | unknown | |||||
2 | user | 2 | unknown | 2 | LCG.Manchester.uk | 2 | paterson | 2 | lhcb_user | |||||
3 | production | 3 | WF-Validation-Reco08-Stripping12 | 3 | LCG.Liverpool.uk | 3 | atsareg | 3 | lhcb_prod | |||||
4 | unknown | 4 | 2010-Gen01 | 4 | LCG.Oxford.uk | 4 | acsmith | 4 | lhcb_admin | |||||
5 | sam | 5 | Reco08-Stripping12 | 5 | LCG.RAL.uk | 5 | graciani | 5 | diracAdmin | |||||
6 | reconstruction | 6 | Reco08-Stripping12-Merged | 6 | LCG.CESGA.es | 6 | roma | 6 | user | |||||
7 | DataReconstruction | 7 | Reco08-Stripping12b | 7 | LCG.PIC.es | 7 | rgracian | 7 | lhcb_calibration | |||||
8 | DataStripping | 8 | Reco08-Stripping12b-Merged | 8 | LCG.GRIDKA.de | 8 | sposs | 8 | lhcb_conf | |||||
9 | MCStripping | 9 | 2010-Sim10Trig0x002a002aReco07-w | 9 | LCG.CERN.ch | 9 | szczypka | 9 | lhcb_data | |||||
10 | MCSimulation | 10 | 2010-Sim08Trig0x002a002aReco07-w | 10 | LCG.Dortmund.de | 10 | adinolfi | 10 | lhcb_mc |
About the “ac_bucket_*_typename”,the dump has no information . There are some function are about the “bucket”,and I don’t clearly understand. Here we show some functions in “AccountingDB.py”: def _getTableName( tableType, typeName, keyName = None ): """'keyFields' : "VARCHAR(256) NOT NULL",'valueFields' : "VARCHAR(256) NOT NULL",'bucketsLength' : "VARCHAR(256) NOT NULL",},'PrimaryKey' : 'name'}})
This function show the naming rules of the tables in DIRAC Accounting databases. def markAllPendingRecordsAsNotTaken( self ):Generate table name"""if not keyName:return "ac_%s_%s" % ( tableType, typeName )elif tableType == "key" :return "ac_%s_%s_%s" % ( tableType, typeName, keyName )else:raise Exception( "Call to _getTableName with tableType as key but with no keyName" )
From this function we can understand the meaning of field "taken",when the record is pending ,"taken"=0."""Mark all records to be processed as not takenNOTE: ONLY EXECUTE THIS AT THE BEGINNING OF THE DATASTORE SERVICE!"""self.log.always( "Marking all records to be processed as not taken" )for typeName in self.dbCatalog:sqlTableName = _getTableName( "in", typeName )result = self._update( "UPDATE `%s` SET taken=0" % sqlTableName )if not result[ 'OK' ]:return resultreturn S_OK()
After load the pending records ,set the "taken"=1.def loadPendingRecords( self ):"""Load all records pending to insertion and generate threaded jobs"""gSynchro.lock()try:now = time.time()if now - self.__doingPendingLockTime <= 3600:return S_OK()self.__doingPendingLockTime = nowfinally:gSynchro.unlock()self.log.info( "[PENDING] Loading pending records for insertion" )pending = 0now = Time.toEpoch()recordsPerSlot = self.getCSOption( "RecordsPerSlot", 100 )for typeName in self.dbCatalog:self.log.info( "[PENDING] Checking %s" % typeName )pendingInQueue = self.__threadPool.pendingJobs()emptySlots = max( 0, 3000 - pendingInQueue )self.log.info( "[PENDING] %s in the queue, %d empty slots" % ( pendingInQueue, emptySlots ) )if emptySlots < 1:continueemptySlots = min( 100, emptySlots )sqlTableName = _getTableName( "in", typeName )sqlFields = [ 'id' ] + self.dbCatalog[ typeName ][ 'typeFields' ]sqlCond = "WHERE taken = 0 or TIMESTAMPDIFF( SECOND, takenSince, UTC_TIMESTAMP() ) > %s" % self.getWaitingRecordsLifeTime()result = self._query( "SELECT %s FROM `%s` %s ORDER BY id ASC LIMIT %d" % ( ", ".join( [ "`%s`" % f for f in sqlFields ] ),sqlTableName,sqlCond,emptySlots * recordsPerSlot ) )if not result[ 'OK' ]:self.log.error( "[PENDING] Error when trying to get pending records", "for %s : %s" % ( typeName, result[ 'Message' ] ) )return resultself.log.info( "[PENDING] Got %s pending records for type %s" % ( len( result[ 'Value' ] ), typeName ) )dbData = result[ 'Value' ]idList = [ str( r[0] ) for r in dbData ]#If nothing to do, continueif not idList:continueresult = self._update( "UPDATE `%s` SET taken=1, takenSince=UTC_TIMESTAMP() WHERE id in (%s)" % ( sqlTableName,", ".join( idList ) ) )if not result[ 'OK' ]:self.log.error( "[PENDING] Error when trying set state to waiting records", "for %s : %s" % ( typeName, result[ 'Message' ] ) )self.__doingPendingLockTime = 0return result#Group them in groups of 10recordsToProcess = []for record in dbData:pending += 1iD = record[ 0 ]startTime = record[ -2 ]endTime = record[ -1 ]valuesList = list( record[ 1:-2 ] )recordsToProcess.append( ( iD, typeName, startTime, endTime, valuesList, now ) )if len( recordsToProcess ) % recordsPerSlot == 0:self.__threadPool.generateJobAndQueueIt( self.__insertFromINTable ,args = ( recordsToProcess, ) )recordsToProcess = []if recordsToProcess:self.__threadPool.generateJobAndQueueIt( self.__insertFromINTable ,args = ( recordsToProcess, ) )self.log.info( "[PENDING] Got %s records requests for all types" % pending )self.__doingPendingLockTime = 0return S_OK()
return S_OK( True ) I think his is a important function,when register a new type,the system will create four tables for the new type:"ac_in_*_typename","ac_type_*_typename", "ac_key_*_typename" and "ac_bucket_*_typename". def deleteType( self, typeName ):def registerType( self, name, definitionKeyFields, definitionAccountingFields, bucketsLength ):"""Register a new type"""gMonitor.registerActivity( "registeradded:%s" % name,"Register added for %s" % " ".join( name.split( "_" ) ),"Accounting","entries",gMonitor.OP_ACUM )......tables = {}for key in definitionKeyFields:keyTableName = _getTableName( "key", name, key[0] )if keyTableName not in tablesInThere:self.log.info( "Table for key %s has to be created" % key[0] )tables[ keyTableName ] = { 'Fields' : { 'id' : 'INTEGER NOT NULL AUTO_INCREMENT','value' : '%s UNIQUE NOT NULL' % key[1]},'UniqueIndexes' : { 'valueindex' : [ 'value' ] },'PrimaryKey' : 'id'}#Registering type.........bucketTableName = _getTableName( "bucket", name )if bucketTableName not in tablesInThere:tables[ bucketTableName ] = { 'Fields' : bucketFieldsDict,'Indexes' : bucketIndexes,'UniqueIndexes' : { 'UniqueConstraint' : uniqueIndexFields }}typeTableName = _getTableName( "type", name )if typeTableName not in tablesInThere:tables[ typeTableName ] = { 'Fields' : fieldsDict }inTableName = _getTableName( "in", name )if inTableName not in tablesInThere:tables[ inTableName ] = { 'Fields' : inbufferDict,'Indexes' : { 'idIndex' : [ 'id' ] },'PrimaryKey' : 'id'}*...*...
To delete a type ,the four kinds of tables and the information of "ac_catalog_types" and dbcatalog are deleted. def __insertInQueueTable( self, typeName, startTime, endTime, valuesList ):"""Deletes a type"""if self.__readOnly:return S_ERROR( "ReadOnly mode enabled. No modification allowed" )if typeName not in self.dbCatalog:return S_ERROR( "Type %s does not exist" % typeName )self.log.info( "Deleting type", typeName )tablesToDelete = []for keyField in self.dbCatalog[ typeName ][ 'keys' ]:tablesToDelete.append( "`%s`" % _getTableName( "key", typeName, keyField ) )tablesToDelete.insert( 0, "`%s`" % _getTableName( "type", typeName ) )tablesToDelete.insert( 0, "`%s`" % _getTableName( "bucket", typeName ) )tablesToDelete.insert( 0, "`%s`" % _getTableName( "in", typeName ) )retVal = self._query( "DROP TABLE %s" % ", ".join( tablesToDelete ) )if not retVal[ 'OK' ]:return retValretVal = self._update( "DELETE FROM `%s` WHERE name='%s'" % ( _getTableName( "catalog", "Types" ), typeName ) )del( self.dbCatalog[ typeName ] )return S_OK()
Insert a record into the "in" tables. def __insertFromINTable( self, recordTuples ):sqlFields = [ 'id', 'taken', 'takenSince' ] + self.dbCatalog[ typeName ][ 'typeFields' ]sqlValues = [ '0', '0', 'UTC_TIMESTAMP()' ] + valuesList + [ startTime, endTime ]if len( sqlFields ) != len( sqlValues ):numRcv = len( valuesList ) + 2numExp = len( self.dbCatalog[ typeName ][ 'typeFields' ] )return S_ERROR( "Fields mismatch for record %s. %s fields and %s expected" % ( typeName,numRcv,numExp ) )retVal = self.insertFields( _getTableName( "in", typeName ),sqlFields,sqlValues )if not retVal[ 'OK' ]:return retValreturn S_OK( retVal[ 'lastRowId' ] )
This function do the real insert,it insert a record from "in" tables to "type" tables and delete this record in "in" tables. We can see from the DIRAC web protal that the fields in tables of accounting databases can be devided into two parts,one is the items of "Plot to gengrate" and the other one is the items of "Group by"."""Do the real insert and delete from the in buffer table"""self.log.verbose( "Received bundle to process", "of %s elements" % len( recordTuples ) )for record in recordTuples:iD, typeName, startTime, endTime, valuesList, insertionEpoch = recordresult = self.insertRecordDirectly( typeName, startTime, endTime, valuesList )if not result[ 'OK' ]:self._update( "UPDATE `%s` SET taken=0 WHERE id=%s" % ( _getTableName( "in", typeName ), iD ) )self.log.error( "Can't insert row", result[ 'Message' ] )continueresult = self._update( "DELETE FROM `%s` WHERE id=%s" % ( _getTableName( "in", typeName ), iD ) )if not result[ 'OK' ]:self.log.error( "Can't delete row from the IN table", result[ 'Message' ] )gMonitor.addMark( "insertiontime", Time.toEpoch() - insertionEpoch )
Above is the definition of Job class.The field in the definitionKeyFields are the items in "Group by",the field in the definitionAccountinyFields are the items in "Plot to gengrate".The accounting system create tables for each fields in definitionKeyFields. At last ,according to my own understanding,I draw a flow chart to describe how the accounting system provide services for users:class Job( BaseAccountingType ):def __init__( self ):BaseAccountingType.__init__( self )self.definitionKeyFields = [ ( 'User', 'VARCHAR(32)' ),( 'UserGroup', 'VARCHAR(32)' ),( 'JobGroup', "VARCHAR(64)" ),( 'JobType', 'VARCHAR(32)' ),( 'JobClass', 'VARCHAR(32)' ),( 'ProcessingType', 'VARCHAR(32)' ),( 'Site', 'VARCHAR(32)' ),( 'FinalMajorStatus', 'VARCHAR(32)' ),( 'FinalMinorStatus', 'VARCHAR(64)' )]self.definitionAccountingFields = [ ( 'CPUTime', "INT UNSIGNED" ),( 'NormCPUTime', "INT UNSIGNED" ),( 'ExecTime', "INT UNSIGNED" ),( 'InputDataSize', 'BIGINT UNSIGNED' ),( 'OutputDataSize', 'BIGINT UNSIGNED' ),( 'InputDataFiles', 'INT UNSIGNED' ),( 'OutputDataFiles', 'INT UNSIGNED' ),( 'DiskSpace', 'BIGINT UNSIGNED' ),( 'InputSandBoxSize', 'BIGINT UNSIGNED' ),( 'OutputSandBoxSize', 'BIGINT UNSIGNED' ),( 'ProcessedEvents', 'INT UNSIGNED' )]