Tags:
create new tag
view all tags

Notes on the organization of the DIRAC accounting data

Introduction

In this space we document our understanding of the current organization of the DIRAC accounting data using the relational model. The accounting database schema is documented and the role of each one of the tables is clearly explained.

This space is mainly maintained by Gang and reflects the results of his findings while studying the DIRAC accounting system.

-- FabioHernandez - 2012-08-24

Information about the dump

This dump is composed of 35M job accounting records. After import to a mysql database named testdb, the total volume of this data is about 5GB.There are 20 tables in this database. The names and sizes of these tables as bellow:

table_name

datasize(M)

ac_in_lhcb-production_job

0.2813M

ac_in_lhcb-production_jobstep

0.3906M

ac_key_lhcb-production_job_finalmajorstatus

0.0156M

ac_key_lhcb-production_job_finalminorstatus

0.0156M

ac_key_lhcb-production_job_jobclass

0.0156M

ac_key_lhcb-production_job_jobgroup

1.5156M

ac_key_lhcb-production_job_jobtype

0.0156M

ac_key_lhcb-production_job_processingtype

0.0156M

ac_key_lhcb-production_job_site

0.0156M

ac_key_lhcb-production_job_user

0.0156M

ac_key_lhcb-production_job_usergroup

0.0156M

ac_key_lhcb-production_jobstep_eventtype

0.0156M

ac_key_lhcb-production_jobstep_finalstepstate

0.0156M

ac_key_lhcb-production_jobstep_jobgroup

0.0781M

ac_key_lhcb-production_jobstep_processingstep

0.0156M

ac_key_lhcb-production_jobstep_processingtype

0.0156M

ac_key_lhcb-production_jobstep_runnumber

0.1875M

ac_key_lhcb-production_jobstep_site

0.0156M

ac_type_lhcb-production_job

4905M

ac_type_lhcb-production_jobstep

803M

As we can see ,these tables can be categorized in 3 types,named: ac_in_*, ac_type_* and ac_key_*, most of the data stored in the two tables named ‘ac_type_lhcb-production_job’ and ‘ac_type_lhcb-production_jobstep’.

Now we do not know exactly the meaning of ac_in_* and ac_type_* . They have almost the same fields,the difference is ‘ac_in_*’ has three more fields named ‘ID’,’taken’,’takensince’ than ‘ac_type_*’. We just infer that all the historical job information are stored in table "ac_type_lhcb-production_job" ,but the most recent jobs are in "ac_in_lhcb-production_job".And after some time, the records in "ac_in_lhcb-production_job" will be inserted to "ac_type_lhcb-production_job".This point was confirmed by the DIRAC developer.As they said in the email:"All tables start with ac_<table type>_<DIRAC setup>_<accounting type name>_... the accounting type defines the object that will be accounted. For instance ac_in_lhcb-production_job is the table that acts as an incoming buffer for job records for LHCb-Production setup. ac_key_lhcb-production_job_jobgroup is an indexed table for the jobgroup attribute for job records in the LHCb-Production setup.First the records are inserted in the "in" table and then processed asynchronously to be inserted in the "type" table."

We also do not know the difference between ‘job’ and ‘jobstep’ like the two tables ‘ac_in_lhcb-production_job’and ‘ac_in_lhcb-production_jobstep’.In the web portal,it seem that only the fields in "ac_*_lhcb-production_job" are used for generating plots,so what ‘ac_*_lhcb-production_jobstep’ used for ,next step we will explore it .We just know from the DIRAC developers that "j ob step is an old type that LHCb uses. It's not defined in DIRAC anymore. Just look at ac_[a-z]+_lhcb-production_job(_.+)? tables".So we will focus on the "ac_[a-z]+_lhcb-production_job" .

The fields in "ac_in_lhcb-production_job types as fellow:

ac_in_lhcb-production_job

Field

Type

JobGroup

varchar(64)

DiskSpace

bigint(20) unsigned

InputDataSize

bigint(20) unsigned

FinalMajorStatus

varchar(32)

OutputDataSize

bigint(20) unsigned

id

int(11)

InputSandBoxSize

bigint(20) unsigned

OutputDataFiles

int(10) unsigned

NormCPUTime

int(10) unsigned

takenSince

datetime

User

varchar(32)

taken

tinyint(1)

JobType

varchar(32)

JobClass

varchar(32)

ProcessingType

varchar(32)

ExecTime

int(10) unsigned

CPUTime

int(10) unsigned

startTime

int(10) unsigned

UserGroup

varchar(32)

FinalMinorStatus

varchar(64)

Site

varchar(32)

ProcessedEvents

int(10) unsigned

OutputSandBoxSize

bigint(20) unsigned

InputDataFiles

int(10) unsigned

endTime

int(10) unsigned


Compare to the two types as stated above , the meaning of type ‘ac_key_*’ is obvious.These tables store the value of fields. There only have two fields :id and value. For example ,table ‘ac_key_lhcb-production_job_user’ store the user’s name who has use DIRAC to submit his jobs and table ‘ac_key_lhcb-production_job_site’ only store the site where job was executed.

Here are some real data extract from these tables:

job_jobtype     job_processingtype     job_site     job_user     job_usergroup    
id value   id value   id value   id value   id value  
1 test   1 uknown   1 LCG.LPN.fr   1 joel   1 unknown  
2 user   2 unknown   2 LCG.Manchester.uk   2 paterson   2 lhcb_user  
3 production   3 WF-Validation-Reco08-Stripping12   3 LCG.Liverpool.uk   3 atsareg   3 lhcb_prod  
4 unknown   4 2010-Gen01   4 LCG.Oxford.uk   4 acsmith   4 lhcb_admin  
5 sam   5 Reco08-Stripping12   5 LCG.RAL.uk   5 graciani   5 diracAdmin  
6 reconstruction   6 Reco08-Stripping12-Merged   6 LCG.CESGA.es   6 roma   6 user  
7 DataReconstruction   7 Reco08-Stripping12b   7 LCG.PIC.es   7 rgracian   7 lhcb_calibration  
8 DataStripping   8 Reco08-Stripping12b-Merged   8 LCG.GRIDKA.de   8 sposs   8 lhcb_conf  
9 MCStripping   9 2010-Sim10Trig0x002a002aReco07-w   9 LCG.CERN.ch   9 szczypka   9 lhcb_data  
10 MCSimulation   10 2010-Sim08Trig0x002a002aReco07-w   10 LCG.Dortmund.de   10 adinolfi   10 lhcb_mc  

-- ZhangGang - 2012-09-06

Source code analysis

We can get more information from DIRAC / AccountingSystem / DB / AccountingDB.py

In AccountingDB.py ,it define the basic operations to accounting databases systems.There are 5 types in databases, data operation,Job,WMS history,Pilot,SRM space token,when register a new type ,one must create four kinds of tables for the new type: ”ac_in_*_typename”, ”ac_tpye_*_typename”, ”ac_key_*_typename” and “ac_bucket_*_typename”.

There also has a table named ”ac_catalog_*_types” store the information of the five types.Here is the definition of this tables in AccountingDB.py:

self._createTables( { self.catalogTableName : { 'Fields' : { 'name' : "VARCHAR(64) UNIQUE NOT NULL",

'keyFields' : "VARCHAR(256) NOT NULL",
'valueFields' : "VARCHAR(256) NOT NULL",
'bucketsLength' : "VARCHAR(256) NOT NULL",
},
'PrimaryKey' : 'name'
}
}
)

About the “ac_bucket_*_typename”,the dump has no information . There are some function are about the “bucket”,and I don’t clearly understand. Here we show some functions in “AccountingDB.py”:

def _getTableName( tableType, typeName, keyName = None ):

"""

Generate table name
"""
if not keyName:
return "ac_%s_%s" % ( tableType, typeName )
elif tableType == "key" :
return "ac_%s_%s_%s" % ( tableType, typeName, keyName )
else:
raise Exception( "Call to _getTableName with tableType as key but with no keyName" )

This function show the naming rules of the tables in DIRAC Accounting databases.

def markAllPendingRecordsAsNotTaken( self ):

"""
Mark all records to be processed as not taken
NOTE: ONLY EXECUTE THIS AT THE BEGINNING OF THE DATASTORE SERVICE!
"""
self.log.always( "Marking all records to be processed as not taken" )
for typeName in self.dbCatalog:
sqlTableName = _getTableName( "in", typeName )
result = self._update( "UPDATE `%s` SET taken=0" % sqlTableName )
if not result[ 'OK' ]:
return result
return S_OK()

From this function we can understand the meaning of field "taken",when the record is pending ,"taken"=0.

def loadPendingRecords( self ):
"""
Load all records pending to insertion and generate threaded jobs
"""
gSynchro.lock()
try:
now = time.time()
if now - self.__doingPendingLockTime <= 3600:
return S_OK()
self.__doingPendingLockTime = now
finally:
gSynchro.unlock()
self.log.info( "[PENDING] Loading pending records for insertion" )
pending = 0
now = Time.toEpoch()
recordsPerSlot = self.getCSOption( "RecordsPerSlot", 100 )
for typeName in self.dbCatalog:
self.log.info( "[PENDING] Checking %s" % typeName )
pendingInQueue = self.__threadPool.pendingJobs()
emptySlots = max( 0, 3000 - pendingInQueue )
self.log.info( "[PENDING] %s in the queue, %d empty slots" % ( pendingInQueue, emptySlots ) )
if emptySlots < 1:
continue
emptySlots = min( 100, emptySlots )
sqlTableName = _getTableName( "in", typeName )
sqlFields = [ 'id' ] + self.dbCatalog[ typeName ][ 'typeFields' ]
sqlCond = "WHERE taken = 0 or TIMESTAMPDIFF( SECOND, takenSince, UTC_TIMESTAMP() ) > %s" % self.getWaitingRecordsLifeTime()
result = self._query( "SELECT %s FROM `%s` %s ORDER BY id ASC LIMIT %d" % ( ", ".join( [ "`%s`" % f for f in sqlFields ] ),
sqlTableName,
sqlCond,
emptySlots * recordsPerSlot ) )
if not result[ 'OK' ]:
self.log.error( "[PENDING] Error when trying to get pending records", "for %s : %s" % ( typeName, result[ 'Message' ] ) )
return result
self.log.info( "[PENDING] Got %s pending records for type %s" % ( len( result[ 'Value' ] ), typeName ) )
dbData = result[ 'Value' ]
idList = [ str( r[0] ) for r in dbData ]
#If nothing to do, continue
if not idList:
continue
result = self._update( "UPDATE `%s` SET taken=1, takenSince=UTC_TIMESTAMP() WHERE id in (%s)" % ( sqlTableName,
", ".join( idList ) ) )
if not result[ 'OK' ]:
self.log.error( "[PENDING] Error when trying set state to waiting records", "for %s : %s" % ( typeName, result[ 'Message' ] ) )
self.__doingPendingLockTime = 0
return result
#Group them in groups of 10
recordsToProcess = []
for record in dbData:
pending += 1
iD = record[ 0 ]
startTime = record[ -2 ]
endTime = record[ -1 ]
valuesList = list( record[ 1:-2 ] )
recordsToProcess.append( ( iD, typeName, startTime, endTime, valuesList, now ) )
if len( recordsToProcess ) % recordsPerSlot == 0:
self.__threadPool.generateJobAndQueueIt( self.__insertFromINTable ,
args = ( recordsToProcess, ) )
recordsToProcess = []
if recordsToProcess:
self.__threadPool.generateJobAndQueueIt( self.__insertFromINTable ,
args = ( recordsToProcess, ) )
self.log.info( "[PENDING] Got %s records requests for all types" % pending )
self.__doingPendingLockTime = 0
return S_OK()

After load the pending records ,set the "taken"=1.

def registerType( self, name, definitionKeyFields, definitionAccountingFields, bucketsLength ):
"""
Register a new type
"""
gMonitor.registerActivity( "registeradded:%s" % name,
"Register added for %s" % " ".join( name.split( "_" ) ),
"Accounting",
"entries",
gMonitor.OP_ACUM )
...
...
tables = {}
for key in definitionKeyFields:
keyTableName = _getTableName( "key", name, key[0] )
if keyTableName not in tablesInThere:
self.log.info( "Table for key %s has to be created" % key[0] )
tables[ keyTableName ] = { 'Fields' : { 'id' : 'INTEGER NOT NULL AUTO_INCREMENT',
'value' : '%s UNIQUE NOT NULL' % key[1]
},
'UniqueIndexes' : { 'valueindex' : [ 'value' ] },
'PrimaryKey' : 'id'
}
#Registering type
...
...
...
bucketTableName = _getTableName( "bucket", name )
if bucketTableName not in tablesInThere:
tables[ bucketTableName ] = { 'Fields' : bucketFieldsDict,
'Indexes' : bucketIndexes,
'UniqueIndexes' : { 'UniqueConstraint' : uniqueIndexFields }
}
typeTableName = _getTableName( "type", name )
if typeTableName not in tablesInThere:
tables[ typeTableName ] = { 'Fields' : fieldsDict }
inTableName = _getTableName( "in", name )
if inTableName not in tablesInThere:
tables[ inTableName ] = { 'Fields' : inbufferDict,
'Indexes' : { 'idIndex' : [ 'id' ] },
'PrimaryKey' : 'id'
}
*...*
...

return S_OK( True )

I think his is a important function,when register a new type,the system will create four tables for the new type:"ac_in_*_typename","ac_type_*_typename",

"ac_key_*_typename" and "ac_bucket_*_typename".

def deleteType( self, typeName ):

"""
Deletes a type
"""
if self.__readOnly:
return S_ERROR( "ReadOnly mode enabled. No modification allowed" )
if typeName not in self.dbCatalog:
return S_ERROR( "Type %s does not exist" % typeName )
self.log.info( "Deleting type", typeName )
tablesToDelete = []
for keyField in self.dbCatalog[ typeName ][ 'keys' ]:
tablesToDelete.append( "`%s`" % _getTableName( "key", typeName, keyField ) )
tablesToDelete.insert( 0, "`%s`" % _getTableName( "type", typeName ) )
tablesToDelete.insert( 0, "`%s`" % _getTableName( "bucket", typeName ) )
tablesToDelete.insert( 0, "`%s`" % _getTableName( "in", typeName ) )
retVal = self._query( "DROP TABLE %s" % ", ".join( tablesToDelete ) )
if not retVal[ 'OK' ]:
return retVal
retVal = self._update( "DELETE FROM `%s` WHERE name='%s'" % ( _getTableName( "catalog", "Types" ), typeName ) )
del( self.dbCatalog[ typeName ] )
return S_OK()

To delete a type ,the four kinds of tables and the information of "ac_catalog_types" and dbcatalog are deleted.

def __insertInQueueTable( self, typeName, startTime, endTime, valuesList ):

sqlFields = [ 'id', 'taken', 'takenSince' ] + self.dbCatalog[ typeName ][ 'typeFields' ]
sqlValues = [ '0', '0', 'UTC_TIMESTAMP()' ] + valuesList + [ startTime, endTime ]
if len( sqlFields ) != len( sqlValues ):
numRcv = len( valuesList ) + 2
numExp = len( self.dbCatalog[ typeName ][ 'typeFields' ] )
return S_ERROR( "Fields mismatch for record %s. %s fields and %s expected" % ( typeName,
numRcv,
numExp ) )
retVal = self.insertFields( _getTableName( "in", typeName ),
sqlFields,
sqlValues )
if not retVal[ 'OK' ]:
return retVal
return S_OK( retVal[ 'lastRowId' ] )

Insert a record into the "in" tables.

def __insertFromINTable( self, recordTuples ):

"""
Do the real insert and delete from the in buffer table
"""
self.log.verbose( "Received bundle to process", "of %s elements" % len( recordTuples ) )
for record in recordTuples:
iD, typeName, startTime, endTime, valuesList, insertionEpoch = record
result = self.insertRecordDirectly( typeName, startTime, endTime, valuesList )
if not result[ 'OK' ]:
self._update( "UPDATE `%s` SET taken=0 WHERE id=%s" % ( _getTableName( "in", typeName ), iD ) )
self.log.error( "Can't insert row", result[ 'Message' ] )
continue
result = self._update( "DELETE FROM `%s` WHERE id=%s" % ( _getTableName( "in", typeName ), iD ) )
if not result[ 'OK' ]:
self.log.error( "Can't delete row from the IN table", result[ 'Message' ] )
gMonitor.addMark( "insertiontime", Time.toEpoch() - insertionEpoch )

This function do the real insert,it insert a record from "in" tables to "type" tables and delete this record in "in" tables.

We can see from the DIRAC web protal that the fields in tables of accounting databases can be devided into two parts,one is the items of "Plot to gengrate"

and the other one is the items of "Group by".

class Job( BaseAccountingType ):
def __init__( self ):
BaseAccountingType.__init__( self )
self.definitionKeyFields = [ ( 'User', 'VARCHAR(32)' ),
( 'UserGroup', 'VARCHAR(32)' ),
( 'JobGroup', "VARCHAR(64)" ),
( 'JobType', 'VARCHAR(32)' ),
( 'JobClass', 'VARCHAR(32)' ),
( 'ProcessingType', 'VARCHAR(32)' ),
( 'Site', 'VARCHAR(32)' ),
( 'FinalMajorStatus', 'VARCHAR(32)' ),
( 'FinalMinorStatus', 'VARCHAR(64)' )
]
self.definitionAccountingFields = [ ( 'CPUTime', "INT UNSIGNED" ),
( 'NormCPUTime', "INT UNSIGNED" ),
( 'ExecTime', "INT UNSIGNED" ),
( 'InputDataSize', 'BIGINT UNSIGNED' ),
( 'OutputDataSize', 'BIGINT UNSIGNED' ),
( 'InputDataFiles', 'INT UNSIGNED' ),
( 'OutputDataFiles', 'INT UNSIGNED' ),
( 'DiskSpace', 'BIGINT UNSIGNED' ),
( 'InputSandBoxSize', 'BIGINT UNSIGNED' ),
( 'OutputSandBoxSize', 'BIGINT UNSIGNED' ),
( 'ProcessedEvents', 'INT UNSIGNED' )
]

Above is the definition of Job class.The field in the definitionKeyFields are the items in "Group by",the field in the definitionAccountinyFields are the items

in "Plot to gengrate".The accounting system create tables for each fields in definitionKeyFields.

At last ,according to my own understanding,I draw a flow chart to describe how the accounting system provide services for users:

-- ZhangGang - 2012-09-12

Edit | Attach | Watch | Print version | History: r6 < r5 < r4 < r3 < r2 | Backlinks | Raw View | Raw edit | More topic actions
Topic revision: r6 - 2012-09-12 - ZhangGang
 
This site is powered by the TWiki collaboration platform Powered by PerlCopyright © 2008-2019 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding TWiki? Send feedback